In [5]:
from pathlib import Path
import psycopg2
import shutil
import time

# -----------------------------
# Timer start
# -----------------------------
start_time = time.perf_counter()

WEATHER_LANDING_DIR = Path("/app/data/landing/weather")
WEATHER_ARCHIVE_DIR = Path("/app/data/archive/weather")

conn = psycopg2.connect(
    dbname="etl_db",
    user="postgres",
    password="postgres",
    host="postgres",
    port=5432,
)
cur = conn.cursor()

# Create TEMP table once per session
cur.execute("""
    CREATE TEMP TABLE weather_daily_raw (
        station_id TEXT,
        obs_date   DATE,
        element    TEXT,
        value      NUMERIC,
        m_flag     TEXT,
        q_flag     TEXT,
        s_flag     TEXT
    ) ON COMMIT PRESERVE ROWS;
""")
conn.commit()

for csv_path in WEATHER_LANDING_DIR.glob("*.csv"):
    print(f"Ingesting {csv_path.name}")

    try:
        # 1️⃣ COPY CSV → temp table
        with open(csv_path, "r") as f:
            cur.copy_expert(
                """
                COPY weather_daily_raw
                FROM STDIN
                WITH CSV HEADER
                """,
                f
            )

        # 2️⃣ Insert into final table with constant source_file
        cur.execute(
            """
            INSERT INTO bronze.weather_daily (
                station_id,
                obs_date,
                element,
                value,
                m_flag,
                q_flag,
                s_flag,
                source_file
            )
            SELECT
                station_id,
                obs_date,
                element,
                value,
                m_flag,
                q_flag,
                s_flag,
                %s
            FROM weather_daily_raw
            """,
            (csv_path.name,)
        )

        # 3️⃣ Clear temp table
        cur.execute("TRUNCATE weather_daily_raw")

        conn.commit()

        shutil.move(csv_path, WEATHER_ARCHIVE_DIR / csv_path.name)
        print(f"Archived → {csv_path.name}")

    except Exception as e:
        conn.rollback()
        raise

cur.close()
conn.close()

# -----------------------------
# Timer end
# -----------------------------
end_time = time.perf_counter()
elapsed = end_time - start_time

print("✅ Ultra-fast weather ingest complete")
print(f"⏱️ Total runtime: {elapsed:.2f} seconds")

✅ Ultra-fast weather ingest complete
⏱️ Total runtime: 0.03 seconds


In [6]:
from sqlalchemy import create_engine
import pandas as pd

engine = create_engine(
    "postgresql+psycopg2://postgres:postgres@postgres:5432/etl_db"
)

df = pd.read_sql(
    "SELECT count(*) FROM bronze.weather_daily",
    engine
)

df


Unnamed: 0,count
0,4695629


In [7]:
# from sqlalchemy import text

# with engine.begin() as conn:
#     conn.execute(text("TRUNCATE TABLE bronze.weather_daily"))
