In [1]:
import pandas as pd
import numpy as np
import os
import quantstats as qs
import json
import pyodbc
from datetime import datetime, timedelta, date

In [2]:
class SQL:
    def __init__(self, driver, server, database):
        self.driver = driver
        self.server = server
        self.database = database

    def append_table(self, table_name, dataframe):
        try:
            cxn = pyodbc.connect(
                "DRIVER=" + self.driver + ";"
                "SERVER=" + self.server + ";"
                "DATABASE=" + self.database + ";"
                "TRUSTED_CONNECTION=yes;"
            )

            cursor = cxn.cursor()
            columns_query = f"SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}' ORDER BY COLUMN_NAME"
            cursor.execute(columns_query)
            columns_info = cursor.fetchall()

            table_columns = [column[0] for column in columns_info]
            common_columns = [col for col in dataframe.columns if col in table_columns]
            column_list = ", ".join("[" + column[0] + "]" for column in columns_info)

            placeholders = ", ".join("?" for _ in common_columns)
            query = f"INSERT INTO {table_name} ({column_list}) VALUES ({placeholders})"
            dataframe_subset = dataframe[common_columns]

            prepared_data = []
            for row in dataframe_subset.itertuples(index=False):
                prepared_row = []
                
                for i, value in enumerate(row):
                    column_name = common_columns[i]
                    data_type = next((column[1] for column in columns_info if column[0] == column_name), None)
                    
                    if isinstance(value, pd.Timestamp):
                        value = value.to_pydatetime()
                    
                    elif pd.isna(value):
                        value = None
                    
                    elif data_type == "decimal":
                        value = decimal.Decimal(str(value))
                    
                    elif data_type == "float":
                        value = float(value)
                    
                    elif data_type == "int":
                        value = int(value)
                        
                    else:
                        pass
                    
                    prepared_row.append(value)
                
                prepared_data.append(prepared_row)

            if not prepared_data:
                print("Data to be inserted into StockHistory is empty!")
            
            else:
                pass

            cursor.executemany(query, prepared_data)
            cursor.commit()
            cursor.close()
            cxn.close()
            
        except Exception as e:
            print(f"An error occurred while appending to the table StockHistory: {e}")
            
class StockDataDump:
    def __init__(self, config_file_path):
        """
        Initialize StockDataDump object.

        Parameters:
        - config_file_path (str): Path to the configuration JSON file.
        """
        # Read JSON file
        self.config_file_path = config_file_path
        with open(config_file_path, encoding="utf-8") as f:
            self.config = json.load(f)
        
        # Initialize an empty DataFrame to store data
        self.all_data = pd.DataFrame()
        
    def read_data(self):
        # Initialize an empty list to store DataFrames
        data_frames = []
        
        # Iterate over each stock symbol
        for file in self.config["all_files"]:
            # Read data from CSV file
            data = pd.read_csv(file)
            
            # Extract asset name from file name
            asset_name = os.path.basename(file).split(".csv")[0]
            
            # Add the symbol column
            if asset_name in ['SPY', 'MTUM', 'IWN', 'EFA', 'EEM', 'XHB', 'XLB', 'XLE', 'XLY', 'XLK', 'XLV', 'XLI', 'XLU', 'XLP', 'XLF', 'XLC', 'XLRE']:
                asset_category = "ETF"
            elif asset_name in ['DBC', 'GLD']:
                asset_category = "Gold"
            elif asset_name in ['BIL', 'IEF', 'BWX', 'LQD', 'TLT']:
                asset_category = "Treasury"
            elif asset_name == 'VNQ':
                asset_category = "REIT"
            else:
                asset_category = "Stock"
            
            # Add asset name, category and date columns
            data["asset_category"] = asset_category
            
            if "Stock" in file:
                data["asset_name"] = data["Symbol"]
            else:
                data["asset_name"] = asset_name
                
            
            # Append DataFrame to the list
            data_frames.append(data)
        
        # Concatenate data frames without resetting index
        self.all_data = pd.concat(data_frames, ignore_index=True)
        
        # Rename columns and drop unnecessary columns
        self.all_data.rename(
            columns={
                "Open": "open_price",
                "High": "high_price",
                "Low": "low_price",
                "Close": "close_price",
                "Volume": "volume",
                "Date": "current_date"
            },
            inplace=True
        )
        
        self.all_data.drop(["Symbol", "Adj Close"], axis=1, inplace=True)
        
        # Filter data to include only the past year's data
        self.all_data["current_date"] = pd.to_datetime(self.all_data["current_date"], format='%Y-%m-%d %H:%M:%S').dt.date
        one_year_ago = date.today() - pd.DateOffset(years=1)
        self.all_data = self.all_data[self.all_data["current_date"] >= one_year_ago]
        
        return

    def calculate_metrics(self):
        self.read_data()
        
        # Group data by asset_name
        grouped_data = self.all_data.groupby('asset_name')
        
        # Calculate percentage returns for different periods
        self.all_data['percentage_1_d_returns'] = grouped_data['close_price'].pct_change(periods=1) * 100
        self.all_data['percentage_1_m_returns'] = grouped_data['close_price'].pct_change(periods=20) * 100
        self.all_data['percentage_3_m_returns'] = grouped_data['close_price'].pct_change(periods=60) * 100
        self.all_data['percentage_1_y_returns'] = grouped_data['close_price'].pct_change(periods=252) * 100
        
        # Calculate percentage volatility for different periods
        self.all_data['percentage_1_m_volatility'] = grouped_data['close_price'].pct_change(periods=20).rolling(window=20).std() * np.sqrt(252) * 100
        self.all_data['percentage_3_m_volatility'] = grouped_data['close_price'].pct_change(periods=60).rolling(window=60).std() * np.sqrt(252) * 100
        self.all_data['percentage_1_y_volatility'] = grouped_data['close_price'].pct_change(periods=252).rolling(window=252).std() * np.sqrt(252) * 100
        
        # Calculate ratios
        grouped_data = self.all_data.groupby("asset_name")
   
        self.all_data["ratio_sharpe"] = self.all_data["asset_name"].map(grouped_data.apply(lambda x: qs.stats.sharpe(x["close_price"])))
        self.all_data["ratio_sortino"] = self.all_data["asset_name"].map(grouped_data.apply(lambda x: qs.stats.sortino(x["close_price"])))
        self.all_data["ratio_win_loss"] = self.all_data["asset_name"].map(grouped_data.apply(lambda x: qs.stats.win_loss_ratio(x["close_price"])))
        self.all_data["ratio_drawdown"] = self.all_data["asset_name"].map(grouped_data.apply(lambda x: qs.stats.max_drawdown(x["close_price"])))
        
        self.all_data = self.all_data[sorted(self.all_data.columns)]
        self.all_data.reset_index(drop=True, inplace=True)
        
        return
    
    def dump_historical_data(self):
        self.calculate_metrics()
        
        sql = SQL(
            self.config["driver"],
            self.config["server"],
            self.config["database"]
        )
        
        sql.append_table("end_of_day_asset_details", self.all_data)
        print("Historical EOD data dumped successully!")
        
        return
    
config_file_path = r"C:\Users\DELL\Desktop\Projects\Codeshastra X\Config File\Data_Dumping_Configuration.JSON"
processor = StockDataDump(config_file_path)
all_data_with_metrics = processor.dump_historical_data()

  self.all_data = self.all_data[self.all_data["current_date"] >= one_year_ago]
  res = returns.mean() / divisor
  res = returns.mean() / downside
  res = returns.mean() / downside
