In [1]:
from sqlalchemy import create_engine, text
import pandas as pd

In [2]:
# Connect to both databases ---
staging_engine = create_engine("postgresql://postgres:Aqua.2212@localhost:5432/staging_database")
final_engine   = create_engine("postgresql://postgres:Aqua.2212@localhost:5432/final_database")

#### Migration 1: forecast_rainfall

In [3]:
# 2.1 Extract data from staging ---
with staging_engine.connect() as conn:
    df = pd.read_sql("SELECT date, region, rain_forecasted FROM agg_forecasts", conn)

print("Rows fetched from staging:", len(df))

Rows fetched from staging: 3490


In [4]:
# 2.1 Load into final_database ---
with final_engine.begin() as conn:  # begin() handles commit/rollback
    for _, row in df.iterrows():
        conn.execute(
            text("""
                INSERT INTO forecast_rainfall (date, region, rain_forecasted)
                VALUES (:date, :region, :rain_forecasted)
                ON CONFLICT (date, region) DO UPDATE
                SET region = EXCLUDED.region,
                    rain_forecasted = EXCLUDED.rain_forecasted
            """),
            {
                "date": row["date"],
                "region": row["region"],
                "rain_forecasted": row["rain_forecasted"]
            }
        )

print("Data migrated successfully!")

Data migrated successfully!


#### Migration 2: hospitals

In [6]:
query = """
SELECT DISTINCT hr.hospital_id, 
       ed.hospital_name, 
       hr.latitude AS hosp_latitude,
       hr.longtitude AS hosp_longitude,
       hr.region
FROM hospitals_region AS hr
INNER JOIN emd_data AS ed ON hr.hospital_id = ed.hospital_id;
"""

with staging_engine.connect() as conn:
    df = pd.read_sql(query, conn)

# normalize region values
df["region"] = df["region"].str.lower()

print("Rows fetched from staging:", len(df))

with final_engine.begin() as conn:
    for _, row in df.iterrows():
        conn.execute(
            text("""
                INSERT INTO hospitals (hospital_id, hospital_name, hosp_latitude, hosp_longitude, region)
                VALUES (:hospital_id, :hospital_name, :hosp_latitude, :hosp_longitude, :region)
                ON CONFLICT (hospital_id) DO UPDATE
                SET hospital_name = EXCLUDED.hospital_name,
                    hosp_latitude = EXCLUDED.hosp_latitude,
                    hosp_longitude = EXCLUDED.hosp_longitude,
                    region = EXCLUDED.region
            """),
            {
                "hospital_id": row["hospital_id"],
                "hospital_name": row["hospital_name"],
                "hosp_latitude": row["hosp_latitude"],
                "hosp_longitude": row["hosp_longitude"],
                "region": row["region"]
            }
        )

print("hospitals migrated:", len(df))

Rows fetched from staging: 8
hospitals migrated: 8


#### Migration 3: emd

In [8]:
query = """
SELECT date,
		hospital_id,
		attendance AS admissions
FROM emd_data
WHERE date BETWEEN '2023-01-01' AND '2024-12-31'
	AND hospital_id <> 'WH';
"""

with staging_engine.connect() as conn:
    df = pd.read_sql(query, conn)

print("Rows fetched from staging:", len(df))

with final_engine.begin() as conn:
    for _, row in df.iterrows():
        conn.execute(
            text("""
                INSERT INTO emd (date, hospital_id, admissions)
                VALUES (:date, :hospital_id, :admissions)
                ON CONFLICT (date, hospital_id) DO UPDATE
                SET admissions = EXCLUDED.admissions
            """),
            {
                "date": row["date"],
                "hospital_id": row["hospital_id"],
                "admissions": row["admissions"]
            }
        )

print("emd migrated:", len(df))

Rows fetched from staging: 5848
emd migrated: 5848


#### Migration 4: station_hospital

In [10]:
query = """
SELECT sh.station_id,
		sh.hospital_id,
		sr.latitude AS stn_latitude,
		sr.longitude AS stn_longitude,
		sr.region
FROM stns_hospital AS sh
INNER JOIN stns_region AS sr ON sh.station_id = sr.station_id;
"""

with staging_engine.connect() as conn:
    df = pd.read_sql(query, conn)

# normalize region values
df["region"] = df["region"].str.lower()

print("Rows fetched from staging:", len(df))

with final_engine.begin() as conn:
    for _, row in df.iterrows():
        conn.execute(
            text("""
                INSERT INTO station_hospital (station_id, hospital_id, stn_latitude, stn_longitude, region)
                VALUES (:station_id, :hospital_id, :stn_latitude, :stn_longitude, :region)
                ON CONFLICT (station_id) DO UPDATE
                SET hospital_id = EXCLUDED.hospital_id,
                    stn_latitude = EXCLUDED.stn_latitude,
                    stn_longitude = EXCLUDED.stn_longitude,
                    region = EXCLUDED.region
            """),
            {
                "station_id": row["station_id"],
                "hospital_id": row["hospital_id"],
                "stn_latitude": row["stn_latitude"],
                "stn_longitude": row["stn_longitude"],
                "region": row["region"]
            }
        )

print("station_hospital migrated:", len(df))

Rows fetched from staging: 70
station_hospital migrated: 70


#### Migration 5: rainfall 

In [17]:
# Fetch 2023 data
query_2023 = """
SELECT date,
		station_id,
		total_rainfall,
		rain_intensity AS rainfall_intensity
FROM rainfall_2023;
"""

# Fetch 2024 data
query_2024 = """
SELECT date,
       station_id,
       total_rainfall,
       rain_intensity AS rainfall_intensity
FROM rainfall_2024;
"""

with staging_engine.connect() as conn:
    df_2023 = pd.read_sql(query_2023, conn)
    df_2024 = pd.read_sql(query_2024, conn)

# Combine the two years
df_rainfall = pd.concat([df_2023, df_2024], ignore_index=True)

print("Rows fetched from staging:", len(df_rainfall))

# --- filtering here ---
# Fetch all valid station_ids from final database
with final_engine.connect() as conn:
    valid_stations_df = pd.read_sql("SELECT station_id FROM station_hospital", conn)
valid_station_set = set(valid_stations_df["station_id"])

# Keep only rows with valid station_id
df_rainfall = df_rainfall[df_rainfall["station_id"].isin(valid_station_set)]
print("Rows after filtering invalid stations:", len(df_rainfall))
# --- End filtering ---

with final_engine.begin() as conn:
    for _, row in df_rainfall.iterrows():
        conn.execute(
            text("""
                INSERT INTO rainfall (date, station_id, total_rainfall, rainfall_intensity)
                VALUES (:date, :station_id, :total_rainfall, :rainfall_intensity)
                ON CONFLICT (date, station_id) DO UPDATE
                SET total_rainfall = EXCLUDED.total_rainfall,
                    rainfall_intensity = EXCLUDED.rainfall_intensity
            """),
            {
                "date": row["date"],
                "station_id": row["station_id"],
                "total_rainfall": row["total_rainfall"],
                "rainfall_intensity": row["rainfall_intensity"]
            }
        )

print("rainfall migrated:", len(df_rainfall))

Rows fetched from staging: 46357
Rows after filtering invalid stations: 45461
rainfall migrated: 45461
