<a href="https://colab.research.google.com/github/hunarpreet1/TradeAnomaly/blob/main/PriceVolumneAnomaly.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install ta
import ta

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import date

folder_path = '/content/drive/My Drive/PriceData'

In [None]:
priceData = pd.read_csv(folder_path + '/' + 'pricedata.csv')
universe = pd.read_csv(folder_path + '/' + 'nse200universe.csv')
nifty50 = pd.read_csv(folder_path + '/' + 'NiftyTRI_Index.csv')
nifty200 = pd.read_csv(folder_path + '/' + 'NSE200TRI_Index.csv')

In [None]:
universe.dropna()

In [None]:
# around 3% data has volume missing
# around 0.1% data has price missing
priceData.set_index('date', inplace=True)
nifty50.set_index('date', inplace=True)
nifty200.set_index('date', inplace=True)

priceData.index = pd.to_datetime(priceData.index)
nifty50.index = pd.to_datetime(nifty50.index)
nifty200.index = pd.to_datetime(nifty200.index)

priceData.dropna(inplace=True) # Think about Nans later.....
nifty50.fillna(method="ffill").fillna(method="bfill", inplace=True)
nifty200.fillna(method="ffill").fillna(method="bfill", inplace=True)

In [None]:
priceData["totalVolume"] = priceData.groupby(level=0)["PX_VOLUME"].transform("sum")
priceData["maxStockVolume"] = priceData.groupby(level=0)["PX_VOLUME"].transform("max")
priceData["volume_ratio"] = priceData["PX_VOLUME"] / priceData["totalVolume"]
priceData["volume_max_ratio"] = priceData["PX_VOLUME"] / priceData["maxStockVolume"]

priceData['returnsOpen'] = priceData.groupby('security')['PX_OPEN'].pct_change()
priceData['returnsClose'] = priceData.groupby('security')['PX_LAST'].pct_change()
priceData['returnsHigh'] = priceData.groupby('security')['PX_HIGH'].pct_change()
priceData['returnsLow'] = priceData.groupby('security')['PX_LOW'].pct_change()
priceData.dropna(inplace=True) # Think about Nans later.....

#Extract candle stick/technical features

> Add blockquote



In [None]:
import pandas as pd
import numpy as np
import ta

import pandas as pd

def calculate_mfi(df, period=14, high_col="PX_HIGH", low_col="PX_LOW", close_col="PX_LAST", volume_col="PX_VOLUME", epsilon=1e-6):
    """
    Calculate Money Flow Index (MFI).

    Parameters:
    df (pd.DataFrame): DataFrame with OHLCV data.
    period (int): Lookback period for MFI (default=14).
    high_col, low_col, close_col, volume_col (str): Column names for high, low, close, and volume.

    Returns:
    pd.Series: Money Flow Index values.
    """
    # Typical Price
    df["TP"] = (df[high_col] + df[low_col] + df[close_col]) / 3

    # Money Flow
    df["MF"] = df["TP"] * df[volume_col]

    # Positive & Negative Money Flow
    df["Positive MF"] = df["MF"].where(df["TP"].diff() > 0, 0)
    df["Negative MF"] = df["MF"].where(df["TP"].diff() < 0, 0)

    # Money Flow Ratio
    df["MFR"] = df["Positive MF"].rolling(window=period).sum() / (df["Negative MF"].rolling(window=period).sum() + epsilon)


    # Money Flow Index
    df["MFI"] = 100 - (100 / (1 + df["MFR"]))

    return df["MFI"]

def extract_features(df):
    """
    Extracts technical features from OHLCV (PX_OPEN, PX_HIGH, PX_LOW, PX_LAST, PX_VOLUME) data.

    Parameters:
    df (pd.DataFrame): DataFrame with columns ['Date', 'security', 'PX_OPEN', 'PX_HIGH', 'PX_LOW', 'PX_LAST', 'PX_VOLUME'].

    Returns:
    pd.DataFrame: DataFrame with added feature columns.
    """
    df = df.copy()

    # === Basic Price Features ===
    df["Daily_Return"] = df.groupby("security")["PX_LAST"].pct_change()
    #df["Log_Return"] = df.groupby("security")["PX_LAST"].apply(lambda x: np.log(x / x.shift(1)))
    df["Price_Spread"] = df["PX_HIGH"] - df["PX_LOW"]
    df["Body_Size"] = abs(df["PX_LAST"] - df["PX_OPEN"])
    df["Upper_Shadow"] = df["PX_HIGH"] - df[["PX_OPEN", "PX_LAST"]].max(axis=1)
    df["Lower_Shadow"] = df[["PX_OPEN", "PX_LAST"]].min(axis=1) - df["PX_LOW"]
    df["Close_Open_Gap"] = (df["PX_OPEN"] - df.groupby("security")["PX_LAST"].shift(1)) / df.groupby("security")["PX_LAST"].shift(1)
    df["Volume_Change"] = df.groupby("security")["PX_VOLUME"].pct_change()

    # === Trend Indicators ===
    df["SMA_5"] = df.groupby("security")["PX_LAST"].transform(lambda x: x.rolling(5).mean())
    df["SMA_20"] = df.groupby("security")["PX_LAST"].transform(lambda x: x.rolling(20).mean())
    df["EMA_12"] = ta.trend.EMAIndicator(df["PX_LAST"], window=12).ema_indicator()
    df["EMA_26"] = ta.trend.EMAIndicator(df["PX_LAST"], window=26).ema_indicator()
    df["MACD"] = ta.trend.MACD(df["PX_LAST"]).macd()
    df["MACD_Signal"] = ta.trend.MACD(df["PX_LAST"]).macd_signal()
    df["RSI"] = ta.momentum.RSIIndicator(df["PX_LAST"], window=14).rsi()

    # === Volatility Indicators ===
    df["ATR"] = ta.volatility.AverageTrueRange(df["PX_HIGH"], df["PX_LOW"], df["PX_LAST"], window=14).average_true_range()
    bollinger = ta.volatility.BollingerBands(df["PX_LAST"], window=20, window_dev=2)
    df["Bollinger_Upper"] = bollinger.bollinger_hband()
    df["Bollinger_Lower"] = bollinger.bollinger_lband()
    #df["Historical_Volatility"] = df.groupby("security")["Log_Return"].transform(lambda x: x.rolling(20).std())

    # === Momentum Indicators ===
    df["Momentum_5"] = ta.momentum.ROCIndicator(df["PX_LAST"], window=5).roc()
    df["Williams_%R"] = ta.momentum.WilliamsRIndicator(df["PX_HIGH"], df["PX_LOW"], df["PX_LAST"], lbp=14).williams_r()

    # Money Flow Index
    df["MFI"] = calculate_mfi(df)

    return df

# === Apply Feature Extraction to Dataset ===
df_features = extract_features(priceData)  # df contains multiple securities
print(df_features.head())


In [None]:
def getStockPriceVolumeData(ticker, start=None, end=None):
  stock_data = priceData[priceData['security'] == ticker + IN_EQUITY_PREFIX]
  if start is not None:
    stock_data = stock_data.loc[start:]
  if end is not None:
    stock_data = stock_data.loc[:end]
  return stock_data

def plotPrice(ticker, priceType='PX_OPEN', start=None, end=None, dotDates=None, lineDates=None, saveFig=True, saveSuffix=''):
  df = getStockPriceVolumeData(ticker, start=start, end=end)
  plt.close("all")  # Close any previous plots
  fig, ax = plt.subplots(figsize=(10, 5))  # Create a new figure explicitly

  ax.plot(df.index, df[priceType], label=priceType, color='black')

  # Formatting
  ax.set_xlabel("Date")
  ax.set_ylabel(priceType)
  ax.set_title(ticker)
  ax.legend()
  plt.xticks(rotation=45)
  plt.grid()
  ax.xaxis.set_major_locator(mdates.AutoDateLocator())  # Automatically adjusts date labels
  fig.autofmt_xdate()  # Rotates dates for better readability
  start = pd.Timestamp(start) if start is not None else df.index[0]
  end = pd.Timestamp(end) if end is not None else df.index[-1]

  if dotDates is not None:
    dotDates = [date for date in dotDates if date >= start and date <= end]
    plt.scatter(dotDates, df.loc[dotDates, priceType], color='red', zorder=3, label="Dots")

  if lineDates is not None:
    lineDates = [pd.Timestamp(date) for date in lineDates if start <= pd.Timestamp(date) <= end]
    print(lineDates)
    for date in lineDates:
        plt.axvline(x=date, color='blue', linestyle='--', linewidth=1, alpha=0.7, label="Event" if date == lineDates[0] else "")

  if saveFig:
    plt.savefig(ticker + '_' + saveSuffix + '.png', bbox_inches="tight")
    #print(f"Plot saved to {save_path}")

  plt.show()

#Remove Index component from stock price

In [None]:
IN_EQUITY_PREFIX = ' IN Equity'
CURR_UNIVERSE = [ ticker.split(' ')[0] for ticker in universe['2022-12-30'] if pd.notna(ticker) ]

In [None]:
import pandas as pd
import numpy as np

def extend_index_composition(index_composition_df):
    """
    Extend index composition backward in time before the first available date.
    Uses the earliest available composition for all prior dates.

    Parameters:
    index_composition_df (pd.DataFrame): Index components with dates as columns and tickers as values.

    Returns:
    pd.DataFrame: Extended index composition.
    """
    first_date = index_composition_df.columns.min()  # Find earliest available date
    all_dates = pd.date_range(start=price_df.index.min(), end=index_composition_df.columns.max(), freq="B")  # All business days

    # Convert composition to DataFrame with dates as index
    extended_composition = index_composition_df.T
    extended_composition = extended_composition.reindex(all_dates, method="ffill")  # Fill backward before 2012

    return extended_composition.T  # Convert back to original format

def compute_equal_weighted_index_returns(price_df, index_composition_df):
    """
    Compute equal-weighted index returns using daily stock prices and index composition.

    Parameters:
    price_df (pd.DataFrame): OHLCV data with date index and tickers as columns (PX_LAST values).
    index_composition_df (pd.DataFrame): Index components with dates as columns and tickers as values.

    Returns:
    pd.Series: Equal-weighted index returns over time.
    """
    # Extend composition before 2012
    index_composition_df = extend_index_composition(index_composition_df) # check with Anay on composition data before 2012....

    returns = price_df.pct_change()  # Compute daily simple returns
    index_returns = {}

    for date in index_composition_df.columns:  # Loop through each date
        if date not in returns.index:
            continue  # Skip if no return data

        tickers = index_composition_df[date].dropna().values  # Get tickers for that day
        valid_returns = returns.loc[date, tickers].dropna()  # Get valid returns

        if len(valid_returns) > 0:
            index_returns[date] = valid_returns.mean()  # Compute equal-weighted return

    return pd.Series(index_returns, name="Equal_Weighted_Return")

# Example Usage
index_composition_df = extend_index_composition(index_composition_df)  # Extend composition first
equal_weighted_index_returns = compute_equal_weighted_index_returns(price_df, index_composition_df)


In [None]:
import pandas as pd
import statsmodels.api as sm
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from scipy.stats import probplot

def compute_log_returns(df):
    """ Compute log returns for a given price series. """
    return np.log(df / df.shift(1))

def compute_simple_returns(df):
    """ Compute simple returns for a given price series. """
    return df.pct_change()

def compute_idiosyncratic_returns(priceData, indexData, universe=CURR_UNIVERSE):
    """
    Compute idiosyncratic returns efficiently without pivoting.

    Parameters:
    priceData (pd.DataFrame): DataFrame with columns ["date", "security", "PX_LAST"].
    indexData (pd.Series): Series with index daily closing prices (indexed by date).

    Returns:
    pd.DataFrame: Idiosyncratic returns for each security.
    """
    idiosyncratic_returns = {}

    # Compute log returns for index
    index_returns = compute_log_returns(indexData).dropna()

    # Iterate over each unique security
    for security in universe:
        print(security + " Regression Started")
        sec_data = getStockPriceVolumeData(security)["PX_LAST"]
        stock_returns = compute_log_returns(sec_data).dropna()

        # Align data
        aligned_data = pd.concat([stock_returns, index_returns], axis=1, join="inner").dropna()
        if len(aligned_data) < 30:  # Skip if insufficient data
            continue

        X = sm.add_constant(aligned_data.iloc[:, 1])  # Add constant (α) for regression
        y = aligned_data.iloc[:, 0]

        # Regression: Stock Return = α + β × Index Return + ε
        model = sm.OLS(y, X).fit()
        predicted_returns = model.predict(X)

        # Compute idiosyncratic returns (residuals)
        residuals = y - predicted_returns
        idiosyncratic_returns[security] = residuals
        print(f"{security} Regression Finished | R² = {model.rsquared:.4f}")

        # # 📊 Plot 1: Scatter Plot with Regression Line
        # plt.figure(figsize=(10, 5))
        # sns.regplot(x=aligned_data.iloc[:, 1], y=aligned_data.iloc[:, 0],
        #             scatter_kws={"alpha": 0.5}, line_kws={"color": "red"}, ci=None)
        # plt.xlabel("Index Returns")
        # plt.ylabel(f"{security} Returns")
        # plt.title(f"Regression of {security} on Index (α={model.params[0]:.4f}, β={model.params[1]:.4f})")
        # plt.grid(True)
        # plt.show()

        # # 📊 Plot 2: Residuals Distribution (Idiosyncratic Returns)
        # plt.figure(figsize=(10, 5))
        # sns.histplot(residuals, bins=50, kde=True)
        # plt.axvline(0, color="red", linestyle="--", linewidth=2)
        # plt.xlabel("Residuals (Idiosyncratic Returns)")
        # plt.ylabel("Frequency")
        # plt.title(f"Distribution of Idiosyncratic Returns for {security}")
        # plt.grid(True)
        # plt.show()

        # # 📊 Plot 3: Q-Q Plot for Residuals Normality Check
        # plt.figure(figsize=(6, 6))
        # probplot(residuals, dist="norm", plot=plt)
        # plt.title(f"Q-Q Plot of Residuals for {security}")
        # plt.grid(True)
        # plt.show()

        # # 📊 Plot 4: Residuals vs Fitted Values to check heteroskedasticity
        # plt.figure(figsize=(10, 5))
        # plt.scatter(predicted_returns, residuals, alpha=0.5)
        # plt.axhline(0, color="red", linestyle="--", linewidth=2)
        # plt.xlabel("Fitted Values (Predicted Returns)")
        # plt.ylabel("Residuals")
        # plt.title(f"Residuals vs Fitted Values for {security}")
        # plt.grid(True)
        # plt.show()

    # Convert dictionary to DataFrame
    idio_df = pd.DataFrame(idiosyncratic_returns)
    return idio_df

idiosyncratic_returns = compute_idiosyncratic_returns(priceData, nifty200["PX_LAST"])
print(idiosyncratic_returns.head())


In [None]:
universe.dropna()

In [None]:
priceData.columns

In [None]:
CURR_UNIVERSE

#Model 1

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (7,4.5) # Make the default figures a bit bigger
import numpy as np
import random
import pandas as pd
from datetime import datetime
from datetime import date
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.models import Model
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf

In [None]:
'''
utoencoder Model: An autoencoder is implemented as a class AnomalyDetector that extends the tf.keras.models.Model class.
The autoencoder is composed of two parts: an encoder and a decoder.
The encoder reduces the dimensionality of the input data, and the decoder restores it to its original shape.
'''

# class AnomalyDetectorNN(Model):
#     def __init__(self, input_dim=30):
#         super(AnomalyDetectorNN, self).__init__()

#         # Encoder: Compress input into a smaller representation
#         self.encoder = tf.keras.Sequential([
#             layers.Dense(128, activation="relu", input_shape=(input_dim,)),  # Input shape added
#             layers.Dense(16, activation="relu")
#         ])

#         # Decoder: Reconstruct input from encoded representation
#         self.decoder = tf.keras.Sequential([
#             layers.Dense(128, activation="relu"),
#             layers.Dense(input_dim, activation="sigmoid")  # Ensures output matches input dimension
#         ])

#     def call(self, x):
#         encoded = self.encoder(x)   # Encode input
#         decoded = self.decoder(encoded)  # Decode back to original shape
#         return decoded

# autoencoder = AnomalyDetectorNN(30)

class AnomalyDetectorNN(Model):
    def __init__(self, window_size=30, num_features=5):
        super(AnomalyDetectorNN, self).__init__()

        input_dim = window_size * num_features  # Flattened input size

        self.encoder = tf.keras.Sequential([
            layers.Flatten(),  # Convert (batch, 30, features) → (batch, 30*num_features)
            layers.Dense(128, activation="relu"),
            layers.Dense(16, activation="relu")
        ])

        self.decoder = tf.keras.Sequential([
            layers.Dense(128, activation="relu"),
            layers.Dense(input_dim, activation="sigmoid"),  # Linear activation
            layers.Reshape((window_size, num_features))  # Reshape back to original
        ])

    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded


In [None]:
import tensorflow as tf
from tensorflow.keras import layers, Model

# class LSTMAutoencoder(Model):
#     def __init__(self, input_dim=30, latent_dim=16):
#         super(LSTMAutoencoder, self).__init__()

#         # Encoder: Compress input into a smaller representation
#         self.encoder = tf.keras.Sequential([
#             layers.LSTM(128, activation="relu", return_sequences=True, input_shape=(input_dim, 1)),
#             layers.LSTM(latent_dim, activation="relu", return_sequences=False)  # Bottleneck
#         ])

#         # Decoder: LSTM reconstructing original input
#         self.decoder = tf.keras.Sequential([
#             layers.RepeatVector(input_dim),  # Expands bottleneck vector across time steps
#             layers.LSTM(128, activation="relu", return_sequences=True),
#             layers.TimeDistributed(layers.Dense(1))  # Output shape matches input (30,1)
#         ])

#     def call(self, x):
#         encoded = self.encoder(x)  # Compress input into bottleneck representation
#         decoded = self.decoder(encoded)  # Reconstruct sequence from bottleneck
#         return decoded

# # Example usage
# input_dim = 30  # Sequence length (time steps)
# latent_dim = 8  # Compressed representation

# autoencoder = LSTMAutoencoder(input_dim=input_dim, latent_dim=latent_dim)
# #autoencoder.compile(optimizer='adam', loss='mae')

import tensorflow as tf
from tensorflow.keras import Model, layers

# class LSTMAutoencoder(Model):
#     def __init__(self, timeSteps=30, numFeatures=5, latent_dim=8):
#         super(LSTMAutoencoder, self).__init__()

#         # Encoder: Compress input into a latent representation
#         self.encoder = tf.keras.Sequential([
#             layers.LSTM(128, activation="relu", return_sequences=True, input_shape=(timeSteps, numFeatures)),
#             layers.LSTM(latent_dim, activation="relu", return_sequences=False)  # Bottleneck
#         ])

#         # Decoder: Reconstruct original input shape (timeSteps, numFeatures)
#         self.decoder = tf.keras.Sequential([
#             layers.RepeatVector(timeSteps),  # Expand bottleneck across time steps
#             layers.LSTM(128, activation="relu", return_sequences=True),
#             layers.TimeDistributed(layers.Dense(numFeatures))  # Ensure output matches input shape
#         ])

#     def call(self, x):
#         encoded = self.encoder(x)  # Compress input into latent space
#         decoded = self.decoder(encoded)  # Reconstruct input from latent space
#         return decoded

class LSTMAutoencoder(Model):
    def __init__(self, timeSteps=30, numFeatures=5, latent_dim=16):
        super(LSTMAutoencoder, self).__init__()

        # Encoder: Compress input into a latent representation
        self.encoder = tf.keras.Sequential([
            layers.LSTM(128, activation="relu", return_sequences=True, input_shape=(timeSteps, numFeatures)),
            layers.LSTM(latent_dim, activation="relu", return_sequences=False)  # Keep sequences
        ])

        # Decoder: Reconstruct the original input shape (timeSteps, numFeatures)
        self.decoder = tf.keras.Sequential([
            layers.RepeatVector(timeSteps),  # Expand latent representation
            layers.LSTM(128, activation="relu", return_sequences=True),
            #layers.LSTM(64, activation="relu", return_sequences=True),  # Extra LSTM layer
            layers.TimeDistributed(layers.Dense(numFeatures))  # Ensure output matches input
        ])

    def call(self, x):
        encoded = self.encoder(x)  # Compress input
        decoded = self.decoder(encoded)  # Reconstruct input
        return decoded



In [None]:
def normalize_window_roll(data, window_size, step=1, cols_to_keep=None):
    """
    Normalizes a rolling window of data using MinMaxScaler.

    Parameters:
    - data: pd.DataFrame, with datetime index and multiple columns.
    - window_size: int, size of rolling window.
    - step: int, step size for moving window.
    - cols_to_keep: list, column names to keep for normalization.

    Returns:
    - X: np.array of shape (num_samples, window_size, num_features).
    - index_map: list of lists, mapping each index to dates.
    """

    X = []
    index_map = []

    # If cols_to_keep is provided, filter only those columns
    if cols_to_keep:
        data = data[cols_to_keep]

    for i in range(0, len(data) - window_size, step):
        x_window = data.iloc[i : i + window_size]  # Extract rolling window
        index_map.append(x_window.index.to_list())  # Store index list

        # Normalize across all selected columns
        scaler = MinMaxScaler(feature_range=(0, 1))
        x_window_scaled = scaler.fit_transform(x_window)  # Shape: (window_size, num_features)

        X.append(x_window_scaled)

    X = np.array(X)  # Shape: (num_samples, window_size, num_features)

    return X, index_map



In [None]:
import numpy as np
import pandas as pd

def split_data(X, index_map, test_start, test_end, val_start, val_end):
    """
    Splits data into train, validation, and test sets based on given date ranges.

    Parameters:
    - X: np.array (N_data x seq_length x num_features), the normalized rolling window data.
    - index_map: list of lists containing pd.Timestamp (N_data x seq_length), mapping each index in X to a sequence of dates.
    - test_start: datetime.date or pd.Timestamp, start date for test set.
    - test_end: datetime.date or pd.Timestamp, end date for test set.
    - val_start: datetime.date or pd.Timestamp, start date for validation set.
    - val_end: datetime.date or pd.Timestamp, end date for validation set.

    Returns:
    - X_train, X_val, X_test: np.array, split data (same 3D shape structure).
    - index_train, index_val, index_test: list of lists of pd.Timestamp.
    """

    # Convert all date inputs to pd.Timestamp for consistency
    test_start, test_end = pd.Timestamp(test_start), pd.Timestamp(test_end)
    val_start, val_end = pd.Timestamp(val_start), pd.Timestamp(val_end)

    # Ensure index_map is not empty
    if not index_map:
        seq_length = X.shape[1] if X.ndim == 3 else 0
        num_features = X.shape[2] if X.ndim == 3 else 0
        return (np.empty((0, seq_length, num_features)),  # X_train
                np.empty((0, seq_length, num_features)),  # X_val
                np.empty((0, seq_length, num_features)),  # X_test
                [], [], [])  # Empty index lists

    # Extract representative date (first date of each sequence in index_map)
    index_representative = np.array([dates[0] for dates in index_map])

    # Identify test indices
    test_mask = (index_representative >= test_start) & (index_representative <= test_end)
    test_indices = np.where(test_mask)[0]
    X_test = X[test_indices] if len(test_indices) > 0 else np.empty((0, X.shape[1], X.shape[2]))
    index_test = [index_map[i] for i in test_indices] if len(test_indices) > 0 else []

    # Identify validation indices
    val_mask = (index_representative >= val_start) & (index_representative <= val_end)
    val_indices = np.where(val_mask)[0]
    X_val = X[val_indices] if len(val_indices) > 0 else np.empty((0, X.shape[1], X.shape[2]))
    index_val = [index_map[i] for i in val_indices] if len(val_indices) > 0 else []

    # Remaining data for training
    train_mask = ~(test_mask | val_mask)
    train_indices = np.where(train_mask)[0]
    X_train = X[train_indices] if len(train_indices) > 0 else np.empty((0, X.shape[1], X.shape[2]))
    index_train = [index_map[i] for i in train_indices] if len(train_indices) > 0 else []

    return X_train, X_val, X_test, index_train, index_val, index_test


In [None]:
#Source: https://www.tensorflow.org/tutorials/generative/autoencoder
def predict(model, data, threshold):
  reconstructions = model(data)
  loss = tf.reduce_mean(tf.keras.losses.mae(reconstructions, data), axis=1)
  return tf.math.less(loss, threshold)

In [None]:
#reconstructions.shape
#X_train.shape
train_loss.shape

In [None]:
#TICKERS = CURR_UNIVERSE
TICKERS = ['HDFC', 'RELIANCE', 'SBIN']
PRICE_TYPE = 'PX_LAST'
#COLS_TO_KEEP = ['PX_LAST','PX_OPEN', 'PX_HIGH', 'PX_LOW', 'volume_max_ratio' ]
#COLS_TO_KEEP = ['PX_LAST','PX_OPEN', 'PX_HIGH', 'PX_LOW', 'volume_ratio' ]
#COLS_TO_KEEP = ['returnsClose','returnsOpen', 'returnsHigh', 'returnsLow', 'volume_max_ratio' ]
COLS_TO_KEEP = [ 'PX_LAST' ]
# input_dim = 30
# latent_dim = 8
MODEL_INFO = ""
returnsAtAnomalies = {}

for ticker in TICKERS:
  try:
    prices = getStockPriceVolumeData(ticker)

    X, index_map = normalize_window_roll(prices,30,5,COLS_TO_KEEP)
    startTest = date(2019, 1, 1)
    endTest = date(2021, 1, 1)

    startVal = date(2014, 1, 1)
    endVal = date(2018, 1, 1)

    X_train, X_val, X_test, index_train, index_val, index_test = split_data(X, index_map, startTest, endTest, startVal, endVal)

    early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
                                                    patience=2,
                                                    mode='min')

    ########################### CHANGE MDOEL HERE ###########################

    autoencoder = AnomalyDetectorNN(30, 1)
    #autoencoder = LSTMAutoencoder(30, 1, 16)
    autoencoder.compile(optimizer='adam', loss='mae')

    history = autoencoder.fit(X_train,X_train, epochs=80,
                        validation_data=(X_val, X_val),
                        batch_size=16,
                        callbacks=[early_stopping])

    reconstructions = autoencoder.predict(X_train)
    train_loss = tf.reduce_mean(tf.keras.losses.mae(reconstructions, X_train), axis=1)

    plt.hist(train_loss[None,:], bins=50)
    plt.xlabel("Train loss")
    plt.ylabel("No of examples")
    plt.show()

    threshold = np.mean(train_loss) + np.std(train_loss)
    #threshold = np.max(train_loss)
    print("Threshold: ", threshold)

    reconstructions = autoencoder.predict(X_test)
    test_loss = tf.reduce_mean(tf.keras.losses.mae(reconstructions, X_test), axis=1)

    plt.hist(test_loss[None, :], bins=50)
    plt.xlabel("Test loss")
    plt.ylabel("No of examples")
    plt.show()

    preds = predict(autoencoder, X_test, threshold)

    anomalousIndexIds  = np.where(preds == False)[0]

    dotDates = [index_test[i][-1] for i in anomalousIndexIds]
    plotPrice(ticker, PRICE_TYPE, date(2015, 1, 1), None, dotDates, lineDates=[startTest, endTest], saveSuffix=autoencoder.__class__.__name__ + MODEL_INFO)

    returns = calculate_returns(prices, dotDates)
    returnsAtAnomalies[ticker] = returns

  except Exception as e:
    print(f"Error processing {ticker}: {e}")

returnsAtAnomaliesDf = pd.concat(
    [df.assign(Ticker=ticker) for ticker, df in returnsAtAnomalies.items()],
    ignore_index=True
)
results = evaluate_predictions(returnsAtAnomaliesDf)
results.to_csv('results' + autoencoder.__class__.__name__ + MODEL_INFO + '.csv')


In [None]:
evaluate_predictions(returnsAtAnomaliesDf)

In [None]:
returnsAtAnomaliesDf

#Direction of trade

In [None]:
import pandas as pd
import numpy as np

def signalFromAnomaly(data, anomalyDates):
    """
    Generate trading signals based on anomalies using trend & mean reversion filters.

    Parameters:
    data (pd.DataFrame): DataFrame with 'Close' prices and date index.
    anomalyDates (list): List of dates (as strings) where anomalies were detected.

    Returns:
    pd.DataFrame: Original DataFrame with a new 'Signal' column.
    """
    df = data.copy()

    # Ensure the index is a datetime index
    df.index = pd.to_datetime(df.index)

    # Compute Moving Averages
    df['SMA_5'] = df['PX_LAST'].rolling(window=5).mean()
    df['SMA_20'] = df['PX_LAST'].rolling(window=20).mean()

    # Compute Bollinger Bands
    rolling_mean = df['PX_LAST'].rolling(window=20).mean()
    rolling_std = df['PX_LAST'].rolling(window=20).std()
    df['BB_Upper'] = rolling_mean + (2 * rolling_std)
    df['BB_Lower'] = rolling_mean - (2 * rolling_std)

    # Compute RSI manually
    delta = df['PX_LAST'].diff()
    gain = np.where(delta > 0, delta, 0)
    loss = np.where(delta < 0, -delta, 0)

    avg_gain = pd.Series(gain).rolling(window=14, min_periods=1).mean()
    avg_loss = pd.Series(loss).rolling(window=14, min_periods=1).mean()

    rs = avg_gain / (avg_loss + 1e-10)  # Avoid division by zero
    df['RSI'] = 100 - (100 / (1 + rs))

    # Compute Momentum (5-day return)
    df['Momentum'] = df['PX_LAST'].pct_change(periods=5)

    # Initialize Signal Column
    df['Signal'] = 0

    # Generate Signals
    for date in anomalyDates:
        date = pd.to_datetime(date)
        if date in df.index:
            sma5 = df.loc[date, 'SMA_5']
            sma20 = df.loc[date, 'SMA_20']
            rsi = df.loc[date, 'RSI']
            close = df.loc[date, 'PX_LAST']
            upper_bb = df.loc[date, 'BB_Upper']
            lower_bb = df.loc[date, 'BB_Lower']
            momentum = df.loc[date, 'Momentum']

            # Trend-Following: Buy if SMA5 > SMA20 (Uptrend), Sell if SMA5 < SMA20 (Downtrend)
            if sma5 > sma20:
                signal = 1  # Buy
            elif sma5 < sma20:
                signal = -1  # Sell
            else:
                signal = 0  # No clear trend

            # Mean Reversion Filters
            if close >= upper_bb or rsi > 70:
                signal = -1  # Overbought, Sell
            elif close <= lower_bb or rsi < 30:
                signal = 1  # Oversold, Buy

            # Momentum Confirmation (Only take trade if past returns support it)
            if momentum > 0 and signal == 1:
                signal = 0  # Avoid buying into strong price rise
            elif momentum < 0 and signal == -1:
                signal = 0  # Avoid selling into strong decline

            df.loc[date, 'Signal'] = signal

    return df

#Quality of Trade

In [None]:
import pandas as pd

def calculate_returns(df, anomaly_dates, months=[3, 6]):
    """
    Compute 3-month and 6-month returns from anomaly dates, selecting the next available date if missing.

    Parameters:
    df (pd.DataFrame): DataFrame with 'PX_LAST' prices and datetime index.
    anomaly_dates (list): List of anomaly dates (as strings or datetime).
    months (list): List of months for return calculation (default: [3, 6]).

    Returns:
    pd.DataFrame: Table with anomaly dates, 3M & 6M returns.
    """
    df = df.copy()
    df.index = pd.to_datetime(df.index)  # Ensure index is datetime
    sorted_dates = df.index.sort_values()  # Ensure index is sorted
    results = []

    for date in anomaly_dates:
        anomaly_date = pd.to_datetime(date)
        if anomaly_date not in df.index:
            continue  # Skip if anomaly date is missing

        price_at_anomaly = df.loc[anomaly_date, "PX_LAST"]
        row = {"Anomaly Date": anomaly_date, "Price at Anomaly": price_at_anomaly}

        for m in months:
            future_date = anomaly_date + pd.DateOffset(months=m)

            # Find the next available date if the exact future date is missing
            future_idx = sorted_dates.searchsorted(future_date)
            if future_idx < len(sorted_dates):  # Ensure index is within bounds
                adjusted_future_date = sorted_dates[future_idx]
                future_price = df.loc[adjusted_future_date, "PX_LAST"]
                return_m = (future_price - price_at_anomaly) * 100 / price_at_anomaly
                row[f"{m}M Return"] = return_m
            else:
                row[f"{m}M Return"] = None  # No future data available

        results.append(row)

    return pd.DataFrame(results)


import pandas as pd
import numpy as np

def evaluate_predictions(returns_df):
    """
    Compute statistics to evaluate anomaly-based predictions.

    Parameters:
    returns_df (pd.DataFrame): DataFrame containing 'Ticker', 'Anomaly Date', '3M Return', '6M Return'.

    Returns:
    pd.DataFrame: Summary table with counts of strong movements.
    """
    stats = []

    for ticker, group in returns_df.groupby("Ticker"):
        total_anomalies = len(group)

        # Count occurrences where absolute return exceeds threshold
        count_abs_3m_10 = (group["3M Return"].abs() > 10).sum()
        count_abs_6m_10 = (group["6M Return"].abs() > 10).sum()
        count_abs_3m_20 = (group["3M Return"].abs() > 20).sum()
        count_abs_6m_20 = (group["6M Return"].abs() > 20).sum()

        stats.append({
            "Ticker": ticker,
            "Total Anomalies": total_anomalies,
            "|3M Return| > 10%": count_abs_3m_10,
            "|6M Return| > 10%": count_abs_6m_10,
            "|3M Return| > 20%": count_abs_3m_20,
            "|6M Return| > 20%": count_abs_6m_20
        })

    return pd.DataFrame(stats)


#Volume normalization (DON)

#Remove index component (NEED INDEX TIME SERIES)

#Use all features (DONE)

# Rough

In [None]:
TICKER = 'SBIN'
PRICE_TYPE = 'PX_LAST'
prices = getStockPriceVolumeData(TICKER, start=date(2010, 1, 1))[PRICE_TYPE]
prices.rename('price', inplace=True)
prices.dropna(inplace=True) # only 2 nan dates for reliance for eg.

In [None]:
# should we smoothen the data ?
X, index_map = normalize_window_roll(prices,30)

In [None]:
print(X.shape)
print(prices.shape)
print(len(index_map))

In [None]:
train_size = int(0.7*X.shape[0])
test_size = int(0.1*X.shape[0])
val_size = int(0.2*X.shape[0])

X_train = X[:train_size]
X_val = X[train_size+test_size:train_size+val_size+test_size]
X_test = X[train_size:train_size+test_size]

val_index_offset = train_size + test_size
test_index_offset = train_size
#X_val = X[train_size+test_size:train_size+test_size+val_size]

In [None]:
'''
Training the model: The autoencoder is trained using the stock market data using the fit function.
An EarlyStopping callback is added to stop the training if the validation loss does not improve for 2 consecutive epochs.
'''
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss',
                                                  patience=2,
                                                  mode='min')
autoencoder.compile(optimizer='adam', loss='mae')

history = autoencoder.fit(X_train,X_train, epochs=80,
                    validation_data=(X_val, X_val),
                    batch_size=16,
                    callbacks=[early_stopping])

In [None]:
#Source: https://www.tensorflow.org/tutorials/generative/autoencoder
reconstructions = autoencoder.predict(X_train)
train_loss = tf.keras.losses.mae(reconstructions, X_train)

plt.hist(train_loss[None,:], bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

In [None]:
threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)

In [None]:
np.mean(train_loss)

In [None]:
reconstructions = autoencoder.predict(X_test)
test_loss = tf.keras.losses.mae(reconstructions, X_test)

plt.hist(test_loss[None, :], bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

In [None]:
preds = predict(autoencoder, X_test, threshold)

In [None]:
preds

In [None]:
anomalous_test_data = X_test[np.where(preds==False)]

In [None]:
encoded_data = autoencoder.encoder(anomalous_test_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()

for p in [1,10,20]:
  plt.plot(anomalous_test_data[p], 'b')
  plt.plot(np.arange(0,X.shape[1]),decoded_data[p], 'r')
  plt.fill_between(np.arange(X.shape[1]), decoded_data[p], anomalous_test_data[p], color='lightcoral')
  plt.legend(labels=["Input", "Reconstruction", "Error"])
  plt.show()

#Visualize on original price plot

In [None]:
anomalousIndexIds  = np.where(preds == False)[0]
anomalousIndexIds = [ int(test_index_offset + id) for id in anomalousIndexIds ]

In [None]:
#dotDates = [date for i in anomalousIndexIds for date in index_map[i]]
dotDates = [index_map[i][0] for i in anomalousIndexIds]

In [None]:
dotDates

In [None]:
plotPrice(TICKER, 'PX_LAST', date(2013, 1, 1), None, dotDates, lineDates=[index_map[val_index_offset][0], index_map[test_index_offset][0]])

In [None]:
ticker

In [None]:
prices.index[test_index_offset]

In [None]:
val_index_offset

In [None]:
test_index_offset