In [0]:
import configparser
from pathlib import Path
config = configparser.ConfigParser()
config_path = Path("/Workspace/Users/runcoyote2017@gmail.com/databricks-etl-demo/conf/core_config.ini")
# Get the catalog name configuration and set the current Catalog and database 
NULL = config.read(config_path)
catalog = config.get("databricks", "catalog_name")
database = 'gold'
spark.catalog.setCurrentCatalog(catalog) 
spark.catalog.setCurrentDatabase(database)

In [0]:
%run ../../lib/utils_notebook

In [0]:
#Read cleaned data from silver layer
caregivers_df = spark.table("silver.cleaned_caregivers")
#Select and transform columns to create the caregivers' gold table
final_caregivers_df = caregivers_df.select("caregiver_id",concat("first_name",lit(" "),coalesce("middle_name",lit("")),lit(" "),coalesce("last_name",lit(""))).alias("full_name"),"gender","birthdate","age","salary","ssn")
# Create a table with cleaned and transform data
create_table(final_caregivers_df,"caregivers")
#Count and print inserted rows
rows_inserted = inserted_rows('caregivers')
print(f"rows inserted:{rows_inserted}")


In [0]:
#Read cleaned data from silver layer
children_df = spark.table("silver.cleaned_children")
#Select and transform columns to create the caregivers' gold table
final_children_df = children_df.select("child_id",concat("first_name",lit(" "),coalesce("middle_name",lit("")),lit(" "),coalesce("last_name",lit(""))).alias("full_name"),"gender","birthdate","caregiver_id")
# Create a table with cleaned and transform data
create_table(final_children_df,"children")
#Count and print inserted rows
rows_inserted = inserted_rows('children')
print(f"rows inserted:{rows_inserted}")

In [0]:
#Query to get the data to create the caregiver_children_summary table
caregiver_children_summary_df = spark.sql(f"""SELECT cg.caregiver_id as caregiver_id,count(ch.child_id) as num_children FROM caregivers cg
INNER JOIN children ch ON cg.caregiver_id = ch.caregiver_id
group by cg.caregiver_id
order by num_children desc""")
# Create an added table summarizing how many children each caregiver has
create_table(caregiver_children_summary_df,"caregiver_children_summary")
#Count and print inserted rows
rows_inserted = inserted_rows('caregiver_children_summary')
print(f"rows inserted:{rows_inserted}")

In [0]:
#Query to get the data to create the caregiver_age_salary_distribution table
caregiver_age_salary_distribution_df  = spark.sql(f"""SELECT
                    CONCAT(FLOOR(age / 5) * 5, '-', FLOOR(age / 5) * 5 + 4) AS age_group,
                    ROUND(AVG(salary), 2) AS avg_salary,
                    COUNT(*) AS num_caregivers
                    FROM
                    caregivers
                    GROUP BY
                    FLOOR(age / 5)
                    ORDER BY
                    FLOOR(age / 5);
                """)
# Create an added table for age and income analysis
create_table(caregiver_age_salary_distribution_df,"caregiver_age_salary_distribution")
#Count and print inserted rows
rows_inserted = inserted_rows('caregiver_age_salary_distribution')
print(f"rows inserted:{rows_inserted}")