# `FRE 521D_Assignment 2_Group 3`
### Members: Janine, Juliette, Margaret & Clare

## Task 1: Pipeline Architecture Design

### (a) Data Flow Diagram

The ETL pipeline follows a layered Extract–Transform–Load–Aggregate architecture as illustrated in Figure 1.
Weather data is extracted from the Open-Meteo Historical Weather API using country centroid coordinates from a CSV file. The pipeline enforces rate limiting and retry logic to ensure reliable API access.

During the transformation stage, JSON responses are flattened, cleaned, validated, and standardized. The transformed data is then loaded into a daily weather table using upsert operations to prevent duplication.

Finally, daily weather data is aggregated into monthly and annual summary tables. These aggregated datasets are joined with crop production data to produce an integrated analytical view used for business analysis.

---
```
┌─────────────────────────────────────────────────────────────────────────────────────┐
│                        ETL PIPELINE ARCHITECTURE                                    │
├─────────────────────────────────────────────────────────────────────────────────────┤
│                                                                                     │
│  ┌─────────────┐     ┌─────────────┐      ┌─────────────┐      ┌─────────────┐      │
│  │   EXTRACT   │────>│  TRANSFORM  │─────>│    LOAD     │─────>│  AGGREGATE  │      │
│  └─────────────┘     └─────────────┘      └─────────────┘      └─────────────┘      │
│        │                   │                   │                   │                │
│        ▼                   ▼                   ▼                   ▼                │
│  ┌──────────────┐    ┌───────────┐       ┌──────────────┐    ┌───────────────────┐  │
│  │  Open-Meteo  │    │ Flatten   │       │daily_ Weather│    │ monthly_ weather  │  │
│  │   API        │    │ JSON      │       │ Table        │    │                   │  │
│  │              │    │           │       │              │    │- Monthly & Annual │  │
│  │ - Rate limit │    │ - Parse   │       │ - Upsert     │    │- Calculate Metrics│  │
│  │ - Retry      │    │ - Validate│       │ - Dedupe     │    │- Join with        │  │
│  │ - Country CSV│    │ - Clean   │       │ - Index      │    │  production data  │  │
│  └──────────────┘    └───────────┘       └──────────────┘    └───────────────────┘  │
│        │                   │                   │                       │            │
│        ▼                   ▼                   ▼───────────────────────▼───────┐    │
│     Raw Layer          Cleaned Layer        Aggregate Layers           │       │    │
│        │                   │                   │                       │       │    │
│        ▼                   ▼                   ▼                       ▼       ▼    │
│  ┌───────────┐       ┌───────────┐       ┌───────────┐  ┌────────────────────────┐  │
│  │  Logging  │       │  Logging  │       │  Logging  │  │        Logging         │  │
│  │ - Success │       │ - Records │       │ - Inserts │  │- weather & crop Data   │  │
│  │ - Errors  │       │ - Nulls   │       │ - Commits │  │- Country + Year level  │  │
│  │ - Timing  │       │ - Types   │       │ - Errors  │  │    ┌───────────────┐   │  │
│  └───────────┘       └───────────┘       └───────────┘  └────│ANALYSIS READY │───┘  │
│                                                              └───────────────┘      │
└─────────────────────────────────────────────────────────────────────────────────────┘
```
**Figure 1:** ETL pipeline data flow illustrating extraction from the Open-Meteo API, transformation through raw and cleaned layers, aggregation, and integration with crop production data.

```
┌─────────────────────────────────────────────────────────────────────┐
│                    TABLE RELATIONSHIPS                              │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   A-1 TABLES                         A-2 TABLES                     │
│   ──────────                         ──────────                     │
│                                                                     │
│   ┌──────────────┐                   ┌──────────────┐               │
│   │crop_production│                   │daily_weather │              │
│   │              │                   │              │               │
│   │ iso3_code ───┼───────────────────┼─ iso3_code   │               │
│   │ year      ───┼───┐               │ date         │               │
│   │ crop         │   │               └──────────────┘               │
│   │ production   │   │                      │                       │
│   │ yield        │   │                      │ Aggregate             │
│   └──────────────┘   │                      ▼                       │
│          │           │               ┌──────────────┐               │
│          │           │               │annual_weather│               │
│          │           │               │              │               │
│          │           └───────────────┼─ iso3_code   │               │
│          │                           │ year ────────┼───┐           │
│          │                           │ weather vars │   │           │
│          │                           └──────────────┘   │           │
│          │                                              │           │
│          │              JOIN ON                         │           │
│          │         iso3_code + year                     │           │
│          │                                              │           │
│          ▼                                              ▼           │
│   ┌─────────────────────────────────────────────────────────┐       │
│   │              climate_agriculture_analysis               │       │
│   │                   (Integrated View)                     │       │
│   │                                                         │       │
│   │  - Country attributes (name, region, income group)      │       │
│   │  - Crop metrics (production, yield, area, fertilizer)   │       │
│   │  - Climate metrics (temp, precip, GDD, extremes)        │       │
│   │  - Derived: water balance, temp bucket                  │       │
│   └─────────────────────────────────────────────────────────┘       │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
```
**Figure 2:** Table relationships showing integration between Assignment 1 crop production tables and new weather tables using iso3_code and year.

### (b) Schema for New Weather Tables

The ETL pipeline introduces several new tables to store weather data:

**Daily Weather Table (`daily_weather`)**

Stores cleaned daily weather observations including:
- iso3_code
- date
- temperature metrics (mean, max, min)
- precipitation and rain totals
- evapotranspiration (ET0)

This table enforces uniqueness on (`iso3_code, date`) and serves as the cleaned data layer.

**Monthly Weather Table (`monthly_weather`)**

Stores monthly aggregated climate summaries by country and month.

**Annual Weather Table (`annual_weather`)**

Stores annual aggregated climate indicators by country and year, including derived metrics such as:
- Growing Degree Days (GDD)
- Precipitation variability
- Extreme temperature counts

### (c) Relationship to Assignment 1 Tables

Weather data is integrated with existing Assignment 1 crop production tables using shared temporal and geographic keys.

The `daily_weather` table is aggregated into the `annual_weather` table by iso3_code and year.

The `annual_weather` table is then joined with the `crop_production` table on:

**iso3_code + year**


This produces the integrated view `climate_agriculture_analysis`, which combines:
- Country attributes (name, region, income group)
- Crop metrics (production, yield, area, fertilizer use)
- Climate metrics (temperature, precipitation, GDD, extremes)

### (d) Error Handling and API Rate Limit Strategy

To ensure reliable data extraction, the pipeline implements:
- A minimum 5-second delay between API requests to comply with rate limits
- Retry logic with up to three attempts for failed requests
- Exponential backoff between retries

Errors, failures, and processing times are logged at each ETL stage to allow monitoring and troubleshooting.

### (e) Data Lineage and Logging

The pipeline tracks data lineage through both metadata storage and operational logging.

Each extracted weather record includes:
- Source identifier (Open-Meteo API)
- Extraction timestamp

Logging is implemented across the ETL process to capture:
- Extraction success or failure
- Record counts
- Data validation results (nulls and type issues)
- Load confirmations and database errors

This ensures transparency, reproducibility, and quality control throughout the pipeline.


## Task 2: ETL Pipeline Implementation

Our Task 2 work plan:

1. Extract : CSV ingestion, API ingestion, Rate limiting, Logging
2. Tranform : JSON flattening, Raw layer, Cleaned layer, Type enforcement, Validation
3. Load : Table creation, Constraints (CHECK + PK), Batch UPSERT loading, Verification

In [1]:
# ============================================
# ETL PIPELINE SETUP
# ============================================

# Standard imports for ETL work
import pandas as pd
import numpy as np
import requests
import json
import time
import os
from datetime import datetime
from sqlalchemy import text


# For database connection 
from sqlalchemy import create_engine

# Load the SQL magic extension
%load_ext sql
%config SqlMagic.style = '_DEPRECATED_DEFAULT'
%config SqlMagic.autopandas = False
%sql mysql+pymysql://mfre521d_user:mfre521d_user_pw@127.0.0.1:3306/mfre521d

# Display settings 
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 50)

print(f"Pandas version: {pd.__version__}")
print(f"Current time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("Setup complete!")

Pandas version: 2.2.3
Current time: 2026-01-30 10:22:31
Setup complete!


In [2]:
# ============================================
# DATABASE CONNECTION
# ============================================

# Database connection parameters
DB_USER = "mfre521d_user"
DB_PASSWORD = "mfre521d_user_pw"
DB_HOST = "localhost"
DB_PORT = "3306"
DB_NAME = "mfre521d"

# Create connection string
connection_string = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

# Create SQLAlchemy engine
engine = create_engine(connection_string)

# Connect SQL magic
%sql {connection_string}

print("Connection established!")

Connection established!


In [3]:
def run_query(sql):
    return pd.read_sql(sql, engine)


In [4]:
%%sql

SELECT 'Connection successful!' AS status, NOW() AS current_ts;

   mysql+pymysql://mfre521d_user:***@127.0.0.1:3306/mfre521d
 * mysql+pymysql://mfre521d_user:***@localhost:3306/mfre521d
1 rows affected.


status,current_ts
Connection successful!,2026-01-30 18:22:31


In [5]:
# ============================================
# LOAD COUNTRY CENTROIDS CSV SAFELY
# ============================================

def read_csv_safely(filepath):
    """
    Read CSV with full control (lecture style).
    Returns all columns as strings for safe processing.
    """
    df = pd.read_csv(
        filepath,
        dtype=str,               # everything as string
        keep_default_na=False,  # no auto converting NA values
        encoding='utf-8',
        low_memory=False
    )
    
    print(f"Loaded {len(df)} rows and {len(df.columns)} columns from {filepath}")
    return df


# Load centroid file 
centroids_path = "../data/country_centroids.csv"
df_centroids_raw = read_csv_safely(centroids_path)

# Quick inspection 
print("\nColumn names:")
print(df_centroids_raw.columns.tolist())

print("\nFirst 5 rows:")
df_centroids_raw.head()


Loaded 34 rows and 5 columns from ../data/country_centroids.csv

Column names:
['iso3_code', 'country_name', 'latitude', 'longitude', 'hemisphere']

First 5 rows:


Unnamed: 0,iso3_code,country_name,latitude,longitude,hemisphere
0,USA,United States,39.8283,-98.5795,Northern
1,CAN,Canada,56.1304,-106.3468,Northern
2,MEX,Mexico,23.6345,-102.5528,Northern
3,BRA,Brazil,-14.235,-51.9253,Southern
4,ARG,Argentina,-38.4161,-63.6167,Southern


In [37]:
# ============================================
# OPEN-METEO API EXTRACTION FUNCTION
# ============================================

BASE_URL = "https://archive-api.open-meteo.com/v1/archive"

# Variables required 
WEATHER_VARS = [
    "temperature_2m_mean",
    "temperature_2m_max",
    "temperature_2m_min",
    "precipitation_sum",
    "rain_sum",
    "et0_fao_evapotranspiration"
]

START_DATE = "2015-01-01"
END_DATE = "2023-12-31"


def fetch_weather_data(lat, lon, start_date, end_date, max_retries=5):
    """
    Fetch historical daily weather data from Open-Meteo API.
    Implements retry logic and error handling (lecture style).
    """
    
    params = {
        "latitude": lat,
        "longitude": lon,
        "start_date": start_date,
        "end_date": end_date,
        "daily": ",".join(WEATHER_VARS),
        "timezone": "UTC"
    }
    
    attempt = 0
    
    while attempt < max_retries:
        try:
            response = requests.get(BASE_URL, params=params)
            
            print(f"Request URL: {response.url}")
            print(f"Status Code: {response.status_code}")
            
            if response.status_code == 200:
                return response.json()
            
            else:
                print(f"API error (attempt {attempt+1}): {response.status_code}")
        
        except Exception as e:
            print(f"Request failed (attempt {attempt+1}): {e}")
        
        # Exponential backoff
        wait_time =  30 * (attempt + 1)
        print(f"Retrying after {wait_time} seconds...")
        time.sleep(wait_time)
        
        attempt += 1
    
    print("Max retries reached. Returning None.")
    return None


In [38]:
# ============================================
# TESTING API CALL WITH ONE COUNTRY
# ============================================

# Pick first country (USA)
test_row = df_centroids_raw.iloc[0]

test_lat = test_row['latitude']
test_lon = test_row['longitude']
test_country = test_row['iso3_code']

print(f"Testing API for country: {test_country}")

test_data = fetch_weather_data(
    lat=test_lat,
    lon=test_lon,
    start_date=START_DATE,
    end_date=END_DATE
)

# Inspect structure
if test_data:
    print("\nTop-level keys:")
    print(test_data.keys())
    
    print("\nDaily data keys:")
    print(test_data['daily'].keys())
    
    print("\nFirst 5 dates:")
    print(test_data['daily']['time'][:5])


Testing API for country: USA
Request URL: https://archive-api.open-meteo.com/v1/archive?latitude=39.8283&longitude=-98.5795&start_date=2015-01-01&end_date=2023-12-31&daily=temperature_2m_mean%2Ctemperature_2m_max%2Ctemperature_2m_min%2Cprecipitation_sum%2Crain_sum%2Cet0_fao_evapotranspiration&timezone=UTC
Status Code: 200

Top-level keys:
dict_keys(['latitude', 'longitude', 'generationtime_ms', 'utc_offset_seconds', 'timezone', 'timezone_abbreviation', 'elevation', 'daily_units', 'daily'])

Daily data keys:
dict_keys(['time', 'temperature_2m_mean', 'temperature_2m_max', 'temperature_2m_min', 'precipitation_sum', 'rain_sum', 'et0_fao_evapotranspiration'])

First 5 dates:
['2015-01-01', '2015-01-02', '2015-01-03', '2015-01-04', '2015-01-05']


In [39]:
# ============================================
# FLATTEN WEATHER API JSON INTO TABULAR FORMAT
# ============================================
def flatten_weather_json(weather_json, iso3_code, verbose=False):
    """
    Convert API JSON daily data into tabular format.
    Adds country code and metadata.
    """

    # Extract daily section
    daily_data = weather_json['daily']

    # Build dataframe
    df_raw = pd.DataFrame({
        "iso3_code": iso3_code,
        "date": daily_data['time'],
        "temperature_mean": daily_data['temperature_2m_mean'],
        "temperature_max": daily_data['temperature_2m_max'],
        "temperature_min": daily_data['temperature_2m_min'],
        "precipitation": daily_data['precipitation_sum'],
        "rain": daily_data['rain_sum'],
        "evapotranspiration": daily_data['et0_fao_evapotranspiration']
    })

    # Add metadata
    df_raw['source'] = 'Open-Meteo API'
    df_raw['extracted_at'] = datetime.now()

    if verbose:
        print(f"Created raw dataframe with {len(df_raw)} rows for {iso3_code}")

    return df_raw


In [40]:
# ============================================
# TEST FLATTENING FUNCTION
# ============================================

df_raw_test = flatten_weather_json(test_data, test_country, verbose=True)

print("\nRaw dataframe shape:")
print(df_raw_test.shape)

print("\nFirst 5 rows:")
df_raw_test.head()


Created raw dataframe with 3287 rows for USA

Raw dataframe shape:
(3287, 10)

First 5 rows:


Unnamed: 0,iso3_code,date,temperature_mean,temperature_max,temperature_min,precipitation,rain,evapotranspiration,source,extracted_at
0,USA,2015-01-01,-7.7,0.6,-12.1,0.0,0.0,1.1,Open-Meteo API,2026-01-30 11:12:00.713490
1,USA,2015-01-02,-4.3,2.0,-8.8,0.0,0.0,1.11,Open-Meteo API,2026-01-30 11:12:00.713490
2,USA,2015-01-03,-2.5,4.0,-5.3,0.3,0.0,0.97,Open-Meteo API,2026-01-30 11:12:00.713490
3,USA,2015-01-04,-12.2,-6.6,-16.3,4.1,0.0,0.84,Open-Meteo API,2026-01-30 11:12:00.713490
4,USA,2015-01-05,-10.0,-3.8,-13.6,0.0,0.0,0.76,Open-Meteo API,2026-01-30 11:12:00.713490


In [41]:
# ============================================
# EXTRACT WEATHER DATA FOR ALL COUNTRIES 
# ============================================

all_raw_data = []

start_time = datetime.now()

for idx, row in df_centroids_raw.iterrows():
    
    iso3 = row['iso3_code']
    lat = row['latitude']
    lon = row['longitude']

    # Fetch weather data
    weather_json = fetch_weather_data(
        lat=lat,
        lon=lon,
        start_date=START_DATE,
        end_date=END_DATE
    )
    
    if weather_json is not None:
        df_raw_country = flatten_weather_json(weather_json, iso3)
        all_raw_data.append(df_raw_country)

    # Rate limiting (5 seconds)
    time.sleep(5)

end_time = datetime.now()
print("ETL extraction completed.")
print(f"Countries processed: {len(all_raw_data)}")
print(f"Total runtime: {end_time - start_time}")


Request URL: https://archive-api.open-meteo.com/v1/archive?latitude=39.8283&longitude=-98.5795&start_date=2015-01-01&end_date=2023-12-31&daily=temperature_2m_mean%2Ctemperature_2m_max%2Ctemperature_2m_min%2Cprecipitation_sum%2Crain_sum%2Cet0_fao_evapotranspiration&timezone=UTC
Status Code: 200
Request URL: https://archive-api.open-meteo.com/v1/archive?latitude=56.1304&longitude=-106.3468&start_date=2015-01-01&end_date=2023-12-31&daily=temperature_2m_mean%2Ctemperature_2m_max%2Ctemperature_2m_min%2Cprecipitation_sum%2Crain_sum%2Cet0_fao_evapotranspiration&timezone=UTC
Status Code: 200
Request URL: https://archive-api.open-meteo.com/v1/archive?latitude=23.6345&longitude=-102.5528&start_date=2015-01-01&end_date=2023-12-31&daily=temperature_2m_mean%2Ctemperature_2m_max%2Ctemperature_2m_min%2Cprecipitation_sum%2Crain_sum%2Cet0_fao_evapotranspiration&timezone=UTC
Status Code: 200
Request URL: https://archive-api.open-meteo.com/v1/archive?latitude=-14.235&longitude=-51.9253&start_date=2015-01

In [42]:
# ============================================
# COMBINE ALL RAW DATA
# ============================================

df_raw_all = pd.concat(all_raw_data, ignore_index=True)

print("\nCombined raw dataset shape:")
print(df_raw_all.shape)

print("\nSample rows:")
df_raw_all.head()



Combined raw dataset shape:
(111758, 10)

Sample rows:


Unnamed: 0,iso3_code,date,temperature_mean,temperature_max,temperature_min,precipitation,rain,evapotranspiration,source,extracted_at
0,USA,2015-01-01,-7.7,0.6,-12.1,0.0,0.0,1.1,Open-Meteo API,2026-01-30 11:12:01.869906
1,USA,2015-01-02,-4.3,2.0,-8.8,0.0,0.0,1.11,Open-Meteo API,2026-01-30 11:12:01.869906
2,USA,2015-01-03,-2.5,4.0,-5.3,0.3,0.0,0.97,Open-Meteo API,2026-01-30 11:12:01.869906
3,USA,2015-01-04,-12.2,-6.6,-16.3,4.1,0.0,0.84,Open-Meteo API,2026-01-30 11:12:01.869906
4,USA,2015-01-05,-10.0,-3.8,-13.6,0.0,0.0,0.76,Open-Meteo API,2026-01-30 11:12:01.869906


In [43]:
# ============================================
#  CLEANED LAYER - TYPE CONVERSIONS
# ============================================

df_clean = df_raw_all.copy()

# Convert date column
df_clean['date'] = pd.to_datetime(df_clean['date'])

# Convert numeric columns
numeric_cols = [
    'temperature_mean',
    'temperature_max',
    'temperature_min',
    'precipitation',
    'rain',
    'evapotranspiration'
]

for col in numeric_cols:
    df_clean[col] = pd.to_numeric(df_clean[col], errors='coerce')

# Derive year and month
df_clean['year'] = df_clean['date'].dt.year
df_clean['month'] = df_clean['date'].dt.month

print("Type conversion completed!")

print("\nData types:")
print(df_clean.dtypes)


Type conversion completed!

Data types:
iso3_code                     object
date                  datetime64[ns]
temperature_mean             float64
temperature_max              float64
temperature_min              float64
precipitation                float64
rain                         float64
evapotranspiration           float64
source                        object
extracted_at          datetime64[us]
year                           int32
month                          int32
dtype: object


In [44]:
# ============================================
# NULL VALUE CHECKS
# ============================================

print("\nNull counts per column:")
print(df_clean.isnull().sum())



Null counts per column:
iso3_code             0
date                  0
temperature_mean      0
temperature_max       0
temperature_min       0
precipitation         0
rain                  0
evapotranspiration    1
source                0
extracted_at          0
year                  0
month                 0
dtype: int64


In [45]:
# ============================================
# BASIC RANGE VALIDATION
# ============================================

print("\nTemperature range:")
print(df_clean[['temperature_min', 'temperature_mean', 'temperature_max']].describe())

print("\nPrecipitation range:")
print(df_clean['precipitation'].describe())

print("\nEvapotranspiration range:")
print(df_clean['evapotranspiration'].describe())



Temperature range:
       temperature_min  temperature_mean  temperature_max
count    111758.000000     111758.000000    111758.000000
mean         13.871539         18.569677        23.711112
std           9.914468         10.038286        10.531857
min         -42.500000        -37.900000       -33.900000
25%           7.400000         12.600000        18.000000
50%          16.500000         21.800000        27.000000
75%          21.300000         25.800000        30.900000
max          34.800000         40.000000        45.900000

Precipitation range:
count    111758.000000
mean          3.057758
std           8.026577
min           0.000000
25%           0.000000
50%           0.000000
75%           2.400000
max         316.700000
Name: precipitation, dtype: float64

Evapotranspiration range:
count    111757.000000
mean          4.064746
std           2.273780
min           0.030000
25%           2.490000
50%           3.970000
75%           5.330000
max          15.330000
Name:

In [46]:
# ============================================
# CHECK EXISTING TABLES IN SQL DATABASE
# ============================================

query = "SHOW TABLES;"

df_tables = pd.read_sql(query, engine)

print("Tables in database:")
print(df_tables)


Tables in database:
                Tables_in_mfre521d
0                     AirQuality_2
1             air_quality_readings
2                   annual_weather
3    annual_weather_quality_checks
4     climate_agriculture_analysis
5                        countries
6             country_name_mapping
7                  crop_production
8          crop_weather_integrated
9                            crops
10                   daily_summary
11                   daily_weather
12                  food_nutrition
13                 monthly_summary
14                 monthly_weather
15  monthly_weather_quality_checks
16            pollution_thresholds
17                     sensor_info
18           temperature_anomalies
19            temperature_readings
20              validated_readings
21                weather_stations


In [47]:
# ============================================
# CREATE DAILY_WEATHER TABLE
# ============================================

create_daily_weather_table = """
CREATE TABLE IF NOT EXISTS daily_weather (
    iso3_code VARCHAR(3) NOT NULL,
    date DATE NOT NULL,

    temperature_mean FLOAT,
    temperature_max FLOAT,
    temperature_min FLOAT,
    precipitation FLOAT,
    rain FLOAT,
    evapotranspiration FLOAT,

    source VARCHAR(50),
    extracted_at DATETIME,

    year INT,
    month INT,

    -- Primary Key for idempotency
    PRIMARY KEY (iso3_code, date),

    -- Basic validation constraints
    CHECK (year BETWEEN 2015 AND 2023),
    CHECK (month BETWEEN 1 AND 12),
    CHECK (precipitation >= 0),
    CHECK (rain >= 0),
    CHECK (evapotranspiration >= 0)
);
"""

with engine.connect() as connection:
    connection.execute(text(create_daily_weather_table))
    connection.commit()

print("daily_weather table created successfully!")


daily_weather table created successfully!


In [48]:
# ============================================
# UPSERT QUERY FOR DAILY_WEATHER
# ============================================

insert_daily_weather = """
INSERT INTO daily_weather (
    iso3_code,
    date,
    temperature_mean,
    temperature_max,
    temperature_min,
    precipitation,
    rain,
    evapotranspiration,
    source,
    extracted_at,
    year,
    month
)
VALUES (
    :iso3_code,
    :date,
    :temperature_mean,
    :temperature_max,
    :temperature_min,
    :precipitation,
    :rain,
    :evapotranspiration,
    :source,
    :extracted_at,
    :year,
    :month
)
ON DUPLICATE KEY UPDATE
    temperature_mean = VALUES(temperature_mean),
    temperature_max = VALUES(temperature_max),
    temperature_min = VALUES(temperature_min),
    precipitation = VALUES(precipitation),
    rain = VALUES(rain),
    evapotranspiration = VALUES(evapotranspiration),
    source = VALUES(source),
    extracted_at = VALUES(extracted_at),
    year = VALUES(year),
    month = VALUES(month);
"""


In [49]:
# ============================================
# LOAD CLEANED DATA INTO DATABASE 
# ============================================

# --- Make SQL-friendly types ---
df_sql = df_clean.copy()

# DATE column: Python date (not pandas Timestamp)
df_sql["date"] = pd.to_datetime(df_sql["date"]).dt.date

# extracted_at: Python datetime
# df_sql["extracted_at"] = pd.to_datetime(df_sql["extracted_at"]).dt.to_pydatetime()

# Convert to object dtype so None is preserved for null values (not converted back to NaN)
df_sql = df_sql.astype(object)

# Replace all pandas/numpy missing values with None -> MySQL NULL
df_sql = df_sql.where(pd.notnull(df_sql), None)
batch_size = 1000
total_rows = len(df_sql)

print(f"Loading {total_rows} rows into daily_weather...")

with engine.begin() as connection:
    for start in range(0, total_rows, batch_size):
        end = start + batch_size
        batch = df_sql.iloc[start:end].to_dict(orient="records")
        connection.execute(text(insert_daily_weather), batch)

print("Data loading completed successfully!")



Loading 111758 rows into daily_weather...
Data loading completed successfully!


In [50]:
# ============================================
# VERIFY LOAD
# ============================================

check_query = "SELECT COUNT(*) AS row_count FROM daily_weather;"

df_check = pd.read_sql(check_query, engine)

print(df_check)


   row_count
0     111758


In [51]:
# ============================================
#  DAILY_WEATHER TABLE PREVIEW
# ============================================

preview_query = "SELECT * FROM daily_weather LIMIT 5;"

df_preview = pd.read_sql(preview_query, engine)

df_preview


Unnamed: 0,iso3_code,date,temperature_mean,temperature_max,temperature_min,precipitation,rain,evapotranspiration,source,extracted_at,year,month
0,ARG,2015-01-01,17.7,20.7,14.3,0.0,0.0,5.86,Open-Meteo API,2026-01-30 11:12:28,2015,1
1,ARG,2015-01-02,17.4,24.0,10.8,0.0,0.0,6.11,Open-Meteo API,2026-01-30 11:12:28,2015,1
2,ARG,2015-01-03,21.5,28.5,16.2,0.0,0.0,7.98,Open-Meteo API,2026-01-30 11:12:28,2015,1
3,ARG,2015-01-04,20.0,30.3,10.9,0.0,0.0,7.71,Open-Meteo API,2026-01-30 11:12:28,2015,1
4,ARG,2015-01-05,28.1,36.4,21.9,0.3,0.3,9.25,Open-Meteo API,2026-01-30 11:12:28,2015,1


## Task 3: Data Aggregation and Integration

#### Create Aggregated Views for Monthly & Annual Weather Summaries (by country)

In [62]:
%%sql
-- Create aggregated views for monthly weather by country

DROP VIEW IF EXISTS monthly_weather;
CREATE VIEW monthly_weather AS
SELECT
    dw.iso3_code,
    dw.year,
    dw.month,

    COUNT(*) AS n_days,

    -- Temp (C)
    AVG(dw.temperature_mean) AS temperature_mean_mon,
    MAX(dw.temperature_max) AS temperature_max_mon,
    MIN(dw.temperature_min) AS temperature_min_mon,

    -- Precipitation
    SUM(dw.precipitation) AS precipitation_mon,
    SUM(dw.rain) AS rain_mon,
    SUM(dw.evapotranspiration) AS evapotranspiration_mon,

    -- Derived Metrics
    SUM(GREATEST(dw.temperature_mean - 10, 0)) AS gdd_mon,
    STDDEV_SAMP(dw.precipitation) AS precipitation_sd_mon,
    SUM(
    CASE
        WHEN dw.temperature_max >= 35
        OR dw.temperature_min <= 0
        THEN 1
        ELSE 0
    END) AS extreme_temp_days_mon
    
FROM daily_weather dw
GROUP BY
    dw.iso3_code,
    dw.year,
    dw.month;

   mysql+pymysql://mfre521d_user:***@127.0.0.1:3306/mfre521d
 * mysql+pymysql://mfre521d_user:***@localhost:3306/mfre521d
0 rows affected.
0 rows affected.


[]

In [63]:
%%sql 

-- Preview monthly_weather view

SELECT * FROM monthly_weather LIMIT 5;

   mysql+pymysql://mfre521d_user:***@127.0.0.1:3306/mfre521d
 * mysql+pymysql://mfre521d_user:***@localhost:3306/mfre521d
5 rows affected.


iso3_code,year,month,n_days,temperature_mean_mon,temperature_max_mon,temperature_min_mon,precipitation_mon,rain_mon,evapotranspiration_mon,gdd_mon,precipitation_sd_mon,extreme_temp_days_mon
ARG,2015,1,31,23.564516190559637,36.4,10.5,24.49999949336052,24.49999949336052,224.99999856948853,420.50000190734863,2.0301649455010664,1
ARG,2015,2,28,22.32857138769967,33.1,10.7,105.00000034272672,105.00000034272672,154.66000199317932,345.1999988555908,8.053225685947822,0
ARG,2015,3,31,21.299999944625363,33.3,7.7,31.10000000149012,31.10000000149012,138.50999999046326,350.29999828338623,2.941709714135207,0
ARG,2015,4,30,16.976666577657063,30.0,6.8,63.40000028163195,63.40000028163195,85.85000029206276,209.2999973297119,7.409441016807826,0
ARG,2015,5,31,13.441935400809012,25.3,2.5,8.000000029802322,8.000000029802322,59.13000059127808,113.39999675750732,1.079127485154056,0


In [64]:
%%sql
-- Create aggregated views for annual weather by country

DROP VIEW IF EXISTS annual_weather;
CREATE VIEW annual_weather AS
SELECT
    dw.iso3_code,
    dw.year,

    COUNT(*) AS n_days,

    -- Temp (C)
    AVG(dw.temperature_mean) AS temperature_mean_yr,
    MAX(dw.temperature_max) AS temperature_max_yr,
    MIN(dw.temperature_min) AS temperature_min_yr,

    -- Precipitation
    SUM(dw.precipitation) AS precipitation_yr,
    SUM(dw.rain) AS rain_yr,
    SUM(dw.evapotranspiration) AS evapotranspiration_yr,

    -- Derived metrics
    SUM(GREATEST(dw.temperature_mean - 10, 0)) AS gdd_yr,
    STDDEV_SAMP(dw.precipitation) AS precipitation_sd_yr,
    SUM(
    CASE
        WHEN dw.temperature_max >= 35
        OR dw.temperature_min <= 0
        THEN 1
        ELSE 0
    END) AS extreme_temp_days_yr

FROM daily_weather dw
GROUP BY
    dw.iso3_code,
    dw.year;


   mysql+pymysql://mfre521d_user:***@127.0.0.1:3306/mfre521d
 * mysql+pymysql://mfre521d_user:***@localhost:3306/mfre521d
0 rows affected.
0 rows affected.


[]

In [65]:
%%sql 

-- Preview annual_weather view

SELECT * FROM annual_weather LIMIT 5;

   mysql+pymysql://mfre521d_user:***@127.0.0.1:3306/mfre521d
 * mysql+pymysql://mfre521d_user:***@localhost:3306/mfre521d
5 rows affected.


iso3_code,year,n_days,temperature_mean_yr,temperature_max_yr,temperature_min_yr,precipitation_yr,rain_yr,evapotranspiration_yr,gdd_yr,precipitation_sd_yr,extreme_temp_days_yr
ARG,2015,365,16.160821923164473,38.6,-2.7,481.6999982222914,481.6999982222914,1468.16000315547,2400.8999996185303,4.25614549115477,13
ARG,2016,366,15.430327842144369,37.4,-1.8,640.3999985903502,640.3999985903502,1369.090000152588,2248.1999883651733,4.892452938506748,13
ARG,2017,365,16.25013696984069,37.8,-0.7,636.7000013664365,636.7000013664365,1444.9100015163422,2410.8999948501587,5.83258680589989,12
ARG,2018,365,16.403835635315883,38.1,-2.3,409.1999992281199,409.1999992281199,1514.3000038862228,2545.800006866455,3.2378715043956925,23
ARG,2019,365,16.173698587286964,36.7,-3.5,341.3999990448356,341.3999990448356,1560.100000411272,2455.6999855041504,3.878644737566522,15


In [66]:
%%sql

-- Quality Check (monthly_weather)

DROP VIEW IF EXISTS monthly_weather_quality_checks;

CREATE VIEW monthly_weather_quality_checks AS
SELECT
    dw.iso3_code,
    dw.year,
    dw.month,
    COUNT(*) AS n_days,
    SUM(CASE WHEN dw.temperature_mean IS NULL THEN 1 ELSE 0 END) AS missing_temp_mean_days,
    SUM(CASE WHEN dw.precipitation  IS NULL THEN 1 ELSE 0 END) AS missing_precip_days
FROM daily_weather dw
GROUP BY
    dw.iso3_code,
    dw.year,
    dw.month;

-- Preview monthly_weather view

SELECT * FROM monthly_weather_quality_checks LIMIT 5;

   mysql+pymysql://mfre521d_user:***@127.0.0.1:3306/mfre521d
 * mysql+pymysql://mfre521d_user:***@localhost:3306/mfre521d
0 rows affected.
0 rows affected.
5 rows affected.


iso3_code,year,month,n_days,missing_temp_mean_days,missing_precip_days
ARG,2015,1,31,0,0
ARG,2015,2,28,0,0
ARG,2015,3,31,0,0
ARG,2015,4,30,0,0
ARG,2015,5,31,0,0


In [67]:
%%sql

-- Quality Check (annual_weather)

DROP VIEW IF EXISTS annual_weather_quality_checks;

CREATE VIEW annual_weather_quality_checks AS
SELECT
    dw.iso3_code,
    dw.year,
    COUNT(*) AS n_days,
    SUM(CASE WHEN dw.temperature_mean IS NULL THEN 1 ELSE 0 END) AS missing_temp_mean_days,
    SUM(CASE WHEN dw.precipitation  IS NULL THEN 1 ELSE 0 END) AS missing_precip_days
FROM daily_weather dw
GROUP BY
    dw.iso3_code,
    dw.year;



   mysql+pymysql://mfre521d_user:***@127.0.0.1:3306/mfre521d
 * mysql+pymysql://mfre521d_user:***@localhost:3306/mfre521d
0 rows affected.
0 rows affected.


[]

#### Create Integrated View (crop_production & annual_weather)

In [68]:
%%sql
DROP VIEW IF EXISTS crop_weather_integrated;

CREATE VIEW crop_weather_integrated AS
SELECT
    cp.country_id,
    c.country_name,
    c.iso3_code,
    c.region,
    c.income_group,
    cp.year,
    cp.crop_id,

    -- Crop outcomes/inputs
    cp.area_harvested_ha,
    cp.production_tonnes,
    cp.yield_kg_ha,
    cp.fertilizer_use_kg_ha,
    cp.irrigation_pct,
    cp.notes,

    -- Annual weather metrics
    aw.n_days,
    aw.temperature_mean_yr,
    aw.temperature_max_yr,
    aw.temperature_min_yr,
    aw.precipitation_yr,
    aw.rain_yr,
    aw.evapotranspiration_yr,
    aw.gdd_yr,
    aw.precipitation_sd_yr,
    aw.extreme_temp_days_yr
    
FROM crop_production cp
JOIN countries c
  ON cp.country_id = c.country_id
JOIN annual_weather aw
  ON aw.iso3_code = c.iso3_code
 AND aw.year = cp.year;



   mysql+pymysql://mfre521d_user:***@127.0.0.1:3306/mfre521d
 * mysql+pymysql://mfre521d_user:***@localhost:3306/mfre521d
0 rows affected.
0 rows affected.


[]

In [69]:
%%sql

-- Preview integrated view
SELECT * FROM crop_production LIMIT 5;

   mysql+pymysql://mfre521d_user:***@127.0.0.1:3306/mfre521d
 * mysql+pymysql://mfre521d_user:***@localhost:3306/mfre521d
5 rows affected.


country_id,year,crop_id,area_harvested_ha,production_tonnes,yield_kg_ha,fertilizer_use_kg_ha,irrigation_pct,notes
1,1990,1,1316340.0,9954010.0,7561.9,93.01,23.0,
1,1990,2,3109820.0,16759800.0,5389.31,101.2,33.5,
1,1990,3,4433170.0,13023200.0,2937.68,88.06,8.5,
1,1990,4,4710050.0,14925800.0,3168.91,113.43,43.8,
1,1991,1,1248610.0,11071000.0,8866.61,89.27,24.5,


In [70]:
%%sql
SELECT
  COUNT(*) AS crop_rows,
  SUM(CASE WHEN aw.n_days IS NOT NULL THEN 1 ELSE 0 END) AS rows_with_weather,
  SUM(CASE WHEN aw.n_days IS NULL THEN 1 ELSE 0 END) AS rows_missing_weather,
  ROUND(100.0 * SUM(CASE WHEN aw.n_days IS NOT NULL THEN 1 ELSE 0 END) / COUNT(*), 2) AS pct_with_weather
FROM crop_production cp
JOIN countries c
  ON cp.country_id = c.country_id
LEFT JOIN annual_weather aw
  ON aw.iso3_code = c.iso3_code
 AND aw.year = cp.year;



   mysql+pymysql://mfre521d_user:***@127.0.0.1:3306/mfre521d
 * mysql+pymysql://mfre521d_user:***@localhost:3306/mfre521d
1 rows affected.


crop_rows,rows_with_weather,rows_missing_weather,pct_with_weather
4187,1106,3081,26.42


In [71]:
%%sql

-- Confirm no missing data from 2015-2023
SELECT
  COUNT(*) AS crop_rows_2015plus,
  SUM(CASE WHEN aw.n_days IS NOT NULL THEN 1 ELSE 0 END) AS matched_rows,
  SUM(CASE WHEN aw.n_days IS NULL THEN 1 ELSE 0 END) AS unmatched_rows
FROM crop_production cp
JOIN countries c ON cp.country_id = c.country_id
LEFT JOIN annual_weather aw
  ON aw.iso3_code = c.iso3_code
 AND aw.year = cp.year
WHERE cp.year >= 2015;



   mysql+pymysql://mfre521d_user:***@127.0.0.1:3306/mfre521d
 * mysql+pymysql://mfre521d_user:***@localhost:3306/mfre521d
1 rows affected.


crop_rows_2015plus,matched_rows,unmatched_rows
1106,1106,0


### Task 3 Documentation

#### Derived Metrics Assumptions

#### Growing Degree Days (GDD)
We derived GDD using the following recommended formula from Michigan State University Agricultural Research Center (see source below): GDD = Avg Daily Temp - Base Temp.
Base Temp Calculation: Common baselines for crops are 5 °C for cool-season plants and 10 °C for warm-season plants. Given the crop data is representative of mainly warm-weather crops (maize, soybeans, rice), we chose a single 10C base for consistency of analysis. Note that results may understate accumulated heat for wheat, which is a cool-season crop.
Source: https://www.canr.msu.edu/uploads/files/Research_Center/NW_Mich_Hort/General/CalculatingGrowingDegreeDays.pdf

#### Precipitation Variability
Derived as the sample standard deviation of daily precipitation within each country–year–month group (and country-year group for annual data)

#### Extreme Temperature Days
Extreme temperature days are defined as days with either unusually high temperatures (maximum ≥ 35°C) or freezing conditions (minimum ≤ 0°C).These thresholds are commonly used by the IPCC to capture heat stress and frost risk, respectively, and are applied consistently across countries to ensure comparability across heterogeneous climate regimes.
Source: https://www.ipcc.ch/report/ar6/wg1/chapter/chapter-12/ 


#### Integrated Crop & Weather View Assumptions
In constructing crop_weather_integrated, weather observations were matched to crop production records at the country+year level by using the countries lookup table to map each crop_production.country_id to the country ISO3 code, then joining annual_weather on (iso3_code, year).  This assumes (i) ISO3 codes uniquely and consistently identify countries across both datasets and that any boundary or coding changes over time are negligible for the analysis period, and (ii) annual weather metrics derived from the country centroid data are a reasonable proxy for nationwide growing conditions affecting reported production and yields. 

Crop production data are integrated with annual weather using an INNER JOIN on country (ISO3 code) and year. This approach restricts the integrated dataset to country–year observations with available weather data, ensuring that all retained crop records have complete corresponding climate information. Weather data are available only from 2015 onward; therefore, crop observations prior to 2015 are excluded from the integrated dataset.

## TASK 4


### Q1

In [74]:
pd.read_sql("SHOW TABLES", engine)


Unnamed: 0,Tables_in_mfre521d
0,AirQuality_2
1,air_quality_readings
2,annual_weather
3,annual_weather_quality_checks
4,climate_agriculture_analysis
5,countries
6,country_name_mapping
7,crop_production
8,crop_weather_integrated
9,crops


In [83]:
centroids_path = "../data/country_centroids.csv"
df_centroids_raw = pd.read_csv(centroids_path, dtype=str)


df_centroids = df_centroids_raw[["iso3_code", "hemisphere"]].copy()

df_centroids["hemisphere"] = (
    df_centroids["hemisphere"]
    .str.upper()
    .str[0:2]
    .replace({"NO": "NH", "SO": "SH"})
)

df_centroids.head()

df_weather = pd.read_sql(
    """
    SELECT
        iso3_code,
        year,
        month,
        precipitation
    FROM daily_weather
    WHERE year BETWEEN 2015 AND 2023
    """,
    engine
)

df_weather.head()



df_weather = df_weather.merge(
    df_centroids,
    on="iso3_code",
    how="left"
)

df_weather.head()

df_weather_gs = df_weather[
    ((df_weather["hemisphere"] == "NH") & (df_weather["month"].between(4, 8))) |
    ((df_weather["hemisphere"] == "SH") & (df_weather["month"].isin([10, 11, 12, 1, 2])))
].copy()

df_weather_gs.head()


Unnamed: 0,iso3_code,year,month,precipitation,hemisphere
0,ARG,2015,1,0.0,SH
1,ARG,2015,1,0.0,SH
2,ARG,2015,1,0.0,SH
3,ARG,2015,1,0.0,SH
4,ARG,2015,1,0.3,SH


In [86]:
df_precip_agg = (
    df_weather_gs
    .groupby(["iso3_code", "year"])
    .agg(
        gs_precip_total=("precipitation", "sum"),
        gs_precip_mean=("precipitation", "mean"),
        gs_precip_sd=("precipitation", "std")
    )
    .reset_index()
)

# 计算 precipitation variability（CV）
df_precip_agg["gs_precip_cv"] = (
    df_precip_agg["gs_precip_sd"] / df_precip_agg["gs_precip_mean"]
)

df_precip_agg.head()


df_wheat = pd.read_sql(
    """
    SELECT
        c.iso3_code,
        cp.year,
        cp.yield_kg_ha
    FROM crop_production cp
    JOIN countries c
      ON cp.country_id = c.country_id
    WHERE cp.crop_id = 3
      AND cp.year BETWEEN 2015 AND 2023
    """,
    engine
)

df_wheat.head()

df_analysis = df_wheat.merge(
    df_precip_agg,
    on=["iso3_code", "year"],
    how="inner"
)

df_analysis.head()


country_years = (
    df_analysis.groupby("iso3_code")["year"]
    .nunique()
    .reset_index(name="n_years")
)

major_countries = country_years[country_years["n_years"] >= 7]["iso3_code"]

df_major = df_analysis[df_analysis["iso3_code"].isin(major_countries)].copy()


In [87]:
def safe_corr(x, y):
    if x.isna().sum() > 2 or y.isna().sum() > 2:
        return np.nan
    if x.nunique() < 3 or y.nunique() < 3:
        return np.nan
    return x.corr(y)

impact = (
    df_major
    .groupby("iso3_code")
    .apply(lambda g: safe_corr(g["yield_kg_ha"], g["gs_precip_cv"]))
    .reset_index(name="corr_yield_vs_precip_cv")
)

impact["abs_corr"] = impact["corr_yield_vs_precip_cv"].abs()

impact_sorted = impact.sort_values("abs_corr", ascending=False)

impact_sorted.head(10)


  .apply(lambda g: safe_corr(g["yield_kg_ha"], g["gs_precip_cv"]))


Unnamed: 0,iso3_code,corr_yield_vs_precip_cv,abs_corr
16,MMR,-0.896212,0.896212
1,AUS,-0.673484,0.673484
18,PAK,-0.643195,0.643195
12,JPN,0.515184,0.515184
17,NPL,-0.442606,0.442606
13,KOR,0.295654,0.295654
2,BGD,0.295642,0.295642
6,DEU,0.290784,0.290784
9,GBR,-0.287341,0.287341
15,MEX,-0.284857,0.284857


From the table, the relationship between growing season precipitation and wheat yields from 2015 to 2023 for major wheat-producing countries. Countries were divided into Northern and Southern Hemispheres based on latitude, and different growing seasons were used for each hemisphere.

We measured precipitation variability during the growing season and compared it with wheat yields for each country.

The results show that countries such as Myanmar, Australia, and Pakistan have a strong relationship between precipitation variability and wheat yields. This means wheat production in these countries is more sensitive to changes in rainfall during the growing season.

### Q2

In [105]:
# extreme weather: top 20% of precipitation CV
thresholds = (
    df_precip_agg
    .groupby("iso3_code")["gs_precip_cv"]
    .quantile(0.8)
    .reset_index(name="extreme_threshold")
)

df_extreme = df_precip_agg.merge(
    thresholds,
    on="iso3_code",
    how="left"
)

df_extreme["extreme_year"] = (
    df_extreme["gs_precip_cv"] >= df_extreme["extreme_threshold"]
)

df_extreme.head()


Unnamed: 0,iso3_code,year,gs_precip_total,gs_precip_mean,gs_precip_sd,gs_precip_cv,extreme_threshold,extreme_year
0,ARG,2015,308.1,2.040397,5.154133,2.526044,3.471737,False
1,ARG,2016,411.4,2.706579,6.60863,2.441691,3.471737,False
2,ARG,2017,261.9,1.734437,5.388581,3.106818,3.471737,False
3,ARG,2018,189.9,1.257616,3.674442,2.921752,3.471737,False
4,ARG,2019,162.3,1.074834,4.738005,4.408126,3.471737,True


In [108]:
df_prod = pd.read_sql(
    """
    SELECT
        c.iso3_code,
        cp.year,
        cp.crop_id,
        cp.production_tonnes
    FROM crop_production cp
    JOIN countries c
      ON cp.country_id = c.country_id
    WHERE cp.crop_id IN (0, 1, 2, 3)   -- Maize, Rice, Soybeans Wheat
      AND cp.year BETWEEN 2015 AND 2023
    """,
    engine
)


df_q2 = df_prod.merge(
    df_extreme[["iso3_code", "year", "extreme_year"]],
    on=["iso3_code", "year"],
    how="inner"
)

crop_map = {
    0: "Maize",
    1: "Rice",
    2:"Soybeans",
    3: "Wheat"
}

df_q2["crop"] = df_q2["crop_id"].map(crop_map)



In [109]:
summary = (
    df_q2
    .groupby(["crop", "extreme_year"])["production_tonnes"]
    .mean()
    .reset_index()
)

normal = summary[summary["extreme_year"] == False] \
    .rename(columns={"production_tonnes": "avg_normal"})

extreme = summary[summary["extreme_year"] == True] \
    .rename(columns={"production_tonnes": "avg_extreme"})

out = normal.merge(extreme, on="crop", how="inner")

out["avg_production_loss"] = out["avg_normal"] - out["avg_extreme"]

out


Unnamed: 0,crop,extreme_year_x,avg_normal,extreme_year_y,avg_extreme,avg_production_loss
0,Rice,False,20539690.0,True,22083540.0,-1543854.0
1,Soybeans,False,15437330.0,True,14490660.0,946670.8
2,Wheat,False,10589770.0,True,9159287.0,1430482.0


An extreme weather year is defined as a year in which a country’s growing-season precipitation variability (CV) is in the top 20% of its values between 2015 and 2023. Growing seasons are defined as April–August for Northern Hemisphere countries and October–February for Southern Hemisphere countries. This country-specific threshold is used to account for differences in baseline climate conditions.

To analyze production shocks, I merge the extreme weather indicator with country-level crop production data and compare average production in extreme years and normal years for three major crops: wheat, rice, and soybeans.

The results show that extreme weather affects crops differently. Wheat production is lower in extreme weather years, with an average production loss of about 1.43 million tonnes. Soybeans also experience a production decline of approximately 0.95 million tonnes during extreme years. In contrast, rice production is higher in extreme weather years, increasing by around 1.54 million tonnes, suggesting greater resilience to precipitation variability, possibly due to irrigation or adaptation to wetter conditions.

Maize does not appear in the final comparison because no maize production observations coincide with years classified as extreme under the chosen definition. This suggests that maize production may be less exposed to precipitation variability during the growing season in the sample period.

This analysis has several limitations. Country-level weather data may hide important regional differences within countries, and precipitation variability captures only one aspect of extreme weather. In addition, crop production is influenced by non-weather factors such as irrigation, technology, and policy, so the results should be interpreted as associations rather than causal effects.

### Q3

In [110]:
df_weather_y = pd.read_sql(
    """
    SELECT
        iso3_code,
        year,
        precipitation,
        evapotranspiration
    FROM daily_weather
    WHERE year BETWEEN 2015 AND 2023
    """,
    engine
)
# DATA ClEAN
for col in ["precipitation", "evapotranspiration"]:
    df_weather_y[col] = pd.to_numeric(df_weather_y[col], errors="coerce")

df_wb = (
    df_weather_y
    .groupby(["iso3_code", "year"], as_index=False)
    .agg(
        precip_total=("precipitation", "sum"),
        et_total=("evapotranspiration", "sum")
    )
)

# water balance：P - ET
df_wb["water_balance"] = df_wb["precip_total"] - df_wb["et_total"]

# deficit：ET - P
df_wb["water_deficit"] = df_wb["et_total"] - df_wb["precip_total"]


df_irrig = pd.read_sql(
    """
    SELECT
        c.iso3_code,
        cp.year,
        cp.crop_id,
        cp.irrigation_pct
    FROM crop_production cp
    JOIN countries c
      ON cp.country_id = c.country_id
    WHERE cp.year BETWEEN 2015 AND 2023
    """,
    engine
)

df_irrig["irrigation_pct"] = pd.to_numeric(df_irrig["irrigation_pct"], errors="coerce")

major_crop_ids = [0, 1, 2, 3]
df_irrig = df_irrig[df_irrig["crop_id"].isin(major_crop_ids)].copy()

df_irrig_y = (
    df_irrig
    .groupby(["iso3_code", "year"], as_index=False)
    .agg(irrigation_pct_mean=("irrigation_pct", "mean"))
)

# Merge
df_q3 = df_wb.merge(df_irrig_y, on=["iso3_code", "year"], how="inner")


def trend_slope(g, xcol="year", ycol="water_deficit"):
    g = g.dropna(subset=[xcol, ycol])
    if g[xcol].nunique() < 3:
        return np.nan
    x = g[xcol].values
    y = g[ycol].values
    # 线性趋势斜率
    return np.polyfit(x, y, 1)[0]

trend = (
    df_q3.groupby("iso3_code")
    .apply(lambda g: pd.Series({
        "deficit_slope": trend_slope(g, "year", "water_deficit"),
        "avg_irrigation_pct": g["irrigation_pct_mean"].mean(),
        "latest_year": g["year"].max(),
        "latest_deficit": g.loc[g["year"].idxmax(), "water_deficit"]
    }))
    .reset_index()
)

# SET irrigation threshold
LOW_IRRIG_THR = 20

risk = trend[
    (trend["deficit_slope"] > 0) &                      
    (trend["avg_irrigation_pct"] < LOW_IRRIG_THR) &     
    (trend["latest_deficit"] > 0)                       
].sort_values(["deficit_slope", "latest_deficit"], ascending=False)

risk.head(20)


  .apply(lambda g: pd.Series({


Unnamed: 0,iso3_code,deficit_slope,avg_irrigation_pct,latest_year,latest_deficit
29,UGA,7.1265,8.433333,2023.0,654.51


Uganda (UGA) has an increasing water deficit, low irrigation coverage (about 8%), and a positive deficit in the most recent year.
This suggests Uganda faces potential food security risks under changing climate conditions.

### !! Question 3 interpretation-written part is not completely done yet.