In [1]:
from sqlalchemy import create_engine, text
import pandas as pd, sqlalchemy
from datetime import datetime
import io

In [2]:
engine = create_engine(
    "postgresql+psycopg2://appuser:$Q59s78u@pg-postgresql.jrycastillo.svc:5432/appdb"
)
print(engine)  # should show postgresql+psycopg2, NOT sqlite

Engine(postgresql+psycopg2://appuser:***@pg-postgresql.jrycastillo.svc:5432/appdb)


In [4]:
print(engine.url)

postgresql+psycopg2://appuser:***@pg-postgresql.jrycastillo.svc:5432/appdb


In [5]:
print("pandas:", pd.__version__)
print("sqlalchemy:", sqlalchemy.__version__)

pandas: 2.1.4
sqlalchemy: 2.0.15


In [None]:
for year in range(2019, 2026):
    table_name = f"ais_mtable_y{year}"
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        created_at TIMESTAMP NOT NULL,
        ts TIMESTAMP,
        static_timestamp TEXT,
        prev_mmsi TEXT,
        latitude DOUBLE PRECISION,
        longitude DOUBLE PRECISION,
        speed DOUBLE PRECISION,
        course DOUBLE PRECISION,
        heading DOUBLE PRECISION,
        imo TEXT,
        name TEXT,
        call_sign TEXT,
        flag TEXT,
        draught TEXT,
        ship_and_cargo_type TEXT,
        ship_type TEXT,
        length TEXT,
        width TEXT,
        eta TEXT,
        destination TEXT,
        status TEXT,
        maneuver TEXT,
        accuracy BIGINT,
        rot TEXT,
        collection_type TEXT,
        mmsi TEXT
    ) PARTITION BY RANGE (created_at);
    """
    try:
        with engine.begin() as conn: 
            conn.execute(text(create_table_sql))
            print(f"Parent table '{table_name}' created (or already exists).")
    except Exception as e:
        print(f"Failed to create table '{table_name}':", e)

In [None]:
for year in range(2019, 2026):
    parent_table = f"ais_mtable_y{year}"
    
    for month in range(1, 13):
        start_date = datetime(year, month, 1)
        if month == 12:
            end_date = datetime(year + 1, 1, 1)
        else:
            end_date = datetime(year, month + 1, 1)

        partition_name = f"{parent_table}_p{month:02d}"
        create_partition_sql = f"""
        CREATE TABLE IF NOT EXISTS {partition_name} PARTITION OF {parent_table}
        FOR VALUES FROM ('{start_date.strftime('%Y-%m-%d')}') TO ('{end_date.strftime('%Y-%m-%d')}');
        """

        try:
            with engine.begin() as conn: 
                conn.execute(text(create_partition_sql))
                print(f"✅ Created partition: {partition_name}")
        except Exception as e:
            print(f"Failed to create partition '{partition_name}': {e}")

In [None]:
import os
import pandas as pd
import dask.dataframe as dd
from sqlalchemy import create_engine
from datetime import datetime
import time

def format_time(seconds):
    seconds = int(seconds)
    hrs, rem = divmod(seconds, 3600)
    mins, secs = divmod(rem, 60)
    time_str = ""
    if hrs > 0:
        time_str += f"{hrs} hr "
    if mins > 0 or hrs > 0:
        time_str += f"{mins} min "
    time_str += f"{secs} sec"
    return time_str

In [None]:
upload_log_file = "upload_log.csv"

if not os.path.exists(upload_log_file):
    pd.DataFrame(columns=[
        "filename",      
        "unique_years",    
        "time",           
        "rows",          
        "uploaded_datetime"
    ]).to_csv(upload_log_file, index=False)
    print(f"🆕 Created log file: {upload_log_file}")

upload_log = pd.read_csv(upload_log_file, parse_dates=["uploaded_datetime"])
print(f"📖 Loaded log file: {upload_log_file}")

In [None]:
parquet_folder = "/home/jovyan/shared/val/task/250728-LM-AIS_2022-2024_cleaned"
parquet_files = [f for f in os.listdir(parquet_folder) if f.endswith(".parquet")]

In [None]:
def df_to_copybuf(df: pd.DataFrame) -> io.StringIO:
    buf = io.StringIO()
    df.to_csv(buf, index=False, header=False) 
    buf.seek(0)
    return buf


BATCH_ROWS = 200_000

for file in parquet_files:
    if file in upload_log['filename'].values:
        print(f"Skipping {file}, already uploaded.")
        continue

    parquet_path = os.path.join(parquet_folder, file)
    print(f"Processing {file}...")
    start_time = time.time()

    df = pd.read_parquet(parquet_path, engine="pyarrow")
    df["created_at"] = pd.to_datetime(df["created_at"])
    unique_years_list = sorted(df["created_at"].dt.year.unique().tolist())

    total_rows = 0
    print("UNIQUE YEARS:", unique_years_list)
    for year in unique_years_list:
        table_name = f"ais_mtable_y{year}"
        print(f"Uploading to table: {table_name}")
        df_year = df[df["created_at"].dt.year == year]

        rows_count = len(df_year)
        if rows_count == 0:
            continue

        cols = ",".join([f'"{c}"' for c in df_year.columns])
        copy_sql = f'COPY {table_name} ({cols}) FROM STDIN WITH CSV'

        conn = engine.raw_connection()
        cur = conn.cursor()

        steps = (rows_count // BATCH_ROWS) + 1
        for i in range(steps):
            chunk = df_year.iloc[i*BATCH_ROWS : (i+1)*BATCH_ROWS]
            if len(chunk) == 0:
                continue
            buf = df_to_copybuf(chunk)
            cur.copy_expert(copy_sql, buf)
            print(f"   -> batch {i+1}/{steps}, rows={len(chunk)}")

        conn.commit()
        cur.close()
        conn.close()

        total_rows += rows_count
        print(f"Inserted {rows_count} rows for year {year}")

    elapsed_time_seconds = time.time() - start_time
    elapsed_time = format_time(elapsed_time_seconds)
    uploaded_datetime = datetime.now()

    upload_log = pd.concat([upload_log, pd.DataFrame({
        "filename": [file],
        "unique_years": [unique_years_list],
        "time": [elapsed_time],
        "rows": [total_rows],
        "uploaded_datetime": [uploaded_datetime]
    })], ignore_index=True)
    upload_log.to_csv(upload_log_file, index=False)

    print(f"Finished uploading {file} in {elapsed_time}, total rows={total_rows}.\n")
    break


In [None]:
import os
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta

TABLE = "public.ais_mtable_y2024_p02" 
DATE_COLUMN = "created_at"            
CHUNKSIZE = 2_000_000                   
BASE_DIR = "output_dl_postgres"         
COMPRESSION = "snappy"                

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = os.path.join(BASE_DIR, f"{TABLE.replace('.', '_')}_{timestamp}")
os.makedirs(output_dir, exist_ok=True)
print(f"Saving Parquet files to: {output_dir}")

with engine.connect() as conn:
    result = conn.execute(text(f"SELECT MIN({DATE_COLUMN}), MAX({DATE_COLUMN}) FROM {TABLE}"))
    min_date, max_date = result.fetchone()

min_date = pd.Timestamp(min_date).normalize()
max_date = pd.Timestamp(max_date).normalize()
print(f"Date range: {min_date} → {max_date}")

current = min_date
day_idx = 0

while current <= max_date:
    next_day = current + timedelta(days=1)
    day_idx += 1
    print(f"\n📅 Processing day {day_idx}: {current.date()} → {next_day.date()}")

    raw_conn = engine.raw_connection()
    cur = raw_conn.cursor(name='stream_cursor')

    cur.execute(f"""
        SELECT * FROM {TABLE}
        WHERE {DATE_COLUMN} >= %s AND {DATE_COLUMN} < %s
    """, (current, next_day))

    chunk_count = 0
    total_rows_day = 0

    while True:
        rows = cur.fetchmany(CHUNKSIZE)
        if not rows:
            break

        df_chunk = pd.DataFrame(rows, columns=[desc[0] for desc in cur.description])

        file_name = f"{TABLE.replace('.', '_')}_{current.date()}_{chunk_count}.parquet"
        file_path = os.path.join(output_dir, file_name)

        df_chunk.to_parquet(file_path, index=False, engine="pyarrow", compression=COMPRESSION)
        chunk_count += 1
        total_rows_day += len(df_chunk)
        print(f"   ✅ Chunk {chunk_count} saved → {file_name} ({len(df_chunk)} rows)")

    if chunk_count == 0:
        print(f"   ⚠️ No rows found for {current.date()}")
    else:
        print(f"   📊 Finished {current.date()} → {chunk_count} chunks, {total_rows_day} rows")

    cur.close()
    raw_conn.close()
    current = next_day

print("\n🎉 Export complete!")