# SP500 Stock Demo — Notebook 01: Data Prep

- Connect to Snowflake and set context
- Subset `CORTEX_DEMO.FSI_STOCKS_INSIGHT.DAILY_STOCK_PRICE` to S&P 500 tickers
- Simulate hourly OHLCV from daily for a small demo slice
- Persist curated tables into `SP500_STOCK_DEMO.DATA`


In [None]:
# 0) Imports and session
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col, lit, to_timestamp, dateadd
from snowflake.snowpark.types import StructType, StructField, StringType, TimestampType, FloatType, IntegerType
import pandas as pd
import numpy as np

session = get_active_session()

# Print context
env = session.sql('select current_user(), current_role(), current_database(), current_schema(), current_warehouse(), current_version()').collect()[0]
print({'user': env[0], 'role': env[1], 'db': env[2], 'schema': env[3], 'wh': env[4], 'version': env[5]})

# Set target DB/Schema
session.sql("USE DATABASE SP500_STOCK_DEMO").collect()
session.sql("USE SCHEMA DATA").collect()
session.sql("USE WAREHOUSE DEMO_WH_M").collect()

# Parameter: number of years of data to include (set to 1 for fast local testing; change to 7 when ready)
YEARS_BACK = 1


In [None]:
# 1) Load S&P 500 tickers
# Option A: use a small inline list for demo
# sp500_tickers = pd.DataFrame({'TICKER': [
#     'AAPL','MSFT','AMZN','GOOGL','META','NVDA','BRK.B','JPM','JNJ','XOM'
# ]})
# sp500_df = session.create_dataframe(sp500_tickers)
# sp500_df.write.save_as_table('SP500_TICKERS', mode='overwrite')

# Option B: if you have a maintained table, reference it directly instead of writing above
sp500_df = session.table('SP_500_LIST')
sp500_df.show(5)

In [None]:
# 2) Subset daily source to S&P 500 (standardize to TICKER/DATE)
source_table = 'CORTEX_DEMO.FSI_STOCKS_INSIGHT.DAILY_STOCK_PRICE'

daily = session.table(source_table)
subset = (
    daily.join(sp500_df, on=(daily['TICKER'] == sp500_df['SYMBOL']))
         .select(
             daily['TICKER'].alias('TICKER'),
             daily['DATE'].alias('DATE'),
             daily['OPEN'],
             daily['HIGH'],
             daily['LOW'],
             daily['CLOSE'],
             daily['VOLUME']
         )
)
subset.limit(5).show()
subset.write.save_as_table('DAILY_SP500', mode='overwrite')


In [None]:
wh = str(session.get_current_warehouse()).strip('"')
print(f"Current warehouse: {wh}")
print(session.sql(f"SHOW WAREHOUSES LIKE '{wh}';").collect())

session.sql(f"alter warehouse {session.get_current_warehouse()} set WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect()

print(session.sql(f"SHOW WAREHOUSES LIKE '{wh}';").collect())

In [None]:
# 3) Simulate hourly OHLCV from daily (TICKER/DATE standard)
# Brownian-bridge-like noise; 6 hours between 10:00 and 16:00 local

from snowflake.snowpark.functions import to_timestamp_ltz, current_date, lit

# Window for testing is parameterized by YEARS_BACK (set to 1 by default)
cutoff = session.sql(f"select dateadd('year', -{YEARS_BACK}, current_date()) as D").collect()[0]['D']
limited = session.table('DAILY_SP500').filter(col('DATE') >= lit(cutoff))

pdf = limited.to_pandas()
pdf = pdf.sort_values(['TICKER','DATE'])

rng = np.random.default_rng(42)

rows = []
for sym, grp in pdf.groupby('TICKER'):
    for _, r in grp.iterrows():
        o, h, l, c = float(r.OPEN), float(r.HIGH), float(r.LOW), float(r.CLOSE)
        v = float(r.VOLUME)
        date = pd.to_datetime(r.DATE)
        hours = pd.date_range(date + pd.Timedelta(hours=10), date + pd.Timedelta(hours=16), freq='1h', inclusive='left')
        # Distribute total volume
        vol_alloc = rng.multinomial(int(v) if v>0 else 0, np.ones(len(hours))/len(hours)) if v>0 else np.zeros(len(hours), dtype=int)
        # Brownian bridge around open->close bounded by daily high/low
        steps = len(hours)
        noise = rng.normal(0, 1, steps)
        noise = (noise - noise.mean()) / (noise.std() + 1e-6)
        path = o + (c - o) * (np.arange(steps)/(steps-1 if steps>1 else 1)) + noise * max((h-l)/6, 1e-6)
        path = np.clip(path, l, h)
        # Create synthetic OHLC intraday using small windows around path
        for i, ts in enumerate(hours):
            base = float(path[i])
            hi = float(min(h, base + abs(base)*0.002))
            lo = float(max(l, base - abs(base)*0.002))
            op = float(base)
            cl = float(base + rng.normal(0, abs(base)*0.0008))
            rows.append([sym, ts, op, hi, lo, cl, int(vol_alloc[i])])

hourly_pdf = pd.DataFrame(rows, columns=['TICKER','TS','OPEN','HIGH','LOW','CLOSE','VOLUME'])

hourly_df = session.create_dataframe(hourly_pdf)
hourly_df = hourly_df.with_column('TS', to_timestamp(col('TS')))

hourly_df.write.save_as_table('HOURLY_SP500_SIM', mode='overwrite')

hourly_df.limit(5).show()


In [None]:
# 3b) Optional: Batched hourly generation (memory-safe) into a separate table
# - Keeps the original full-table flow intact
# - Generates intraday hours for small ticker chunks and writes to HOURLY_SP500_SIM_BATCH
# - Set TEST_FIRST_BATCH=True to only process the first chunk for quick validation

from snowflake.snowpark.functions import col as sp_col, lit as sp_lit

BATCH_TABLE = 'HOURLY_SP500_SIM_BATCH'
BATCH_SIZE = 10           # number of tickers per chunk
TEST_FIRST_BATCH = True   # set False to process all chunks

# Build list of tickers from SP_500_LIST (SYMBOL column)
all_symbols = [r['SYMBOL'] for r in sp500_df.select('SYMBOL').collect()]

# Helper to chunk list
def chunk_list(items, size):
    for i in range(0, len(items), size):
        yield items[i:i+size]

# Ensure cutoff exists from previous cell; if not, compute here as 1-year default
try:
    _ = cutoff
except NameError:
    cutoff = session.sql("select dateadd('year', -1, current_date()) as D").collect()[0]['D']

mode = 'overwrite'
chunks = list(chunk_list(all_symbols, BATCH_SIZE))
print({'chunks': len(chunks), 'batch_size': BATCH_SIZE, 'test_first_batch': TEST_FIRST_BATCH})

for idx, symbols in enumerate(chunks):
    # Filter daily subset to batch tickers and cutoff window
    limited_chunk = (
        session.table('DAILY_SP500')
               .filter((sp_col('DATE') >= sp_lit(cutoff)) & (sp_col('TICKER').isin(symbols)))
    )
    pdf = limited_chunk.to_pandas().sort_values(['TICKER','DATE'])

    # Brownian-bridge-like intraday synthesis
    rng = np.random.default_rng(42 + idx)
    rows = []
    for sym, grp in pdf.groupby('TICKER'):
        for _, r in grp.iterrows():
            o, h, l, c = float(r.OPEN), float(r.HIGH), float(r.LOW), float(r.CLOSE)
            v = float(r.VOLUME)
            date = pd.to_datetime(r.DATE)
            hours = pd.date_range(date + pd.Timedelta(hours=10), date + pd.Timedelta(hours=16), freq='1h', inclusive='left')
            vol_alloc = rng.multinomial(int(v) if v>0 else 0, np.ones(len(hours))/len(hours)) if v>0 else np.zeros(len(hours), dtype=int)
            steps = len(hours)
            noise = rng.normal(0, 1, steps)
            noise = (noise - noise.mean()) / (noise.std() + 1e-6)
            path = o + (c - o) * (np.arange(steps)/(steps-1 if steps>1 else 1)) + noise * max((h-l)/6, 1e-6)
            path = np.clip(path, l, h)
            for i, ts in enumerate(hours):
                base = float(path[i])
                hi = float(min(h, base + abs(base)*0.002))
                lo = float(max(l, base - abs(base)*0.002))
                op = float(base)
                cl = float(base + rng.normal(0, abs(base)*0.0008))
                rows.append([sym, ts, op, hi, lo, cl, int(vol_alloc[i])])

    if not rows:
        print(f'Chunk {idx+1}/{len(chunks)}: no rows (skipped)')
    else:
        hourly_pdf = pd.DataFrame(rows, columns=['TICKER','TS','OPEN','HIGH','LOW','CLOSE','VOLUME'])
        hourly_df = session.create_dataframe(hourly_pdf).with_column('TS', to_timestamp(col('TS')))
        hourly_df.write.save_as_table(BATCH_TABLE, mode=mode)
        print(f'Chunk {idx+1}/{len(chunks)}: wrote {len(rows)} rows in mode={mode}')
        mode = 'append'  # subsequent chunks append

    if TEST_FIRST_BATCH:
        break

# Preview and counts for the batch table
try:
    session.table(BATCH_TABLE).limit(5).show()
    print({'batch_rows': session.table(BATCH_TABLE).count()})
except Exception as e:
    print('Batch table preview unavailable:', e)



In [None]:
# 4) Indexes for downstream feature generation
session.sql("CREATE OR REPLACE VIEW HOURLY_SP500_SIM_VIEW AS \
    SELECT TICKER, TS, OPEN, HIGH, LOW, CLOSE, VOLUME FROM HOURLY_SP500_SIM").collect()

print('Created tables: DAILY_SP500, HOURLY_SP500_SIM and view HOURLY_SP500_SIM_VIEW')


In [None]:
wh = str(session.get_current_warehouse()).strip('"')
print(f"Current warehouse: {wh}")
print(session.sql(f"SHOW WAREHOUSES LIKE '{wh}';").collect())

session.sql(f"alter warehouse {session.get_current_warehouse()} set WAREHOUSE_SIZE = SMALL WAIT_FOR_COMPLETION = TRUE").collect()

print(session.sql(f"SHOW WAREHOUSES LIKE '{wh}';").collect())