In [None]:

from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col, hash
from snowflake.snowpark.types import DateType, DecimalType, IntegerType, StringType

session = get_active_session()

# Read stage files
df = session.read.json('@"SNOWFLAKE_CONNECTED_GCS_US"."PUBSUB_EVENTS"."MY_GCS_STAGE"/')
df = df.select(
    col('$1')['Meta Data'].alias('METADATA'),
    col('$1')['Time Series (Daily)'].alias('TIME_SERIES_DAILY'),
    
)

# Extract main keys from JSON: 
df = df.select(
    col('METADATA')['2. Symbol'].alias('COMPANY'),
    col('METADATA')['3. Last Refreshed'].alias('LAST_REFRESHED'),
    col('TIME_SERIES_DAILY')
)

# Flatten DF with daily stocks price data and cast the types accordingly
df = df.flatten(col('TIME_SERIES_DAILY')).select(
    col('COMPANY').cast(StringType()).alias('COMPANY'),
    col('LAST_REFRESHED').cast(DateType()).alias('LAST_REFRESHED'),
    col('KEY').alias('DATE'),
    col('VALUE')['4. close'].cast(DecimalType(19, 4)).alias('day_closed_value'),
    col('VALUE')['5. volume'].cast(DecimalType(38, 0)).alias('volume')
)

df = df.with_column("UNIQUE_ID",
                    hash(col("COMPANY"), col("DATE")))

# 9 files with 100 rows each, so it gives us 900 rows
df.count()

df = df.drop_duplicates('UNIQUE_ID')

# API gives rolling last 100 days data, so we should have (number of company * 100) + number of files - 1
df.count()

df.show()

df.write.mode("overwrite").option("single", True).parquet("@PARQUET_FILES_DB.PUBLIC.ALPHA_VANTAGE_MARTS/2025_12_27/stocks.parquet")