In [35]:
import yfinance as yf
import pandas as pd
from datetime import datetime

In [36]:
import logging
from datetime import datetime
import traceback
from sqlalchemy import text


In [37]:
stocks = [{"symbol": "^NSEI", "name": "Nifty 50", "sector": "Index"},               #List of Stocks whose data I require
          {"symbol": "HDFCBANK.NS", "name": "HDFC Bank", "sector": "Banking"},    
          {"symbol": "TCS.NS","name": "TCS","sector": "IT"},
          {"symbol": "RELIANCE.NS","name": "Reliance Industries","sector": "Energy"},
          {"symbol": "HINDUNILVR.NS","name": "Hindustan Unilever","sector": "FMCG"},
          {"symbol": "MARUTI.NS","name": "Maruti Suzuki","sector": "Auto"},
          {"symbol": "BHARTIARTL.NS","name": "Bharti Airtel","sector": "Telecom"}]


# Tables Structure
- Table 1: Intraday Market Data
- Table 2: Stock Meta Data


In [38]:
intraday_data = []    #This will store a list of dateframes 

for stock in stocks:
    ticker = yf.Ticker(stock["symbol"])
    
    df = ticker.history(period="1d",interval="5m")
    
    if df.empty:
        continue
    
    df = df.reset_index()    # datetime was used as index labels earlier
    
    df["symbol"] = stock["symbol"]
    
    df = df[["Datetime","symbol","Open","High","Low","Close","Volume"]]     #Restructuring Table
    
    intraday_data.append(df)   


In [39]:
intraday_market_data = pd.concat(intraday_data,ignore_index=True)


In [40]:
intraday_market_data.head()

Unnamed: 0,Datetime,symbol,Open,High,Low,Close,Volume
0,2026-01-27 09:15:00+05:30,^NSEI,25079.0,25123.449219,25007.849609,25009.949219,0
1,2026-01-27 09:20:00+05:30,^NSEI,25007.5,25008.699219,24938.849609,24946.400391,0
2,2026-01-27 09:25:00+05:30,^NSEI,24946.300781,24970.699219,24933.050781,24958.550781,0
3,2026-01-27 09:30:00+05:30,^NSEI,24958.400391,25000.199219,24940.199219,24989.5,0
4,2026-01-27 09:35:00+05:30,^NSEI,24989.949219,25095.150391,24977.800781,25091.5,0


In [41]:
intraday_market_data.columns = [col.lower() for col in intraday_market_data.columns]   #Normalizing Column names to lower case


In [42]:
intraday_market_data.dtypes

datetime    datetime64[ns, Asia/Kolkata]
symbol                            object
open                             float64
high                             float64
low                              float64
close                            float64
volume                             int64
dtype: object

In [43]:
intraday_market_data.dtypes

datetime    datetime64[ns, Asia/Kolkata]
symbol                            object
open                             float64
high                             float64
low                              float64
close                            float64
volume                             int64
dtype: object

In [44]:
metadata_rows = []   # List will contain  metadata of each stock in dictionary format

for stock in stocks:
    ticker = yf.Ticker(stock["symbol"])
    info = ticker.fast_info                # Acquiring Meta Data ofor each stock in dictionary format
    
    metadata_rows.append({                 # creating a dictionory for each stock metadata and appending the dictionary to list created
        "symbol": stock["symbol"],
        "name": stock["name"],
        "sector": stock["sector"],
        "market_cap": info.get("marketCap"),
        "shares_outstanding": info.get("shares"),
        "ingestion_time": datetime.now()
    })


In [45]:
stock_metadata = pd.DataFrame(metadata_rows)

In [46]:
stock_metadata.head()

Unnamed: 0,symbol,name,sector,market_cap,shares_outstanding,ingestion_time
0,^NSEI,Nifty 50,Index,,,2026-01-27 15:42:49.554242
1,HDFCBANK.NS,HDFC Bank,Banking,14254330000000.0,15386800000.0,2026-01-27 15:42:49.847722
2,TCS.NS,TCS,IT,11425920000000.0,3618088000.0,2026-01-27 15:42:50.132069
3,RELIANCE.NS,Reliance Industries,Energy,18681580000000.0,13532470000.0,2026-01-27 15:42:50.406329
4,HINDUNILVR.NS,Hindustan Unilever,FMCG,5641133000000.0,2349591000.0,2026-01-27 15:42:50.681884


In [47]:
len(stock_metadata.iloc[0]['name'])

8

In [48]:
stock_metadata.dtypes

symbol                        object
name                          object
sector                        object
market_cap                   float64
shares_outstanding           float64
ingestion_time        datetime64[ns]
dtype: object

In [49]:
stock_metadata["market_cap"] = stock_metadata["market_cap"].round()
stock_metadata["shares_outstanding"] = stock_metadata["shares_outstanding"].round()

In [50]:
stock_metadata["market_cap"] = pd.to_numeric(stock_metadata["market_cap"], errors="coerce")
stock_metadata["shares_outstanding"] = pd.to_numeric(stock_metadata["shares_outstanding"], errors="coerce")

In [51]:
stock_metadata["market_cap"] = stock_metadata["market_cap"].astype("Int64")
stock_metadata["shares_outstanding"] = stock_metadata["shares_outstanding"].astype("Int64")

In [52]:
stock_metadata.dtypes

symbol                        object
name                          object
sector                        object
market_cap                     Int64
shares_outstanding             Int64
ingestion_time        datetime64[ns]
dtype: object

In [53]:
stock_metadata

Unnamed: 0,symbol,name,sector,market_cap,shares_outstanding,ingestion_time
0,^NSEI,Nifty 50,Index,,,2026-01-27 15:42:49.554242
1,HDFCBANK.NS,HDFC Bank,Banking,14254327950117.0,15386795741.0,2026-01-27 15:42:49.847722
2,TCS.NS,TCS,IT,11425920385002.0,3618087519.0,2026-01-27 15:42:50.132069
3,RELIANCE.NS,Reliance Industries,Energy,18681578472618.0,13532472635.0,2026-01-27 15:42:50.406329
4,HINDUNILVR.NS,Hindustan Unilever,FMCG,5641133431484.0,2349591262.0,2026-01-27 15:42:50.681884
5,MARUTI.NS,Maruti Suzuki,Auto,4793067240630.0,314402574.0,2026-01-27 15:42:51.071378
6,BHARTIARTL.NS,Bharti Airtel,Telecom,12020910241652.0,6091471619.0,2026-01-27 15:42:51.325092


In [54]:
intraday_market_data.head()

Unnamed: 0,datetime,symbol,open,high,low,close,volume
0,2026-01-27 09:15:00+05:30,^NSEI,25079.0,25123.449219,25007.849609,25009.949219,0
1,2026-01-27 09:20:00+05:30,^NSEI,25007.5,25008.699219,24938.849609,24946.400391,0
2,2026-01-27 09:25:00+05:30,^NSEI,24946.300781,24970.699219,24933.050781,24958.550781,0
3,2026-01-27 09:30:00+05:30,^NSEI,24958.400391,25000.199219,24940.199219,24989.5,0
4,2026-01-27 09:35:00+05:30,^NSEI,24989.949219,25095.150391,24977.800781,25091.5,0


In [55]:
intraday_market_data.isnull().sum()

datetime    0
symbol      0
open        0
high        0
low         0
close       0
volume      0
dtype: int64

In [56]:
intraday_market_data.duplicated().sum()

np.int64(0)

In [57]:
logging.basicConfig(
    filename="stock_metadata_pipeline.log",
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
    force=True
)


In [58]:
from sqlalchemy import create_engine

engine = create_engine(
    "mssql+pyodbc://@ADNANKHADER\\SQLEXPRESS/stock_market_analytics"
    "?driver=ODBC+Driver+17+for+SQL+Server"
)
if engine.connect():
    print("Connection Successful")


  if engine.connect():


Connection Successful


In [59]:
try:
    logging.info("Starting metadata load into STAGING table")

    stock_metadata.to_sql(
        name="staging_stock_metadata",
        con=engine,
        if_exists="append",
        index=False
    )

    logging.info(
        f"Inserted {len(stock_metadata)} rows into staging_stock_metadata"
    )

except Exception as e:
    logging.error("Failed to load metadata into staging table")
    logging.error(str(e))
    logging.error(traceback.format_exc())
    raise



In [60]:
merge_metadata_sql = text("""
MERGE stock_metadata AS target
USING staging_stock_metadata AS source
ON target.symbol = source.symbol

WHEN MATCHED THEN
    UPDATE SET
        market_cap = source.market_cap,
        shares_outstanding = source.shares_outstanding,
        ingestion_time = source.ingestion_time

WHEN NOT MATCHED BY TARGET THEN
    INSERT (
        symbol,
        name,
        sector,
        market_cap,
        shares_outstanding,
        ingestion_time
    )
    VALUES (
        source.symbol,
        source.name,
        source.sector,
        source.market_cap,
        source.shares_outstanding,
        source.ingestion_time
    );
""")


In [61]:
try:
    logging.info("Starting MERGE into stock_metadata")

    with engine.begin() as conn:
        conn.execute(merge_metadata_sql)

    logging.info("MERGE into stock_metadata completed successfully")

except Exception as e:
    logging.error("MERGE into stock_metadata failed")
    logging.error(str(e))
    logging.error(traceback.format_exc())
    raise


In [62]:
try:
    logging.info("Truncating staging_stock_metadata table")

    with engine.begin() as conn:
        conn.execute(text("TRUNCATE TABLE staging_stock_metadata;"))

    logging.info("staging_stock_metadata truncated successfully")

except Exception as e:
    logging.error("Failed to truncate staging_stock_metadata")
    logging.error(str(e))
    logging.error(traceback.format_exc())
    raise


In [63]:
logging.info("Metadata pipeline run completed successfully")

In [64]:
try:
    logging.info("Starting intraday data load into staging table")

    intraday_market_data.to_sql(
        name="staging_intraday_market_data",
        con=engine,
        if_exists="append",
        index=False
    )

    logging.info(
        f"Inserted {len(intraday_market_data)} rows into staging table"
    )

except Exception as e:
    logging.error("Failed to insert data into staging table")
    logging.error(str(e))
    logging.error(traceback.format_exc())
    raise


In [65]:
merge_sql = text("""
MERGE intraday_market_data AS target
USING staging_intraday_market_data AS source
ON  target.symbol = source.symbol
AND target.[datetime] = source.[datetime]

WHEN NOT MATCHED BY TARGET THEN
    INSERT (
        [datetime],
        symbol,
        [open],
        high,
        low,
        [close],
        volume
    )
    VALUES (
        source.[datetime],
        source.symbol,
        source.[open],
        source.high,
        source.low,
        source.[close],
        source.volume
    );
""")



In [66]:
try:
    logging.info("Starting MERGE into intraday_market_data")

    with engine.begin() as conn:
        conn.execute(merge_sql)

    logging.info("MERGE completed successfully")

except Exception as e:
    logging.error("MERGE failed")
    logging.error(str(e))
    logging.error(traceback.format_exc())
    raise

In [67]:
try:
    logging.info("Truncating staging table")

    with engine.begin() as conn:
        conn.execute(text("TRUNCATE TABLE staging_intraday_market_data;"))

    logging.info("Staging table truncated successfully")

except Exception as e:
    logging.error("Failed to truncate staging table")
    logging.error(str(e))
    logging.error(traceback.format_exc())
    raise


In [68]:
logging.info("Intraday stock pipeline run completed successfully")