In [0]:
%pip install --quiet "snowflake-snowpark-python[pandas]"
dbutils.library.restartPython()


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from pyspark.sql.functions import col
import pandas as pd

# Read Spark tables
silver_pdf = spark.table("workspace.default.stocks_silver").toPandas()
forecast_pdf = spark.table("workspace.default.stocks_forecast").toPandas()

# Rename columns to match Snowflake schema
silver_pdf2 = silver_pdf.rename(columns={
    "symbol":"SYMBOL","date":"DT","close":"CLOSE","volume":"VOLUME","return":"RET_PCT"
})
forecast_pdf2 = forecast_pdf.rename(columns={
    "symbol":"SYMBOL","ds":"DS","yhat":"YHAT","yhat_lower":"YHAT_LOWER","yhat_upper":"YHAT_UPPER"
})

# Ensure date columns are proper dates
silver_pdf2["DT"] = pd.to_datetime(silver_pdf2["DT"]).dt.date
forecast_pdf2["DS"] = pd.to_datetime(forecast_pdf2["DS"]).dt.date

print("✅ Data ready:", silver_pdf2.shape, forecast_pdf2.shape)

✅ Data ready: (497472, 8) (1014, 5)


In [0]:

import os, math, tempfile, shutil
import snowflake.connector
import pandas as pd

def put_and_copy(df: pd.DataFrame, table: str, cols: list[str], chunk_rows=100_000):
    """
    Writes df to CSV chunks (columns forced to `cols` order),
    PUTs to @%table, then COPY INTO table.
    """
    # --- Force column order up-front & normalize dates if present ---
    df = df.loc[:, cols].copy()
    if "DT" in df.columns:
        df["DT"] = pd.to_datetime(df["DT"]).dt.strftime("%Y-%m-%d")
    if "DS" in df.columns:
        df["DS"] = pd.to_datetime(df["DS"]).dt.strftime("%Y-%m-%d")

    tmpdir = tempfile.mkdtemp(prefix=f"sf_load_{table}_")
    try:
        total = len(df)
        nchunks = max(1, math.ceil(total / chunk_rows))
        print(f"📦 {table}: {total} rows → {nchunks} chunk(s)")

        # Write CSV chunks with header
        filepaths = []
        for i in range(nchunks):
            start = i * chunk_rows
            end   = min(start + chunk_rows, total)
            chunk = df.iloc[start:end].copy()
            # ⬇️ Force column order again on each chunk
            chunk = chunk.loc[:, cols]
            path  = os.path.join(tmpdir, f"{table.lower()}_{i:04d}.csv")
            chunk.to_csv(path, index=False)
            filepaths.append(path)
        print(f"🗂️  {table}: wrote {len(filepaths)} CSV file(s). First file: {os.path.basename(filepaths[0])}")

        # Connect to Snowflake
        conn = snowflake.connector.connect(
            user=SF_USER, password=SF_PASSWORD, account=SF_ACCOUNT,
            warehouse=SF_WAREHOUSE, database=SF_DATABASE, schema=SF_SCHEMA, role=SF_ROLE,
            login_timeout=60, client_session_keep_alive=True
        )
        cur = conn.cursor()
        cur.execute(f"USE WAREHOUSE {SF_WAREHOUSE}")
        cur.execute(f"USE DATABASE {SF_DATABASE}")
        cur.execute(f"USE SCHEMA {SF_SCHEMA}")

        # Clean old staged files for this table (optional safety)
        cur.execute(f"REMOVE @%{table} pattern='.*.csv.gz'")

        # PUT local CSVs → table stage (auto-compress to .gz)
        for path in filepaths:
            cur.execute(f"PUT file://{path} @%{table} AUTO_COMPRESS=TRUE OVERWRITE=TRUE")

        # COPY INTO table: now positions match exactly, thanks to enforced order
        cols_sql = ",".join(cols)
        copy_sql = f"""
            COPY INTO {table} ({cols_sql})
            FROM @%{table}
            FILE_FORMAT=(TYPE=CSV FIELD_OPTIONALLY_ENCLOSED_BY='\"' SKIP_HEADER=1 COMPRESSION=GZIP)
            ON_ERROR='ABORT_STATEMENT'
        """
        cur.execute(copy_sql)

        # Verify count
        cur.execute(f"SELECT COUNT(*) FROM {table}")
        cnt = cur.fetchone()[0]
        print(f"✅ Loaded {table}: total rows = {cnt}")

        # Clean stage files (optional)
        cur.execute(f"REMOVE @%{table} pattern='.*.csv.gz'")
        cur.close(); conn.close()
    finally:
        shutil.rmtree(tmpdir, ignore_errors=True)


In [0]:
# Forecast load first (1 chunk in your case)
put_and_copy(
    df   = forecast_pdf2,
    table= "STOCK_FORECAST",
    cols = ["SYMBOL","DS","YHAT","YHAT_LOWER","YHAT_UPPER"],
    chunk_rows = 50_000
)


📦 STOCK_FORECAST: 1014 rows → 1 chunk(s)
🗂️  STOCK_FORECAST: wrote 1 CSV file(s). First file: stock_forecast_0000.csv
✅ Loaded STOCK_FORECAST: total rows = 1014


In [0]:

put_and_copy(
    df   = silver_pdf2,
    table= "STOCK_SILVER",
    cols = ["SYMBOL","DT","CLOSE","VOLUME","RET_PCT"],
    chunk_rows = 100_000
)


📦 STOCK_SILVER: 497472 rows → 5 chunk(s)
🗂️  STOCK_SILVER: wrote 5 CSV file(s). First file: stock_silver_0000.csv
✅ Loaded STOCK_SILVER: total rows = 994944
