# Complete Medallion Architecture Pipeline Demo

This notebook demonstrates the complete data pipeline flow:
- **Bronze Layer**: Raw data extraction
- **Silver Layer**: Enriched transformations
- **Gold Layer**: Business intelligence with health alerts
- **Idempotency**: Verifying merge_upsert behavior

## 1. Setup and Configuration

In [1]:
import pandas as pd
from deltalake import DeltaTable
import src.config as cfg
from src.pipelines import (
    run_extraction_load_bronze_pipeline,
    run_transformation_silver_pipeline,
    run_transformation_gold_pipeline
)

pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 50)

print("‚úÖ Setup complete")
print(f"Bucket: {cfg.BUCKET_NAME}")
print(f"Cities: buenos_aires, rosario, cordoba")

‚úÖ Setup complete
Bucket: lucagelmini-bucket
Cities: buenos_aires, rosario, cordoba


## 2. Bronze Layer - Raw Data Extraction

Extracts raw data from Open-Meteo API for 3 cities:
- Historical weather (past 7 days)
- Forecast weather (next 7 days)
- Air quality forecast (next 5 days)
- Nearest weather stations

In [2]:
print("\n" + "="*60)
print("RUNNING BRONZE EXTRACTION PIPELINE")
print("="*60 + "\n")

run_extraction_load_bronze_pipeline()

print("\n‚úÖ Bronze layer extraction complete")


RUNNING BRONZE EXTRACTION PIPELINE

Loader initialized for:  s3://lucagelmini-bucket/bronze
Processing data for buenos_aires...
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'buenos_aires'
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'buenos_aires'
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'buenos_aires'
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'buenos_aires'
Processing data for rosario...
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'rosario'
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'rosario'
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'rosario'
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'rosario'
Processing data for cordoba...
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'cordoba'
Succesfuly overwritten p

### Bronze Layer - Data Verification

In [3]:
bronze_tables = ['historical', 'forecast', 'air_quality', 'nearest_stations']

print("\nBRONZE LAYER TABLES:")
print("=" * 80)

bronze_stats = []
for table in bronze_tables:
    table_path = f"s3://{cfg.BUCKET_NAME}/bronze/{table}"
    dt = DeltaTable(table_path, storage_options=cfg.storage_options)
    df = dt.to_pandas()
    
    cities_count = df['city'].nunique() if 'city' in df.columns else 'N/A'
    
    bronze_stats.append({
        'Table': table,
        'Records': len(df),
        'Cities': cities_count,
        'Columns': len(df.columns)
    })

bronze_df = pd.DataFrame(bronze_stats)
print(bronze_df.to_string(index=False))
print("\n")


BRONZE LAYER TABLES:
           Table  Records  Cities  Columns
      historical     1152       3        8
        forecast     1008       3        8
     air_quality      720       3        8
nearest_stations       32       3        8




### Bronze Layer - Sample Data

In [4]:
historical_path = f"s3://{cfg.BUCKET_NAME}/bronze/historical"
historical_dt = DeltaTable(historical_path, storage_options=cfg.storage_options)
historical_df = historical_dt.to_pandas()

print("\nSample Historical Data (first 5 rows):")
print("=" * 80)
print(historical_df[['time', 'city', 'temperature_2m', 'precipitation', 'windspeed_10m']].head())


Sample Historical Data (first 5 rows):
               time     city  temperature_2m  precipitation  windspeed_10m
0  2025-11-30T00:00  cordoba            21.7            0.0           10.6
1  2025-11-30T01:00  cordoba            22.2            0.2           10.2
2  2025-11-30T02:00  cordoba            22.2            0.0            6.1
3  2025-11-30T03:00  cordoba            21.8            0.0            6.1
4  2025-11-30T04:00  cordoba            21.5            0.0            5.8


## 3. Silver Layer - Enriched Transformations

Creates 4 enriched tables:
1. **weather_summary**: Daily historical weather aggregations
2. **weather_forecast**: Daily forecast weather aggregations
3. **air_quality_daily**: Daily air quality aggregations + AQI
4. **hourly_historical_analysis**: Hourly temperature patterns

In [5]:
print("\n" + "="*60)
print("RUNNING SILVER TRANSFORMATION PIPELINE")
print("="*60 + "\n")

run_transformation_silver_pipeline()

print("\n‚úÖ Silver layer transformation complete")


RUNNING SILVER TRANSFORMATION PIPELINE

Loader initialized for:  s3://lucagelmini-bucket/silver
Processing silver layer for buenos_aires...
Processing silver layer for rosario...
Processing silver layer for cordoba...

‚úÖ Silver layer transformation complete


### Silver Layer - Data Verification

In [6]:
silver_tables = ['weather_summary', 'weather_forecast', 'air_quality_daily', 'hourly_historical_analysis']

print("\nSILVER LAYER TABLES:")
print("=" * 80)

silver_stats = []
for table in silver_tables:
    table_path = f"s3://{cfg.BUCKET_NAME}/silver/{table}"
    dt = DeltaTable(table_path, storage_options=cfg.storage_options)
    df = dt.to_pandas()
    
    cities_count = df['city'].nunique() if 'city' in df.columns else 'N/A'
    
    silver_stats.append({
        'Table': table,
        'Records': len(df),
        'Cities': cities_count,
        'Columns': len(df.columns)
    })

silver_df = pd.DataFrame(silver_stats)
print(silver_df.to_string(index=False))
print("\n")


SILVER LAYER TABLES:
                     Table  Records  Cities  Columns
           weather_summary       27       3       12
          weather_forecast       21       3       12
         air_quality_daily       15       3       16
hourly_historical_analysis       72       3        8




### Silver Layer - Weather Summary Sample

In [7]:
weather_summary_path = f"s3://{cfg.BUCKET_NAME}/silver/weather_summary"
weather_summary_dt = DeltaTable(weather_summary_path, storage_options=cfg.storage_options)
weather_summary_df = weather_summary_dt.to_pandas()

print("\nWeather Summary Sample (first 5 rows):")
print("=" * 80)
print(weather_summary_df[['date', 'city', 'temp_min', 'temp_max', 'temp_avg', 'temp_range', 'total_precipitation']].head())


Weather Summary Sample (first 5 rows):
         date     city  temp_min  temp_max   temp_avg  temp_range  \
0  2025-12-07  cordoba      16.9      22.3  19.250000         5.4   
1  2025-11-29  cordoba      20.5      32.1  25.270833        11.6   
2  2025-11-30  cordoba      19.2      28.6  22.912500         9.4   
3  2025-12-01  cordoba      17.6      23.8  20.983333         6.2   
4  2025-12-02  cordoba      15.0      26.5  20.837500        11.5   

   total_precipitation  
0                 27.7  
1                 12.6  
2                  8.0  
3                  8.0  
4                  0.0  


### Silver Layer - Air Quality with AQI

In [8]:
air_quality_path = f"s3://{cfg.BUCKET_NAME}/silver/air_quality_daily"
air_quality_dt = DeltaTable(air_quality_path, storage_options=cfg.storage_options)
air_quality_df = air_quality_dt.to_pandas()

print("\nAir Quality Daily Sample (first 5 rows):")
print("=" * 80)
print(air_quality_df[['date', 'city', 'pm10_avg', 'pm2_5_avg', 'aqi_simplified']].head())

print("\nAQI Statistics:")
print(air_quality_df['aqi_simplified'].describe())


Air Quality Daily Sample (first 5 rows):
         date     city   pm10_avg  pm2_5_avg  aqi_simplified
0  2025-12-07  cordoba   8.537500   8.220833       32.883333
1  2025-12-08  cordoba   7.175000   6.962500       27.850000
2  2025-12-09  cordoba  15.150000  14.541667       58.166667
3  2025-12-10  cordoba  16.337500  15.708333       62.833333
4  2025-12-11  cordoba  10.592308   9.923077       39.692308

AQI Statistics:
count    15.000000
mean     29.640256
std      15.122914
min      15.883333
25%      18.225000
50%      25.723077
75%      36.287821
max      62.833333
Name: aqi_simplified, dtype: float64


### Silver Layer - Hourly Historical Analysis

In [9]:
hourly_path = f"s3://{cfg.BUCKET_NAME}/silver/hourly_historical_analysis"
hourly_dt = DeltaTable(hourly_path, storage_options=cfg.storage_options)
hourly_df = hourly_dt.to_pandas()

print("\nHourly Historical Analysis (hours 6-12 for Buenos Aires):")
print("=" * 80)
ba_hourly = hourly_df[(hourly_df['city'] == 'buenos_aires') & (hourly_df['hour'].between(6, 12))]
print(ba_hourly[['hour', 'city', 'temp_min', 'temp_max', 'temp_avg', 'days_count']])


Hourly Historical Analysis (hours 6-12 for Buenos Aires):
    hour          city  temp_min  temp_max  temp_avg  days_count
54     6  buenos_aires      17.2      27.1  21.43125           9
55     7  buenos_aires      16.9      26.6  21.23125           9
56     8  buenos_aires      16.5      26.2  20.86875           9
57     9  buenos_aires      16.4      25.7  20.55000           9
58    10  buenos_aires      17.3      25.5  20.88750           9
59    11  buenos_aires      18.0      26.1  21.92500           9
60    12  buenos_aires      18.4      27.9  23.56875           9


## 4. Gold Layer - Business Intelligence

Creates **forecast_combined** table with:
- Combined weather and air quality forecasts
- **Health alerts** (GOOD, LOW_ALERT, MODERATE_ALERT, HIGH_ALERT)
- **Allergy risk** (LOW, MODERATE, HIGH)
- **Outdoor activity score** (0-100)

In [10]:
print("\n" + "="*60)
print("RUNNING GOLD TRANSFORMATION PIPELINE")
print("="*60 + "\n")

run_transformation_gold_pipeline()

print("\n‚úÖ Gold layer transformation complete")


RUNNING GOLD TRANSFORMATION PIPELINE

Loader initialized for:  s3://lucagelmini-bucket/gold
Processing gold layer for buenos_aires...
Processing gold layer for rosario...
Processing gold layer for cordoba...

‚úÖ Gold layer transformation complete


### Gold Layer - Data Verification

In [11]:
forecast_combined_path = f"s3://{cfg.BUCKET_NAME}/gold/forecast_combined"
forecast_combined_dt = DeltaTable(forecast_combined_path, storage_options=cfg.storage_options)
forecast_combined_df = forecast_combined_dt.to_pandas()

print("\nGOLD LAYER TABLE:")
print("=" * 80)
print(f"Table: forecast_combined")
print(f"Records: {len(forecast_combined_df)}")
print(f"Cities: {forecast_combined_df['city'].nunique()}")
print(f"Columns: {len(forecast_combined_df.columns)}")
print("\n")


GOLD LAYER TABLE:
Table: forecast_combined
Records: 15
Cities: 3
Columns: 19




### Gold Layer - Business Intelligence Features

In [12]:
print("\nForecast Combined Sample (all columns):")
print("=" * 80)
print(forecast_combined_df[[
    'date', 'city', 'temp_avg', 'aqi_simplified', 
    'health_alert', 'allergy_risk', 'outdoor_score'
]].head(10))


Forecast Combined Sample (all columns):
         date     city   temp_avg  aqi_simplified health_alert allergy_risk  \
0  2025-12-07  cordoba  20.150000       32.883333         GOOD          LOW   
1  2025-12-08  cordoba  21.300000       27.850000         GOOD          LOW   
2  2025-12-09  cordoba  22.254167       58.166667    LOW_ALERT          LOW   
3  2025-12-10  cordoba  24.316667       62.833333    LOW_ALERT     MODERATE   
4  2025-12-11  cordoba  22.004167       39.692308         GOOD          LOW   
5  2025-12-07  rosario  23.637500       19.483333         GOOD          LOW   
6  2025-12-08  rosario  21.237500       16.266667         GOOD          LOW   
7  2025-12-09  rosario  20.050000       22.833333         GOOD          LOW   
8  2025-12-10  rosario  22.816667       26.066667         GOOD          LOW   
9  2025-12-11  rosario  21.541667       43.538462         GOOD          LOW   

   outdoor_score  
0             63  
1             66  
2             70  
3            

### Gold Layer - Health Alert Distribution

In [13]:
print("\nHealth Alert Distribution:")
print("=" * 80)
health_counts = forecast_combined_df['health_alert'].value_counts()
print(health_counts)

print("\n\nAllergy Risk Distribution:")
print("=" * 80)
allergy_counts = forecast_combined_df['allergy_risk'].value_counts()
print(allergy_counts)

print("\n\nOutdoor Score Statistics:")
print("=" * 80)
print(forecast_combined_df['outdoor_score'].describe())


Health Alert Distribution:
health_alert
GOOD         13
LOW_ALERT     2
Name: count, dtype: int64


Allergy Risk Distribution:
allergy_risk
LOW         14
MODERATE     1
Name: count, dtype: int64


Outdoor Score Statistics:
count    15.00000
mean     75.40000
std      12.74923
min      58.00000
25%      66.50000
50%      70.00000
75%      89.50000
max      92.00000
Name: outdoor_score, dtype: float64


## 5. Idempotency Test - Merge Upsert Behavior

Demonstrates that running the pipelines again produces the same results without duplicates.

In [14]:
print("\n" + "="*60)
print("IDEMPOTENCY TEST - RUNNING PIPELINES AGAIN")
print("="*60 + "\n")

# Store initial counts
initial_counts = {
    'bronze_historical': len(DeltaTable(f"s3://{cfg.BUCKET_NAME}/bronze/historical", storage_options=cfg.storage_options).to_pandas()),
    'silver_weather_summary': len(DeltaTable(f"s3://{cfg.BUCKET_NAME}/silver/weather_summary", storage_options=cfg.storage_options).to_pandas()),
    'silver_air_quality': len(DeltaTable(f"s3://{cfg.BUCKET_NAME}/silver/air_quality_daily", storage_options=cfg.storage_options).to_pandas()),
    'gold_forecast_combined': len(DeltaTable(f"s3://{cfg.BUCKET_NAME}/gold/forecast_combined", storage_options=cfg.storage_options).to_pandas())
}

print("Initial Record Counts:")
for key, value in initial_counts.items():
    print(f"  {key}: {value}")

print("\nRunning pipelines again...\n")


IDEMPOTENCY TEST - RUNNING PIPELINES AGAIN

Initial Record Counts:
  bronze_historical: 1152
  silver_weather_summary: 27
  silver_air_quality: 15
  gold_forecast_combined: 15

Running pipelines again...



In [15]:
# Run bronze pipeline again
print("Re-running Bronze pipeline...")
run_extraction_load_bronze_pipeline()
print("‚úÖ Bronze pipeline complete\n")

Re-running Bronze pipeline...
Loader initialized for:  s3://lucagelmini-bucket/bronze
Processing data for buenos_aires...
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'buenos_aires'
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'buenos_aires'
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'buenos_aires'
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'buenos_aires'
Processing data for rosario...
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'rosario'
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'rosario'
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'rosario'
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'rosario'
Processing data for cordoba...
Succesfuly overwritten partition: date_retrieved = '2025-12-07' AND city = 'cordoba'
Succesfuly overwritten partitio

In [16]:
# Run silver pipeline again
print("Re-running Silver pipeline...")
run_transformation_silver_pipeline()
print("‚úÖ Silver pipeline complete\n")

Re-running Silver pipeline...
Loader initialized for:  s3://lucagelmini-bucket/silver
Processing silver layer for buenos_aires...
Processing silver layer for rosario...
Processing silver layer for cordoba...
‚úÖ Silver pipeline complete



In [17]:
# Run gold pipeline again
print("Re-running Gold pipeline...")
run_transformation_gold_pipeline()
print("‚úÖ Gold pipeline complete\n")

Re-running Gold pipeline...
Loader initialized for:  s3://lucagelmini-bucket/gold
Processing gold layer for buenos_aires...
Processing gold layer for rosario...
Processing gold layer for cordoba...
‚úÖ Gold pipeline complete



In [18]:
# Check counts after re-running
final_counts = {
    'bronze_historical': len(DeltaTable(f"s3://{cfg.BUCKET_NAME}/bronze/historical", storage_options=cfg.storage_options).to_pandas()),
    'silver_weather_summary': len(DeltaTable(f"s3://{cfg.BUCKET_NAME}/silver/weather_summary", storage_options=cfg.storage_options).to_pandas()),
    'silver_air_quality': len(DeltaTable(f"s3://{cfg.BUCKET_NAME}/silver/air_quality_daily", storage_options=cfg.storage_options).to_pandas()),
    'gold_forecast_combined': len(DeltaTable(f"s3://{cfg.BUCKET_NAME}/gold/forecast_combined", storage_options=cfg.storage_options).to_pandas())
}

print("\nIDEMPOTENCY TEST RESULTS:")
print("=" * 80)
print(f"{'Table':<35} {'Initial':<10} {'After Rerun':<15} {'Status'}")
print("=" * 80)

all_match = True
for key in initial_counts.keys():
    initial = initial_counts[key]
    final = final_counts[key]
    status = "‚úÖ IDENTICAL" if initial == final else "‚ùå CHANGED"
    if initial != final:
        all_match = False
    print(f"{key:<35} {initial:<10} {final:<15} {status}")

print("=" * 80)
if all_match:
    print("\n‚úÖ IDEMPOTENCY VERIFIED: All record counts remain unchanged!")
    print("   merge_upsert successfully prevents duplicates.")
else:
    print("\n‚ö†Ô∏è WARNING: Some counts changed. This may be expected for bronze layer (insert_overwrite).")


IDEMPOTENCY TEST RESULTS:
Table                               Initial    After Rerun     Status
bronze_historical                   1152       1152            ‚úÖ IDENTICAL
silver_weather_summary              27         27              ‚úÖ IDENTICAL
silver_air_quality                  15         15              ‚úÖ IDENTICAL
gold_forecast_combined              15         15              ‚úÖ IDENTICAL

‚úÖ IDEMPOTENCY VERIFIED: All record counts remain unchanged!
   merge_upsert successfully prevents duplicates.


## 6. Business Use Cases - Sample Queries

Demonstrating practical applications of the gold layer data.

### Use Case 1: Find High Alert Days

In [19]:
print("\nHIGH ALERT DAYS (Health concerns):")
print("=" * 80)

high_alert_days = forecast_combined_df[
    forecast_combined_df['health_alert'].isin(['HIGH_ALERT', 'MODERATE_ALERT'])
]

if len(high_alert_days) > 0:
    print(high_alert_days[['date', 'city', 'temp_avg', 'aqi_simplified', 'health_alert']].to_string(index=False))
else:
    print("‚úÖ No high alert days found - conditions are safe!")


HIGH ALERT DAYS (Health concerns):
‚úÖ No high alert days found - conditions are safe!


### Use Case 2: Best Days for Outdoor Activities

In [20]:
print("\nBEST DAYS FOR OUTDOOR ACTIVITIES (Top 5):")
print("=" * 80)

best_days = forecast_combined_df.nlargest(5, 'outdoor_score')
print(best_days[[
    'date', 'city', 'outdoor_score', 'temp_avg', 
    'aqi_simplified', 'total_precipitation', 'health_alert'
]].to_string(index=False))


BEST DAYS FOR OUTDOOR ACTIVITIES (Top 5):
      date         city  outdoor_score  temp_avg  aqi_simplified  total_precipitation health_alert
2025-12-07 buenos_aires             92 22.241667       15.900000                  0.0         GOOD
2025-12-08      rosario             91 21.237500       16.266667                  0.0         GOOD
2025-12-08 buenos_aires             91 21.191667       16.966667                  0.0         GOOD
2025-12-07      rosario             90 23.637500       19.483333                  0.0         GOOD
2025-12-10 buenos_aires             89 23.158333       20.516667                  0.0         GOOD


### Use Case 3: High Allergy Risk Days

In [21]:
print("\nHIGH ALLERGY RISK DAYS:")
print("=" * 80)

high_allergy = forecast_combined_df[
    forecast_combined_df['allergy_risk'].isin(['HIGH', 'MODERATE'])
]

if len(high_allergy) > 0:
    print(high_allergy[[
        'date', 'city', 'allergy_risk', 'pm10_avg', 
        'pm2_5_avg', 'avg_windspeed'
    ]].to_string(index=False))
else:
    print("‚úÖ No high allergy risk days found!")


HIGH ALLERGY RISK DAYS:
      date    city allergy_risk  pm10_avg  pm2_5_avg  avg_windspeed
2025-12-10 cordoba     MODERATE   16.3375  15.708333       6.741667


### Use Case 4: City Comparison - Average Conditions

In [22]:
print("\nCITY COMPARISON - AVERAGE FORECAST CONDITIONS:")
print("=" * 80)

city_comparison = forecast_combined_df.groupby('city').agg({
    'temp_avg': 'mean',
    'aqi_simplified': 'mean',
    'outdoor_score': 'mean',
    'total_precipitation': 'sum'
}).round(2)

city_comparison.columns = ['Avg Temp (¬∞C)', 'Avg AQI', 'Avg Outdoor Score', 'Total Precip (mm)']
print(city_comparison)


CITY COMPARISON - AVERAGE FORECAST CONDITIONS:
              Avg Temp (¬∞C)  Avg AQI  Avg Outdoor Score  Total Precip (mm)
city                                                                      
buenos_aires          22.39    19.00               82.2                8.0
cordoba               22.01    44.29               65.4              138.0
rosario               21.86    25.64               78.6               18.4


## 7. Summary

### Architecture Overview

```
BRONZE (Raw)          SILVER (Enriched)              GOLD (Business)
‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ

üì¶ historical      ‚Üí  weather_summary
                   ‚Üí  hourly_historical_analysis

üì¶ forecast        ‚Üí  weather_forecast             ‚îê
üì¶ air_quality     ‚Üí  air_quality_daily            ‚îú‚Üí forecast_combined ‚≠ê
                                                   ‚îò   ‚Ä¢ health_alert
                                                       ‚Ä¢ allergy_risk
                                                       ‚Ä¢ outdoor_score
```