# Stock Price Data Upload to Snowflake

This notebook demonstrates an **idempotent** upload process that:

1. ✅ Reads parquet files with stock price data
2. ✅ Connects to Snowflake using environment variables
3. ✅ Creates target table if it doesn't exist (never drops!)
4. ✅ Stages data to a temporary table
5. ✅ Uses MERGE to insert new records or update existing ones
6. ✅ Handles date columns properly (DATE as date, TIMESTAMP as timestamp)
7. ✅ Can be re-run multiple times without creating duplicates

**Key Features:**
- **Idempotent**: Safe to run multiple times
- **No data loss**: Never drops or truncates tables
- **Upsert logic**: Updates existing records, inserts new ones
- **Proper date handling**: DATE columns stored as DATE type in Snowflake

**Target Table:** `SNOWFLAKE_INTELLIGENCE_HOL.DATA.STOCK_PRICE_HISTORY`


In [17]:
import os
import logging
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional, Dict, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
import yfinance as yf
import pyarrow as pa
import pyarrow.parquet as pq



In [18]:
ticker = "NVDA"
file = f"/Users/jdacosta/Library/CloudStorage/GoogleDrive-john.dacosta@snowflake.com/My Drive/_local/Downloads/_cursor_demos/Snowflake_Intelligence_HOL/jdacosta/pricehistory/data/price-history/{ticker}/price-history-{ticker}.parquet"


In [19]:
df = pd.read_parquet(file)

In [20]:
# Import Snowflake connection utilities
from snowflake_connection import SnowflakeConnectionManager
from dotenv import load_dotenv
import os

# Load environment variables
load_dotenv()


True

In [21]:
df.head(10)


Unnamed: 0,DATE,OPEN_PRICE,HIGH_PRICE,LOW_PRICE,CLOSE_PRICE,VOLUME,DIVIDENDS,STOCK_SPLITS,TICKER,DOWNLOAD_TIMESTAMP
0,1999-01-22 00:00:00-05:00,0.040114,0.04477,0.035577,0.037607,2714688000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
1,1999-01-25 00:00:00-05:00,0.040591,0.042024,0.037607,0.041547,510480000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
2,1999-01-26 00:00:00-05:00,0.042024,0.04286,0.037726,0.038323,343200000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
3,1999-01-27 00:00:00-05:00,0.038442,0.039398,0.036293,0.038204,244368000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
4,1999-01-28 00:00:00-05:00,0.038204,0.038442,0.037845,0.038084,227520000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
5,1999-01-29 00:00:00-05:00,0.038084,0.038204,0.036293,0.036293,244032000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
6,1999-02-01 00:00:00-05:00,0.036293,0.037248,0.036293,0.03701,154704000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
7,1999-02-02 00:00:00-05:00,0.036293,0.037248,0.03307,0.034145,264096000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
8,1999-02-03 00:00:00-05:00,0.033667,0.035339,0.033428,0.034861,75120000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
9,1999-02-04 00:00:00-05:00,0.035339,0.037726,0.034861,0.036771,181920000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646


In [22]:
df.tail(10)

Unnamed: 0,DATE,OPEN_PRICE,HIGH_PRICE,LOW_PRICE,CLOSE_PRICE,VOLUME,DIVIDENDS,STOCK_SPLITS,TICKER,DOWNLOAD_TIMESTAMP
6707,2025-09-22 00:00:00-04:00,175.300003,184.550003,174.710007,183.610001,269637000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
6708,2025-09-23 00:00:00-04:00,181.970001,182.419998,176.210007,178.429993,192559600,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
6709,2025-09-24 00:00:00-04:00,179.770004,179.779999,175.399994,176.970001,143564100,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
6710,2025-09-25 00:00:00-04:00,174.479996,180.259995,173.130005,177.690002,191586700,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
6711,2025-09-26 00:00:00-04:00,178.169998,179.770004,174.929993,178.190002,148573700,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
6712,2025-09-29 00:00:00-04:00,180.429993,184.0,180.320007,181.850006,193063500,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
6713,2025-09-30 00:00:00-04:00,182.080002,187.350006,181.479996,186.580002,236981000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
6714,2025-10-01 00:00:00-04:00,185.240005,188.139999,183.899994,187.240005,173844900,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
6715,2025-10-02 00:00:00-04:00,189.600006,191.050003,188.059998,188.889999,136805800,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
6716,2025-10-03 00:00:00-04:00,189.190002,190.360001,185.380005,187.619995,137340500,0.0,0.0,NVDA,2025-10-06 15:16:48.386646


In [23]:
# Check data info and date column type
print("DataFrame Info:")
print(f"Shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
print(f"\nDate column dtype: {df['DATE'].dtype}")
print(f"Date range: {df['DATE'].min()} to {df['DATE'].max()}")
print(f"Duplicates: {df.duplicated(subset=['TICKER', 'DATE']).sum()}")


DataFrame Info:
Shape: (6717, 10)
Columns: ['DATE', 'OPEN_PRICE', 'HIGH_PRICE', 'LOW_PRICE', 'CLOSE_PRICE', 'VOLUME', 'DIVIDENDS', 'STOCK_SPLITS', 'TICKER', 'DOWNLOAD_TIMESTAMP']

Date column dtype: datetime64[ns, America/New_York]
Date range: 1999-01-22 00:00:00-05:00 to 2025-10-03 00:00:00-04:00
Duplicates: 0


In [24]:
# Prepare data for Snowflake - ensure DATE is proper date type
df_upload = df.copy()

# Convert DATE column to date (remove time and timezone)
df_upload['DATE'] = pd.to_datetime(df_upload['DATE']).dt.date

# Convert DOWNLOAD_TIMESTAMP to timestamp without timezone
df_upload['DOWNLOAD_TIMESTAMP'] = pd.to_datetime(df_upload['DOWNLOAD_TIMESTAMP']).dt.tz_localize(None)

print("Data prepared for upload:")
print(f"DATE dtype: {df_upload['DATE'].dtype}")
print(f"DOWNLOAD_TIMESTAMP dtype: {df_upload['DOWNLOAD_TIMESTAMP'].dtype}")
df_upload.head(3)


Data prepared for upload:
DATE dtype: object
DOWNLOAD_TIMESTAMP dtype: datetime64[us]


Unnamed: 0,DATE,OPEN_PRICE,HIGH_PRICE,LOW_PRICE,CLOSE_PRICE,VOLUME,DIVIDENDS,STOCK_SPLITS,TICKER,DOWNLOAD_TIMESTAMP
0,1999-01-22,0.040114,0.04477,0.035577,0.037607,2714688000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
1,1999-01-25,0.040591,0.042024,0.037607,0.041547,510480000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646
2,1999-01-26,0.042024,0.04286,0.037726,0.038323,343200000,0.0,0.0,NVDA,2025-10-06 15:16:48.386646


In [25]:
# Connect to Snowflake
manager = SnowflakeConnectionManager()

try:
    session = manager.connect()
    print("✓ Connected to Snowflake")
    print(f"  Database: {session.get_current_database()}")
    print(f"  Schema: {session.get_current_schema()}")
    print(f"  Warehouse: {session.get_current_warehouse()}")
except Exception as e:
    print(f"✗ Connection failed: {e}")
    raise


✓ Connected to Snowflake
  Database: "SNOWFLAKE_INTELIGENCE_HOL"
  Schema: "DATA"
  Warehouse: "COMPUTE_WH"


In [26]:
# Create or verify main table exists (idempotent - never drops)
# Use the actual database and schema from the connection
current_database = session.get_current_database()
current_schema = session.get_current_schema()
table_name = "STOCK_PRICE_HISTORY"
target_table = f"{current_database}.{current_schema}.{table_name}"

print(f"Target table: {target_table}")

create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {target_table} (
    DATE DATE,
    OPEN_PRICE FLOAT,
    HIGH_PRICE FLOAT,
    LOW_PRICE FLOAT,
    CLOSE_PRICE FLOAT,
    VOLUME NUMBER,
    DIVIDENDS FLOAT,
    STOCK_SPLITS FLOAT,
    TICKER VARCHAR(10),
    DOWNLOAD_TIMESTAMP TIMESTAMP,
    PRIMARY KEY (TICKER, DATE)
)
"""

try:
    session.sql(create_table_sql).collect()
    print(f"✓ Table {target_table} ready")
except Exception as e:
    print(f"✗ Failed to create table: {e}")
    raise


Target table: "SNOWFLAKE_INTELIGENCE_HOL"."DATA".STOCK_PRICE_HISTORY
✓ Table "SNOWFLAKE_INTELIGENCE_HOL"."DATA".STOCK_PRICE_HISTORY ready


In [27]:
# Create temporary staging table
temp_table = f"{target_table}_TEMP_{ticker}"

# Drop temp table if it exists (from previous runs)
session.sql(f"DROP TABLE IF EXISTS {temp_table}").collect()
print(f"✓ Cleaned up any existing temp table")

# Upload data to temporary table
try:
    # Convert pandas to Snowpark DataFrame
    snowpark_df = session.create_dataframe(df_upload)
    
    # Save to temp table
    snowpark_df.write.mode("overwrite").save_as_table(temp_table)
    
    # Get count
    temp_count = session.sql(f"SELECT COUNT(*) as cnt FROM {temp_table}").collect()[0]['CNT']
    print(f"✓ Staged {temp_count:,} records to {temp_table}")
    
except Exception as e:
    print(f"✗ Failed to stage data: {e}")
    raise


✓ Cleaned up any existing temp table


KeyboardInterrupt: 

In [None]:
# Check current state of target table
try:
    existing_count_result = session.sql(f"""
        SELECT COUNT(*) as cnt 
        FROM {target_table}
        WHERE TICKER = '{ticker}'
    """).collect()
    
    existing_count = existing_count_result[0]['CNT'] if existing_count_result else 0
    print(f"Existing records for {ticker}: {existing_count:,}")
except Exception as e:
    # Table might be empty or not exist yet
    existing_count = 0
    print(f"No existing records for {ticker}")


Existing records for NVDA: 0


In [None]:
# Perform idempotent MERGE (upsert)
# This will INSERT new records and UPDATE existing ones based on (TICKER, DATE)
merge_sql = f"""
MERGE INTO {target_table} AS target
USING {temp_table} AS source
ON target.TICKER = source.TICKER 
   AND target.DATE = source.DATE
WHEN MATCHED THEN
    UPDATE SET
        target.OPEN_PRICE = source.OPEN_PRICE,
        target.HIGH_PRICE = source.HIGH_PRICE,
        target.LOW_PRICE = source.LOW_PRICE,
        target.CLOSE_PRICE = source.CLOSE_PRICE,
        target.VOLUME = source.VOLUME,
        target.DIVIDENDS = source.DIVIDENDS,
        target.STOCK_SPLITS = source.STOCK_SPLITS,
        target.DOWNLOAD_TIMESTAMP = source.DOWNLOAD_TIMESTAMP
WHEN NOT MATCHED THEN
    INSERT (
        DATE, OPEN_PRICE, HIGH_PRICE, LOW_PRICE, CLOSE_PRICE,
        VOLUME, DIVIDENDS, STOCK_SPLITS, TICKER, DOWNLOAD_TIMESTAMP
    )
    VALUES (
        source.DATE, source.OPEN_PRICE, source.HIGH_PRICE, source.LOW_PRICE, source.CLOSE_PRICE,
        source.VOLUME, source.DIVIDENDS, source.STOCK_SPLITS, source.TICKER, source.DOWNLOAD_TIMESTAMP
    )
"""

try:
    merge_result = session.sql(merge_sql).collect()
    print("✓ MERGE completed successfully")
    
    # Get updated count
    final_count_result = session.sql(f"""
        SELECT COUNT(*) as cnt 
        FROM {target_table}
        WHERE TICKER = '{ticker}'
    """).collect()
    
    final_count = final_count_result[0]['CNT']
    new_records = final_count - existing_count
    
    print(f"\nResults:")
    print(f"  Previous count: {existing_count:,}")
    print(f"  Final count: {final_count:,}")
    print(f"  New/Updated records: {new_records:,}")
    
except Exception as e:
    print(f"✗ MERGE failed: {e}")
    raise


✓ MERGE completed successfully

Results:
  Previous count: 0
  Final count: 6,717
  New/Updated records: 6,717


In [None]:
# Cleanup: Drop temporary table
try:
    session.sql(f"DROP TABLE IF EXISTS {temp_table}").collect()
    print(f"✓ Cleaned up temp table {temp_table}")
except Exception as e:
    print(f"⚠ Warning: Could not drop temp table: {e}")


✓ Cleaned up temp table "SNOWFLAKE_INTELIGENCE_HOL"."DATA".STOCK_PRICE_HISTORY_TEMP_NVDA


In [None]:
# Verify the data in Snowflake
verify_sql = f"""
SELECT 
    TICKER,
    MIN(DATE) as MIN_DATE,
    MAX(DATE) as MAX_DATE,
    COUNT(*) as TOTAL_RECORDS,
    MIN(CLOSE_PRICE) as MIN_PRICE,
    MAX(CLOSE_PRICE) as MAX_PRICE
FROM {target_table}
WHERE TICKER = '{ticker}'
GROUP BY TICKER
"""

result_df = session.sql(verify_sql).to_pandas()
print("\n" + "="*60)
print("VERIFICATION - Data in Snowflake:")
print("="*60)
print(result_df.to_string(index=False))

# Show sample of latest records
sample_sql = f"""
SELECT DATE, OPEN_PRICE, HIGH_PRICE, LOW_PRICE, CLOSE_PRICE, VOLUME, TICKER
FROM {target_table}
WHERE TICKER = '{ticker}'
ORDER BY DATE DESC
LIMIT 5
"""

sample_df = session.sql(sample_sql).to_pandas()
print("\n" + "="*60)
print("Latest 5 records in Snowflake:")
print("="*60)
print(sample_df.to_string(index=False))



VERIFICATION - Data in Snowflake:
TICKER   MIN_DATE   MAX_DATE  TOTAL_RECORDS  MIN_PRICE  MAX_PRICE
  NVDA 1999-01-22 2025-10-03           6717    0.03128 188.889999

Latest 5 records in Snowflake:
      DATE  OPEN_PRICE  HIGH_PRICE  LOW_PRICE  CLOSE_PRICE    VOLUME TICKER
2025-10-03  189.190002  190.360001 185.380005   187.619995 137340500   NVDA
2025-10-02  189.600006  191.050003 188.059998   188.889999 136805800   NVDA
2025-10-01  185.240005  188.139999 183.899994   187.240005 173844900   NVDA
2025-09-30  182.080002  187.350006 181.479996   186.580002 236981000   NVDA
2025-09-29  180.429993  184.000000 180.320007   181.850006 193063500   NVDA


In [None]:
# Close Snowflake connection
manager.close()
print("\n✓ Snowflake session closed")
print("\n" + "="*60)
print("UPLOAD COMPLETE - Process is idempotent and can be re-run safely!")
print("="*60)



✓ Snowflake session closed

UPLOAD COMPLETE - Process is idempotent and can be re-run safely!
