In [21]:
import psycopg2 as ps2
import yfinance as yf
import sqlalchemy
import pandas as pd
import numpy as np
import datetime as dt
import scipy.stats as stats
import json
import pyarrow.parquet as pq
import re
import talib as ta
import warnings
warnings.filterwarnings('ignore')

## imports for reading multiple files
import glob
import os
import dask
import dask_expr
dask.config.set({'dataframe.query-planning':True})
import dask.dataframe as ddf

## setup logger to log flow of data creation
import logging
logger = logging.getLogger('data_ETLC_log')
formatter = logging.Formatter('%(asctime)s | %(name)s | %(levelname)s | %(message)s')
file_h = logging.FileHandler('data_ETL.log')
file_h.setFormatter(formatter)
logger.setLevel(logging.DEBUG)
logger.addHandler(file_h)

# Create Connection and Cursor to PostgreSQL database

In [22]:
## create connection with database fin_data and username fin_user
try:
    con = ps2.connect("host=127.0.0.1 dbname=fin_data user=fin_user password=fin_user")
    print(type(con))
    logger.info(f"Created connection of type {type(con)}")
except:
    print("Could not connect to database")
    logger.error("Could not connect to database", exc_info=True)

## create cursor
try:
    cur = con.cursor()
    print(type(cur))
    logger.info(f"Created cursor object of type {type(cur)}")
except:
    print("Could not create cursor object")
    logger.error("Could not create cursor object", exc_info=True)

## set autocommit to true
con.set_session(autocommit=True)

<class 'psycopg2.extensions.connection'>
<class 'psycopg2.extensions.cursor'>


# Create Equity Table and insert values

In [23]:
names = ['^NSEI','RELIANCE.NS', 'BHARTIARTL.NS', 'ASIANPAINT.NS','HDFCBANK.NS', 'ICICIBANK.NS', 'SBIN.NS','TCS.NS', 'MPHASIS.NS', 'TECHM.NS']
try:
    cur.execute("CREATE TABLE IF NOT EXISTS equity (equity_id serial PRIMARY KEY, company_name varchar, ticker varchar UNIQUE NOT NULL, sector varchar)")
    logger.info("Equity table created succesfully.")
except:
    logger.error("Could not create table in database.", exc_info=True)
    raise Exception("Could not create table in database.")
for ticker_name in names:
    try:
        company_name = yf.Ticker(ticker_name).info['longName']
        logger.info(f"Equity retrieved for {company_name}.")
    except:
        logger.error(f"Could not retrieve data for {company_name}", exc_info=True)
        raise Exception(f"Could not retrieve data for {company_name}")
    if ticker_name!='^NSEI':
        sector = yf.Ticker(ticker_name).info['sector']
    else:
        sector = None
    cur.execute("INSERT INTO equity (company_name, ticker, sector) values (%s, %s, %s)",(company_name, ticker_name, sector))
    logger.info(f"Data for equity {company_name} added to equity table in database.")

# Functions to get equity_id from ticker names or file path using equity table
# Functions to find outlier and flag outliers

In [25]:
## path to data source
path = "D:\\Study\\DE\\Project\\data_source"

## dictionary formed by reading equity table from database, to create the foreign key by replacing ticker name with equity_id
equity_id_names = pd.read_sql("SELECT equity_id, ticker from equity", con=con, index_col='ticker')
equity_id_name_dict = equity_id_names.to_dict()['equity_id']
equity_id_name_dict = {key.strip("^").split('.')[0]: int(value) for (key,value) in equity_id_name_dict.items()}
print(equity_id_name_dict)

## this function takes dataframe at input, returns dataframe with file_path replaced by equity_id, because in source directory there is one file for each equity
## extract filename from full path, file path column is created with name file_path, when calling dask.read_csv
## file_path requires cleaning after reading from table,then use equity_id_name_dict to replace file_path with equity_id
## map equity name to id and convert it to integer

## data integrity check -> raises type error if arguement is not a dataframe or series from dask or pandas
## data integrity check -> raise value error if dataframe does not contain a column named file_path

def file_path_to_id(df):
    if type(df) not in [ddf.DataFrame, pd.DataFrame, pd.Series, ddf.Series]:
        logger.error(f"Arguement of type {type(df)} was provided", exc_info=True)
        raise TypeError("Type of given arguement is not acceptable for this function")
    if 'file_path' not in df.columns:
        logger.error(f"No column named file_path found in given dataframe.", f"Columns present in given dataframe are:{df.columns}", exc_info=True)
        raise ValueError("Given dataframe does not contain a column named file_path.")
    df['file_path'] = df['file_path'].apply(lambda x: re.search(r'([^/]+)\.(json|csv)$', x).group(1))
    df = df.rename(columns={'file_path':'equity_id'})

    df['equity_id'] = df['equity_id'].map(equity_id_name_dict)
    df['equity_id'] = df['equity_id'].astype(int)
    logger.info("Dataframe file_path column is modified succesfully.")
    return df.sort_values('equity_id')

## this function takes and cleans ticker name instead of file path and maps equity_id to it.
## data integrity check -> exactly similar to previous function

def ticker_name_to_id(df):
    if type(df) not in [ddf.DataFrame, pd.DataFrame, pd.Series, ddf.Series]:
        logger.error(f"Arguement of type {type(df)} was provided", exc_info=True)
        raise TypeError("Given arguement is not a Dataframe")
    if 'ticker' not in df.columns:
        logger.error(f"No column named ticker found in given dataframe.", f"Columns present in given dataframe are:{df.columns}", exc_info=True)
        raise ValueError("Given dataframe does not contain a column named ticker.")
    
    df['ticker'] = df['ticker'].str.replace(".NS", "")
    df['ticker'] = df['ticker'].map(equity_id_name_dict)
    df.rename(columns={'ticker':'equity_id'}, inplace=True)
    df['equity_id'] = df['equity_id'].astype(int)
    logger.info("Dataframe ticker column is modified succesfully.")
    return df

## function to find out outliers in a dataframe for each column
## using z score method to find outliers
def find_outlier(df, col):
    if type(df) not in [ddf.DataFrame, pd.DataFrame, pd.Series, ddf.Series]:
        logger.error(f"Arguement of type {type(df)} was provided", exc_info=True)
        raise TypeError("Given arguement is not a Dataframe")
    if type(df) == ddf.DataFrame:
        df = df.compute()
    z = np.abs(stats.zscore(df[col]))
    df[f'is_outlier_{col}'] = np.where(z>3, True, False)
    return df

## function to set final outlier column based on other outlier columns
## if final column has value true, then that row should be removed as it contains an outlier
def overall_outlier(df):
    if type(df) not in [ddf.DataFrame, pd.DataFrame, pd.Series, ddf.Series]:
        logger.error(f"Arguement of type {type(df)} was provided", exc_info=True)
        raise TypeError("Given arguement is not a Dataframe")
    if type(df) == ddf.DataFrame:
        df = df.compute()
    df['outlier'] = np.where(((df['is_outlier_Open'] == True) |\
         (df['is_outlier_High'] == True) |\
             (df['is_outlier_Low'] == True) |\
                 (df['is_outlier_Close'] == True) |\
                    (df['is_outlier_Volume'] == True)
    ), True, False)
    return df

{'NSEI': 1, 'RELIANCE': 2, 'BHARTIARTL': 3, 'ASIANPAINT': 4, 'HDFCBANK': 5, 'ICICIBANK': 6, 'SBIN': 7, 'TCS': 8, 'MPHASIS': 9, 'TECHM': 10}


## Load split data and dividends data files

In [26]:
## using pandas read_csv to read files
## calling dataframe.round function to round of dividend values to 4 decimal digits
## remove .NS from ticker name, replace ticker name by equity_id and rename columns

div_path = r"D:\Study\DE\Project\data_source\splits_dividends_data\dividends_data.csv"
div_data = pd.read_csv(div_path)

## data integrity check -> check for duplicate/null records, if higher than threshold throw error, else drop and warn
if div_data.duplicated().sum()>0:
    if div_data.duplicated().sum()>10:
        print(f"High number of duplicate records found: {div_data.duplicated().sum()}")
        logger.error(f"High number of duplicate records found: {div_data.duplicated().sum()}", exc_info=True)
        raise ValueError("Duplicate records present in dividends dataframe")
    else:
        div_data = div_data.drop_duplicates(keep='first')
        logger.info(f"Dropped {div_data.duplicated().sum()} records from dividends dataframe due to duplication")
        warnings.warn(f"Dropped {div_data.duplicated().sum()} records from dividends dataframe due to duplication")

if div_data.isna().sum().sum()>0:
    if div_data.isna().sum().sum()>10:
        print(f"High number of null records found: {div_data.isna().sum().sum()}")
        logger.error(f"High number of null records found: {div_data.isna().sum().sum()}", exc_info=True)
        raise ValueError("Null records present in dividends dataframe")
    else:
        div_data = div_data.drop_na(keep='first')
        logger.info(f"Dropped {div_data.isna().sum()} records from dividends dataframe due to null values")
        warnings.warn(f"Dropped {div_data.isna().sum()} records from dividends dataframe due to null values")

## data integrity check -> standardize date format to ensure constistency
div_data['Date'] = pd.to_datetime(div_data['Date']).dt.date
div_data['dividend'] = div_data['dividend'].round(4).astype(float)
div_data = ticker_name_to_id(div_data)

print(f"Number of records loaded from dividend data: {div_data.shape[0]}")
logger.info(f"Number of records loaded from dividend data: {div_data.shape[0]}")
div_data.to_parquet(path='D:\\Study\\DE\\Project\\data_destination\\dividend_data.parquet', compression='snappy')
print(div_data.head())

Number of records loaded from dividend data: 292
         Date  dividend  equity_id
0  1996-05-24    0.7430          2
1  1997-05-16    0.8049          2
2  1998-05-06    0.8668          2
3  1999-05-07    0.9287          2
4  2000-04-06    0.9906          2


In [27]:
## using pandas read_csv to read files
## calling dataframe.round function to round of dividend values to 4 decimal digits
## remove .NS from ticker name, replace ticker name by equity_id and rename columns

split_path = r"D:\Study\DE\Project\data_source\splits_dividends_data\splits_data.csv"
split_data = pd.read_csv(split_path)
split_data.rename(columns={"splitRatio":"split_ratio"}, inplace=True)

## data integrity check -> check for duplicate/null records, if higher than threshold throw error, else drop and warn
if split_data.duplicated().sum()>0:
    if split_data.duplicated().sum()>10:
        print(f"High number of duplicate records found: {split_data.duplicated().sum()}")
        logger.error(f"High number of duplicate records found: {split_data.duplicated().sum()}", exc_info=True)
        raise ValueError("Duplicate records present in splits dataframe")
    else:
        split_data = split_data.drop_duplicates(keep='first')
        logger.info(f"Dropped {split_data.duplicated().sum()} records from splits dataframe due to duplication")
        warnings.warn(f"Dropped {split_data.duplicated().sum()} records from splits dataframe due to duplication")
if split_data.isna().sum().sum()>0:
    if split_data.isna().sum().sum()>10:
        print(f"High number of null records found: {split_data.isna().sum().sum()}")
        logger.error(f"High number of null records found: {split_data.isna().sum().sum()}", exc_info=True)
        raise ValueError("Null records present in splits dataframe")
    else:
        split_data = split_data.dropna(keep='first')
        logger.info(f"Dropped {split_data.isna().sum()} records from splits dataframe due to null values")
        warnings.warn(f"Dropped {split_data.isna().sum()} records from splits dataframe due to null values")

## data integrity check -> search for regex pattern in split_ratio to ensure split is present in correct format else raises valueError
split_string_quality = split_data['split_ratio'].str.find(r"\d:\d").astype(bool)
if False in split_string_quality:
    print([val.index for val in split_string_quality if val==False])
    logger.error("Split ratio data not in correct format. Correct format is a string like -> number:number", exc_info=True)
    raise ValueError("Split ratio data not in correct format. Correct format is a string like -> number:number")

## data integrity check -> standardize date format to ensure constistency
split_data['Date'] = pd.to_datetime(split_data['Date']).dt.date

split_data = ticker_name_to_id(split_data)

print(f"Number of records from splits data: {split_data.shape[0]}")
logger.info(f"Number of records loaded from splits data: {split_data.shape[0]}")
div_data.to_parquet(path='D:\\Study\\DE\\Project\\data_destination\\split_data.parquet', compression='snappy')
print(split_data.head())

Number of records from splits data: 18
         Date split_ratio  equity_id
0  1997-10-27         2:1          2
1  2009-11-26         2:1          2
2  2017-09-07         2:1          2
3  2009-07-24         2:1          3
4  2003-08-22         3:2          4


## Create tables in database for split and dividend data, insert values from dataframes

In [28]:
## create tables and iterate through rows of dataframe to insert values.
cur.execute("""CREATE TABLE IF NOT EXISTS split_data (split_id serial PRIMARY KEY, 
date date NOT NULL, 
split_ratio varchar,
equity_id int,
FOREIGN KEY(equity_id) REFERENCES equity(equity_id))""")
logger.info("Succesfully created table names split_data")

for index, row in split_data.iterrows():
    cur.execute("INSERT INTO split_data (Date, split_ratio, equity_id) values(%s, %s, %s)", (row['Date'], row['split_ratio'], row['equity_id']))
logger.info(f"Succesfully inserted {split_data.shape[0]} rows to split_data table in database")

cur.execute("""CREATE TABLE IF NOT EXISTS dividend_data (dividend_id SERIAL PRIMARY KEY,
date date NOT NULL,
dividend numeric,
equity_id int,
FOREIGN KEY(equity_id) REFERENCES equity(equity_id))""")
logger.info("Succesfully created table names dividend_data")

for index, row in div_data.iterrows():
    cur.execute("INSERT INTO dividend_data (date, dividend, equity_id) values(%s, %s, %s)", (row['Date'], row['dividend'], row['equity_id']))
logger.info(f"Succesfully inserted {div_data.shape[0]} rows to dividend_data table in database")

## Load data from source, merge and insert in database (execute once)

In [29]:
## Using dask library for reading all csvs in a directory. include_path_column is used to get file name
## then used file_path_to_id for transformation
## data integrity check -> standardize date format to ensure constistency
## data integrity check -> check for duplicate and null records, if higher than threshold throw error, else drop and warn

df_csv = ddf.read_csv(f"D:\\Study\\DE\\Project\\data_source\\*.csv", include_path_column="file_path")
before_length = df_csv.compute().shape[0]
after_length = df_csv.compute().drop_duplicates().shape[0]
num_duplicates = before_length - after_length
num_na = df_csv.compute().isna().sum().sum()

if num_duplicates>0:
    if num_duplicates>10:
        print(f"High number of duplicate records found: {num_duplicates}")
        logger.error(f"High number of duplicate records found: {num_duplicates}", exc_info=True)
        raise ValueError("Duplicate records present in csv dataframe")
    else:
        df_csv = df_csv.drop_duplicates(keep='first')
        logger.info(f"Dropped {num_duplicates} records from csv dataframe due to duplication")
        warnings.warn(f"Dropped {num_duplicates} records from csv dataframe due to duplication")
if num_na>0:
    if num_na>10:
        print(f"High number of null records found: {num_na}")
        logger.error(f"High number of null records found: {num_na}", exc_info=True)
        raise ValueError("Null records present in csv dataframe")
    else:
        df_csv = df_csv.drop_na()
        logger.info(f"Dropped {num_na} records from csv dataframe due to null values")
        warnings.warn(f"Dropped {num_na} records from csv dataframe due to null values")

df_csv['Date'] = ddf.to_datetime(df_csv['Date']).dt.date
df_csv = df_csv.round(2)
df_csv = file_path_to_id(df_csv)

## convert to pandas dataframe before calculating outliers
for col in df_csv.compute().columns:
    if col in ['Date','equity_id']:
        continue
    df_csv = find_outlier(df_csv,col)

df_csv = overall_outlier(df_csv)
print(f"Outliers found: {df_csv['outlier'].sum()}")
logger.info(f"Outliers found: {df_csv['outlier'].sum()}")

## if too many outliers are present, then raise exception
if(df_csv['outlier'].sum() > (df_csv.shape[0]/100)*2.5):
    logger.error("Number of outliers is more than 2.5 percent", exc_info=True)
    raise Exception("Too many outliers found.")

## drop columns that are not needed as we have final outlier column
df_csv.drop(columns=['is_outlier_Open', 'is_outlier_High', 'is_outlier_Low', 'is_outlier_Close', 'is_outlier_Volume'], inplace=True)
## filter outliers based on column
df_csv = df_csv[df_csv['outlier']==False]
## drop column as outliers are removed
df_csv.drop(columns=['outlier'], inplace=True)

logger.info("Successfully created single combined dataframe of all csv files, outliers are removed")
df_csv.head()

Outliers found: 162


Unnamed: 0,Date,Open,High,Low,Close,Volume,equity_id
0,2018-03-01,10479.95,10525.5,10447.15,10458.35,176000,1
987,2022-03-07,15867.95,15944.6,15711.45,15863.15,585400,1
986,2022-03-04,16339.45,16456.0,16133.8,16245.35,456100,1
985,2022-03-03,16723.2,16768.95,16442.95,16498.05,442100,1
984,2022-03-02,16593.1,16678.5,16478.65,16605.95,517700,1


In [30]:
## using dask to read multiple json files, then using file_path_to_id function
## data integrity check -> standardize date format to ensure constistency
## data integrity check -> check for duplicate and null records, if higher than threshold throw error, else drop and warn

df_json = ddf.read_json(f"D:\\Study\\DE\\Project\\data_source\\*.json", include_path_column="file_path", orient='record')
before_length = df_json.compute().shape[0]
after_length = df_json.compute().drop_duplicates().shape[0]
num_duplicates = before_length - after_length
num_na = df_json.compute().isna().sum().sum()

if num_duplicates>0:
    if num_duplicates>10:
        print(f"High number of duplicate records found: {num_duplicates}")
        logger.error(f"High number of duplicate records found: {num_duplicates}", exc_info=True)
        raise ValueError("Duplicate records present in json dataframe")
    else:
        df_json = df_json.drop_duplicates(keep='first')
        logger.info(f"Dropped {num_duplicates} records from json dataframe due to duplication")
        warnings.warn(f"Dropped {num_duplicates} records from json dataframe due to duplication")
if num_na>0:
    if num_na>10:
        print(f"High number of null records found: {num_na}")
        logger.error(f"High number of null records found: {num_na}", exc_info=True)
        raise ValueError("Null records present in json dataframe")
    else:
        df_json = df_json.drop_na()
        logger.info(f"Dropped {num_na} records from json dataframe due to null values")
        warnings.warn(f"Dropped {num_na} records from json dataframe due to null values")

df_json['Date'] = ddf.to_datetime(df_json['Date']).dt.date
df_json = file_path_to_id(df_json)
df_json = df_json.round(2)

for col in df_json.compute().columns:
    if col in ['Date','equity_id']:
        continue
    df_json = find_outlier(df_json,col)

## similar approach to df_csv for removing outliers from dataframe
df_json = overall_outlier(df_json)
print(f"Outliers found: {df_json['outlier'].sum()}")
logger.info(f"Outliers found: {df_json['outlier'].sum()}")

if(df_json['outlier'].sum() > (df_json.shape[0]/100)*2.5):
    logger.error("Number of outliers is more than 2.5 percent", exc_info=True)
    raise Exception("Too many outliers found.")


df_json.drop(columns=['is_outlier_Open', 'is_outlier_High', 'is_outlier_Low', 'is_outlier_Close', 'is_outlier_Volume'], inplace=True)
df_json = df_json[df_json['outlier']==False]
df_json.drop(columns=['outlier'], inplace=True)


logger.info("Successfully created single combined dataframe of all json files")
df_json.head()

Outliers found: 176


Unnamed: 0,Date,Open,High,Low,Close,Volume,equity_id
0,2018-03-01,303.57,304.88,295.42,296.1,18986889,6
989,2022-03-04,681.27,696.6,675.35,678.36,18697223,6
988,2022-03-03,707.5,711.78,683.69,688.47,23190595,6
987,2022-03-02,706.9,714.74,697.78,704.78,25252492,6
986,2022-02-28,712.77,734.26,705.33,732.24,20547432,6


In [31]:
## using dask.concat() to concatenate dataframes formed from csv and json files.

df = ddf.concat([df_csv, df_json])
logger.info("Succesfully created concatenated dataframe of all csv and json files")
df.to_parquet(path='D:\\Study\\DE\\Project\\data_destination', compression='snappy', partition_on=['equity_id'])
logger.info("OHLC Feed successfully saved to data_destination directory in parquet format.")
df.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume,equity_id
0,2018-03-01,10479.95,10525.5,10447.15,10458.35,176000,1
0,2018-03-01,376.22,382.84,373.33,375.08,5837005,3
0,2018-03-01,1073.45,1077.48,1061.95,1072.4,736217,4
0,2018-03-01,900.94,907.65,895.52,898.57,1954920,5
0,2018-03-01,845.28,854.31,841.9,844.08,4478295,2


In [32]:
## create table, user iterrows() to iterate through rows and insert data
cur.execute("""CREATE TABLE IF NOT EXISTS OHLC_Feed (entry_id SERIAL PRIMARY KEY,
    equity_id int,
    date date,
    open numeric,
    high numeric,
    low numeric,
    close numeric,
    volume int,
    FOREIGN KEY(equity_id) REFERENCES equity(equity_id) )""")
logger.info("Successfully created table named OHLC_Feed in database")

for index, row in df.iterrows():
    cur.execute("INSERT INTO ohlc_feed (equity_id, date, open, high, low, close, volume) values (%s, %s, %s, %s, %s, %s, %s)",\
    (row['equity_id'], row['Date'], row['Open'], row['High'], row['Low'], row['Close'], row['Volume']))
logger.info(f"Successfully inserted {df.shape[0]} rows to table named OHLC_Feed in database")

# Read daily updated entries from source, write new data to table

# Execute once daily, preferably after market is closed

In [33]:
## update db entries daily, add one last row from csv and json files
## using os.listdir() to get list of files inside a directory, ignore if not a file
## iterate through files, if ends with .csv then open and read lines
## for csv files, last row is read as string, use split to convert to list and extract datetime object from string
## for json files, last row is read as a dictionary, converting datetime object to date only
## get ticker_name by splitting file name, use it as key to dictionary
## use list indexing or dictionary keys and filename along with equity_id_name_dict to insert data

## data integrity check -> when files are opened in read mode, date is in string format, convert using strptime()
## data integrity check -> round off floats to last 2 digits
## data integrity check -> convert volume to integer

file_list = os.listdir(path)
file_list = [file for file in file_list if os.path.isfile(path+'\\'+file)]
file_list = sorted(file_list, key = lambda x: equity_id_name_dict[x.split('.')[0]])
for file_name in file_list:
    if file_name.endswith('.csv'):
        last_row = ""
        try:
            with open(path+"\\"+file_name, 'r') as file:
                last_row = (file.readlines()[-1])
                logger.info(f"Successfully read last row from file: {file_name}")
        except:
            logger.error(f"Could not read data from file:{file_name}", exc_info=True)
        last_row = last_row.strip('\n').split(',')
        last_row[0] = dt.datetime.strptime(last_row[0], "%Y-%m-%d")
            
        ticker_name = file_name.split('.')[0]

        last_row[1] = round(float(last_row[1]),2)
        last_row[2] = round(float(last_row[2]),2)
        last_row[3] = round(float(last_row[3]),2)
        last_row[4] = round(float(last_row[4]),2)
        last_row[5] = int(last_row[5])

        cur.execute("INSERT INTO ohlc_feed (equity_id, date, open, high, low, close, volume) values (%s, %s, %s, %s, %s, %s, %s)",\
        (equity_id_name_dict[ticker_name],last_row[0],last_row[1],last_row[2],last_row[3],last_row[4],last_row[5]))
        logger.info(f"Successfully inserted last row from file: {file_name} to ohlc_feed table in database")

        print(f"Data from file {file_name} is saved to table for ticker name {ticker_name}")
        print(last_row)
        print(r"#" * 100)
        print("\n")


    elif file_name.endswith('.json'):
        last_row = {}
        try:
            with open(path+"\\"+file_name, 'r') as file:
                last_row=json.load(file)[-1]
                logger.info(f"Successfully read last row from file: {file_name}")
        except:
            logger.error(f"Could not read data from file:{file_name}", exc_info=True)
        last_row['Date'] = dt.datetime.strptime(last_row['Date'], "%Y-%m-%d")

        ticker_name = file_name.split('.')[0]

        last_row['Open'] = round(float(last_row['Open']),2)
        last_row['High'] = round(float(last_row['High']),2)
        last_row['Low'] = round(float(last_row['Low']),2)
        last_row['Close'] = round(float(last_row['Close']),2)
        last_row['Volume'] = int(last_row['Volume'])

        cur.execute("INSERT INTO ohlc_feed (equity_id, date, open, high, low, close, volume) values (%s, %s, %s, %s, %s, %s, %s)",\
        (equity_id_name_dict[ticker_name],last_row['Date'],last_row['Open'],last_row['High'],last_row['Low'],last_row['Close'],last_row['Volume']))
        logger.info(f"Successfully inserted last row from file: {file_name} to ohlc_feed table in database")

        print(f"Data from file {file_name} is saved to table for ticker name {ticker_name}")
        print(last_row)
        print(r"#" * 100)
        print("\n")

Data from file NSEI.csv is saved to table for ticker name NSEI
[datetime.datetime(2024, 2, 22, 0, 0), 22020.3, 22252.5, 21875.25, 22217.45, 0]
####################################################################################################


Data from file RELIANCE.csv is saved to table for ticker name RELIANCE
[datetime.datetime(2024, 2, 22, 0, 0), 2936.3, 2969.9, 2916.0, 2963.5, 9246346]
####################################################################################################


Data from file BHARTIARTL.csv is saved to table for ticker name BHARTIARTL
[datetime.datetime(2024, 2, 22, 0, 0), 1137.6, 1138.75, 1097.65, 1135.55, 8642337]
####################################################################################################


Data from file ASIANPAINT.csv is saved to table for ticker name ASIANPAINT
[datetime.datetime(2024, 2, 22, 0, 0), 2983.0, 3027.2, 2932.1, 3017.4, 2615297]
####################################################################################

# Calculating indicators for analysis

In [34]:
## taking more amount of data as some calculations are performed on windows of date, hence more past data is required
start_date = (dt.date.today() - dt.timedelta(days=110)).strftime("%Y-%m-%d")

try:
    df_ind = pd.read_sql(f"select * from ohlc_feed where date > '{start_date}'", con=con, index_col="entry_id")
    logger.info(f"Successfully read {df_ind.shape[0]} rows from ohlc_feed table in database")
    print(f"Number of records read from database: {df_ind.shape[0]}")
except:
    logger.error("Could not query database", exc_info=True)
    raise Exception("Could not query database")

df_ind = df_ind.sort_values(["equity_id", "date"]).reset_index().drop(columns=['entry_id'])
df_ind['date'] = pd.to_datetime(df_ind['date']).dt.date

print("Calculations are performed on date range printed below.")
logger.info(f"Range of calculation is from {df_ind.groupby('equity_id')['date'].min().unique()} to {df_ind.groupby('equity_id')['date'].max().unique()}.")
print(df_ind.groupby('equity_id')['date'].min().unique())
print(df_ind.groupby('equity_id')['date'].max().unique())

Number of records read from database: 674
Calculations are performed on date range printed below.
[datetime.date(2023, 11, 6)]
[datetime.date(2024, 2, 22)]


In [35]:
df_calc = pd.DataFrame()
group = df_ind.groupby('equity_id')
for equity_id, df_eq in group:
    ## Simple moving average and standard deviation using pandas rolling function
    ## Exponential moving average using pandas ewm (equity weighted) function
    
    ## not using moving_avg_close as bb_mid column calculated by bollinger band function has the exact same value
    #df_eq['moving_avg_close'] = df_eq['close'].rolling(window=14).mean()
    df_eq['exp_moving_avg_close'] = df_eq['close'].ewm(span=14, ignore_na=True).mean()
    df_eq['moving_std_dev_close'] = df_eq['close'].rolling(window=14).std()
    
    ## momentum indicators
    df_eq['rsi_close'] = ta.RSI(df_eq['close'], timeperiod=14)
    df_eq['stochastic_osc_kline'],df_eq['stochastic_osc_dline'] = ta.STOCH(high=df_eq['high'], low=df_eq['low'], close=df_eq['close'], fastk_period=5, slowk_period=5, slowd_period=5)

    ## volatitlity indicators, ATR and bollinger bands
    df_eq['avg_true_range'] = ta.ATR(high=df_eq['high'], low=df_eq['low'], close=df_eq['close'], timeperiod=14)
    df_eq['bb_up'], df_eq['bb_mid'], df_eq['bb_low'] = ta.BBANDS(df_eq['close'], timeperiod=14)
    
    ## volume indicators
    df_eq['on_balance_volume'] = ta.OBV(df_eq['close'], df_eq['volume'])
    df_eq['money_flow_index'] = ta.MFI(high=df_eq['high'], low=df_eq['low'], close=df_eq['close'], volume=df_eq['volume'], timeperiod=14)

    ## drop columns which are already present in ohlc_feed
    df_eq.drop(columns=['open','high','low','volume'], inplace=True)
    df_eq = df_eq.round(2)
    df_eq = df_eq.dropna()

    ## daily log returns indicator
    df_eq['daily_log_return'] = np.log(df_eq['close']/df_eq['close'].shift(1))
    df_eq.drop(columns=['close'], inplace=True)
    df_eq = df_eq.dropna()
    

    print(f"For equity id: {equity_id}, Calculated {df_eq.shape[0]} number of rows")
    #print(df_eq.head())

    ## concatenate dataframes, to form final table for inserting in database
    df_calc=pd.concat([df_calc,df_eq])
    logger.info(f"Succesfully performed calculations on dataframe for equity_id: {equity_id}, Calculated {df_eq.shape[0]} number of rows and {df_eq.shape[1]} number of columns")

For equity id: 1, Calculated 4 number of rows
For equity id: 2, Calculated 60 number of rows
For equity id: 3, Calculated 60 number of rows
For equity id: 4, Calculated 60 number of rows
For equity id: 5, Calculated 50 number of rows
For equity id: 6, Calculated 60 number of rows
For equity id: 7, Calculated 59 number of rows
For equity id: 8, Calculated 51 number of rows
For equity id: 9, Calculated 60 number of rows
For equity id: 10, Calculated 60 number of rows


In [36]:
## check columns and number of rows in dataframe
print(f"Number NaN values in calculated dataframe: {df_calc.isna().sum().sum()}")
print(f"Number of rows in calculated dataframe: {df_calc.shape[0]}")
print(f"Number of columns in calculated dataframe: {df_calc.shape[1]}")

logger.info(f"Number NaN values in calculated dataframe: {df_calc.isna().sum().sum()}")
logger.info(f"Number of rows in calculated dataframe: {df_calc.shape[0]}")
logger.info(f"Number of columns in calculated dataframe: {df_calc.shape[1]}")

print(r"#" * 100)
print("Calculations are available for date range printed below.")
print(f"Starting from : {df_calc.groupby('equity_id')['date'].min().unique().astype(str)}")
print(f"Starting from : {df_calc.groupby('equity_id')['date'].max().unique().astype(str)}")
print(r"#" * 100)

logger.info("Calculations are available for date range printed below.")
logger.info(f"Starting from : {df_calc.groupby('equity_id')['date'].min().unique().astype(str)}")
logger.info(f"Starting from : {df_calc.groupby('equity_id')['date'].max().unique().astype(str)}")

df_calc.to_parquet(path="D:\\Study\\DE\\Project\\data_destination\\short_term_analysis.parquet", compression='snappy')

#print(df_calc)

Number NaN values in calculated dataframe: 0
Number of rows in calculated dataframe: 524
Number of columns in calculated dataframe: 14
####################################################################################################
Calculations are available for date range printed below.
Starting from : ['2023-11-29']
Starting from : ['2024-02-22']
####################################################################################################


# Insert calculated data into a new table in database

In [37]:
try:
    cur.execute("""CREATE TABLE IF NOT EXISTS short_term_analysis (entry_id SERIAL PRIMARY KEY, equity_id int, date date, exp_moving_avg_close numeric, moving_std_dev_close numeric,
    rsi_close numeric, stochastic_osc_kline numeric, stochastic_osc_dline numeric, avg_true_range numeric, bb_up numeric, bb_mid numeric, bb_low numeric,
    on_balance_volume numeric, money_flow_index numeric, daily_log_return numeric, FOREIGN KEY(equity_id) REFERENCES equity(equity_id))""")
    logger.info("Successfully created table named short_term_analysis")
except:
    logger.error("Could not create table short_term_analysis in database", exc_info=True)
    raise Exception("Could not create table short_term_analysis")

for index, row in df_calc.iterrows():
    try:
        cur.execute("""INSERT INTO short_term_analysis (equity_id, date, exp_moving_avg_close, moving_std_dev_close,
        rsi_close, stochastic_osc_kline, stochastic_osc_dline, avg_true_range, 
        bb_up, bb_mid, bb_low, on_balance_volume, money_flow_index, daily_log_return)
        values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",\
        (row['equity_id'], row['date'], row['exp_moving_avg_close'], row['moving_std_dev_close'],\
        row['rsi_close'], row['stochastic_osc_kline'], row['stochastic_osc_dline'], row['avg_true_range'],\
        row['bb_up'], row['bb_mid'],row['bb_low'], row['on_balance_volume'], row['money_flow_index'], row['daily_log_return']))
    except:
        logger.error("Could not insert data into table short_term_analysis", exc_info=True)
        raise Exception("Could not insert data")

In [38]:
cur.close()
con.close()