---
## Working v2 Code Cells

In [1]:
# 0) Constants and engine (v2 layout)
import pandas as pd, numpy as np, s3fs, re
from sqlalchemy.engine import create_engine

FS_OPTS = {
    "key": "minio-root-user",
    "secret": "minio-root-password",
    "client_kwargs": {"endpoint_url": "http://minio:9000"},
}
SRC_GLOB = "demo-bucket/warehouse/finance/yahoo/curated_price/date=*/part-*.parquet"
ENGINE = create_engine("trino://user@trino:8080/iceberg/curated")
print("Constants set. SRC_GLOB =", SRC_GLOB)

Constants set. SRC_GLOB = demo-bucket/warehouse/finance/yahoo/curated_price/date=*/part-*.parquet


In [2]:
# 1) Load curated Parquet (v2 path/partition), Pandas only
fs = s3fs.S3FileSystem(**FS_OPTS)
paths = fs.glob(SRC_GLOB)
if not paths:
    raise RuntimeError(f"No files found at s3://{SRC_GLOB}")
df = pd.concat([pd.read_parquet(f"s3://{p}", storage_options=FS_OPTS) for p in paths], ignore_index=True)
df.columns = [c.strip() for c in df.columns]
print("Rows loaded:", len(df), " Columns:", list(df.columns))

Rows loaded: 152  Columns: ['ts', 'adj close', 'close', 'high', 'low', 'open', 'volume', 'ticker', 'ingest_date', 'date']


In [3]:
# 2) Pandas mean(close) by ticker
means_py = (
    df.assign(close=pd.to_numeric(df["close"], errors="coerce"))
      .dropna(subset=["close"])
      .groupby("ticker", as_index=False)["close"]
      .mean()
      .rename(columns={"close":"avg_close"})
      .sort_values("ticker")
      .reset_index(drop=True)
)
means_py.head(20)

Unnamed: 0,ticker,avg_close
0,AAPL,231.261577
1,AMZN,227.85
2,GOOGL,209.49158
3,MSFT,510.584736


In [4]:
# 3) Ensure schema & table (Iceberg via Trino)
with ENGINE.begin() as conn:
    conn.exec_driver_sql("CREATE SCHEMA IF NOT EXISTS iceberg.curated")
    conn.exec_driver_sql("""CREATE TABLE IF NOT EXISTS fact_price (
  ticker VARCHAR,
  ts TIMESTAMP(3) WITH TIME ZONE,
  open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE,
  volume BIGINT,
  ingest_date VARCHAR
) WITH (partitioning = ARRAY['day(ts)'])
""")
print("Schema/table ensured.")

Schema/table ensured.


In [5]:
# 4) Insert into Iceberg in chunks (from v2 curated Parquet)
def sql_literal(s: str) -> str:
    return str(s).replace("'", "''")

use = df[["ticker","ts","open","high","low","close","volume","ingest_date"]].copy()
use["ts"] = pd.to_datetime(use["ts"], utc=True)
for c in ["open","high","low","close"]:
    use[c] = pd.to_numeric(use[c], errors="coerce")
use["volume"] = pd.to_numeric(use["volume"], errors="coerce").astype("Int64")
use["ingest_date"] = use["ingest_date"].astype(str)

rows, chunk = 0, 400
for part in np.array_split(use, max(1, int(np.ceil(len(use)/chunk)))):
    if part.empty:
        continue
    values = []
    for _, r in part.iterrows():
        ts_iso = pd.to_datetime(r["ts"], utc=True).isoformat()
        values.append("("
          f"'{sql_literal(r['ticker'])}', "
          f"from_iso8601_timestamp('{sql_literal(ts_iso)}'), "
          f"{'NULL' if pd.isna(r['open']) else r['open']}, "
          f"{'NULL' if pd.isna(r['high']) else r['high']}, "
          f"{'NULL' if pd.isna(r['low']) else r['low']}, "
          f"{'NULL' if pd.isna(r['close']) else r['close']}, "
          f"{'NULL' if pd.isna(r['volume']) else int(r['volume'])}, "
          f"'{sql_literal(r['ingest_date'])}'"
        ")")
    sql = "INSERT INTO fact_price (ticker, ts, open, high, low, close, volume, ingest_date) VALUES " + ",".join(values)
    with ENGINE.begin() as conn:
        conn.exec_driver_sql(sql)
    rows += len(values)
print("Inserted rows:", rows)

Inserted rows: 152


In [6]:
# 5) SQL verification and compare with Pandas
with ENGINE.connect() as conn:
    res = conn.exec_driver_sql("""        SELECT ticker, AVG(close) AS avg_close
        FROM fact_price
        GROUP BY ticker
        ORDER BY ticker
    """).fetchall()
means_sql = pd.DataFrame(res, columns=["ticker","avg_close"]).sort_values("ticker").reset_index(drop=True)

cmp = means_py.merge(means_sql, on="ticker", how="outer", suffixes=("_py","_sql"))
cmp["diff"] = (cmp["avg_close_py"] - cmp["avg_close_sql"]).abs()
cmp.head(20)

Unnamed: 0,ticker,avg_close_py,avg_close_sql,diff
0,AAPL,231.261577,231.261577,0.0
1,AMZN,227.85,227.85,0.0
2,GOOGL,209.49158,209.49158,0.0
3,MSFT,510.584736,510.584736,0.0
