<a href="https://colab.research.google.com/github/Devashish-23/deep-learning-project-/blob/main/BFM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ================================================================
# TCS.NS Forecasting — RF | XGB | ARIMA | LSTM
# Target = next-day log-return; Evaluate/Forecast in PRICE space
# Adds RSI, MACD, BBWidth, Volatility, ATR, Stoch, OBV, EMAs
# + Percentage error summary (MAPE, MdAPE, P95 APE, Direction Accuracy)
# ================================================================
import re, math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import MinMaxScaler
import xgboost as xgb
from statsmodels.tsa.arima.model import ARIMA

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

# ---------------------------
# 0) Config
# ---------------------------
FILE_PATH = "/content/TCS_NS.csv"
SEED = 42
np.random.seed(SEED)
tf.keras.utils.set_random_seed(SEED)

# ---------------------------
# 1) Load & Clean
# ---------------------------
df = pd.read_csv(FILE_PATH)
df.columns = [c.strip().lower() for c in df.columns]

# parse date
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df = df.dropna(subset=['date']).sort_values('date').reset_index(drop=True)

# numeric parser
def parse_number(x):
    if pd.isna(x): return np.nan
    if isinstance(x, (int, float, np.number)): return float(x)
    s = str(x).strip().replace(',', '').replace('₹','')
    try:
        return float(s)
    except:
        return np.nan

num_cols = [c for c in df.columns if c != 'date']
for c in num_cols:
    df[c] = df[c].apply(parse_number)

df = df.dropna().reset_index(drop=True)
print("Cleaned data shape:", df.shape)
print("Date range:", df['date'].min().date(), "→", df['date'].max().date())

# convenience
CLOSE = 'close_tcs.ns'
OPEN  = 'open_tcs.ns'
HIGH  = 'high_tcs.ns'
LOW   = 'low_tcs.ns'
VOL   = 'volume_tcs.ns'

# ---------------------------
# 2) Core transforms: log-returns
# ---------------------------
df['close_lag1'] = df[CLOSE].shift(1)
df['log_ret'] = np.log(df[CLOSE] / df['close_lag1'])

# ---------------------------
# 3) Technical Indicators (no external libs)
# ---------------------------

# % changes
for col in [OPEN, HIGH, LOW, CLOSE, VOL]:
    df[f'{col}_pct'] = df[col].pct_change()

# Ratios
df['close_over_open'] = df[CLOSE] / df[OPEN]
df['high_over_low']   = df[HIGH]  / df[LOW]

# EMAs (for MACD and as features)
def ema(series, span):
    return series.ewm(span=span, adjust=False).mean()

df['ema_5']  = ema(df[CLOSE], 5)
df['ema_10'] = ema(df[CLOSE], 10)
df['ema_20'] = ema(df[CLOSE], 20)
df['ema_50'] = ema(df[CLOSE], 50)
df['ema_200']= ema(df[CLOSE], 200)

# MACD(12,26,9)
df['ema_12'] = ema(df[CLOSE], 12)
df['ema_26'] = ema(df[CLOSE], 26)
df['macd']   = df['ema_12'] - df['ema_26']
df['macd_sig']= ema(df['macd'], 9)
df['macd_hist']= df['macd'] - df['macd_sig']

# RSI(14)
def rsi(series, period=14):
    delta = series.diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)
    avg_gain = gain.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
    avg_loss = loss.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
    rs = avg_gain / (avg_loss.replace(0, np.nan))
    rsi = 100 - (100 / (1 + rs))
    return rsi

df['rsi_14'] = rsi(df[CLOSE], 14)

# Bollinger Bands (20, 2σ) + Width
mid = df[CLOSE].rolling(20).mean()
std = df[CLOSE].rolling(20).std()
upper = mid + 2*std
lower = mid - 2*std
df['bb_width'] = (upper - lower) / mid

# Rolling volatility of returns
for w in [5,10,20]:
    df[f'vol_{w}'] = df['log_ret'].rolling(w).std()

# ATR(14)
tr1 = df[HIGH] - df[LOW]
tr2 = (df[HIGH] - df[CLOSE].shift(1)).abs()
tr3 = (df[LOW]  - df[CLOSE].shift(1)).abs()
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
df['atr_14'] = tr.rolling(14).mean()

# Stochastic Oscillator %K(14) and %D(3)
low14  = df[LOW].rolling(14).min()
high14 = df[HIGH].rolling(14).max()
df['stoch_k'] = 100 * (df[CLOSE] - low14) / (high14 - low14)
df['stoch_d'] = df['stoch_k'].rolling(3).mean()

# OBV
delta_close = df[CLOSE].diff()
direction = np.sign(delta_close).fillna(0)
df['obv'] = (direction * df[VOL]).cumsum()

# MAs (already present) + slopes & spreads
for span in [5,10,20,50,200]:
    ma_col = f'ma_{span}'
    if ma_col in df.columns:
        df[f'{ma_col}_slope'] = df[ma_col] - df[ma_col].shift(1)

if all(c in df.columns for c in ['ma_5','ma_10','ma_20','ma_50','ma_200']):
    df['ma5_10_spread']  = df['ma_5']  - df['ma_10']
    df['ma10_20_spread'] = df['ma_10'] - df['ma_20']
    df['ma20_50_spread'] = df['ma_20'] - df['ma_50']
    df['ma50_200_spread']= df['ma_50'] - df['ma_200']

# Lagged returns (t-1..t-10)
for k in range(1, 11):
    df[f'log_ret_lag{k}'] = df['log_ret'].shift(k)

# Label: next-day return
df['y_next_ret'] = df['log_ret'].shift(-1)

# Drop NaNs created by indicators/rollings
df_feat = df.dropna().reset_index(drop=True)

# ---------------------------
# 4) Features & Labels (clean inf/NaN)
# ---------------------------
feat_cols = []

# Price dynamics
feat_cols += [f'{OPEN}_pct', f'{HIGH}_pct', f'{LOW}_pct', f'{CLOSE}_pct', f'{VOL}_pct',
              'close_over_open','high_over_low']

# EMAs
feat_cols += ['ema_5','ema_10','ema_20','ema_50','ema_200']

# MACD suite
feat_cols += ['macd','macd_sig','macd_hist']

# RSI, BBWidth, Volatility, ATR, Stoch, OBV
feat_cols += ['rsi_14','bb_width','vol_5','vol_10','vol_20','atr_14','stoch_k','stoch_d','obv']

# MAs (if present) and their slopes / spreads
for span in [5,10,20,50,200]:
    if f'ma_{span}' in df_feat.columns: feat_cols.append(f'ma_{span}')
    if f'ma_{span}_slope' in df_feat.columns: feat_cols.append(f'ma_{span}_slope')
for c in ['ma5_10_spread','ma10_20_spread','ma20_50_spread','ma50_200_spread']:
    if c in df_feat.columns: feat_cols.append(c)

# Lagged returns
feat_cols += [f'log_ret_lag{k}' for k in range(1,11)]

X = df_feat[feat_cols].copy()
y = df_feat['y_next_ret'].copy()

# Clean ∞/NaN
X = X.replace([np.inf, -np.inf], np.nan)
y = y.replace([np.inf, -np.inf], np.nan)
valid_mask = X.notna().all(axis=1) & y.notna()
X = X[valid_mask].reset_index(drop=True)
y = y[valid_mask].reset_index(drop=True)
df_feat = df_feat.loc[valid_mask].reset_index(drop=True)

# Split (time-aware)
split_idx = int(len(df_feat)*0.8)
train_idx = np.arange(0, split_idx)
test_idx  = np.arange(split_idx, len(df_feat))

X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

# Evaluation in PRICE space
close_for_test_base = df_feat[CLOSE].iloc[test_idx - 1].reset_index(drop=True)
actual_close_tplus1 = df_feat[CLOSE].iloc[test_idx].reset_index(drop=True)

def price_metrics_from_returns(close_t, pred_ret, actual_t1):
    pred_price = close_t.values * np.exp(pred_ret)
    rmse = math.sqrt(mean_squared_error(actual_t1.values, pred_price))
    mape = mean_absolute_percentage_error(actual_t1.values, pred_price)*100
    acc  = 100 - mape
    return rmse, mape, acc, pred_price

results = {}
pred_price_traces = {}  # store predicted price series on test set

# ---------------------------
# 5) RandomForest (slightly regularized)
# ---------------------------
rf = RandomForestRegressor(
    n_estimators=700, max_depth=None, min_samples_leaf=5,
    random_state=SEED, n_jobs=-1
)
rf.fit(X_train, y_train)
rf_ret_test = rf.predict(X_test)
rmse, mape, acc, rf_pred_prices = price_metrics_from_returns(close_for_test_base, rf_ret_test, actual_close_tplus1)
results['RandomForest'] = (rmse, mape, acc)
pred_price_traces['RandomForest'] = rf_pred_prices

# ---------------------------
# 6) XGBoost (tuned a bit)
# ---------------------------
xgbr = xgb.XGBRegressor(
    n_estimators=900, learning_rate=0.03, max_depth=6,
    subsample=0.9, colsample_bytree=0.9,
    reg_lambda=2.0, reg_alpha=0.0,
    random_state=SEED
)
xgbr.fit(X_train, y_train)
xgb_ret_test = xgbr.predict(X_test)
rmse, mape, acc, xgb_pred_prices = price_metrics_from_returns(close_for_test_base, xgb_ret_test, actual_close_tplus1)
results['XGBoost'] = (rmse, mape, acc)
pred_price_traces['XGBoost'] = xgb_pred_prices

# ---------------------------
# 7) ARIMA on returns (short-memory AR)
# ---------------------------
ret_series = df_feat['log_ret']
ret_train, ret_test = ret_series.iloc[train_idx], ret_series.iloc[test_idx]
# modest AR order
arima = ARIMA(ret_train, order=(5,0,0))
arima_fit = arima.fit()
arima_ret_test = arima_fit.forecast(len(ret_test))
rmse, mape, acc, arima_pred_prices = price_metrics_from_returns(close_for_test_base, arima_ret_test, actual_close_tplus1)
results['ARIMA'] = (rmse, mape, acc)
pred_price_traces['ARIMA'] = arima_pred_prices

# ---------------------------
# 8) LSTM on returns (sequence)
# ---------------------------
scaler = MinMaxScaler((0,1))
ret_all = ret_series.values.reshape(-1,1)
ret_scaled = scaler.fit_transform(ret_all)

def seq_xy(data, n_steps=60):
    Xs, ys = [], []
    for i in range(n_steps, len(data)):
        Xs.append(data[i-n_steps:i, 0])
        ys.append(data[i, 0])
    return np.array(Xs), np.array(ys)

N_STEPS = 60
X_all, y_all = seq_xy(ret_scaled, N_STEPS)
split_pos = int(len(X_all)*0.8)
Xtr, Xte = X_all[:split_pos], X_all[split_pos:]
ytr, yte = y_all[:split_pos], y_all[split_pos:]

Xtr = Xtr.reshape((Xtr.shape[0], N_STEPS, 1))
Xte = Xte.reshape((Xte.shape[0], N_STEPS, 1))

lstm = Sequential([
    Input(shape=(N_STEPS,1)),
    LSTM(96, return_sequences=True),
    Dropout(0.2),
    LSTM(96),
    Dropout(0.2),
    Dense(64, activation='relu'),
    Dense(1)
])
lstm.compile(optimizer=tf.keras.optimizers.Adam(1e-3), loss='mse')
callbacks = [
    EarlyStopping(monitor='val_loss', patience=12, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=6, min_lr=1e-5, verbose=1)
]
lstm.fit(Xtr, ytr, validation_split=0.1, epochs=100, batch_size=32, verbose=0, callbacks=callbacks)

lstm_ret_pred_scaled = lstm.predict(Xte, verbose=0)
lstm_ret_pred = scaler.inverse_transform(lstm_ret_pred_scaled).ravel()

# Align LSTM price evaluation
abs_start = N_STEPS + split_pos
close_bases_lstm = df_feat[CLOSE].iloc[abs_start-1 : abs_start-1 + len(lstm_ret_pred)].reset_index(drop=True)
actuals_lstm     = df_feat[CLOSE].iloc[abs_start   : abs_start   + len(lstm_ret_pred)].reset_index(drop=True)

rmse, mape, acc, lstm_pred_prices = price_metrics_from_returns(close_bases_lstm, lstm_ret_pred, actuals_lstm)
results['LSTM'] = (rmse, mape, acc)
pred_price_traces['LSTM'] = lstm_pred_prices

# ---------------------------
# 9) Report metrics
# ---------------------------
print("\n=== Model Comparison (Test, Price space) ===")
for k,(rmse,mape,acc) in results.items():
    print(f"{k:12s} | RMSE={rmse:.2f} | MAPE={mape:.2f}% | Accuracy≈{acc:.2f}%")

# ---------------------------
# 10) Multi-horizon forecasts (t+1, t+5, t+20)
# ---------------------------
last_date = df['date'].iloc[-1]
last_close = df[CLOSE].iloc[-1]
print("\nLast known trading date:", last_date.date(), "Close:", last_close)

def next_business_dates(start_date, steps_list):
    out = {}
    for s in steps_list:
        out[s] = np.busday_offset(start_date.date(), s, weekmask='1111100')
    return {s: pd.Timestamp(d) for s,d in out.items()}

target_steps = [1,5,20]
target_dates = next_business_dates(last_date, target_steps)

# Helper to roll forward returns for RF/XGB (freeze non-lag features; update lagged returns)
lag_keys = [f'log_ret_lag{k}' for k in range(1,11)]
def roll_forward_returns(model, base_feat_row, steps):
    row = base_feat_row.copy()
    preds = []
    lag_vals = [row[k] for k in lag_keys]
    for _ in range(steps):
        pred_ret = model.predict(pd.DataFrame([row.values], columns=feat_cols))[0]
        preds.append(pred_ret)
        lag_vals = lag_vals[1:] + [pred_ret]
        for i,k in enumerate(lag_keys):
            row[k] = lag_vals[i]
    return np.array(preds)

# Build last feature row for RF/XGB
last_feat_row = X.iloc[[-1]].copy().iloc[0]

# RF forward
rf_fwd_rets  = roll_forward_returns(rf, last_feat_row, max(target_steps))
rf_prices    = last_close * np.exp(np.cumsum(rf_fwd_rets))
# XGB forward
xgb_fwd_rets = roll_forward_returns(xgbr, last_feat_row, max(target_steps))
xgb_prices   = last_close * np.exp(np.cumsum(xgb_fwd_rets))
# ARIMA forward (returns)
arima_fwd_rets = arima_fit.forecast(max(target_steps)).values
arima_prices   = last_close * np.exp(np.cumsum(arima_fwd_rets))
# LSTM forward (returns)
ret_series_all = df_feat['log_ret'].values.reshape(-1,1)
last_returns_scaled = scaler.transform(ret_series_all[-N_STEPS:]).reshape(1,N_STEPS,1)
lstm_fwd_scaled = []
seq = last_returns_scaled.copy()
for _ in range(max(target_steps)):
    nxt = lstm.predict(seq, verbose=0)[0,0]
    lstm_fwd_scaled.append(nxt)
    seq = np.concatenate([seq[:,1:,:], np.array([[[nxt]]])], axis=1)
lstm_fwd_rets = scaler.inverse_transform(np.array(lstm_fwd_scaled).reshape(-1,1)).ravel()
lstm_prices   = last_close * np.exp(np.cumsum(lstm_fwd_rets))

def step_pick(price_path, steps):
    return {s: float(price_path[s-1]) for s in steps}

future_preds = {
    'RandomForest': step_pick(rf_prices,  target_steps),
    'XGBoost'     : step_pick(xgb_prices, target_steps),
    'ARIMA'       : step_pick(arima_prices, target_steps),
    'LSTM'        : step_pick(lstm_prices, target_steps),
}

print("\n=== Future Predictions (Price) ===")
for model, vals in future_preds.items():
    print(f"{model:12s} | {target_dates[1].date()}: {vals[1]:.2f} | {target_dates[5].date()}: {vals[5]:.2f} | {target_dates[20].date()}: {vals[20]:.2f}")

# ---------------------------
# 11) Last-10-Days Actual vs Predicted tables (aligned)
# ---------------------------
def tail_compare(dates, actual, pred, n=10):
    d = pd.DataFrame({
        'Date': dates[-n:].dt.date,
        'Actual_Close': np.array(actual)[-n:],
        'Pred_Close'  : np.array(pred)[-n:]
    })
    d['Abs_Err']   = (d['Pred_Close'] - d['Actual_Close']).abs()
    d['Pct_Err_%'] = 100 * d['Abs_Err'] / d['Actual_Close']
    return d

test_dates_for_price = df_feat['date'].iloc[test_idx].reset_index(drop=True)

rf_tail    = tail_compare(test_dates_for_price, actual_close_tplus1, pred_price_traces['RandomForest'])
xgb_tail   = tail_compare(test_dates_for_price, actual_close_tplus1, pred_price_traces['XGBoost'])
arima_tail = tail_compare(test_dates_for_price, actual_close_tplus1, pred_price_traces['ARIMA'])

lstm_dates   = df_feat['date'].iloc[abs_start : abs_start + len(pred_price_traces['LSTM'])].reset_index(drop=True)
lstm_actuals = df_feat[CLOSE].iloc[abs_start : abs_start + len(pred_price_traces['LSTM'])].reset_index(drop=True)
lstm_tail    = tail_compare(lstm_dates, lstm_actuals, pred_price_traces['LSTM'])

print("\n--- Last 10 Days: RandomForest ---\n", rf_tail.to_string(index=False))
print("\n--- Last 10 Days: XGBoost ---\n", xgb_tail.to_string(index=False))
print("\n--- Last 10 Days: ARIMA ---\n", arima_tail.to_string(index=False))
print("\n--- Last 10 Days: LSTM ---\n", lstm_tail.to_string(index=False))

# ---------------------------
# 12) Percentage error summary (test set)
#     - MdAPE (median absolute % error)
#     - P95 APE (95th percentile of absolute % error)
#     - Direction accuracy (correct sign of next-day return)
# ---------------------------
summary_rows = []

def ape_stats(actual_prices, pred_prices):
    ape = 100 * np.abs(pred_prices - actual_prices) / actual_prices
    mdape = np.median(ape)
    p95 = np.percentile(ape, 95)
    return mdape, p95, ape

# RF/XGB/ARIMA share the same test alignment
models_order = ['RandomForest','XGBoost','ARIMA']
for m in models_order:
    mdape, p95, ape_vec = ape_stats(actual_close_tplus1.values, pred_price_traces[m])
    summary_rows.append([m, float(mdape), float(p95)])

# LSTM has its own alignment
mdape, p95, ape_vec_lstm = ape_stats(lstm_actuals.values, pred_price_traces['LSTM'])
summary_rows.append(['LSTM', float(mdape), float(p95)])

# Direction accuracy: compare sign of next-day return between actual and predicted (price-based)
def direction_accuracy(actual_prices, pred_prices):
    act_ret = np.sign(np.diff(actual_prices))
    prd_ret = np.sign(np.diff(pred_prices))
    n = min(len(act_ret), len(prd_ret))
    if n == 0: return np.nan
    return 100.0 * (act_ret[:n] == prd_ret[:n]).mean()

dir_rows = []
for m in models_order:
    da = direction_accuracy(actual_close_tplus1.values, pred_price_traces[m])
    dir_rows.append([m, float(da)])
da_lstm = direction_accuracy(lstm_actuals.values, pred_price_traces['LSTM'])
dir_rows.append(['LSTM', float(da_lstm)])

summary_df = pd.DataFrame(summary_rows, columns=['Model','MdAPE_%','P95_APE_%'])
dir_df = pd.DataFrame(dir_rows, columns=['Model','Direction_Accuracy_%'])

print("\n=== Percentage Error Summary (Test) ===")
print(summary_df.to_string(index=False))
print("\n=== Direction Accuracy (Test) ===")
print(dir_df.to_string(index=False))
