In [1]:
import os
import pandas as pd
from pathlib import Path
from datetime import datetime
from sklearn.impute import KNNImputer
import pandas as pd
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
import seaborn
import sys 
from typing import List, Optional, Callable, Dict, Union
from collections import defaultdict
import calendar
from joblib import Parallel, delayed

# This code was developed with the assistance of multiple AI agents (e.g., Perplexity, August 2025).


In [2]:
%pwd

'C:\\Users\\mangl\\Desktop\\capstone\\pair_selection'

In [3]:
cols_map = {
  "<ticker>": "ticker",
  "<date>": "date",
  "<time>": "time",
  "<open>": "open_price",
  "<high>": "high_price",
  "<low>": "low_price",
  "<close>": "close_price",
  "<volume>": "volume",
  "<o/i>": "open_interest"
}

In [4]:
DATA_DIR = Path(f"../downloaded_files/Cash Data January 2021")

In [5]:
pd.read_csv(DATA_DIR/'21STCENMGM.csv')

Unnamed: 0,<ticker>,<date>,<time>,<open>,<high>,<low>,<close>,<volume>,<o/i>
0,21STCENMGM,01/01/2021,09:45:00,10.65,10.65,10.65,10.65,11,0
1,21STCENMGM,01/01/2021,09:57:00,10.65,10.65,10.65,10.65,35,0
2,21STCENMGM,01/01/2021,09:59:00,10.65,10.65,10.65,10.65,90,0
3,21STCENMGM,01/01/2021,10:10:00,10.65,10.65,10.65,10.65,10,0
4,21STCENMGM,01/01/2021,10:13:00,10.65,10.65,10.65,10.65,139,0
...,...,...,...,...,...,...,...,...,...
521,21STCENMGM,01/29/2021,13:27:00,11.10,11.10,11.10,11.10,299,0
522,21STCENMGM,01/29/2021,14:47:00,11.15,11.15,11.15,11.15,3,0
523,21STCENMGM,01/29/2021,15:26:00,11.15,11.15,11.15,11.15,20,0
524,21STCENMGM,01/29/2021,15:28:00,11.15,11.15,11.15,11.15,8,0


In [6]:
tickers = [i.rsplit(".", maxsplit=1)[0] for i in os.listdir(DATA_DIR)]
tickers[:10] # few available tickers

['.CNX100',
 '.CNXIT',
 '.NSEBANK',
 '.NSEI',
 '20MICRONS',
 '21STCENMGM',
 '3IINFOTECH',
 '3MINDIA',
 '3PLAND',
 '5PAISA']

In [7]:

def get_stock_data(
    tickers,
    start=None,
    end=None,
    agg_func=None,
    resample_freq=None,
    columns=None,
    impute=True,
    DATA_DIR=None
):
    """
    Load and process intraday stock data from CSV files.
    
    Parameters
    ----------
    tickers : List[str]
        List of stock ticker symbols (without file extensions).
    start : str or pd.Timestamp, optional
        Start datetime for filtering the data.
    end : str or pd.Timestamp, optional
        End datetime for filtering the data.
    agg_func : str or callable, optional
        Aggregation function to apply during resampling (e.g., 'mean', 'ohlc').
        Required if `resample_freq` is provided.
    resample_freq : str, optional
        Resample frequency (e.g., '5min', '15min').
    columns : List[str], optional
        List of columns to retain from the original data.
    impute : bool, default True
        Whether to impute missing data using expanding median.
    DATA_DIR : str or Path, optional
        Directory containing the CSV files.
        
    Returns
    -------
    Dict[str, pd.DataFrame]
        Dictionary mapping each ticker to its corresponding processed DataFrame.
    """
    data_dict = {}
    start = pd.to_datetime(start) if start is not None else None
    end = pd.to_datetime(end) if end is not None else None
    
    # Handle columns parameter - if None, use all available columns
    if columns is None:
        columns = list(cols_map.keys())
    
    # Map requested columns to CSV column names
    csv_cols = [cols_map.get(col, col) for col in columns if col in cols_map]
    
    for ticker in tickers:
        # Read CSV with specified columns
        df = pd.read_csv(Path(DATA_DIR) / f"{ticker}.csv", usecols=csv_cols)
        
        # Rename columns from CSV format to standard format
        rename_dict = {v: k for k, v in cols_map.items() if k in columns and v in df.columns}
        df = df.rename(columns=rename_dict)
        
        # Create datetime column
        df["datetime"] = pd.to_datetime(df["date"] + " " + df["time"], format="%m/%d/%Y %H:%M:%S")
        df = df.sort_values(by="datetime")
        
        # Clean up columns
        df = df.drop(columns=["ticker", "date", "time"], errors="ignore")
        df = df.set_index("datetime")
        df = df.sort_index()
            
        # Force imputation if resampling
        if resample_freq:
            impute = True
            
        # Impute missing data
        if impute:    
            full_index = pd.date_range(start=df.index.min(), end=df.index.max(), freq="1min")
            df = df.reindex(full_index)
            for col in df.columns:
                df[col] = df[col].fillna(df[col].expanding().median())
        
        # Filter by date range
        if start is not None:
            df = df[df.index >= start]
        if end is not None:
            df = df[df.index <= end]
        
        # Resample if requested
        if resample_freq:
            if agg_func is None:
                raise ValueError("agg_func must be provided when resampling.")
            df = df.resample(resample_freq).agg(agg_func)
            
        data_dict[ticker] = df
        
    return data_dict


In [8]:
!pip install duckdb pyarrow polars




[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [9]:
import duckdb
import pyarrow
import polars as pl


In [10]:
volume_data = duckdb.sql("""
    SELECT 
        "<ticker>" AS ticker,
        "<volume>" AS volume
    FROM read_csv('../downloaded_files/Cash Data * 2021/*.csv')
""") 

print(f"✅ Loaded {len(volume_data)} volume records")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

✅ Loaded 104251560 volume records


In [None]:
break

In [None]:

# Global set to store illiquid tickers
ILLIQUID_TICKERS = set()

def get_stock_data(tickers, impute=False, DATA_DIR=None):
    """
    Mock function to retrieve stock data for a list of tickers.
    Replace with actual implementation.
    Returns a dictionary of {ticker: DataFrame}.
    """
    result = {}
    for ticker in tickers:
        file_path = Path(DATA_DIR) / f"{ticker}.csv"
        if file_path.exists():
            df = pd.read_csv(file_path)
            result[ticker] = df
    return result

def identify_illiquid_tickers(ticker_dfs, volume_threshold=1000, 
                            zero_volume_days_threshold=0.15, min_nonzero_volume_threshold=100):
    illiquid = set()
    for ticker, df in ticker_dfs.items():
        if 'volume' not in df.columns:
            illiquid.add(ticker)
            continue
        avg_volume = df['volume'].mean()
        fraction_zero_days = (df['volume'] == 0).sum() / len(df) if len(df) > 0 else 1
        nonzero_days = df[df['volume'] > 0]
        avg_nonzero_volume = nonzero_days['volume'].mean() if len(nonzero_days) > 0 else 0
        if (avg_volume < volume_threshold or 
            fraction_zero_days > zero_volume_days_threshold or 
            avg_nonzero_volume < min_nonzero_volume_threshold):
            illiquid.add(ticker)
    return illiquid

def process_stock_data(columns, data_dir_base="../downloaded_files", year=2021,
                      volume_threshold=1000, zero_volume_days_threshold=0.15,
                      min_nonzero_volume_threshold=100):
    months = list(calendar.month_name)[1:]  # Skip empty string at index 0

    def process_ticker(ticker, data_dir):
        if ticker in ILLIQUID_TICKERS:
            return None
        try:
            df = get_stock_data([ticker], impute=False, DATA_DIR=data_dir)[ticker]
            return ticker, df
        except KeyError:
            return None

    def process_month(month):
        DATA_DIR = Path(f"{data_dir_base}/Cash Data {month} {year}")
        if not DATA_DIR.exists():
            return {}
        tickers = [f.rsplit('.', 1)[0] for f in os.listdir(DATA_DIR) if f.endswith('.csv')]
        tickers = list(set(tickers) - ILLIQUID_TICKERS)
        results = Parallel(n_jobs=-1)(
            delayed(process_ticker)(ticker, DATA_DIR) for ticker in tickers
        )
        return {ticker: df for result in results if result is not None for ticker, df in [result]}

    # Parallelize months
    month_dicts = Parallel(n_jobs=-1)(
        delayed(process_month)(month) for month in months
    )

    # Merge per-ticker
    dfs = defaultdict(list)
    for month_dict in month_dicts:
        for ticker, df in month_dict.items():
            dfs[ticker].append(df)

    ticker_dfs = {ticker: pd.concat(df_list, ignore_index=True)
                  for ticker, df_list in dfs.items()}

    # Identify illiquid tickers
    new_illiquid = identify_illiquid_tickers(
        ticker_dfs, volume_threshold, zero_volume_days_threshold, min_nonzero_volume_threshold
    )
    ILLIQUID_TICKERS.update(new_illiquid)

    # Return only liquid tickers
    liquid_dfs = {ticker: df for ticker, df in ticker_dfs.items() if ticker not in ILLIQUID_TICKERS}
    return liquid_dfs

# Example usage
columns = ["volume"]
result = process_stock_data(
    columns,
    volume_threshold=1000,
    zero_volume_days_threshold=0.15,
    min_nonzero_volume_threshold=100
)
print(f"Processed {len(result)} liquid tickers")
print(f"Illiquid tickers: {ILLIQUID_TICKERS}")


In [None]:
len(dfs)

In [None]:
zero_volume_tickers = {ticker  for ticker, df in dfs.items() if df.volume.sum() == 0}
len(dfs)

In [None]:
dfs = {ticker: df for ticker, df in dfs.items() if ticker not in ticker zero_volume_tickers}


In [None]:
# dfs = get_stock_data(
#     tickers,
#     impute=False
# )

Mean trading volume for all the stocks for the month

In [None]:
overall_mean_monthly_traded_volume = sum(df.volume.mean() for _, df in dfs.items())/len(dfs)

 Here we find the stocks whichh have high trading mean trading volumes for atleast half the month

In [None]:
import matplotlib.pyplot as plt

# Calculate high volume day count for each stock
high_volume_day_counts = {
    s: (df.groupby(df.index.date)['volume'].mean() > overall_mean_monthly_traded_volume).sum().item()
    for s, df in dfs.items()
}

# Histogram 
plt.figure(figsize=(10, 6))
plt.hist(high_volume_day_counts.values(), bins=10, edgecolor='black')
plt.xlabel('Number of High-Volume Days')
plt.ylabel('Number of Stocks')
plt.title('Distribution of High-Volume Days Across Stocks')
plt.grid(True)
plt.show()


We observe that for more than half of January, there was sufficient liquidity in the market for some of the stocks. A huge number of stocks have been illiquid 

In [None]:
liquid_stocks = [ s for s, v in high_volume_day_counts.items() if v > 15]

In [None]:
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.tools import add_constant

def calculate_half_life(price_series):
    """
    Calculates the half-life of mean reversion for a price series using linear regression.
    
    Reference:
        Chan, E. P. (2013). Algorithmic Trading: Winning Strategies and Their Rationale. 
        John Wiley & Sons.
    """
    delta_p = price_series.diff().dropna()
    p_lag = price_series.shift(1).dropna()
    
    # Align indices explicitly - keep only indices that appear in both
    common_idx = delta_p.index.intersection(p_lag.index)
    delta_p = delta_p.loc[common_idx]
    p_lag = p_lag.loc[common_idx]
    
    model = OLS(delta_p.values, add_constant(p_lag.values))
    res = model.fit()
    beta = res.params[1]
    halflife = -np.log(2) / beta if beta < 0 else np.inf
    return halflife

def hurst_exponent(ts):
    lags = range(2, 100)
    tau = [np.sqrt(np.std(np.subtract(ts[lag:], ts[:-lag]))) for lag in lags]
    poly = np.polyfit(np.log(lags), np.log(tau), 1)
    return poly[0] * 2.0

def mean_reversion_vector(price_series):
    price_series = price_series.dropna()
    
    adf_stat = adfuller(price_series)[0]
    half_life = calculate_half_life(price_series)
    hurst = hurst_exponent(price_series.values)
    
    returns = price_series.pct_change().dropna()
    vol = returns.std()
    
    return np.array([adf_stat, half_life, hurst, vol])

# Now your embedding extraction code can stay the same:
embedding_list = []
valid_stocks = []

# for ticker in liquid_stocks:
#     df = dfs[ticker]
#     price_series = df['close']
    
#     try:
#         vec = mean_reversion_vector(price_series)
#         if np.all(np.isfinite(vec)):
#             embedding_list.append(vec)
#             valid_stocks.append(ticker)
#     except Exception as e:
#         print(f"Failed for {ticker}: {e}")

# embeddings = np.vstack(embedding_list)


In [None]:
from joblib import Parallel, delayed
from sklearn.preprocessing import StandardScaler

def process_ticker(ticker, price_series):
    """Compute mean-reversion vector for one ticker"""
    try:
        vec = mean_reversion_vector(price_series)
        if np.all(np.isfinite(vec)):
            return ticker, vec
    except Exception as e:
        print(f"Failed for {ticker}: {e}")
    return None

# Prepare inputs first (avoid passing big dfs dict to workers)
inputs = [(ticker, dfs[ticker]['close']) for ticker in liquid_stocks]

results = Parallel(n_jobs=8, backend="loky")(  # try 8 workers first
    delayed(process_ticker)(ticker, price_series) 
    for ticker, price_series in inputs
)

valid_results = [r for r in results if r is not None]
valid_stocks, embedding_list = zip(*valid_results)
embeddings = np.vstack(embedding_list)

scaler = StandardScaler()
embeddings_scaled = scaler.fit_transform(embeddings)


In [None]:
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import AgglomerativeClustering

# Cosine similarity matrix
cos_sim = cosine_similarity(embeddings)

# Cluster 
cluster_model = AgglomerativeClustering(n_clusters=20, linkage='average')

distances = 1 - cos_sim

labels = cluster_model.fit_predict(distances)

# Now labels[i] gives cluster of valid_stocks[i]


In [None]:
! pip install umap-learn


In [None]:

import umap.umap_ as umap 

reducer = umap.plot.interactive(min_dist =0.002, n_components=6, n_neighbors=30)
embeddings_umap = reducer.fit_transform(embeddings)

plt.figure(figsize=(10, 7))
scatter = plt.scatter(embeddings_umap[:, 0], embeddings_umap[:, 1], c=labels, cmap='tab10', s=50, alpha=0.8)
plt.title("Stocks clustered by mean-reversion indicators (UMAP 2D projection)")
plt.xlabel("UMAP Dimension 1")
plt.ylabel("UMAP Dimension 2")
plt.colorbar(scatter, label='Cluster label')
plt.grid(True)
plt.show()


In [None]:
from collections import defaultdict

clustered_tickers = defaultdict(list)
for ticker, label in zip(tickers, labels):
    clustered_tickers[label].append(ticker)

for label in sorted(clustered_tickers):
    print(f"\nCluster {label} ({len(clustered_tickers[label])} tickers):")
    print(", ".join(clustered_tickers[label]))


In [None]:
from itertools import combinations
from statsmodels.tsa.stattools import coint
import pandas as pd

def evaluate_pair(label, ticker1, ticker2):
    try:
        s1 = dfs[ticker1]['close']
        s2 = dfs[ticker2]['close']
        joined = pd.concat([s1, s2], axis=1, join='inner').dropna()
        if len(joined) < 50:
            return None
        s1_aligned = joined.iloc[:, 0]
        s2_aligned = joined.iloc[:, 1]
        score, pvalue, _ = coint(s1_aligned, s2_aligned)
        return {
            'cluster': label,
            'pair': (ticker1, ticker2),
            'cointegration_pvalue': pvalue
        }
    except Exception as e:
        return None

all_pairs = []
for label, ticker_list in clustered_tickers.items():
    if 2 <= len(ticker_list) <= 10:
        for ticker1, ticker2 in combinations(ticker_list, 2):
            all_pairs.append((label, ticker1, ticker2))
pair_results = []
for label, ticker1, ticker2 in all_pairs:
    result = evaluate_pair(label, ticker1, ticker2)
    if result is not None:
        pair_results.append(result)


In [None]:
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.tools import add_constant
from statsmodels.tsa.stattools import adfuller

for result in pair_results:
    t1, t2 = result['pair']
    s1 = dfs[t1]['close']
    s2 = dfs[t2]['close']
    joined = pd.concat([s1, s2], axis=1, join='inner').dropna()
    y = joined.iloc[:,0].values
    x = joined.iloc[:,1].values
    beta = OLS(y, add_constant(x)).fit().params[1]
    spread = y - beta * x
    adf_stat, adf_pvalue, *_ = adfuller(spread)
    result['spread_adf_stat'] = adf_stat
    result['spread_adf_pvalue'] = adf_pvalue
    result['spread_std'] = np.std(spread)


In [None]:
good_pairs = [
    r for r in pair_results
    if r['cointegration_pvalue'] < 0.05 and r['spread_adf_pvalue'] < 0.05
]
#  sort by (1) p-value, (2) spread_std, (3) cluster size
good_pairs = sorted(good_pairs, key=lambda r: (r['cointegration_pvalue'], -r['spread_std']))


In [None]:
import pandas as pd
df_report = pd.DataFrame(good_pairs)
print(df_report[['cluster', 'pair', 'cointegration_pvalue', 'spread_adf_pvalue', 'spread_std']])


In [None]:
pair = good_pairs[0]['pair']
s1 = dfs[pair[0]]['close']
s2 = dfs[pair[1]]['close']
joined = pd.concat([s1, s2], axis=1, join='inner').dropna()
y = joined.iloc[:,0].values
x = joined.iloc[:,1].values
beta = OLS(y, add_constant(x)).fit().params[1]
spread = y - beta * x

plt.figure(figsize=(12,5))
plt.plot(joined.index, spread)
plt.title(f"Spread between {pair[0]} and {pair[1]}")
plt.xlabel("Time")
plt.ylabel("Spread")
plt.show()
