In [1]:
import yfinance as yf
import pandas as pd
import snowflake.connector
import os
import datetime
from concurrent.futures import ThreadPoolExecutor

In [2]:
# Connect snowflake to python
conn = snowflake.connector.connect(
    user = 'CHAITANYA35',
    password = '',
    account = '',
    warehouse = 'COMPUTE_WH',
    database = 'ISM',
    schema = 'DATA'
)

cursor = conn.cursor()

In [3]:
def get_stock_data(stock_symbol):
    stock_name = stock_symbol.upper() + ".NS"
    try:
        stock_data = yf.Ticker(stock_name)
        all_data = stock_data.history(period="max", auto_adjust=True)

        if all_data.empty:
            print(f"No Data found fot {stock_symbol}")
            return pd.DataFrame()

        all_data.reset_index(inplace=True)
        all_data = all_data.rename(columns={
            'Date': 'price_date',
            'Open': 'open_price',
            'High': 'high_price',
            'Low': 'low_price',
            'Close': 'close_price',
            'Volume': 'volume'
        })

        # add stock_symbol column as well.
        all_data['stock_symbol'] = stock_symbol.upper()
        all_data['price_date'] = pd.to_datetime(all_data['price_date']).dt.strftime('%Y-%m-%d')

        # reorder all columns.
        all_data = all_data[['stock_symbol', 'price_date', 'open_price', 'high_price', 'low_price', 'close_price', 'volume']]

        return all_data

    except Exception as e:
        print(f"Error fetching data for {stock_name}: {e}")
        return pd.DataFrame()

In [4]:
def save_and_upload(stock_symbol, group):
    try:
        folder = "temp_parquet_files_parallel_load"
        os.makedirs(folder,exist_ok=True)

        file_name = os.path.join(folder,f"{stock_symbol}.parquet")
        group.to_parquet(file_name, engine='pyarrow', index=False)

        absolute_path = os.path.abspath(file_name)
        basename = os.path.basename(file_name)
        
        cursor.execute(f"PUT file://{absolute_path} @stage_stock_prices_parallel_load OVERWRITE=True")
        cursor.execute(f"""
            COPY INTO stock_prices_parallel_load
            FROM @stage_stock_prices_parallel_load/{basename}
            FILE_FORMAT = (TYPE = 'PARQUET')
            MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
            """)
        
    except Exception as e:
        print(f"Error processing {stock_symbol}: {e}")       

In [5]:
def log_start(process_name):
    start_time = datetime.datetime.now()
    cursor.execute("""
    INSERT INTO DATA_LOAD_LOG (PROCESS_NAME, START_TIME, STATUS)
    VALUES (%s, %s, %s)
    """, (process_name, start_time, 'Running'))

def log_end(status, message = ''):
    end_time = datetime.datetime.now()
    cursor.execute("""
    UPDATE DATA_LOAD_LOG
    SET END_TIME = %s, STATUS = %s, MESSAGE = %s
    WHERE ID IN (SELECT ID FROM DATA_LOAD_LOG WHERE STATUS='Running')
    """, (end_time, status,message))


In [7]:
if __name__ == "__main__":
    process_name = "Stock_Data_Parallel_Load"

    try:   
        log_start(process_name)
        cursor.execute("select stock_symbol from stock_symbol limit 5")
        stock_list = [row[0] for row in cursor.fetchall()]
    
        df_all_stocks_data = pd.DataFrame()    
        for stock in stock_list:
            df = get_stock_data(stock)
            if not df.empty:
                df_all_stocks_data = pd.concat([df_all_stocks_data,df], ignore_index=True) 
    
        # Check dataframe information
        print(df_all_stocks_data.count()) 
    
        # Truncate Table First.
        cursor.execute("Truncate table stock_prices_parallel_load")
    
        # Split by stock symbol
        stock_groups = [(stock, group) for stock, group in df_all_stocks_data.groupby('stock_symbol')]
    
        cursor.execute("CREATE STAGE IF NOT EXISTS stage_stock_prices_parallel_load")
    
        # Parallel Processing
        with ThreadPoolExecutor(max_workers=8) as executor:
            futures = [executor.submit(save_and_upload, stock, group) for stock, group in stock_groups]    
        
        log_end('Success', 'Data Load Sucessfull')
        Print('Data Loaded Sucessfully')
        conn.commit()

    except Exception as e:
        log_end('Failed', 'Data Load Failed')
        print('Data Load Failed')
        conn.commit()

    finally:
        cursor.close()
        conn.close()


stock_symbol    2279957
price_date      2279957
open_price      2279956
high_price      2279956
low_price       2279956
close_price     2279956
volume          2279957
dtype: int64
Data Load Failed
