In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import yfinance as yf
import tensorflow as tf
from datetime import datetime , timedelta
from tqdm import tqdm
from time import time
from ta.momentum import RSIIndicator
from sklearn.preprocessing import StandardScaler
import os
import warnings

scaler = StandardScaler()

#### Firstly we get the stickers for NIFTY50

In [None]:
url = 'https://en.wikipedia.org/wiki/NIFTY_50'
table = pd.read_html(url)

nifty50_df = table[1]  # Table 1 contains the NIFTY 50 companies
nifty50_tickers = nifty50_df['Symbol'].tolist()

nifty50_tickers = [ticker + '.NS' for ticker in nifty50_tickers]

print(nifty50_tickers)

In [None]:
from tensorflow.keras.models import Sequential #This is required to make the Neural Network layer by layer
from tensorflow.keras.layers import * #Used to import LSTM layer , Dense ( fully connected layer ) , Dropout ( Regularization to prevent overfitting)
from tensorflow.keras.callbacks import ModelCheckpoint #Automatically saves the best model even if later epochs overfit or degrade
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.metrics import RootMeanSquaredError
from tensorflow.keras.optimizers import Adam #Used to adjust the model weights during training
from tensorflow.keras.models import load_model
from keras_tuner import RandomSearch

In [None]:
def build_model(hp):
    model = Sequential()
    model.add(Input(shape=(30, 6)))  # 6 input features

    model.add(LSTM(units=hp.Int('lstm_units', min_value=32, max_value=128, step=16)))
    model.add(Dropout(rate=hp.Float('dropout', 0.0, 0.5, step=0.1)))
    model.add(Dense(units=hp.Int('dense_units', 8, 64, step=8), activation='relu'))
    model.add(Dense(1, activation='sigmoid'))

    optimizer = hp.Choice('optimizer', ['adam', 'rmsprop'])
    model.compile(optimizer=optimizer,
                  loss='binary_crossentropy',
                  metrics=['accuracy'])

    return model

In [None]:
def get_ticker_data(symbol , start_date , end_date):
    buffer_start_date = start_date - timedelta(days=300)
    ticker = yf.Ticker(symbol)
    data = ticker.history(start = buffer_start_date , end = end_date)

    nifty_ticker = yf.Ticker('^NSEI')
    nifty_data = nifty_ticker.history(start = start_date , end = end_date)
    
    data['Return'] = data['Close'].pct_change()
    data['MA_50'] = data['Close'].rolling(window=50).mean()
    data['MA_200'] = data['Close'].rolling(window=200).mean()
    data['RSI'] = RSIIndicator(close=data['Close'],window=14).rsi()
    data['Volatility'] = data['Return'].rolling(window=14).std()
    
    data = data.loc[pd.to_datetime(start_date).tz_localize('UTC'):]
    nifty_data['nifty_returns'] = nifty_data['Close'].pct_change()
    data = data.join(nifty_data['nifty_returns'])
    
    data[['Volatility','MA_50','MA_200','RSI']] = scaler.fit_transform(data[['Volatility','MA_50','MA_200','RSI']])
    return data[['Return','Volatility','MA_50','MA_200','RSI','nifty_returns']].dropna()

In [None]:
def dataframe_to_np(X , y):
    X_np = np.array([df.values for df in X])
    y_np = np.array(y)
    return X_np , y_np

In [None]:
def train_model(symbol, train_input, train_label, validation_input, validation_labels):
    print(f'Searching for best hyper-params for {symbol} model')
    tuner = RandomSearch(
        build_model,
        objective='val_accuracy',
        max_trials=20,
        executions_per_trial=1,
        directory='lstm_tuning',
        project_name=f'nifty_predict_{symbol}'  # make it per-symbol to avoid overwrite
    )

    # Run hyperparameter search
    tuner.search(train_input, train_label,
                 validation_data=(validation_input, validation_labels),
                 epochs=10)

    # Get best model
    model1 = tuner.get_best_models(num_models=1)[0]

    print(f'Hyper-Params for best model of {symbol} have been found')
    # Save best weights
    cp = ModelCheckpoint(f'model1_{symbol}/model.keras', save_best_only=True)
    model1.fit(train_input, train_label,
               validation_data=(validation_input, validation_labels),
               epochs=10, callbacks=[cp])

    # Reload best version and evaluate
    model1 = load_model(f'model1_{symbol}/model.keras')
    val_predictions = model1.predict(validation_input).flatten()
    predicted_labels = (val_predictions > 0.5).astype(int)
    accuracy = np.mean(predicted_labels == validation_labels)

    return accuracy


In [None]:
today = datetime.today().date()
base_date = today - timedelta(days=180)
validation_base_date = today - timedelta(days=70)
accuracy_dict = {}
tickers = nifty50_tickers[:13] + nifty50_tickers[15:40] + nifty50_tickers[41:]

for symbol in tickers:
    print('Entered the Loop for',symbol)
    
    # Initialize training window
    start_date = base_date
    end_date = start_date + timedelta(days=51)

    # Initialize validation window
    validation_start_date = validation_base_date
    validation_end_date = validation_start_date + timedelta(days=51)

    # Create validation dataset
    validation_input = []
    validation_labels = []
    while validation_end_date < today:
        validation_iteration_data = get_ticker_data(symbol, validation_start_date, validation_end_date)

        if len(validation_iteration_data) < 31:
            validation_start_date += timedelta(days=1)
            validation_end_date += timedelta(days=1)
            continue

        label = int(validation_iteration_data['Return'].iloc[-1] > 0)
        validation_iteration_data = validation_iteration_data.iloc[:-1].tail(30)

        if len(validation_iteration_data) < 30:
            validation_start_date += timedelta(days=1)
            validation_end_date += timedelta(days=1)
            continue
        
        validation_input.append(validation_iteration_data)
        validation_labels.append(label)

        validation_start_date += timedelta(days=1)
        validation_end_date += timedelta(days=1)

    validation_input, validation_labels = dataframe_to_np(validation_input, validation_labels)

    print('Validation Data Collected for', symbol)
    
    # Create training dataset
    input_data = []
    output_labels = []
    while end_date < validation_base_date:
        data = get_ticker_data(symbol, start_date, end_date)

        if len(data) < 30:
            start_date += timedelta(days=1)
            end_date += timedelta(days=1)
            continue

        label = int(data['Return'].iloc[-1] > 0)
        data = data.iloc[:-1].tail(30)

        if len(data) < 30:
            start_date += timedelta(days=1)
            end_date += timedelta(days=1)
            continue

        input_data.append(data)
        output_labels.append(label)

        start_date += timedelta(days=1)
        end_date += timedelta(days=1)

    input_data, output_labels = dataframe_to_np(input_data, output_labels)
    # Build and train model
    
    print('Training Data Collected for',symbol)
    
    accuracy = train_model(symbol ,input_data , output_labels , validation_input , validation_labels)
    accuracy_dict[symbol] = accuracy
    
    print('Loop done for',symbol)

print('Execution Done ----- XXXX -----')

In [None]:
sorted_accuracies = sorted(accuracy_dict.items(), key=lambda x: x[1], reverse=True)

for symbol, accuracy in sorted_accuracies:
    print(f"{symbol}: {accuracy:.2%}")

In [None]:
threshold = 0.6
high_accuracy_symbols = {
    symbol: acc for symbol, acc in accuracy_dict.items() if acc > threshold
}

print(high_accuracy_symbols)

In [None]:
def get_today_return(symbol):
    ticker = yf.Ticker(symbol)
    today_data = ticker.history(period='2d').Close
    today_return = (today_data[-1] / today_data[0]) - 1
    
    return today_return

In [None]:
today = datetime.today().date()
start_date = today - timedelta(days=50)
end_date = today - timedelta(days=1)
correct = 0
total = 0

total_return = 0
for symbol in high_accuracy_symbols :
    total+=1
    data = get_ticker_data(symbol, start_date, end_date)

    if len(data) < 30:
            print(f'Not Sufficient data for {symbol}')
            continue
    
    data = data.tail(30)
    input_np = np.array([data.values])
    
    model = load_model(f'model1_{symbol}/model.keras')
    
    prediction = model.predict(input_np)[0][0]
    actual = get_today_return(symbol)
    if int(actual > 0) == int(prediction > 0.5):
        print(f'Prediction correct for {symbol}')
        correct+=1
        
    else:
        print(f'Prediction Wrong for {symbol}')
        
    total_return+=actual 
print('Our Accuracy for today is',correct / total)
print('Our Total correct predictions are',correct)
print('Total Companies in our Portfolio are',total)
print('Our Total Return for today is',total_return)
print('Predictions Done')