In [None]:
nse_futures_tickers = [
    "ACC.NS", "ADANIENT.NS", "ADANIPORTS.NS", "AMBUJACEM.NS", "APOLLOHOSP.NS",
    "ASHOKLEY.NS", "ASIANPAINT.NS", "AUROPHARMA.NS", "AXISBANK.NS", "BAJAJ-AUTO.NS",
    "BAJAJFINSV.NS", "BAJFINANCE.NS", "BALKRISIND.NS", "BALRAMCHIN.NS", "BANDHANBNK.NS",
    "BANKBARODA.NS", "BEL.NS", "BERGEPAINT.NS", "BHARATFORG.NS", "BHARTIARTL.NS",
    "BHEL.NS", "BIOCON.NS", "BOSCHLTD.NS", "BPCL.NS", "BRITANNIA.NS",
    "CHOLAFIN.NS", "CIPLA.NS", "COALINDIA.NS", "COFORGE.NS", "COLPAL.NS",
    "CONCOR.NS", "CUB.NS", "CUMMINSIND.NS", "DABUR.NS", "DALBHARAT.NS",
    "DEEPAKNTR.NS", "DIVISLAB.NS", "DLF.NS", "DRREDDY.NS", "EICHERMOT.NS",
    "ESCORTS.NS", "EXIDEIND.NS", "FEDERALBNK.NS", "GAIL.NS", "GLENMARK.NS",
    "GMRINFRA.NS", "GODREJCP.NS", "GRANULES.NS", "GRASIM.NS", "GUJGASLTD.NS",
    "HAVELLS.NS", "HCLTECH.NS", "HDFCBANK.NS", "HDFCLIFE.NS",
    "HEROMOTOCO.NS", "HINDALCO.NS", "HINDCOPPER.NS", "HINDPETRO.NS", "HINDUNILVR.NS",
    "IBULHSGFIN.NS", "ICICIBANK.NS", "ICICIGI.NS", "ICICIPRULI.NS", "IDEA.NS",
    "IDFCFIRSTB.NS", "IEX.NS", "IGL.NS", "INDHOTEL.NS", "INDIACEM.NS",
    "INDIAMART.NS", "INDIGO.NS", "INDUSINDBK.NS", "INDUSTOWER.NS", "INFY.NS",
    "INTELLECT.NS", "IOC.NS", "IRCTC.NS", "ITC.NS", "JINDALSTEL.NS",
    "JSWSTEEL.NS", "JUBLFOOD.NS", "KOTAKBANK.NS", "L&TFH.NS", "LALPATHLAB.NS",
    "LAURUSLABS.NS", "LICI.NS", "LT.NS", "LTI.NS", "LTTS.NS",
    "LUPIN.NS", "M&M.NS", "M&MFIN.NS", "MANAPPURAM.NS", "MARICO.NS",
    "MARUTI.NS", "MCDOWELL-N.NS", "METROPOLIS.NS", "MFSL.NS", "MGL.NS",
    "MOTHERSON.NS", "MPHASIS.NS", "MRF.NS", "MUTHOOTFIN.NS", "NATIONALUM.NS",
    "NAUKRI.NS", "NAVINFLUOR.NS", "NESTLEIND.NS", "NMDC.NS", "NTPC.NS",
    "OBEROIRLTY.NS", "OFSS.NS", "ONGC.NS", "PAGEIND.NS", "PEL.NS",
    "PERSISTENT.NS", "PETRONET.NS", "PIDILITIND.NS", "PIIND.NS", "PNB.NS",
    "POLYCAB.NS", "POWERGRID.NS", "PVRINOX.NS", "RAMCOCEM.NS", "RBLBANK.NS",
    "RECLTD.NS", "RELIANCE.NS", "SAIL.NS", "SBICARD.NS", "SBILIFE.NS",
    "SBIN.NS", "SHREECEM.NS", "SIEMENS.NS", "SRF.NS", "SHRIRAMFIN.NS",
    "SUNPHARMA.NS", "SUNTV.NS", "SYNGENE.NS", "TATACHEM.NS", "TATACOMM.NS",
    "TATACONSUM.NS", "TATAMOTORS.NS", "TATAPOWER.NS", "TATASTEEL.NS", "TCS.NS",
    "TECHM.NS", "TITAN.NS", "TORNTPHARM.NS", "TRENT.NS", "TVSMOTOR.NS",
    "UBL.NS", "ULTRACEMCO.NS", "UPL.NS", "VEDL.NS", "VOLTAS.NS",
    "WIPRO.NS", "ZEEL.NS"
]


In [None]:
import yfinance as yf
import pandas as pd
from itertools import combinations
from tqdm import tqdm
import statsmodels.api as sm
import numpy as np

def get_highly_correlated_pairs_by_sector(tickers, start_date="2023-07-01", end_date="2025-07-01", threshold=0.75, save_path="correlated_pairs_by_sector.csv"):
    """
    Calculates correlation for stock pairs within the same sector.
    Stores pairs with correlation > threshold or < -threshold.
    """
    # Step 1: Fetch sector info for all tickers
    print("Fetching sector info for tickers...")
    ticker_sector_map = {}
    for t in tickers:
        try:
            sector = yf.Ticker(t).info.get("sector", "Unknown")
            ticker_sector_map[t] = sector
        except Exception as e:
            ticker_sector_map[t] = "Unknown"

    sector_groups = {}
    for ticker, sector in ticker_sector_map.items():
        sector_groups.setdefault(sector, []).append(ticker)

    print(f"Sector groups created: {len(sector_groups)} sectors found.")

    all_pairs = []

    # Step 2: For each sector, calculate correlations within that sector
    for sector, sector_tickers in sector_groups.items():
        if len(sector_tickers) < 2:
            continue  # Skip if not enough tickers for pairing

        print(f"\nProcessing sector: {sector} with {len(sector_tickers)} tickers")

        # Download data for this sector
        data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]

        # Compute correlations
        for stock1, stock2 in tqdm(combinations(sector_tickers, 2), desc=f"{sector} Pairs"):
            series1 = data[stock1]
            series2 = data[stock2]

            df_pair = pd.concat([series1, series2], axis=1).dropna()
            if len(df_pair) < 100:
                continue

            corr = df_pair.corr().iloc[0, 1]

            if abs(corr) >= threshold:
                stock_x, stock_y = get_viable_pair(df_pair, start_date)
                all_pairs.append({
                    "Sector": sector,
                    "Stock_Y": stock_y,
                    "Stock_X": stock_x,
                    "Correlation": round(corr, 4),
                    "Sign": "positive" if corr > 0 else "negative"
                })

    # Step 3: Save results
    result_df = pd.DataFrame(all_pairs)
    result_df.to_csv(save_path, index=False)
    print(f"\nSaved {len(result_df)} correlated pairs to {save_path}")
    return result_df


def get_viable_pair(data, test_start_date):
    one_year_data = data.loc[test_start_date:]
    best_ratio = float('inf')
    best_pair = None
    best_model = None
    for s1, s2 in combinations(data.columns, 2):
        for y, x in [(s1, s2), (s2, s1)]:
            x_data = sm.add_constant(one_year_data[x])
            y_data = one_year_data[y]
            model = sm.OLS(y_data, x_data).fit()
            intercept_se = model.bse.iloc[0]
            residuals = model.resid
            std_err = np.std(residuals)
            ratio = intercept_se / std_err
            if ratio < best_ratio:
                best_ratio = ratio
                best_pair = (x, y)
                best_model = model
    return best_pair


In [None]:
df = get_highly_correlated_pairs_by_sector(nse_futures_tickers, start_date="2023-07-01", end_date="2025-07-01", threshold=0.75, save_path="correlated_pairs_by_sector.csv")

Fetching sector info for tickers...
Sector groups created: 12 sectors found.

Processing sector: Basic Materials with 25 tickers


  data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]
[*********************100%***********************]  25 of 25 completed
Basic Materials Pairs: 300it [00:00, 523.13it/s]
  data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]



Processing sector: Energy with 8 tickers


[*********************100%***********************]  8 of 8 completed
Energy Pairs: 28it [00:00, 268.95it/s]
  data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]



Processing sector: Industrials with 14 tickers


[*********************100%***********************]  14 of 14 completed
Industrials Pairs: 91it [00:00, 274.51it/s]
  data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]



Processing sector: Healthcare with 15 tickers


[*********************100%***********************]  15 of 15 completed
Healthcare Pairs: 105it [00:00, 263.89it/s]
  data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]



Processing sector: Financial Services with 30 tickers


[*********************100%***********************]  30 of 30 completed
Financial Services Pairs: 435it [00:00, 503.51it/s]
  data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]
[                       0%                       ]


Processing sector: Consumer Cyclical with 19 tickers


[*********************100%***********************]  19 of 19 completed
Consumer Cyclical Pairs: 171it [00:00, 376.24it/s]
  data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]
[                       0%                       ]


Processing sector: Consumer Defensive with 11 tickers


[*********************100%***********************]  11 of 11 completed
Consumer Defensive Pairs: 55it [00:00, 489.36it/s]
  data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]
[                       0%                       ]


Processing sector: Communication Services with 9 tickers


[*********************100%***********************]  9 of 9 completed
Communication Services Pairs: 36it [00:00, 509.91it/s]
  data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]



Processing sector: Technology with 11 tickers


[*********************100%***********************]  11 of 11 completed
Technology Pairs: 55it [00:00, 211.09it/s]
  data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]



Processing sector: Real Estate with 2 tickers


[*********************100%***********************]  2 of 2 completed
Real Estate Pairs: 1it [00:00, 476.46it/s]
  data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]



Processing sector: Utilities with 7 tickers


[*********************100%***********************]  7 of 7 completed
Utilities Pairs: 21it [00:00, 198.85it/s]
  data = yf.download(sector_tickers, start=start_date, end=end_date)["Close"]



Processing sector: Unknown with 5 tickers


[*********************100%***********************]  5 of 5 completed
ERROR:yfinance:
5 Failed downloads:
ERROR:yfinance:['GMRINFRA.NS', 'IBULHSGFIN.NS', 'L&TFH.NS', 'MCDOWELL-N.NS', 'LTI.NS']: YFTzMissingError('possibly delisted; no timezone found')
Unknown Pairs: 10it [00:00, 1245.19it/s]


Saved 375 correlated pairs to correlated_pairs_by_sector.csv





In [None]:
df

Unnamed: 0,Sector,Stock_Y,Stock_X,Correlation,Sign
0,Basic Materials,ACC.NS,AMBUJACEM.NS,0.8217,positive
1,Basic Materials,ACC.NS,HINDCOPPER.NS,0.7817,positive
2,Basic Materials,ACC.NS,SAIL.NS,0.7645,positive
3,Basic Materials,AMBUJACEM.NS,HINDCOPPER.NS,0.9059,positive
4,Basic Materials,AMBUJACEM.NS,JINDALSTEL.NS,0.8303,positive
...,...,...,...,...,...
370,Utilities,MGL.NS,POWERGRID.NS,0.8014,positive
371,Utilities,MGL.NS,TATAPOWER.NS,0.8062,positive
372,Utilities,NTPC.NS,POWERGRID.NS,0.9543,positive
373,Utilities,NTPC.NS,TATAPOWER.NS,0.9599,positive


In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from concurrent.futures import ProcessPoolExecutor
import time
import os
from typing import Iterable, Tuple, Optional, Dict, Any

# ============================================================================
# SECTION 0: CONFIG
# ============================================================================
DEFAULT_START_DATE = "2023-07-01"
DEFAULT_END_DATE = "2025-07-01"
MIN_OVERLAP_DAYS = 253            # ~1y trading days (needed for rolling window)
ROLL_WINDOW = 252                  # 1-year lookback for regression/zscore
Z_ENTRY = 2.5                      # |z| threshold to open
Z_EXIT_RESET = 1.0                 # mean reversion band to close
Z_EXIT_STOP = 3.0                  # stop-loss band to close
ADF_P_CUTOFF = 0.05                # stationarity check on residuals
SMALL_STD = 1e-8                   # guard against div0/near-constant spreads

# ============================================================================
# SECTION 1: MARKET DATA
# ============================================================================

def download_data(stocks: Iterable[str], start_date: str, end_date: str) -> Optional[pd.DataFrame]:
    """Download *Close* prices for the given tickers.

    Notes
    -----
    * ``keepna=True`` so we preserve index alignment; we then ffill & drop leading NaNs.
    * Returns ``None`` if we couldn't download *all* requested tickers (yfinance sometimes drops failed cols).
    * Ensures DateTimeIndex.
    """
    try:
        df = yf.download(
            list(stocks),
            start=start_date,
            end=end_date,
            progress=False,
            auto_adjust=False,
            actions=False,
            keepna=True,
        )["Close"]
    except Exception:
        return None

    if isinstance(df, pd.Series):
        # yfinance collapses to Series if single ticker; make DataFrame
        df = df.to_frame()

    df.ffill(inplace=True)
    df.dropna(how="all", inplace=True)  # drop rows where *all* miss
    df.index = pd.to_datetime(df.index)

    # If any requested ticker missing entirely, bail out
    missing = [t for t in stocks if t not in df.columns]
    if missing:
        return None

    # If no rows or insufficient columns -> None
    if df.empty or len(df.columns) < len(list(stocks)):
        return None

    return df


# ============================================================================
# SECTION 2: SIGNAL GENERATION
# ============================================================================

def generate_initial_pair_trades(df: pd.DataFrame, stock_x: str, stock_y: str) -> pd.DataFrame:
    """Generate entry/exit logs for (stock_y vs stock_x) using a rolling regression spread Z-Score.

    We *explicitly* select columns by name to avoid column-order bugs that happen when
    upstream data frames re-order columns.
    """
    if df is None:
        return pd.DataFrame()

    # Sanity: both cols present
    if stock_x not in df.columns or stock_y not in df.columns:
        return pd.DataFrame()

    df_reg = df[[stock_x, stock_y]].dropna().copy()
    df_reg.columns = ["X", "Y"]

    if len(df_reg) < ROLL_WINDOW + 1:
        return pd.DataFrame()

    logs = []
    position = None  # 'long' (long Y / short X) or 'short'
    entry_info: Dict[str, Any] = {}

    # Iterate from first full window onward
    for i in range(ROLL_WINDOW, len(df_reg)):
        current_day = df_reg.index[i]
        window = df_reg.iloc[i - ROLL_WINDOW : i]

        y_window = window["Y"]
        x_window = sm.add_constant(window["X"])
        model = sm.OLS(y_window, x_window).fit()
        intercept, beta = model.params
        residuals = model.resid
        std_err_resid = np.std(residuals)
        if std_err_resid < SMALL_STD:
            continue  # skip near-flat spread

        # Current day's spread metrics
        row_i = df_reg.iloc[i]
        resid_i = row_i["Y"] - (intercept + beta * row_i["X"])
        z_score = resid_i / std_err_resid

        # ENTRY LOGIC ---------------------------------------------------------
        if position is None and (z_score > Z_ENTRY or z_score < -Z_ENTRY):
            # Require stationary residuals over lookback
            try:
                adf_p = adfuller(residuals)[1]
            except Exception:
                adf_p = 1.0
            if adf_p < ADF_P_CUTOFF:
                position = "long" if z_score < 0 else "short"  # If Y below X-model -> expect reversion upward -> long Y/short X
                entry_info = {
                    "Entry Date": current_day,
                    "Stock Y": stock_y,
                    "Stock X": stock_x,
                    "ADF_PValue_Entry": adf_p,
                    "ZScore Entry": z_score,
                    "Beta": beta,
                    "Intercept": intercept,
                    "StdErr_Residual": std_err_resid,
                    "Position": position,
                }
        # EXIT LOGIC ----------------------------------------------------------
        elif position is not None:
            exit_triggered = False
            if position == "long":
                # close if reverted (z>-Z_EXIT_RESET) or stop-out (z<-Z_EXIT_STOP)
                if (z_score > -Z_EXIT_RESET) or (z_score < -Z_EXIT_STOP):
                    exit_triggered = True
            else:  # short
                if (z_score < Z_EXIT_RESET) or (z_score > Z_EXIT_STOP):
                    exit_triggered = True

            if exit_triggered:
                try:
                    adf_p_exit = adfuller(residuals)[1]
                except Exception:
                    adf_p_exit = 1.0
                trade_record = entry_info.copy()
                trade_record.update({
                    "Exit Date": current_day,
                    "ADF_PValue_Exit": adf_p_exit,
                    "ZScore Exit": z_score,
                })
                logs.append(trade_record)
                position = None
                entry_info = {}

    return pd.DataFrame(logs)


# ============================================================================
# SECTION 3: P&L CALCULATION
# ============================================================================

def calculate_trade_pnls(trades_df: pd.DataFrame, price_df: pd.DataFrame, lot_sizes: Dict[str, int]) -> pd.DataFrame:
    """Attach entry/exit prices, lot sizes (beta-adjusted), and P&L per trade."""
    if trades_df.empty:
        return pd.DataFrame()

    results = []
    for _, trade in trades_df.iterrows():
        entry_date = pd.to_datetime(trade["Entry Date"])
        exit_date = pd.to_datetime(trade["Exit Date"])
        stock_x = trade["Stock X"]
        stock_y = trade["Stock Y"]

        # Skip incomplete trades (no exit)
        if pd.isna(exit_date):
            continue

        try:
            entry_px_x = price_df.loc[entry_date, stock_x]
            entry_px_y = price_df.loc[entry_date, stock_y]
            exit_px_x = price_df.loc[exit_date, stock_x]
            exit_px_y = price_df.loc[exit_date, stock_y]
        except KeyError:
            # If either price missing, skip trade
            continue

        lot_x = lot_sizes.get(stock_x, 1)
        lot_y = lot_sizes.get(stock_y, 1)

        # Hedge ratio scaling: scale Y exposure to beta * X exposure
        beta = trade["Beta"]
        if lot_x > 0 and np.isfinite(beta):
            beta_adjusted_lot_y = max(1, int(round(lot_x * beta)))
        else:
            beta_adjusted_lot_y = lot_y

        if trade["Position"] == "long":  # long Y / short X
            pnl_y = (exit_px_y - entry_px_y) * beta_adjusted_lot_y
            pnl_x = (entry_px_x - exit_px_x) * lot_x
        else:  # short Y / long X
            pnl_y = (entry_px_y - exit_px_y) * beta_adjusted_lot_y
            pnl_x = (exit_px_x - entry_px_x) * lot_x

        trade_pnl = pnl_y + pnl_x

        result = trade.to_dict()
        result.update({
            "Entry Open Price X": entry_px_x,
            "Exit Open Price X": exit_px_x,
            "Entry Open Price Y": entry_px_y,
            "Exit Open Price Y": exit_px_y,
            "Lot Size X": lot_x,
            "Lot Size Y": beta_adjusted_lot_y,
            "Trade PnL": trade_pnl,
        })
        results.append(result)

    out = pd.DataFrame(results)
    return out


# ============================================================================
# SECTION 4: WORKER (per pair)  ***UPDATED FOR SECTOR SUPPORT***
# ============================================================================

def run_full_backtest_for_pair(pair_info: Tuple[str, str, float, str, Optional[str]]):
    """Master worker function for a single pair.

    Parameters
    ----------
    pair_info : tuple
        Expected layout: (stock_y, stock_x, correlation, sign[, sector])
        We accept either 4-item or 5-item tuples to stay backward compatible.
    """
    # Unpack flexibly ---------------------------------------------------------
    if len(pair_info) == 5:
        stock_y, stock_x, correlation, sign, sector = pair_info
    elif len(pair_info) == 4:
        stock_y, stock_x, correlation, sign = pair_info
        sector = None
    else:
        raise ValueError(f"Unexpected pair_info length={len(pair_info)}: {pair_info}")

    start_date = DEFAULT_START_DATE
    end_date = DEFAULT_END_DATE
    lot_sizes: Dict[str, int] = {}  # user can override externally

    try:
        price_df = download_data([stock_x, stock_y], start_date, end_date)
        if price_df is None:
            return None

        trades_df = generate_initial_pair_trades(price_df, stock_x, stock_y)
        if trades_df.empty:
            return None

        final_trades = calculate_trade_pnls(trades_df, price_df, lot_sizes)
        if final_trades.empty:
            return None

        # Attach correlation/sign/sector meta so downstream aggregation has context
        final_trades["Correlation"] = correlation
        final_trades["Sign"] = sign
        if sector is not None:
            final_trades["Sector"] = sector

        print(f"  -> Processed {stock_y} & {stock_x} ({sector if sector else 'NA'}). Found {len(final_trades)} trades.")
        return final_trades

    except Exception:
        return None



In [None]:

# ============================================================================
# SECTION 5: MAIN EXECUTION BLOCK (reads *sector* CSV)  ***KEY CHANGE***
# ============================================================================
if __name__ == '__main__':
    # ------------------------------------------------------------------
    # INPUT CSV REQUIREMENTS
    # ------------------------------------------------------------------
    # *Must* contain:  Sector, Stock_Y, Stock_X, Correlation, Sign
    # Column names are case-insensitive; we'll normalize.
    # ------------------------------------------------------------------
    input_csv = "/content/correlated_pairs_by_sector.csv"  # <-- change as needed

    raw_df = pd.read_csv(input_csv)
    # Normalize column names (strip, lower, replace spaces/underscores uniformly)
    colmap = {c: c.strip().lower().replace(" ", "_") for c in raw_df.columns}
    raw_df.rename(columns=colmap, inplace=True)

    expected_cols = {"stock_x", "stock_y", "correlation", "sign"}
    if not expected_cols.issubset(set(raw_df.columns)):
        raise ValueError(f"Input CSV missing required columns {expected_cols}. Found: {raw_df.columns.tolist()}")

    # Optional sector column
    has_sector = "sector" in raw_df.columns

    # Build tuples for pool: (stock_y, stock_x, correlation, sign, sector?)
    if has_sector:
        pairs_to_process = [
            (row.stock_y, row.stock_x, row.correlation, row.sign, row.sector)
            for row in raw_df.itertuples(index=False)
        ]
    else:
        pairs_to_process = [
            (row.stock_y, row.stock_x, row.correlation, row.sign)
            for row in raw_df.itertuples(index=False)
        ]

    print(f"Starting backtest for {len(pairs_to_process)} pairs using up to {os.cpu_count()} CPU cores.")
    start_time = time.time()

    final_df = pd.DataFrame()
    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        results_list = list(executor.map(run_full_backtest_for_pair, pairs_to_process))

    valid_results = [res for res in results_list if res is not None and not res.empty]

    if not valid_results:
        print("\nBacktest complete. No trades were found across any pairs.")
    else:
        final_df = pd.concat(valid_results, ignore_index=True)
        if not final_df.empty:
            final_df = final_df.sort_values(by="Entry Date").reset_index(drop=True)
            final_df["Cumulative PnL"] = final_df["Trade PnL"].cumsum()

    end_time = time.time()

    print("\n" + "=" * 50)
    print("BACKTESTING COMPLETE")
    print(f"Total execution time: {end_time - start_time:.2f} seconds")
    print(f"Total pairs processed: {len(pairs_to_process)}")
    print(f"Total trades found: {len(final_df)}")
    print("=" * 50)

    print("\nFinal Results DataFrame:")
    print(final_df)

    # Optional: write results
    out_csv = os.path.splitext(input_csv)[0] + "_backtest_results.csv"
    if not final_df.empty:
        final_df.to_csv(out_csv, index=False)
        print(f"\nResults written to: {out_csv}")


Starting backtest for 375 pairs using up to 2 CPU cores.
  -> Processed ACC.NS & SAIL.NS (Basic Materials). Found 1 trades.
  -> Processed PIDILITIND.NS & GRASIM.NS (Basic Materials). Found 1 trades.
  -> Processed JSWSTEEL.NS & HINDALCO.NS (Basic Materials). Found 3 trades.
  -> Processed PIDILITIND.NS & HINDALCO.NS (Basic Materials). Found 2 trades.
  -> Processed HINDALCO.NS & VEDL.NS (Basic Materials). Found 2 trades.
  -> Processed JINDALSTEL.NS & HINDCOPPER.NS (Basic Materials). Found 1 trades.
  -> Processed SAIL.NS & HINDCOPPER.NS (Basic Materials). Found 1 trades.
  -> Processed PIDILITIND.NS & JINDALSTEL.NS (Basic Materials). Found 1 trades.
  -> Processed TATASTEEL.NS & JINDALSTEL.NS (Basic Materials). Found 2 trades.
  -> Processed PIDILITIND.NS & JSWSTEEL.NS (Basic Materials). Found 1 trades.
  -> Processed JSWSTEEL.NS & VEDL.NS (Basic Materials). Found 1 trades.
  -> Processed PIDILITIND.NS & TATASTEEL.NS (Basic Materials). Found 2 trades.
  -> Processed RELIANCE.NS & ADA

In [None]:
lot_sizes = {
    "AXISBANK.NS": 500, "BANKBARODA.NS": 6500, "FEDERALBNK.NS": 8500, "HDFCBANK.NS": 550,
    "ICICIBANK.NS": 1375, "IDFCFIRSTB.NS": 6500, "INDUSINDBK.NS": 700, "KOTAKBANK.NS": 400,
    "PNB.NS": 10000, "SBIN.NS": 1500, "JIOFIN.NS": 1500, "SBILIFE.NS": 300,
    "BAJFINANCE.NS": 125, "BAJAJFINSV.NS": 250, "CHOLAFIN.NS": 1000, "HDFCAMC.NS": 300,
    "HDFC.NS": 300, "ICICIGI.NS": 500, "ICICIPRULI.NS": 1100, "LICHSGFIN.NS": 1100,
    "MUTHOOTFIN.NS": 300, "PEL.NS": 300, "RECLTD.NS": 5000, "SRTRANSFIN.NS": 425,
    "COFORGE.NS": 200, "HCLTECH.NS": 700, "INFY.NS": 400, "LTIM.NS": 200,
    "MPHASIS.NS": 300, "PERSISTENT.NS": 150, "TCS.NS": 150, "TECHM.NS": 700, "WIPRO.NS": 1600,
    "AUROPHARMA.NS": 850, "BIOCON.NS": 2300, "CIPLA.NS": 650, "DIVISLAB.NS": 200, "DRREDDY.NS": 125,
    "GLAND.NS": 200, "GRANULES.NS": 2200, "IPCALAB.NS": 200, "LAURUSLABS.NS": 800, "LUPIN.NS": 500,
    "SUNPHARMA.NS": 700, "TORNTPHARM.NS": 300, "MANKIND.NS": 300, "FORTIS.NS": 1750, "PPLPHARMA.NS": 550,
    "ADANIGREEN.NS": 650, "ADANIENT.NS": 500, "ADANIPORTS.NS": 1000, "BPCL.NS": 1800,
    "GAIL.NS": 6750, "IOC.NS": 9750, "NTPC.NS": 5700, "ONGC.NS": 3850, "PETRONET.NS": 3000,
    "POWERGRID.NS": 2700, "RELIANCE.NS": 250, "TATAPOWER.NS": 2250,
    "COALINDIA.NS": 2700, "HINDALCO.NS": 2150, "JSWSTEEL.NS": 1500, "NALCO.NS": 7500,
    "NMDC.NS": 3000, "SAIL.NS": 4750, "TATASTEEL.NS": 4250, "VEDL.NS": 2000,
    "ASHOKLEY.NS": 2750, "BAJAJ-AUTO.NS": 125, "BALKRISIND.NS": 400, "BOSCHLTD.NS": 25,
    "EICHERMOT.NS": 175, "HEROMOTOCO.NS": 300, "M&M.NS": 700, "MARUTI.NS": 100,
    "TATAMOTORS.NS": 2850, "TVSMOTOR.NS": 1350, "UNOMINDA.NS": 800,
    "BRITANNIA.NS": 150, "COLPAL.NS": 325, "DABUR.NS": 1000, "GODREJCP.NS": 500,
    "HINDUNILVR.NS": 300, "ITC.NS": 3200, "MARICO.NS": 1300, "NESTLEIND.NS": 25,
    "TATACONSUM.NS": 700, "UBL.NS": 350, "UNITDSPR.NS": 300,
    "ACC.NS": 500, "AMBUJACEM.NS": 2500, "ULTRACEMCO.NS": 150, "GRASIM.NS": 475,
    "SHREECEM.NS": 25, "RAMCOCEM.NS": 600,
    "ABB.NS": 250, "BEL.NS": 2850, "BHEL.NS": 5250, "BHARATFORG.NS": 1000, "IRCTC.NS": 875,
    "L&T.NS": 300, "LTTS.NS": 200, "NBCC.NS": 7500, "SIEMENS.NS": 275, "TATAPROJ.NS": 600,
    "APLAPOLLO.NS": 275, "ANGELONE.NS": 425, "KFIN.NS": 700, "AMBER.NS": 150, "PGELECTRO.NS": 500
}


In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from concurrent.futures import ProcessPoolExecutor
import time
import os

# ==============================================================================
# SECTION 1: CORE LOGIC FUNCTIONS (Now with robust column handling)
# ==============================================================================

def download_data(stocks, start_date, end_date):
    """
    Downloads historical closing prices for a list of stocks.
    """
    try:
        df = yf.download(
            stocks,
            start=start_date,
            end=end_date,
            progress=False,
            auto_adjust=False,
            actions=False,
            keepna=True
        )['Close']
        # Forward-fill to handle missing values, then drop any remaining NaNs at the start
        df.ffill(inplace=True)
        df.dropna(inplace=True)
        df.index = pd.to_datetime(df.index)
        if df.empty or len(df.columns) < len(stocks):
            return None
        return df
    except Exception as e:
        # print(f"Could not download data for {stocks}: {e}")
        return None

def generate_initial_pair_trades(df, stock_x, stock_y):
    """
    Generates trade entry and exit signals based on a rolling regression.
    This version is robust and uses explicit column names instead of positions.
    """
    if df is None:
        return pd.DataFrame()

    # Explicitly use stock_x and stock_y to select columns and rename them for regression
    df_reg = df[[stock_x, stock_y]].dropna().copy()
    df_reg.columns = ['X', 'Y']

    logs = []
    position = None
    entry_info = {}

    if len(df_reg) < 253:
        return pd.DataFrame()

    for i in range(252, len(df_reg)):
        current_day = df_reg.index[i]
        window = df_reg.iloc[i-252:i]

        y_window = window['Y']
        x_window = sm.add_constant(window['X'])

        model = sm.OLS(y_window, x_window).fit()
        intercept, beta = model.params
        residuals = model.resid
        std_err_resid = np.std(residuals)

        if std_err_resid < 1e-8: # Avoid division by zero for stable pairs
            continue

        y_i = df_reg.iloc[i]['Y']
        x_i = df_reg.iloc[i]['X']
        resid_i = y_i - (intercept + beta * x_i)
        z_score = resid_i / std_err_resid

        if position is None and (z_score > 2.5 or z_score < -2.5):
            adf_p = adfuller(residuals)[1]
            if adf_p < 0.05:
                position = 'long' if z_score < 0 else 'short'
                entry_info = {
                    'Entry Date': current_day,
                    'Stock Y': stock_y, # Use original ticker name
                    'Stock X': stock_x, # Use original ticker name
                    'ADF_PValue_Entry': adf_p,
                    'ZScore Entry': z_score,
                    'Beta': beta,
                    'Intercept': intercept,
                    'StdErr_Residual': std_err_resid,
                    'Position': position
                }
        elif position is not None:
            exit_triggered = False
            if position == 'long' and (z_score > -1 or z_score < -3):
                exit_triggered = True
            elif position == 'short' and (z_score < 1 or z_score > 3):
                exit_triggered = True

            if exit_triggered:
                adf_p_exit = adfuller(residuals)[1]
                entry_info.update({
                    'Exit Date': current_day,
                    'ADF_PValue_Exit': adf_p_exit,
                    'ZScore Exit': z_score
                })
                logs.append(entry_info)
                position = None
                entry_info = {}

    return pd.DataFrame(logs)

def calculate_trade_pnls(trades_df, price_df, lot_sizes):
    """
    Calculates profit and loss for each trade with corrected hedge ratio logic.
    """
    if trades_df.empty:
        return pd.DataFrame()

    results = []

    for _, trade in trades_df.iterrows():
        entry_date = pd.to_datetime(trade['Entry Date'])
        exit_date = pd.to_datetime(trade['Exit Date'])
        stock_x = trade['Stock X']
        stock_y = trade['Stock Y']

        try:
            entry_px_x = price_df.loc[entry_date, stock_x]
            entry_px_y = price_df.loc[entry_date, stock_y]
            exit_px_x = price_df.loc[exit_date, stock_x]
            exit_px_y = price_df.loc[exit_date, stock_y]
        except KeyError:
            continue

        lot_x = lot_sizes.get(stock_x, 1)
        lot_y = lot_sizes.get(stock_y, 1)

        # Apply hedge ratio to determine the correct lot size for stock Y
        # This should be done BEFORE calculating PnL
        beta_adjusted_lot_y = round(lot_x * trade["Beta"]) if lot_x > 0 and trade["Beta"] > 0 else lot_y

        if trade['Position'] == 'long': # Long Y, Short X
            pnl_y = (exit_px_y - entry_px_y) * beta_adjusted_lot_y
            pnl_x = (entry_px_x - exit_px_x) * lot_x
        else:  # Short Y, Long X
            pnl_y = (entry_px_y - exit_px_y) * beta_adjusted_lot_y
            pnl_x = (exit_px_x - entry_px_x) * lot_x

        trade_pnl = pnl_y + pnl_x

        result = trade.to_dict()
        result.update({
            'Entry Open Price X': entry_px_x,
            'Exit Open Price X': exit_px_x,
            'Entry Open Price Y': entry_px_y,
            'Exit Open Price Y': exit_px_y,
            'Lot Size X': lot_x,
            'Lot Size Y': beta_adjusted_lot_y,
            'Trade PnL': trade_pnl,
        })
        results.append(result)

    return pd.DataFrame(results)

# ==============================================================================
# SECTION 2: PARALLEL EXECUTION FRAMEWORK
# ==============================================================================

def run_full_backtest_for_pair(pair_info):
    """
    Master worker function for a single pair.
    """
    stock_y, stock_x, correlation, sign = pair_info

    start_date = "2023-07-01"
    end_date = "2025-07-01"
    lot_sizes = {}

    try:
        price_df = download_data([stock_x, stock_y], start_date, end_date)

        if price_df is None:
            # print(f"  -> Skipping {stock_y} & {stock_x}: Failed to download or insufficient data.")
            return None

        # Pass stock_x and stock_y explicitly to the analysis function
        trades_df = generate_initial_pair_trades(price_df, stock_x, stock_y)

        if trades_df.empty:
            return None

        final_trades = calculate_trade_pnls(trades_df, price_df, lot_sizes)

        if not final_trades.empty:
            print(f"  -> Processed {stock_y} & {stock_x}. Found {len(final_trades)} trades.")

        return final_trades

    except Exception as e:
        # print(f"!!! An error occurred while processing pair {stock_y} & {stock_x}: {e}")
        return None

In [None]:
# ==============================================================================
# SECTION 3: MAIN EXECUTION BLOCK
# ==============================================================================

if __name__ == '__main__':
    # --- IMPORTANT ---
    # Replace this with your actual DataFrame of correlated pairs.
    # The DataFrame must have columns: 'Stock Y', 'Stock X', 'Correlation', 'Sign'
    correlated_pairs_data = pd.read_csv("/content/correlated_pairs_by_sector.csv")
    correlated_df = pd.DataFrame(correlated_pairs_data)

    pairs_to_process = [tuple(row) for row in correlated_df.itertuples(index=False)]

    print(f"Starting backtest for {len(pairs_to_process)} pairs using up to {os.cpu_count()} CPU cores.")
    start_time = time.time()

    final_df = pd.DataFrame()
    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        results_list = list(executor.map(run_full_backtest_for_pair, pairs_to_process))

    valid_results = [res for res in results_list if res is not None and not res.empty]

    if not valid_results:
        print("\nBacktest complete. No trades were found across any pairs.")
    else:
        final_df = pd.concat(valid_results, ignore_index=True)
        if not final_df.empty:
            final_df = final_df.sort_values(by='Entry Date').reset_index(drop=True)
            final_df['Cumulative PnL'] = final_df['Trade PnL'].cumsum()

    end_time = time.time()

    print("\n" + "="*50)
    print("BACKTESTING COMPLETE")
    print(f"Total execution time: {end_time - start_time:.2f} seconds")
    print(f"Total pairs processed: {len(pairs_to_process)}")
    print(f"Total trades found: {len(final_df)}")
    print("="*50)

    print("\nFinal Results DataFrame:")
    print(final_df)


In [None]:
final_df.to_csv('final_results.csv', index=False)

In [None]:
# ==============================================================================
# SECTION 3: MAIN EXECUTION BLOCK
# ==============================================================================

if __name__ == '__main__':
    # --- IMPORTANT ---
    # Replace this with your actual DataFrame of correlated pairs.
    # The DataFrame must have columns: 'Stock Y', 'Stock X', 'Correlation', 'Sign'
    correlated_pairs_data = pd.read_csv("/content/correlated_pairs (3).csv")
    correlated_df = pd.DataFrame(correlated_pairs_data)

    # Convert DataFrame rows to a list of tuples for the parallel executor
    pairs_to_process = [tuple(row) for row in correlated_df.itertuples(index=False)]

    print(f"Starting backtest for {len(pairs_to_process)} pairs using up to {os.cpu_count()} CPU cores.")
    start_time = time.time()

    # Use ProcessPoolExecutor to run the backtests in parallel
    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        # The map function distributes the work and collects the results
        results_list = list(executor.map(run_full_backtest_for_pair, pairs_to_process))

    # --- Final Aggregation ---
    # Filter out any None results from failed pairs
    valid_results = [res for res in results_list if res is not None and not res.empty]

    if not valid_results:
        print("\nBacktest complete. No trades were found across any pairs.")
        final_df = pd.DataFrame()
    else:
        # Combine all results into a single DataFrame
        final_df = pd.concat(valid_results, ignore_index=True)

        # Sort by entry date and calculate cumulative PnL
        final_df = final_df.sort_values(by='Entry Date').reset_index(drop=True)
        final_df['Cumulative PnL'] = final_df['Trade PnL'].cumsum()

    end_time = time.time()

    print("\n" + "="*50)
    print("BACKTESTING COMPLETE")
    print(f"Total execution time: {end_time - start_time:.2f} seconds")
    print(f"Total pairs processed: {len(pairs_to_process)}")
    print(f"Total trades found: {len(final_df)}")
    print("="*50)

    print("\nFinal Results DataFrame:")
    print(final_df.head())

In [None]:
import pandas as pd
df = pd.read_csv("/content/final_results.csv")

In [None]:
def analyze_pair(df, stock_y, stock_x):
    """
    Detailed historical trade report for a specific directional pair.
    """


    # Normalize values for matching
    df["Stock_Y"] = df["Stock_Y"].astype(str).str.strip().str.upper()
    df["Stock_X"] = df["Stock_X"].astype(str).str.strip().str.upper()
    stock_y = stock_y.strip().upper()
    stock_x = stock_x.strip().upper()

    mask = (df["Stock_Y"] == stock_y) & (df["Stock_X"] == stock_x)
    pair_df = df.loc[mask].copy()
    if pair_df.empty:
        return f"No historical trades found for {stock_y} & {stock_x}.", 404

    # Clean & convert
    pair_df["Exit Date"] = pd.to_datetime(pair_df["Exit Date"], errors="coerce")
    pair_df["Trade_PnL"] = pd.to_numeric(pair_df["Trade_PnL"], errors="coerce")
    pair_df.dropna(subset=["Exit Date", "Trade_PnL"], inplace=True)
    pair_df.sort_values("Exit Date", inplace=True)

    # Cumulative PnL
    pair_df["Cumulative_PnL"] = pair_df["Trade_PnL"].cumsum()

    # Stats
    total_pnl = pair_df["Trade_PnL"].sum()
    total_trades = len(pair_df)
    winning_trades = (pair_df["Trade_PnL"] > 0).sum()
    win_rate = (winning_trades / total_trades) * 100 if total_trades else 0
    avg_pnl = pair_df["Trade_PnL"].mean() if total_trades else 0

    summary = {
        "pair": f"{stock_y} / {stock_x}",
        "total_pnl": f"{total_pnl:,.2f}",
        "total_trades": total_trades,
        "win_rate": f"{win_rate:.2f}%",
        "avg_pnl": f"{avg_pnl:,.2f}",
    }

    chart_data = {
        "labels": pair_df["Exit Date"].dt.strftime("%Y-%m-%d").tolist(),
        "data": pair_df["Cumulative_PnL"].tolist(),
    }

    print("analysis.html", summary, chart_data)


In [None]:
df["Stock_Y"]

Unnamed: 0,Stock_Y
0,BOSCHLTD.NS
1,BOSCHLTD.NS
2,PAGEIND.NS
3,MRF.NS
4,ASIANPAINT.NS
...,...
1938,ULTRACEMCO.NS
1939,LALPATHLAB.NS
1940,BOSCHLTD.NS
1941,MRF.NS


In [None]:
analyze_pair(df, "ADANIENT.NS", "AMBUJACEM.NS")

({'pair': 'ADANIENT.NS / AMBUJACEM.NS',
  'summary': {'pair': 'ADANIENT.NS / AMBUJACEM.NS',
   'total_pnl': '78.30',
   'total_trades': 2,
   'winning_trades': 1,
   'losing_trades': 1,
   'win_rate': '50.00%',
   'avg_pnl': '39.15',
   'median_pnl': '39.15',
   'best_pnl': '117.35',
   'worst_pnl': '-39.05',
   'max_drawdown': '-39.05',
   'rows_dropped': 0},
  'chart': {'labels': ['2024-11-22', '2024-12-11'],
   'cumulative_pnl': [117.35, 78.3],
   'trade_pnl': [117.35, -39.05]},
  'rows_used': 2,
  'rows_dropped': 0,
  'raw':    Entry_Date      Stock_Y       Stock_X  ADF_PValue_Entry  ZScore_Entry  \
  0  21-11-2024  ADANIENT.NS  AMBUJACEM.NS          0.017381     -3.439984   
  1  02-12-2024  ADANIENT.NS  AMBUJACEM.NS          0.029124     -2.524588   
  
         Beta    Intercept  StdErr_Residual Position   Exit Date  ...  \
  0  2.955982  1274.562006       151.759682     long  22-11-2024  ...   
  1  2.857411  1328.699738       162.887148     long  11-12-2024  ...   
  
     Exi

In [None]:
import pandas as pd
import numpy as np
from typing import Tuple, Dict, Any

def analyze_pair(
    df: pd.DataFrame,
    stock_y: str,
    stock_x: str,
    *,
    include_reverse: bool = False,
    date_col: str = "Exit Date",
    pnl_col: str = "Trade_PnL",
    symbol_y_col: str = "Stock_Y",
    symbol_x_col: str = "Stock_X",
    format_numbers: bool = True,
) -> Tuple[Dict[str, Any], int]:
    """
    Summarize historical trades for a directional pair (stock_y / stock_x).

    This version assumes the input DataFrame has already been cleaned:
    - Symbol columns contain clean strings (case-insensitive match OK).
    - PnL column is numeric or clean numeric strings (no currency junk).
    - Date column is parseable by pandas.

    Parameters
    ----------
    df : pd.DataFrame
        Trade history.
    stock_y, stock_x : str
        Requested pair symbols.
    include_reverse : bool, default False
        If True, include rows logged as the reverse (Stock_Y==stock_x & Stock_X==stock_y).
        **PnL is NOT sign-flipped automatically.** If you need normalized direction,
        do it before calling or modify below.
    date_col, pnl_col, symbol_y_col, symbol_x_col : str
        Column names in df.
    format_numbers : bool, default True
        If True, human-friendly strings (with commas) are included in summary.

    Returns
    -------
    result : dict
        {
          "pair": "Y / X",
          "summary": {...},
          "chart": {"labels": [...], "cumulative_pnl": [...], "trade_pnl": [...]},
          "rows_used": int,
          "rows_dropped": int,
          "raw": <cleaned DataFrame>
        }
    status_code : int
        200 success; 404 none matched; 422 all invalid after cleaning.
    """

    # --- normalize requested symbols (don't mutate df) ---
    want_y = str(stock_y).strip().upper()
    want_x = str(stock_x).strip().upper()

    # --- ensure required columns exist ---
    missing = [c for c in (symbol_y_col, symbol_x_col, date_col, pnl_col) if c not in df.columns]
    if missing:
        return {"message": f"Missing required columns: {missing}."}, 400

    # --- build normalized filters ---
    norm_sy = df[symbol_y_col].astype(str).str.strip().str.upper()
    norm_sx = df[symbol_x_col].astype(str).str.strip().str.upper()

    mask = (norm_sy == want_y) & (norm_sx == want_x)
    if include_reverse:
        rev_mask = (norm_sy == want_x) & (norm_sx == want_y)
        mask = mask | rev_mask

    pair_df = df.loc[mask].copy()
    if pair_df.empty:
        return {"message": f"No historical trades found for {want_y} & {want_x}."}, 404

    # --- parse exit date ---
    # Try fast path: if already datetime, leave; else parse w/ fallback formats.
    if not pd.api.types.is_datetime64_any_dtype(pair_df[date_col]):
        # first try generic parse
        pair_df["_exit_dt"] = pd.to_datetime(pair_df[date_col], errors="coerce", dayfirst=True)
    else:
        pair_df["_exit_dt"] = pair_df[date_col]

    # --- coerce PnL numeric (lightweight; assumes cleaned numeric strings ok) ---
    pair_df["_pnl"] = pd.to_numeric(pair_df[pnl_col], errors="coerce")

    # --- drop invalid rows (track) ---
    before = len(pair_df)
    pair_df = pair_df.dropna(subset=["_exit_dt", "_pnl"])
    rows_dropped = before - len(pair_df)

    if pair_df.empty:
        return {
            "message": (
                f"All matched rows for {want_y} & {want_x} had invalid {date_col} or {pnl_col} "
                "after cleaning."
            ),
            "rows_dropped": rows_dropped,
        }, 422

    # --- sort chronologically ---
    pair_df = pair_df.sort_values("_exit_dt").reset_index(drop=True)

    # --- cumulative pnl ---
    pair_df["_cum_pnl"] = pair_df["_pnl"].cumsum()

    # --- stats ---
    pnl = pair_df["_pnl"]
    total_pnl = float(pnl.sum())
    total_trades = int(len(pnl))
    wins = int((pnl > 0).sum())
    losses = int((pnl < 0).sum())
    win_rate = (wins / total_trades * 100.0) if total_trades else 0.0
    avg_pnl = float(pnl.mean()) if total_trades else 0.0
    median_pnl = float(pnl.median()) if total_trades else 0.0
    best_pnl = float(pnl.max()) if total_trades else 0.0
    worst_pnl = float(pnl.min()) if total_trades else 0.0

    # max drawdown on equity curve
    cum = pair_df["_cum_pnl"].to_numpy()
    running_max = np.maximum.accumulate(cum)
    drawdowns = cum - running_max
    max_drawdown = float(drawdowns.min()) if drawdowns.size else 0.0

    # --- formatting helpers ---
    def fmt2(x): return f"{x:,.2f}"
    def fmtpct(x): return f"{x:.2f}%"

    summary = {
        "pair": f"{want_y} / {want_x}",
        "total_pnl": total_pnl,
        "total_trades": total_trades,
        "winning_trades": wins,
        "losing_trades": losses,
        "win_rate": win_rate,
        "avg_pnl": avg_pnl,
        "median_pnl": median_pnl,
        "best_pnl": best_pnl,
        "worst_pnl": worst_pnl,
        "max_drawdown": max_drawdown,
        "rows_dropped": rows_dropped,
    }

    if format_numbers:
        summary.update({
            "total_pnl": fmt2(total_pnl),
            "win_rate": fmtpct(win_rate),
            "avg_pnl": fmt2(avg_pnl),
            "median_pnl": fmt2(median_pnl),
            "best_pnl": fmt2(best_pnl),
            "worst_pnl": fmt2(worst_pnl),
            "max_drawdown": fmt2(max_drawdown),
        })

    # --- chart data ---
    chart = {
        "labels": pair_df["_exit_dt"].dt.strftime("%Y-%m-%d").tolist(),
        "cumulative_pnl": pair_df["_cum_pnl"].round(2).tolist(),
        "trade_pnl": pair_df["_pnl"].round(2).tolist(),
    }

    result = {
        "pair": summary["pair"],
        "summary": summary,
        "chart": chart,
        "rows_used": total_trades,
        "rows_dropped": rows_dropped,
        "raw": pair_df,  # caller can decide to show/download
    }
    return result, 200


In [None]:
import pandas as pd
import numpy as np
import yfinance as yf
import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller
from datetime import datetime, timedelta
from concurrent.futures import ProcessPoolExecutor
from functools import partial
import os


# -------------------------------------------------------------------------
# Download price data
# -------------------------------------------------------------------------
def download_data(stocks, start_date, end_date):
    """
    Download adjusted close prices for multiple tickers from Yahoo Finance.
    Returns DataFrame indexed by date; columns = tickers.
    """
    try:
        df = yf.download(
            stocks,
            start=start_date,
            end=end_date,
            progress=False,
            auto_adjust=True,
            actions=False,
        )["Close"]
    except Exception:
        return None

    if isinstance(df, pd.Series):
        df = df.to_frame()

    df.ffill(inplace=True)
    df.dropna(axis=0, how="any", inplace=True)
    if df.empty:
        return None

    df.index = pd.to_datetime(df.index)
    return df


# -------------------------------------------------------------------------
# Signal thresholds
# -------------------------------------------------------------------------
Z_ENTRY = 2.5   # abs(z) > 2.5 → entry signal
Z_EXIT = 1      # abs(z) < 1 → exit zone


def evaluate_pair_for_signals(price_df, stock_x, stock_y, asof_dt):
    """
    Fast signal evaluation:
      1. Regress Y ~ X (directional; use given stock_y, stock_x).
      2. Compute z-score of residual at asof_dt (use last available <= asof_dt).
      3. If z-score not in actionable zone, return None (skip ADF for speed).
      4. If actionable, run ADF on residuals; require p < 0.05.
      5. Return signal dict else None.
    """
    if price_df is None or price_df.empty:
        return None

    # restrict to <= asof_dt
    price_df = price_df.loc[price_df.index <= asof_dt]
    if len(price_df) < 30:
        return None

    if stock_x not in price_df.columns or stock_y not in price_df.columns:
        return None

    y = price_df[stock_y]
    X = sm.add_constant(price_df[stock_x])
    try:
        model = sm.OLS(y, X).fit()
    except Exception:
        return None

    resid = model.resid
    std_resid = float(np.std(resid))
    if std_resid < 1e-8:
        return None

    latest_day = price_df.index[-1]
    y_i = float(price_df.loc[latest_day, stock_y])
    x_i = float(price_df.loc[latest_day, stock_x])
    intercept, beta = model.params
    resid_i = y_i - (intercept + beta * x_i)
    z_score = resid_i / std_resid

    # Pre-screen by z-score
    if z_score > Z_ENTRY:
        signal = "Potential Short Entry"
    elif z_score < -Z_ENTRY:
        signal = "Potential Long Entry"
    elif -Z_EXIT < z_score < Z_EXIT:
        signal = "Potential Exit"
    else:
        return None  # not actionable → skip ADF

    # Confirm residual stationarity
    try:
        p_adf = adfuller(resid)[1]
    except Exception:
        return None
    if p_adf >= 0.05:
        return None

    return {
        "Date": latest_day.strftime("%Y-%m-%d"),
        "Stock Y": stock_y,
        "Stock X": stock_x,
        "Signal": signal,
        "Z-Score": round(float(z_score), 2),
        "Beta": round(float(beta), 2),
    }


def run_live_check_for_pair(pair_tuple, asof_dt):
    """
    Worker used in ProcessPoolExecutor.
    pair_tuple: (stock_y, stock_x)
    asof_dt: datetime
    """
    try:
        stock_y, stock_x = pair_tuple
    except Exception:
        return None

    # Yahoo Finance end date is exclusive; +1 day to include asof_dt
    end_date = asof_dt + timedelta(days=1)
    start_date = asof_dt - timedelta(days=500)

    price_df = download_data(
        [stock_x, stock_y],
        start_date.strftime("%Y-%m-%d"),
        end_date.strftime("%Y-%m-%d"),
    )
    if price_df is None:
        return None

    return evaluate_pair_for_signals(price_df, stock_x, stock_y, asof_dt)


def run_live_analysis(correlated_pairs_df, date_str=None):
    """
    Trigger live analysis over correlated_pairs DataFrame using the directional pairs given.
    date_str: 'YYYY-MM-DD' to analyze using this date (inclusive).
    If omitted or invalid, uses current date/time.
    Z-score is screened before running ADF for performance.
    """
    # Parse date param
    if date_str:
        try:
            asof_dt = datetime.strptime(date_str, "%Y-%m-%d")
        except ValueError:
            print(f"Invalid date '{date_str}' passed; using today.")
            asof_dt = datetime.now()
    else:
        asof_dt = datetime.now()

    if correlated_pairs_df.empty:
        raise ValueError("correlated_pairs DataFrame is empty.")

    # Ensure required columns exist
    if not {"Stock_Y", "Stock_X"}.issubset(correlated_pairs_df.columns):
        raise ValueError("DataFrame must have columns: 'Stock Y', 'Stock X'.")

    pairs_to_process = list(
        correlated_pairs_df[["Stock_Y", "Stock_X"]].itertuples(index=False, name=None)
    )

    worker = partial(run_live_check_for_pair, asof_dt=asof_dt)

    all_signals = []
    with ProcessPoolExecutor(max_workers=os.cpu_count() or 1) as executor:
        for signal in executor.map(worker, pairs_to_process):
            if signal:
                all_signals.append(signal)

    # Filter signals to current week relative to asof_dt
    start_of_week = asof_dt - timedelta(days=asof_dt.weekday())  # Monday
    weekly_signals = [
        s for s in all_signals
        if datetime.strptime(s["Date"], "%Y-%m-%d") >= start_of_week
    ]

    return weekly_signals


In [None]:
correlated_pairs_df = pd.read_csv("/content/correlated_pairs.csv")

In [None]:
correlated_pairs_df

Unnamed: 0,Stock_Y,Stock_X,Correlation,Sign
0,SUNPHARMA.NS,LUPIN.NS,0.9829,positive
1,TORNTPHARM.NS,LUPIN.NS,0.9783,positive
2,SUNPHARMA.NS,TORNTPHARM.NS,0.9767,positive
3,ICICIBANK.NS,BHARTIARTL.NS,0.9728,positive
4,M&M.NS,VEDL.NS,0.9683,positive
...,...,...,...,...
2860,INDHOTEL.NS,ZEEL.NS,-0.8727,negative
2861,LAURUSLABS.NS,INDUSINDBK.NS,-0.8817,negative
2862,TORNTPHARM.NS,ZEEL.NS,-0.8823,negative
2863,SUNPHARMA.NS,ZEEL.NS,-0.8880,negative


In [None]:
run_live_analysis(correlated_pairs_df, "2025-07-17")

[{'Date': '2025-07-17',
  'Stock Y': 'SUNPHARMA.NS',
  'Stock X': 'LUPIN.NS',
  'Signal': 'Potential Exit',
  'Z-Score': 0.36,
  'Beta': 0.52},
 {'Date': '2025-07-17',
  'Stock Y': 'ICICIBANK.NS',
  'Stock X': 'BHARTIARTL.NS',
  'Signal': 'Potential Exit',
  'Z-Score': -0.23,
  'Beta': 0.51},
 {'Date': '2025-07-17',
  'Stock Y': 'BHARTIARTL.NS',
  'Stock X': 'INDIGO.NS',
  'Signal': 'Potential Exit',
  'Z-Score': -0.34,
  'Beta': 0.3},
 {'Date': '2025-07-17',
  'Stock Y': 'POWERGRID.NS',
  'Stock X': 'GAIL.NS',
  'Signal': 'Potential Exit',
  'Z-Score': 0.12,
  'Beta': 0.97},
 {'Date': '2025-07-17',
  'Stock Y': 'EICHERMOT.NS',
  'Stock X': 'BHARTIARTL.NS',
  'Signal': 'Potential Exit',
  'Z-Score': 0.46,
  'Beta': 1.9},
 {'Date': '2025-07-17',
  'Stock Y': 'VOLTAS.NS',
  'Stock X': 'TRENT.NS',
  'Signal': 'Potential Exit',
  'Z-Score': -0.17,
  'Beta': 0.18},
 {'Date': '2025-07-17',
  'Stock Y': 'GAIL.NS',
  'Stock X': 'BHEL.NS',
  'Signal': 'Potential Exit',
  'Z-Score': -0.77,
  'Be