In [0]:
# Installing Required Packages
#%pip install -r /dbfs/FileStore/tables/requirements_dev.txt
%pip install pyotp
%pip install logzero
%pip install websocket-client    
%pip uninstall pycrypto
%pip install pycryptodome    
%pip install azure-storage-blob

In [0]:
# Importing required libraries
from SmartApi import SmartConnect
import pyotp
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, lit
from pyspark.sql.types import *
from pyspark.sql.window import *

In [0]:
# creating sparksession
spark = SparkSession.builder.appName("SmartConnect").getOrCreate()
spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")

In [0]:
# Passing Cre
"""
api_key = dbutils.secrets.get(scope = "AngelOne-API-Key", key = "AngelOne-API-Key" )
client_code = dbutils.secrets.get(scope = "AngelOne-client-code", key = "AngelOne-client-code")
password =dbutils.secrets.get(scope = "AngelOne-password",key = "AngelOne-password")
totp_key = dbutils.secrets.get(scope = "AngelOne-Totp-key", key = "AngelOne-Totp-key")
"""
api_key = dbutils.secrets.get(scope = "kv-trading-data-analytic", key = "AngelOne-API-Key" )
client_code = dbutils.secrets.get(scope = "kv-trading-data-analytic", key = "AngelOne-client-code")
password =dbutils.secrets.get(scope = "kv-trading-data-analytic",key = "AngelOne-password")
totp_key = dbutils.secrets.get(scope = "kv-trading-data-analytic", key = "AngelOne-Totp-key")
# === Step 2: Generate TOTP ===
totp = pyotp.TOTP(totp_key).now()

# === Step 3: Create connection and login ===
obj = SmartConnect(api_key=api_key)
data = obj.generateSession(client_code, password, totp)



In [0]:
# Function to fetch OHLC data for given inputs, Here i am creating function with symbol,symbol_token, interval, from_date, to_date as parameters
#   symbol and symbol_token: to fetch data from particular stock,
#   interval : here i am using 1 day but can be fetched for other intervals as well. For more details https://smartapi.angelbroking.com/docs/Historical
#   from_date and to_date: to decide range of data we need, here i am fetching 1 year data.

def GetHistoricalOHLCData(symbol,symbol_token, interval, from_date, to_date):
    historicParam = {
        "exchange": "NSE",
        "symboltoken": symbol_token,
        "interval": interval,
        "fromdate": from_date,
        "todate": to_date
    }
    try:
        candles = obj.getCandleData(historicParam)['data']
        #print("Raw candle data:", candles)

        df = pd.DataFrame(candles)
        df.columns = ['datetime', 'open', 'high', 'low', 'close', 'volume']
        df['datetime'] = pd.to_datetime(df['datetime'])
        # print(df.head())
        df["Symbol"] = symbol
        return df
    except Exception as e:
        print("Error fetching data:", e)
        return pd.DataFrame() 

In [0]:
# Creating a dictionary with symbol and symbol_token, will iterate through this dictionary and fetch data for each symbol

SymToken_Symbol_pairs = {
10585:'MOM30IETF-EQ'
,10690:'NIFTYQLITY-EQ'
,25851:'VAL30IETF-EQ'
,23806:'ABSLPSE-EQ'
,9168:'UTISXN50-EQ'
,2328:'CPSEETF-EQ'
,14428:'GOLDBEES-EQ'
,18284:'HNGSNGBEES-EQ'
,7074:'MAHKTECH-EQ'
,11241:'HDFCGROWTH-EQ'
,21254:'LOWVOLIETF-EQ'
,11255:'HDFCQUAL-EQ'
,3001:'BSE500IETF-EQ'
,13198:'COMMOIETF-EQ'
,12578:'FINIETF-EQ'
,10723:'INFRAIETF-EQ'
,10676:'MNC-EQ'
,19640:'ALPHAETF-EQ'
,23855:'MIDSMALL-EQ'
,22832:'SMALLCAP-EQ'
,19237:'MONIFTY500-EQ'
,23184:'MOREALTY-EQ'
,23181:'MOSMALL250-EQ'
,10825:'MOVALUE-EQ'
,7422:'MONQ50-EQ'
,22739:'MON100-EQ'
,24081:'TOP100CASE-EQ'
,10576:'NIFTYBEES-EQ'
,25606:'MOMENTUM50-EQ'
,7412:'ALPHA-EQ'
,22344:'ALPL30IETF-EQ'
,7844:'AUTOIETF-EQ'
,11439:'BANKBEES-EQ'
,2636:'DIVOPPBEES-EQ'
,24461:'EVINDIA-EQ'
,5220:'BFSI-EQ'
,5306:'FMCGIETF-EQ'
,6297:'HEALTHY-EQ'
,10508:'MOHEALTH-EQ'
,2435:'CONSUMBEES-EQ'
,24944:'MODEFENCE-EQ'
,8882:'TNIDETF-EQ'
,7979:'MAKEINDIA-EQ'
,19084:'ITBEES-EQ'
,24861:'METALIETF-EQ'
,21423:'MOM100-EQ'
,8413:'MIDCAPETF-EQ'
,7456:'MIDQ50ADD-EQ'
,8077:'MIDCAP-EQ'
,4529:'NEXT50IETF-EQ'
,24533:'OILIETF-EQ'
,4973:'PHARMABEES-EQ'
,11386:'PVTBANIETF-EQ'
,15032:'PSUBNKBEES-EQ'
,25171:'TOP10ADD-EQ'
,1200:'ESG-EQ'
,17475:'NV20IETF-EQ'
,25080:'MULTICAP-EQ'
,25996:'EMULTIMQ-EQ'
,3507:'MAFANG-EQ'
,5782:'MASPTOP50-EQ'
,522:'ICICIB22-EQ'
,17702:'MIDSELIETF-EQ'
,8080:'SILVERBEES-EQ'
,4378:'SENSEXIETF-EQ'
,17044:'SHARIABEES-EQ'
}

In [0]:
# Dynamic Calculation of FromDate and ToDate to pass to the above function and fetch 1 year data 
from datetime import date, timedelta, datetime
ToDate = date.today()
FromDate = ToDate - timedelta(days=365)

# Converting Date to datetime
FromDate_formatted = f"{FromDate} 00:00"  
ToDate_formatted = f"{ToDate} 00:00"  
print(FromDate_formatted, ToDate_formatted)

In [0]:
# Creating pyspark empty df and will append the data from each symbol to it by converting it into pyspark df
schema = StructType([
    StructField("Date", DateType(), True),
    StructField("Open", IntegerType(), True),
    StructField("High", IntegerType(), True),
    StructField("Low", IntegerType(), True),
    StructField("Close", IntegerType(), True),
    StructField("Volume", IntegerType(), True),
    StructField("Symbol", StringType(), True)
])
final_df = spark.createDataFrame([], schema)

In [0]:
DailyData = GetHistoricalOHLCData(symbol,token,"ONE_DAY",FromDate_formatted,ToDate_formatted)
my_df = pd.DataFrame(DailyData)
#print(DailyData)
print(my_df)

In [0]:
# Iterating through each symbol in the dictionaty to fetch 1 year data from each symbol and appending all to 1 df (final_df)
import time
interval = "ONE_DAY"
# final_df = pd.DataFrame()
for token, symbol in SymToken_Symbol_pairs.items():
    DailyData = GetHistoricalOHLCData(symbol,token,"ONE_DAY",FromDate_formatted,ToDate_formatted)
    my_df = pd.DataFrame(DailyData)
#    final_df = pd.concat([final_df, my_df], ignore_index=True)
#    time.sleep(1)
#    print(final_df)
    spark_df = spark.createDataFrame(my_df)
    final_df = final_df.union(spark_df )
    time.sleep(1)


In [0]:
final_df.count()

In [0]:
# Filtering for friday date, because to check if it is giving proper data or not because of timezones 
final_df.filter((col("symbol") == 'MOM30IETF-EQ') & (col("date")== "2025-03-07 00:00")).orderBy(col("date").desc()).show()

In [0]:
raw_data_path = "/mnt/rawdata/"
final_df.write.mode("overwrite").format("csv").option("header","True").save(raw_data_path)