#Configuration

Model file (lstm_model_month_deep_15.pkl) is located in Google Drive.

In [1]:
# if True then model will not be trained and predictions won't be done.
monitor_pred = True

root = "/content/drive/MyDrive/"

# model name
MODEL = {
    "Deep_30_15": {"path": root + "lstm_model_month_deep_15.pkl",
                   "period": 30,
                   "n_steps": 30,
                   "n_ahead": 15,
                   "update": True
                   }
}

#Data preprocessing modules

In [2]:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta


def get_usd_inr_history(days):
    """
    Fetches data from yahoo finance.
    :param days: Integer that fetches history for past n number of days.
    :return: raw exchange rate data as dataframe
    """
    yf_inr = yf.Ticker("INR=X")
    data = yf_inr.history("{}d".format(days))
    return data


def preprocess(df):
    """
    This function does preprocessing of raw data.
    :param df: The raw dataframe df fetched directly from yahoo finance.
    :return: A preprocessed dataframe.
    """
    # reset index (Converting the date index to column)
    df = df.reset_index()

    # Convert 'date' column to datetime format
    df['Date'] = pd.to_datetime(df['Date'])

    # Create a new column with the month number
    df['Month'] = df['Date'].dt.month

    # Drop the date and other unwanted columns column
    df = df.drop(columns=['Date', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits'])

    # Replace null with the previous value
    df = df.fillna(method='ffill')

    # round to 2 decimal places
    df_round = df.round(2)

    return df_round


def post_processing(df):
    """
    This function expects that programmer has use preprocess on df
    otherwise it will raise an error.
    :param df: preprocessed dataframe using function preprocess()
    :return: return post processed df with seasonally adjusted data and month
    """
    required_cols = ['Open', 'Month']

    if len(df.columns) != 2:
        raise Exception("Invalid dataframe. First pre-process using 'preprocess' function.")

    for col in required_cols:
        if col not in df.columns:
            raise Exception("Invalid dataframe. First pre-process using 'preprocess' function.")

    # 30 day differencing
    period = 30

    # creating a new column Adj with seasonal adjustments
    df["Adj"] = df["Open"].diff(period)

    # drop the "Open" columns
    df = df.drop("Open", axis=1)

    # omitting the first 30 nulls in Adj column
    df = df[30:]

    return df


def prepare_data(data, n_steps, n_ahead=1):
    """
    This function prepares post processed data for feeding it into the LSTM network.
    :param data: Post processed dataframe.
    :param n_steps: Number of timesteps used to training
    :param n_ahead: Number of timesteps to be predicted
    :return: Input X and Label y for the LSTM network
    """
    X = np.array([data[i:i + n_steps] for i in range(len(data) - n_steps + 1)])[:-n_ahead]
    y = np.array([data[n_steps:, 1][i:i + n_ahead] for i in range(len(data[n_steps:]) - n_ahead + 1)])
    return X, y


def create_test_input(df, n_steps, n_ahead):
    """
    This function creates an input for LSTM using the last 30 days df data
    so that LSTM can predict rates for next 15 days.
    :param df: Post processed dataframe.
    :return: Test input numpy array.
    """
    # taking the last period as test for which no label exists
    X = np.array([df[i:i + n_steps] for i in range(len(df) - n_steps + 1)])[-n_ahead:][-1]

    # reshape it because the result X_test is 2D we need a 3D input for LSTM model
    X = X.reshape(1, X.shape[0], X.shape[1])

    return X


def create_train_test(df, fraction, n_steps, n_ahead):
    """
    This function create training and testing data for training a new model.
    :param df: Post processed data.
    :param fraction: Fraction of df assigned for training.
    :param n_steps: Number of timesteps used to training
    :param n_ahead: Number of timesteps to be predicted
    :return: X_train, y_train, X_test and y_test
    """
    split_idx = int(fraction * df.shape[0])
    train = np.array(df[:split_idx])
    test = np.array(df[split_idx:])

    # Split data into input and output samples
    X_train, y_train = prepare_data(train, n_steps, n_ahead)
    X_test, y_test = prepare_data(test, n_steps, n_ahead)

    return X_train, y_train, X_test, y_test


def retrieve(history, yhat, n_steps, n_ahead, period, split_idx=None, y_test=None):
    """
    This function retrieves the original values predicted that is reconstructed from the
    seasonally adjusted predictions.
    :param history: The complete "Open" price.
    :param yhat: Predictions from the model.
    :param n_steps: Number of timesteps used to training.
    :param n_ahead: Number of timesteps to be predicted.
    :param period: Seasonal differencing period.
    :param split_idx: index at which the fraction of training dataset ends.
    :param y_test: the actual values that is expected to be predicted.
    :return: Original INR value of USD.
    """
    if split_idx:
        if y_test:
            lag_arr = history[:-period][split_idx:][n_steps:]
            lag_matrix = np.array([lag_arr[i:i + n_ahead] for i in range(len(lag_arr) - n_ahead + 1)])
            y_test2 = y_test + lag_matrix
        else:
            raise Exception("y_test not given")
    else:
        lag_arr = history[:-period][-n_steps:]
        lag_matrix = np.array([lag_arr[i:i + n_ahead] for i in range(len(lag_arr) - n_ahead + 1)])[-1]
        # Just passing None to y_test2
        y_test2 = y_test

    yhat2 = yhat + lag_matrix

    return yhat2, y_test2

def next_working_day(date_str, date_format):
    # Convert the date string to a datetime object
    date_obj = datetime.strptime(date_str, date_format).date()

    # Define a function to check if a date is a business day
    def is_business_day(date):
        return date.weekday() < 5  # Monday to Friday are business days (0-4)

    # Add days until a business day is found
    next_day = date_obj + timedelta(days=1)

    while not is_business_day(next_day):
        print(next_day)
        next_day += timedelta(days=1)

    # Convert the next working day back to the string format
    return next_day.strftime(date_format)

#Model training
Along with training model, the weights and corresponding history, 15-day predictions are saved in the Google drive.

In [4]:
import pandas as pd
import os
import pickle
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense


if not monitor_pred:
  # fetching configurations
  print("Fetching configurations...")
  model_name = "Deep_30_15"
  n_steps = MODEL[model_name]["n_steps"]
  n_ahead = MODEL[model_name]["n_ahead"]
  period = MODEL[model_name]["period"]
  model_path = MODEL[model_name]["path"]

  # calculating past days
  days = round(n_ahead/2)*n_steps + n_ahead

  # getting historical exchange rates from yahoo finance
  print("Loading {} days historical data from yahoo finance...".format(days))
  data = get_usd_inr_history(days)

  # save the loaded data to google drive
  print("Saving loaded data to google drive as history.csv...")
  data.to_csv(root + "history.csv")

  # processing raw data
  print("Processing the raw data...")
  data_processed = preprocess(data)
  data_pst_processed = np.array(post_processing(data_processed))

  # prepare data for feeding to LSTM network
  print("Modifying data for feeding into LSTM model...")
  X, y = prepare_data(data=data_pst_processed, n_steps=n_steps, n_ahead=n_ahead)

  # create test input (Note: it has no label to compare, we just need an input to feed to LSTM)
  print("Creating test input for LSTM model...")
  X_test = create_test_input(df=data_pst_processed, n_steps=n_steps, n_ahead=n_ahead)

  # load the model
  if os.path.isfile(model_path):
      print("Loading exiting model: {}".format(model_path))

      with open(model_path, "rb") as file:
          model = pickle.load(file)

      # if model requires an update then train it on latest data
      if MODEL[model_name]["update"]:
          print("Updating model parameters...")
          model.fit(X, y, epochs=50, verbose=1)

          print("Save the updated model...")
          with open(model_path, "wb") as file:
            pickle.dump(model, file)


      # predict seasonally adjusted values
      print("Making predictions...")
      yhat = model.predict(X_test, verbose=0)

      # retrieve the values
      print("Reversing seasonal adjustments...")
      yhat2, _ = retrieve(history=data["Open"], yhat=yhat,
                      n_steps=n_steps, n_ahead=n_ahead, period=period,
                      split_idx=None, y_test=None)

      # creates future dates (We omit saturdays and sundays. Only need business days.)
      print("Generating forecast dates...")
      forecast_dates = [np.datetime64('today') + i for i in range(1, n_ahead+1) if
                        np.is_busday(np.datetime64('today') + i)]

      while len(forecast_dates) < n_ahead:
          if np.is_busday(forecast_dates[-1] + 1):
              forecast_dates.append(forecast_dates[-1] + 1)
          else:
              # this means that the next day is Saturday
              forecast_dates.append(forecast_dates[-1] + 3)

      forecast_dates = np.array(forecast_dates)

      # Zip these dates and yhat2 (retrieved values) together
      print("Zipping forecasts and dates...")
      yhat2 = yhat2.flatten()
      forecast_dates = forecast_dates.flatten()
      pred = {"Date": forecast_dates,
              "Open": yhat2}

      print("Creating a dataframe and saving it...")
      final_df = pd.DataFrame(pred)

      # save the df
      final_df.to_csv(root + "prediction.csv")

      print("Program finished successfully!")
else:
  print("Monitoring past prediction mode...")
  # get the history
  history_path = root + "/" + "history.csv"
  hist = pd.read_csv(history_path)
  latest_date_str = hist["Date"].iloc[-1].split()[0]
  latest_date_str_format = '%Y-%m-%d'
  latest_day = datetime.strptime(latest_date_str, latest_date_str_format)

  # get the set of data from the last day till today
  yf_inr = yf.Ticker("INR=X")
  new_data = yf_inr.history(start=next_working_day(latest_date_str, latest_date_str_format))

  new_data.head()

  # keep only Date and Open columns
  new_data = new_data.drop(columns=["High", "Low", "Close", "Volume", "Dividends", "Stock Splits"])

  # save the data in Google drive
  new_data.to_csv(root + "/compare.csv")

  print("Latest Open prices saved to Google Drive. (file name is compare.csv)")
  print("Program finished successfully!")

Monitoring past prediction mode...
Latest Open prices saved to Google Drive. (file name is compare.csv)
Program finished successfully!
