# Gold - Star Schema

### 1.0 - Configuration

In [0]:
%run ./00_utils

### 2.0 - Dimension: Locations

In [0]:
source_df = spark.sql(f"""
    SELECT 
        id as location_id,
        name as location_name,
        locality,
        latitude,
        longitude,
        country_code,
        country_name,
        owner_name,
        provider_name,
        timezone
    FROM {CATALOG}.silver.locations
""")

if not spark.catalog.tableExists(f"{CATALOG}.gold.dim_locations"):
    source_df.write.saveAsTable(f"{CATALOG}.gold.dim_locations")
    print("Created dim_locations")
else:
    source_df.createOrReplaceTempView("source_locations")
    spark.sql(f"""
        MERGE INTO {CATALOG}.gold.dim_locations AS target
        USING source_locations AS source
        ON target.location_id = source.location_id
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)
    print("Merged dim_locations")

count = spark.sql(f"SELECT COUNT(*) FROM {CATALOG}.gold.dim_locations").collect()[0][0]
print(f"dim_locations: {count} rows")

### 3.0 - Dimension: Parameters

In [0]:
source_df = spark.sql(f"""
    SELECT 
        parameter_id,
        parameter_name,
        parameter_units,
        parameter_display_name
    FROM {CATALOG}.silver.parameters
""")

if not spark.catalog.tableExists(f"{CATALOG}.gold.dim_parameters"):
    source_df.write.saveAsTable(f"{CATALOG}.gold.dim_parameters")
    print("Created dim_parameters")
else:
    source_df.createOrReplaceTempView("source_parameters")
    spark.sql(f"""
        MERGE INTO {CATALOG}.gold.dim_parameters AS target
        USING source_parameters AS source
        ON target.parameter_id = source.parameter_id
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)
    print("Merged dim_parameters")

count = spark.sql(f"SELECT COUNT(*) FROM {CATALOG}.gold.dim_parameters").collect()[0][0]
print(f"dim_parameters: {count} rows")

### 4.0 - Dimension: Sensors

In [0]:
source_df = spark.sql(f"""
    SELECT DISTINCT
        sensor_id,
        sensor_name
    FROM {CATALOG}.silver.sensors
""")

if not spark.catalog.tableExists(f"{CATALOG}.gold.dim_sensors"):
    source_df.write.saveAsTable(f"{CATALOG}.gold.dim_sensors")
    print("Created dim_sensors")
else:
    source_df.createOrReplaceTempView("source_sensors")
    spark.sql(f"""
        MERGE INTO {CATALOG}.gold.dim_sensors AS target
        USING source_sensors AS source
        ON target.sensor_id = source.sensor_id
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)
    print("Merged dim_sensors")

count = spark.sql(f"SELECT COUNT(*) FROM {CATALOG}.gold.dim_sensors").collect()[0][0]
print(f"dim_sensors: {count} rows")

### 5.0 - Dimension: Date

In [0]:
source_df = spark.sql(f"""
    SELECT DISTINCT
        CAST(DATE(datetime_utc) AS DATE) as date_id,
        YEAR(datetime_utc) as year,
        MONTH(datetime_utc) as month,
        DAY(datetime_utc) as day,
        DAYOFWEEK(datetime_utc) as day_of_week,
        DAYNAME(datetime_utc) as day_name,
        WEEKOFYEAR(datetime_utc) as week_of_year,
        QUARTER(datetime_utc) as quarter
    FROM {CATALOG}.silver.measurements
    WHERE datetime_utc IS NOT NULL
""")

if not spark.catalog.tableExists(f"{CATALOG}.gold.dim_date"):
    source_df.write.saveAsTable(f"{CATALOG}.gold.dim_date")
    print("Created dim_date")
else:
    source_df.createOrReplaceTempView("source_dates")
    spark.sql(f"""
        MERGE INTO {CATALOG}.gold.dim_date AS target
        USING source_dates AS source
        ON target.date_id = source.date_id
        WHEN NOT MATCHED THEN INSERT *
    """)
    print("Merged dim_date")

count = spark.sql(f"SELECT COUNT(*) FROM {CATALOG}.gold.dim_date").collect()[0][0]
print(f"dim_date: {count} rows")

### 6.0 - Fact: Measurements

In [0]:
source_df = spark.sql(f"""
    SELECT 
        m.sensors_id as sensor_id,
        m.locations_id as location_id,
        s.parameter_id,
        CAST(DATE(m.datetime_utc) AS DATE) as date_id,
        m.datetime_utc,
        m.value,
        m.ingested_at
    FROM {CATALOG}.silver.measurements m
    JOIN {CATALOG}.silver.sensors s ON m.sensors_id = s.sensor_id
""")

if not spark.catalog.tableExists(f"{CATALOG}.gold.fact_measurements"):
    source_df.write.saveAsTable(f"{CATALOG}.gold.fact_measurements")
    print("Created fact_measurements")
else:
    source_df.createOrReplaceTempView("source_facts")
    spark.sql(f"""
        MERGE INTO {CATALOG}.gold.fact_measurements AS target
        USING source_facts AS source
        ON target.sensor_id = source.sensor_id 
           AND target.datetime_utc = source.datetime_utc
        WHEN NOT MATCHED THEN INSERT *
    """)
    print("Merged fact_measurements")

count = spark.sql(f"SELECT COUNT(*) FROM {CATALOG}.gold.fact_measurements").collect()[0][0]
print(f"fact_measurements: {count} rows")

### 7.0 - Verify Star Schema

In [0]:
print("dim_locations samples")
spark.sql(f"SELECT * FROM {CATALOG}.gold.dim_locations LIMIT 5").display()

print("dim_parameters samples")
spark.sql(f"SELECT * FROM {CATALOG}.gold.dim_parameters LIMIT 5").display()

print("dim_sensors samples")
spark.sql(f"SELECT * FROM {CATALOG}.gold.dim_sensors LIMIT 5").display()

print("dim_date samples")
spark.sql(f"SELECT * FROM {CATALOG}.gold.dim_date LIMIT 5").display()

print("fact_measurements samples")
spark.sql(f"SELECT * FROM {CATALOG}.gold.fact_measurements LIMIT 5").display()

print("Row Counts")
spark.sql(f"""
    SELECT 'dim_locations' as table_name, COUNT(*) as rows FROM {CATALOG}.gold.dim_locations
    UNION ALL
    SELECT 'dim_parameters', COUNT(*) FROM {CATALOG}.gold.dim_parameters
    UNION ALL
    SELECT 'dim_sensors', COUNT(*) FROM {CATALOG}.gold.dim_sensors
    UNION ALL
    SELECT 'dim_date', COUNT(*) FROM {CATALOG}.gold.dim_date
    UNION ALL
    SELECT 'fact_measurements', COUNT(*) FROM {CATALOG}.gold.fact_measurements
""").display()