In [None]:
import os
from typing import Any

import numpy as np
import pandas as pd
import tqdm

import algotrading_v40.structures.instrument_desc as sid
import algotrading_v40.utils.data_nav as udn
import algotrading_v40.utils.zerodha_data_cleaning as uzdc

data_dir = "/Users/chirayuagrawal/algotrading_v40/data/raw/indian_market"
parquet_files = []

for root, dirs, files in os.walk(data_dir):
  for file in files:
    if file.endswith(".parquet"):
      parquet_files.append(os.path.join(root, file))

raw_pqs = tuple(sorted(parquet_files))

In [None]:
def process(df: pd.DataFrame) -> tuple[pd.DataFrame, dict[str, Any]]:
  df = df.loc[df["date"].dt.year >= 2016]
  fix_unusual_bars_result = uzdc.fix_unusual_bars(df)
  df = fix_unusual_bars_result.df
  df = uzdc.set_index_to_bar_close_timestamp(df)
  drop_non_standard_indian_trading_hours_result = (
    uzdc.drop_non_standard_indian_trading_hours(df)
  )
  df = drop_non_standard_indian_trading_hours_result.df
  fix_high_low_values_result = uzdc.fix_high_low_values(df)
  df = fix_high_low_values_result.df
  count_bars_per_trading_day_result = uzdc.count_bars_per_trading_day(df)
  quality_result = uzdc.analyse_numeric_columns_quality(df)
  for col in ["open", "high", "low", "close", "volume"]:
    if quality_result[col].n_bad_values > 0:
      raise ValueError(f"Column {col} has bad values")
    if quality_result[col].n_negatives > 0:
      raise ValueError(f"Column {col} has negatives")
    if col != "volume" and quality_result[col].n_zeros > 0:  # volume can have zeros
      raise ValueError(f"Column {col} has zeros")

  df_post_2022 = df.loc[df.index.year >= 2022].copy()
  quality_result_post_2022 = uzdc.analyse_numeric_columns_quality(df_post_2022)

  return df, {
    "n_unusual_bars_dropped": fix_unusual_bars_result.n_dropped,
    "n_unusual_bars_fixed": fix_unusual_bars_result.n_date_fixed,
    "n_non_standard_indian_trading_hours_dropped": drop_non_standard_indian_trading_hours_result.n_dropped,
    "n_high_fixed": fix_high_low_values_result.n_high_fixed,
    "n_low_fixed": fix_high_low_values_result.n_low_fixed,
    "percentage_dates_with_less_than_375_bars": np.round(
      100 * count_bars_per_trading_day_result.fraction_dates_with_less_than_375_bars, 3
    ),
    "n_dates": count_bars_per_trading_day_result.n_dates,
    "n_dates_with_less_than_375_bars": count_bars_per_trading_day_result.n_dates_with_less_than_375_bars,
    "percentage_zero_volume_bars": np.round(
      100 * quality_result["volume"].n_zeros / len(df), 3
    ),
    "percentage_zero_volume_bars_post_2022": np.round(
      100 * quality_result_post_2022["volume"].n_zeros / len(df_post_2022), 3
    ),
    "n_zero_volume_bars": quality_result["volume"].n_zeros,
    "n_zero_volume_bars_post_2022": quality_result_post_2022["volume"].n_zeros,
    "n_bars": len(df),
    "first_date": df.index.min().date(),
    "last_date": df.index.max().date(),
  }

In [None]:
instrument_desc_to_row = {}
instrument_desc_to_df = {}
for raw_pq in tqdm.tqdm(raw_pqs):
  instrument_desc = udn.get_instrument_desc_from_path(raw_pq)
  df = pd.read_parquet(raw_pq)
  dfp, row = process(df)
  del df
  path = udn.get_cleaned_path_from_instrument_desc(instrument_desc)
  os.makedirs(os.path.dirname(path), exist_ok=True)
  dfp.to_parquet(path)
  if isinstance(instrument_desc, sid.EquityDesc):
    instrument_desc_to_df[instrument_desc] = dfp
  else:
    print(
      "Excluded",
      instrument_desc,
      "from missing trading bars analysis since it is not an equity which will be traded",
    )
  row["symbol"] = instrument_desc.symbol
  row["type"] = "equity" if isinstance(instrument_desc, sid.EquityDesc) else "index"
  instrument_desc_to_row[instrument_desc] = row

count_missing_trading_sessions_result = uzdc.count_missing_trading_sessions(
  instrument_desc_to_df
)
count_missing_trading_bars_result = uzdc.count_missing_trading_bars(
  instrument_desc_to_df
)
instrument_desc_to_df_post_2022 = {
  id: df.loc[df.index.year >= 2022].copy() for id, df in instrument_desc_to_df.items()
}
count_missing_trading_bars_result_post_2022 = uzdc.count_missing_trading_bars(
  instrument_desc_to_df_post_2022
)
del instrument_desc_to_df
del instrument_desc_to_df_post_2022
for id, row in instrument_desc_to_row.items():
  if isinstance(id, sid.EquityDesc):
    row["n_missing_trading_sessions_for_equity"] = (
      count_missing_trading_sessions_result.instrument_desc_to_n_missing_sessions[id]
    )
    row["n_missing_trading_bars_for_equity"] = (
      count_missing_trading_bars_result.instrument_desc_to_n_missing_bars[id]
    )
    row["n_missing_trading_bars_for_equity_post_2022"] = (
      count_missing_trading_bars_result_post_2022.instrument_desc_to_n_missing_bars[id]
    )
rows = list(instrument_desc_to_row.values())
df_inventory_cleaned = pd.DataFrame(rows)
df_inventory_cleaned

In [None]:
df_inventory_raw = pd.read_csv(
  "/Users/chirayuagrawal/algotrading_v40/data/raw/indian_market/inventory.csv",
  index_col=0,
)
raw_sym_type = sorted(
  [tuple(row) for row in df_inventory_raw[["symbol", "type"]].values.tolist()]
)
cleaned_sym_type = sorted(
  [tuple(row) for row in df_inventory_cleaned[["symbol", "type"]].values.tolist()]
)

if raw_sym_type != cleaned_sym_type:
  raise ValueError("raw_sym_type != cleaned_sym_type")

if len(set(raw_sym_type)) != len(raw_sym_type):
  raise ValueError("raw_sym_type has duplicates")
# df_inventory_raw and df_inventory_cleaned can be safely merged on symbol and type

In [None]:
dfm = (
  df_inventory_raw[["company_name", "industry", "symbol", "listing_date", "type"]]
  .merge(
    df_inventory_cleaned,
    on=["symbol", "type"],
    how="left",
  )
  .sort_values(by=["type", "industry", "symbol"])
  .reset_index(drop=True)
)
dfm

In [None]:
dfm = dfm[
  [
    "company_name",
    "industry",
    "symbol",
    "listing_date",
    "type",
    "first_date",
    "last_date",
    "n_unusual_bars_dropped",
    "n_unusual_bars_fixed",
    "n_non_standard_indian_trading_hours_dropped",
    "n_high_fixed",
    "n_low_fixed",
    "percentage_dates_with_less_than_375_bars",
    "n_dates",
    "n_dates_with_less_than_375_bars",
    "percentage_zero_volume_bars_post_2022",
    "percentage_zero_volume_bars",
    "n_zero_volume_bars_post_2022",
    "n_zero_volume_bars",
    "n_bars",
    "n_missing_trading_sessions_for_equity",
    "n_missing_trading_bars_for_equity_post_2022",
    "n_missing_trading_bars_for_equity",
  ]
]
dfm

In [None]:
dfm.to_csv(
  "/Users/chirayuagrawal/algotrading_v40/data/cleaned/indian_market/inventory.csv",
  index=False,
)

In [None]:
count_missing_trading_bars_result_post_2022