# Market Data API Ingestion (Dynamic)

### Pre Run Activity
- Make sure to tap the 3 dot icon on top left and enable the necessary 'External Access' integrations for the notebook.
- Update the 'market_config.json' with the requirement before starting the run.

## Runtime Config Set-Up

In [None]:
GLOBAL_CONFIG_PATH = "market_config.json"

### Importing Libraries

In [None]:
import requests
import json
import pandas as pd
from datetime import date, timedelta, datetime
import math
import warnings
from bs4 import BeautifulSoup
import time
import random
import snowflake.connector
from urllib.parse import urlparse
import hashlib
from snowflake.connector.pandas_tools import write_pandas

### Auto Run-Name Initiationa Code

In [None]:
def get_config(CONFIG_PATH="market_config.json"):
    with open(CONFIG_PATH) as f:
        config = json.load(f)
    return config
config = get_config(GLOBAL_CONFIG_PATH) #DEBUG

GLOBAL_RUN_NAME = config['run_auto_pickup']
print(f'GLOBAL_RUN_NAME: {GLOBAL_RUN_NAME}')

### Testing Endpoint

In [None]:
resp = requests.get('https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&outputsize=full&symbol=ORCL&apikey=ESDNSG7Y1KQ6CZ9K')
len(resp.json())
# print(resp.json())
resp.json().keys()

## Main Code

In [None]:
def get_config_run(run_name):
    config = get_config()
    config_run = next((r for r in config['runs'] if r['run_name'] == run_name), None)
    return config_run

def get_config_snowflake():
    config = get_config()
    config_snowflake = config["snowflake"]
    return config_snowflake

def get_snowlfake_conn(schema_name):
    config_snowflake = get_config_snowflake()
    conn = snowflake.connector.connect(
        user=config_snowflake["user"],
        password=config_snowflake["password"],
        account=config_snowflake["account"],
        warehouse=config_snowflake["warehouse"],
        database=config_snowflake["database"],
        schema=schema_name # SQL Specific
    )
    return conn
    
GLOBAL_CONFIG = get_config() #DEBUG
GLOBAL_CONFIG_RUN = get_config_run(GLOBAL_RUN_NAME)
config_snowflake = get_config_snowflake()
print(GLOBAL_CONFIG, GLOBAL_CONFIG_RUN, config_snowflake)

In [None]:
def create_url_string(config_run, config):
    
    url_string = f"""
        {config_run["endpoint"]}?
        function={config_run["function"]}&
        symbol={config_run["symbol"]}&
        datatype={config_run["datatype"]}&
        outputsize={config_run["outputsize"]}&
        apikey={config["market_api_key_list"][config["market_api_key_auto_pickup"]]}
    """
    url_string = url_string.replace(' ', '').replace('\n', '')

    # resp = requests.get('https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&outputsize=full&symbol=ORCL&apikey=ESDNSG7Y1KQ6CZ9K')
    # data = resp.json()
    # print(len(data))
    # print(data.keys())
    # print(data)

    print(url_string)
    return url_string

# config_run = get_config_run(GLOBAL_RUN_NAME) 
url_string = create_url_string(GLOBAL_CONFIG_RUN, GLOBAL_CONFIG)
print(f'url_string: {url_string}')

In [None]:
def fetch_market_data(url_string):
    """
    Fetches news, makes sures all the pages are scraped
    """
    # print(url_string)
    response = requests.get(url_string)
    data = response.json()
    # print(data)

    print(type(data), response.json().keys())
    time_series_dict = data['Time Series (Daily)'] #returns a dict of dicts
    total_results = len(time_series_dict)
    print(f"No. of Time Series data points: {total_results}")
    
    # print(time_series_dict, len(time_series_dict))
    return time_series_dict

url_string = create_url_string(GLOBAL_CONFIG_RUN, GLOBAL_CONFIG)
# print(f'url_string: {url_string}')
time_series_dict = fetch_market_data(url_string)
print(time_series_dict)

In [None]:
def marketdata_to_df(time_series_dict):
    
    # Convert to DataFrame
    df = pd.DataFrame.from_dict(time_series_dict, orient='index').reset_index()
    
    # Rename columns
    df.rename(columns={'index': 'date'}, inplace=True)

    column_rename_mapping = {
        '1. open': 'open', 
        '2. high': 'high', 
        '3. low': 'low',
        '4. close': 'close',
        '5. volume': 'volume'
    }
    df = df.rename(columns=column_rename_mapping)
    df["symbol"] = GLOBAL_CONFIG_RUN["symbol"]
    df["function"] = GLOBAL_CONFIG_RUN["function"]
    print(df.columns)
    
    return df

df = marketdata_to_df(time_series_dict)
df.head(3)

## Pushing to RAW Schema

In [None]:
def preprocess_dataframe(df):
    """
    Creating the primary key hash & ingested_at value.
    """
    df = df.copy()
    df["price_id"] = df.apply(
        lambda row: hashlib.sha1(f"{row['date']}_{row['symbol']}_{row['function']}".encode()).hexdigest(),
        axis=1
    )
    
    # df["ingested_at"] = pd.Timestamp.now(tz="UTC").tz_localize(None) #Done at SQL Level
    
    print(f'df.columns: {df.columns}')
    return df

df = preprocess_dataframe(df)
df.head(3)

In [None]:
# Creating a Temp view & table - Session Scoped
def create_temp_table_from_df(df, temp_table):
    """
    Creating a Temp Table (Session Scoped), can be used within the notebook.
    """

    conn = get_snowlfake_conn(schema_name="RAW") #Table is not created in RAW, it is created in session-scope
    cur = conn.cursor()

    try:
        # Drop old temp table if exists
        cur.execute(f"DROP TABLE IF EXISTS {temp_table}")

        # Write dataframe to a temp table
        success, nchunks, nrows, _ = write_pandas(
            conn, 
            df, 
            table_name=temp_table, 
            auto_create_table=True, 
            overwrite=True, 
            quote_identifiers=False,
            use_logical_type=True
        )

        print(f"[INFO] Temp table created: {temp_table}, Rows inserted: {nrows}")
    
    finally:
        cur.close()
        conn.close()
    
create_temp_table_from_df(df, temp_table="MARKET_DATA_TEMP")

In [None]:
SELECT * FROM MARKET_DATA_TEMP LIMIT 3;

In [None]:
-- DROP TABLE SIGNAL_EXTRACTION_DB.RAW.MARKET_DATA;
CREATE TABLE IF NOT EXISTS SIGNAL_EXTRACTION_DB.RAW.MARKET_DATA (
    date DATE,
    open NUMBER(10,4),
    high NUMBER(10,4),
    low NUMBER(10,4),
    close NUMBER(10,4),
    volume NUMBER(20,4),
    symbol STRING,
    function STRING,
    price_id STRING PRIMARY KEY,
    ingested_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP
);

In [None]:
MERGE INTO SIGNAL_EXTRACTION_DB.RAW.MARKET_DATA AS target
USING MARKET_DATA_TEMP AS source
ON target.price_id = source.price_id
-- WHEN MATCHED THEN UPDATE SET //No need to update since immutable, past values for a symbol will not change.
--     target.date     = source.date,
--     target.open     = source.open,
--     target.high     = source.high,
--     target.low      = source.low,
--     target.close    = source.close,
--     target.volume   = source.volume,
--     target.symbol   = source.symbol,
--     target.function = source.function,
--     target.ingested_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
    INSERT (
        date, open, high, low, close, volume, symbol, function, price_id, ingested_at
    )
    VALUES (
        source.date, source.open, source.high, source.low, source.close, 
        source.volume, source.symbol, source.function, source.price_id, CURRENT_TIMESTAMP
    );

In [None]:
SELECT * FROM SIGNAL_EXTRACTION_DB.RAW.MARKET_DATA LIMIT 3;

In [None]:
SELECT SYMBOL, COUNT(DATE) as ROW_COUNT
FROM SIGNAL_EXTRACTION_DB.RAW.MARKET_DATA
GROUP BY SYMBOL;

## Future Developement

In [None]:
-- TODO: Change the hash functions so that we can take low frequency values as well.