In [2]:
import datetime
import pandas as pd
import numpy as np
import tensorflow as tf
from ta import add_all_ta_features
from ta.utils import dropna
from sklearn.preprocessing import MinMaxScaler
from tensorflow import keras
import yfinance as yf
import matplotlib.pyplot as plt
import joblib


default_symbol = '^NSEI'
default_start_date = datetime.datetime(2013, 1, 21)
default_end_date = datetime.datetime(2024, 4, 20)


class AI_Model():
    def __init__(self,model_path=None,scaler_path=None,symbol=default_symbol,start_date=default_start_date,end_date=default_end_date,verbose=True,last_trained_date=None):
        self.symbol = symbol
        self.start = start_date
        self.end = end_date
        self.data = []
        self.is_data_loaded = False
        self.scaler = MinMaxScaler()
        self.future_days = 7
        self.sequence_length = 21  # Train upto last sequence_len days
        self.batch_size = 16  # batch size len / batch_size feed at a time 
        self.target_col = 'Open' # need to predict 
        self.model = None
        self.is_model_loaded = False
        self.model_path = model_path
        self.verbose = verbose
        self.input_shape = 92
        self.is_model_pre_train= False
        self.last_trained_date = last_trained_date
        self.scaler_path = scaler_path
        
        if(model_path and scaler_path):
            self.load_model()
        else:
            self.build_model()

    def log(self,message):
        if(self.verbose):
            print(message)

    def build_model(self):
        self.model = keras.models.Sequential([
        keras.layers.LSTM(512,input_shape=[None,self.input_shape]),
        keras.layers.Dense(self.future_days)
        ])

        opt = tf.keras.optimizers.SGD(learning_rate=0.2,momentum=0.9)
        self.model.compile(loss=keras.losses.Huber(),optimizer=opt,metrics=['mae'])
        self.is_model_loaded = True

    def load_model(self):
        try:
            self.model = keras.models.load_model(self.model_path)
            self.is_model_loaded = True
            self.log("Model Loaded Successfully")
            self.is_model_pre_train = True
            self.scaler = joblib.load(self.scaler_path)

        except Exception as e:
            self.log("Failed to load Model error : ")
            print(e)
            self.is_model_loaded = False

    def ahead_timeseries_from_array(self,data):
        ahead_ds = tf.keras.utils.timeseries_dataset_from_array(
                    data,
                    targets=None,
                    sequence_length=self.sequence_length+self.future_days,
                    batch_size=self.batch_size
        ).map(self.split_input_and_target)

        return ahead_ds

    # Function to fetch historical data for a symbol
    def fetch_data(self,start_date,end_data):
        data = yf.download(self.symbol, start=start_date, end=end_data)
        if(len(data)>0):
            self.is_data_loaded = True
            self.data = data

    def fetch_latest_data(self,from_date=None):
        # Fetch data for the latest date
        start_date = from_date
        if not start_date:
            start_date = self.start
        latest_date = datetime.datetime.now() - datetime.timedelta(days=1)
        latest_data = self.fetch_data(start_date=start_date,end_data=latest_date)  # Replace 'NIFTY' with your desired symbol
        # latest_data = latest_data[:latest_date]
        return latest_data

    def preprocess_data(self,data_df):
        # Add technical analysis features
        data_df = dropna(data_df)

        data_ta = add_all_ta_features(data_df, open="Open", high="High", low="Low", close="Close", volume='Volume', fillna=True)
        
        # Scale the data
        scaled_data = pd.DataFrame(self.scaler.fit_transform(data_ta), index=data_ta.index, columns=data_ta.columns)
        joblib.dump(self.scaler, 'scaler.pkl')
        
        return scaled_data

    def split_input_and_target(self,ds, ahead=7, target_col=0):
        return ds[:, :-ahead], ds[:, -ahead:, target_col]

    def train_model(self, train_data,validation_data=None, epochs=100,momentum=0.9,learning_rate=0.2,opt=None,loss=None,patience=5,moniter='mae',is_save=False):
        if not opt:
            opt = tf.keras.optimizers.SGD(learning_rate=0.2, momentum=0.9)
        if not loss:
            loss = keras.losses.Huber()
        self.model.compile(loss=loss, optimizer=opt, metrics=['mae'])
        if(validation_data):
            moniter='val_loss'
        early_stopping = keras.callbacks.EarlyStopping(patience=patience,monitor=moniter)
        hist = self.model.fit(train_data, epochs=epochs, callbacks=[early_stopping],validation_data=validation_data)
        
#         Save the trained model
        if(is_save):
            self.model.save('trained_model-latest.h5')
        self.hist = hist
        self.is_model_pre_train = True
        
        return hist

    def predict_future_prices(self,test_data):
        # Make predictions
        if(self.is_model_loaded and self.is_model_pre_train):
            predictions = self.model.predict(test_data)
            return predictions
        self.log("Model is Not Loaded please load the model first.")

    def inverse_preprocess(self,data):
        return self.scaler.inverse_transform(data)

    def numpy_to_df(self,data,index,cols):
        return pd.DataFrame(data,index=index,columns=cols)

    def preprocess_in_pipe(self):
        if(self.is_model_pre_train and self.last_trained_date):
            fetch_latest_data()
            
    def pred_df_from_dummy_inverse(self,y_pred):
        length = len(y_pred)
        try:
            dummy_df = pd.read_csv('dummy_for_inverse.csv',index_col='Date')[-length:]
            dummy_df['Open'] = y_pred
            i_ds = self.inverse_preprocess(dummy_df)
            return i_ds[:,0]
        except Exception as e:
            print(e)
            return None
        
        



In [3]:
rnn_model = AI_Model()

In [4]:
rnn_model.fetch_latest_data()

  df.index += _pd.TimedeltaIndex(dst_error_hours, 'h')
[*********************100%%**********************]  1 of 1 completed


In [31]:
rnn_model.data.iloc[-20:]['Open'].to_json("till_now.json")

In [5]:
preprocessed_data = rnn_model.preprocess_data(rnn_model.data)

  self._psar[i] = high2


In [None]:
train_df = preprocessed_data[:'2024-04-20']
val_df = preprocessed_data['2023-12-31':'2024-04-01']
test_df = preprocessed_data['2024-04-01':]

train_ds = rnn_model.ahead_timeseries_from_array(train_df)
val_ds = rnn_model.ahead_timeseries_from_array(val_df)
test_ds = rnn_model.ahead_timeseries_from_array(test_df)

In [6]:
full_ds = rnn_model.ahead_timeseries_from_array(preprocessed_data)
rnn_model.train_model(full_ds)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

Epoch 83/100
Epoch 84/100
Epoch 85/100
Epoch 86/100
Epoch 87/100


<keras.callbacks.History at 0x1e6e58c0670>

In [11]:
rnn_model.model.save('trained_model-latest.h5')

In [23]:
joblib.dump(rnn_model.scaler, 'scaler-latest.pkl')

['scaler-latest.pkl']

In [12]:
# test_df = preprocessed_data['2024-04-01':]
X = preprocessed_data[-rnn_model.sequence_length:].to_numpy()[np.newaxis,:rnn_model.sequence_length]
y_pred = rnn_model.predict_future_prices(X)
predictions = rnn_model.pred_df_from_dummy_inverse(y_pred[0])



In [37]:
predictions-350

array([22280.78154657, 22261.86345068, 22367.22545753, 22398.21606567,
       22545.60980975, 22504.12041078, 22553.29450248])

In [21]:
X.shape

(1, 21, 92)

In [22]:
import json
with open("x_df.json",'w') as fp:
    json.dump(X.tolist(),fp)

In [None]:
rnn_model.train_model(train_ds,val_ds)

In [None]:
X = test_df.to_numpy()[np.newaxis,:rnn_model.sequence_length]
y_pred = rnn_model.predict_future_prices(X)

In [None]:
tdf = test_df.copy()
tdf['Open'][:rnn_model.future_days] = y_pred[0]

In [None]:
i_ds = rnn_model.inverse_preprocess(tdf)
i_df = rnn_model.numpy_to_df(i_ds,tdf.index,tdf.columns)
i_test_ds = rnn_model.inverse_preprocess(test_df)
i_test_df = rnn_model.numpy_to_df(i_test_ds,test_df.index,test_df.columns)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
i_df[:rnn_model.future_days]['Open'].plot(label='Predicted', marker='o',style='--')
i_test_df[:rnn_model.future_days]['Open'].plot(label='Actual', marker='o',style='--')
plt.xlabel('Date')
plt.ylabel('Open Price')
plt.title('Comparison between Predicted and Actual Open Prices')
plt.legend()
plt.show()

In [None]:
test_df.to_csv("dummy_for_inverse.csv")

In [None]:
# Save scaler
joblib.dump(rnn_model.scaler, 'scaler.pkl')

### PreTrain load model 

In [None]:
# last_trained_date = datetime.datetime(2024, 2, 1)



In [None]:
pre_nn_model = AI_Model(model_path='trained_model.h5',scaler_path='scaler.pkl')
X = test_df[-pre_nn_model.sequence_length:].to_numpy()[np.newaxis,:pre_nn_model.sequence_length]
y_pred = pre_nn_model.predict_future_prices(X)
predictions = pre_nn_model.pred_df_from_dummy_inverse(y_pred[0])

In [None]:
import seaborn as sn

actual = pre_nn_model.pred_df_from_dummy_inverse(test_df['Open'][-7:].to_numpy())
sn.lineplot(predictions)
sn.lineplot(actual)

### with with fine tuning

In [None]:
base_model = AI_Model(model_path='trained_model.h5',scaler_path='scaler.pkl')
base_model = AI_Model()

# X = test_df[-pre_nn_model.sequence_length:].to_numpy()[np.newaxis,:pre_nn_model.sequence_length]
# y_pred = pre_nn_model.predict_future_prices(X)
# predictions = pre_nn_model.pred_df_from_dummy_inverse(y_pred[0])

In [None]:
base_model.fetch_latest_data()

In [None]:
base_model.model.compile(loss=keras.losses.Hinge(),optimizer='adam',metrics=['mae'])
base_model.model.summary()

In [None]:
preprocess = base_model.preprocess_data(base_model.data)
train_ds = base_model.ahead_timeseries_from_array(preprocess)


In [None]:
base_model.train_model(train_ds,val_ds)

In [None]:
pd.DataFrame(base_model.hist.history).plot()

In [None]:
X = preprocess[-base_model.sequence_length:].to_numpy()[np.newaxis,:pre_nn_model.sequence_length]
# y_pred = base_model.predict_future_prices(X)
predictions= base_model.predict_future_prices(X)


In [None]:
import seaborn as sns 

actual = base_model.pred_df_from_dummy_inverse(test_df['Open'][-7:].to_numpy())
preds = pre_nn_model.pred_df_from_dummy_inverse(predictions[0])

# Convert preds and actual to pandas Series objects
preds_series = pd.Series(preds, index=range(len(preds)))
actual_series = pd.Series(actual, index=range(len(actual)))

# Plot the predicted and actual prices
plt.figure(figsize=(10, 6))

# Plot the predicted prices with custom style
sns.lineplot(data=preds_series, label='Predicted', marker='o', linestyle='--')

# Plot the actual prices with custom style
sns.lineplot(data=actual_series, label='Actual', marker='o', linestyle='--')

# Add annotations for maximum and minimum predicted prices
max_pred_price = preds_series.max()
min_pred_price = preds_series.min()

max_x, max_y = preds_series.idxmax(), max_pred_price
min_x, min_y = preds_series.idxmin(), min_pred_price

# Adjust annotation position if it exceeds y-axis limits
if max_y > plt.gca().get_ylim()[1]:
    max_y = plt.gca().get_ylim()[1]
if min_y < plt.gca().get_ylim()[0]:
    min_y = plt.gca().get_ylim()[0]

plt.annotate(f'Max Predicted Price: {max_pred_price:.2f}', xy=(max_x, max_y-40), xytext=(max_x+.1, max_y - 150),
             arrowprops=dict(facecolor='green', shrink=0.05))

plt.annotate(f'Min Predicted Price: {min_pred_price:.2f}', xy=(min_x, min_y+40), xytext=(min_x-.1, min_y + 140),
             arrowprops=dict(facecolor='red', shrink=0.1))

# Set plot labels and title
plt.xlabel('Date')
plt.ylabel('Price')
plt.title('Comparison between Predicted and Actual Prices')

# Show legend
plt.legend()

# Show plot
plt.show()

In [None]:
preds.argmax()

In [None]:
pip install optionlab --user

In [None]:
from optionlab import run_strategy

yield_rate = 7.179
inflation = 4.85

inputs_data = {
    'country':'India',
    "stock_price": 22519,
    "start_date": "2024-04-12",
    "target_date": "2024-04-18",
    "volatility": 0.1153,
    "interest_rate": 0.0002,
    "min_stock": 22068.1,
    "max_stock": 23460,
    "strategy": [
        {"type": "call", "strike": 22800, "premium": 34.80, "n": 50, "action": "sell"},
        {"type": "put", "strike": 22150, "premium": 15.20, "n": 50, "action": "sell"},
        
    ],
}

out = run_strategy(inputs_data)


In [None]:
print("Probability of Profit (PoP): %.1f%%" % (out.probability_of_profit * 100.0)) # 74.5%, according to the calculations

In [None]:
out.strategy_cost,out.minimum_return_in_the_domain,out.maximum_return_in_the_domain,out.in_the_money_probability