## Multivariate LSTM

# Import packages

In [1]:
import requests
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
import pandas as pd

from pandas_datareader import data as wb
from sklearn.svm import SVR
from sklearn.preprocessing import StandardScaler
from sklearn.utils import resample
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import LSTM
from sklearn.metrics import mean_squared_error
from imblearn.under_sampling import RandomUnderSampler

%matplotlib inline

# Set figure parameters
plt.rcParams['figure.figsize'] = [10, 7.5]


ModuleNotFoundError: No module named 'pandas_datareader'

# Input data

## FX

In [None]:
yf.pdr_override()

data = wb.get_data_yahoo('VND=x', start = '2003-01-01', end = '2024-12-31', interval = '1mo')

# Obtain latest vnd to usd rate
df_conv = wb.get_data_yahoo('VND=x', start = '2003-01-01', end = '2024-12-31')
data1 = 1/df_conv
vnd_to_usd = data1.iloc[-1,0]

# Clean up statistics 
df = pd.DataFrame(data['Adj Close'])
df.rename(columns = {'Adj Close':'USDVND'}, inplace = True)

# change the datetime format
usdvnd_up = pd.DataFrame(df['USDVND'].resample('YS').mean())
usdvnd_up.index = pd.to_datetime(usdvnd_up.index, format = '%Y')

## IMF data

In [None]:
def imf_data(endpoint):
    url = f"https://www.imf.org/external/datamapper/api/v1/{endpoint}/VNM"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        values = data['values'][endpoint]['VNM']
        df = pd.DataFrame(list(values.items()), columns=['Year', endpoint.replace('/', ' ')])
        df['Year'] = pd.to_numeric(df['Year'])
        df = df[(df['Year'] >= 2003) & (df['Year'] <= 2024)]
        df.set_index('Year', inplace=True)
        return df

# Define endpoints for each indicator
endpoints = {'NGDPDPC': 'GDP per Capita (Current Prices)',
             'BCA': 'Current Account Balance',
             'PCPIPCH': 'Average CPI'}

# Fetch data for each endpoint and concatenate DataFrames
dfs = {title: imf_data(endpoint) for endpoint, title in endpoints.items()}

In [None]:
# Rename columns
imfdata = pd.concat(dfs.values(), axis=1)
imfdata.columns = endpoints.values()
imfdata.index = pd.to_datetime(imfdata.index, format = '%Y')

# Prepare data

In [None]:
all_data = pd.merge(usdvnd_up,imfdata, left_index=True, right_index=True)

In [None]:
# Upsample the data using linear interp
all_data = all_data.resample('MS').asfreq().interpolate(method='linear')

In [None]:
dataset = all_data.values
dataset = dataset.astype('float32')

In [None]:
# normalize the dataset
scaler = MinMaxScaler(feature_range=(0, 1))
dataset = scaler.fit_transform(dataset)

# separate into train and test data
train_size = int(len(dataset) * 0.66)
test_size = len(dataset) - train_size 
train, test = dataset[0:train_size, :], dataset[train_size:len(dataset), :]

In [None]:
# convert an array of values into a dataset matrix
def create_dataset(dataset, look_back= 1):
    dataX, dataY = [], []
    for i in range(len(dataset)-look_back-1):
        a = dataset[i:(i+look_back), :]
        dataX.append(a)
        dataY.append(dataset[i + look_back, 0])
    return np.array(dataX), np.array(dataY)

look_back = 5
trainX, trainY = create_dataset(train, look_back)
testX, testY = create_dataset(test, look_back)

In [None]:
# reshape input to be [samples, time steps, features]
n = 4 # number of columns
trainX = np.reshape(trainX, (trainX.shape[0], look_back, n))
testX = np.reshape(testX, (testX.shape[0], look_back, n))

# Build model

In [None]:
# create and fit the LSTM network
model = Sequential()
model.add(LSTM(64, input_shape=(look_back, n)))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(trainX, trainY, epochs=300, batch_size=16, verbose=1)

# Save the trained model
model.save('trained_lstm_model.h5')

trainPredict = model.predict(trainX)
testPredict = model.predict(testX)

trainPredict = np.squeeze(trainPredict)
testPredict = np.squeeze(testPredict)

In [None]:
# Transform data
def inverse_transform(arr):
    extended = np.zeros((len(arr), n))
    extended[:, 0] = arr
    return scaler.inverse_transform(extended)[:, 0]


trainPredict = inverse_transform(trainPredict)
testPredict = inverse_transform(testPredict)
trainY = inverse_transform(trainY)
testY = inverse_transform(testY)

In [None]:
# shift predictions up by one
testPredict = np.delete(testPredict, -1)
testY = np.delete(testY, 0)

to_row = len(all_data) - len(testY)
date_range = all_data[to_row:].index
plt.plot(date_range, testPredict, color = 'blue', marker = 'o', linestyle = 'dashed', label = 'Predicted')
plt.plot(date_range, testY, color = 'red', label = 'Actual')
plt.legend()
plt.show()

testScore = np.sqrt(mean_squared_error(testY, testPredict))
testScore = testScore*vnd_to_usd

print('Test Score: %.6f RMSE' % (testScore))

# Forecast 

In [None]:
from tensorflow.keras.models import load_model

def generate_forecasts(model_path, test_data, forecast_steps):
    """
    Generate forecasts beyond the test data using a trained LSTM model.
    
    Args:
    - model_path: Path to the trained LSTM model file.
    - test_data: Test data used for model evaluation.
    - forecast_steps: Number of future time steps to forecast.
    
    Returns:
    - forecasts: Array containing the forecasted values.
    """
    # Load the trained LSTM model
    model = load_model(model_path)
    
    # Prepare the test data for forecasting
    current_data = np.copy(test_data)
    
    # Generate forecasts
    forecasts = []
    for _ in range(forecast_steps):
        # Predict the next time step
        next_step_prediction = model.predict(current_data.reshape(1, *current_data.shape))
        
        # Append the prediction to the forecasts
        forecasts.append(next_step_prediction[0, 0])
        
        # Update current data by removing the oldest time step and appending the latest prediction
        current_data = np.roll(current_data, -1)
        current_data[-1] = next_step_prediction
    
    return np.array(forecasts)

# Example usage:
# Replace 'trained_lstm_model.h5' with the path to your trained LSTM model file
# Replace 'test_data' with your actual test data
# Replace 'forecast_steps' with the number of future time steps to forecast
# forecasts = generate_forecasts('trained_lstm_model.h5', test_data, forecast_steps)


In [None]:
test_data = testX[-1]
forecast_steps = 10

In [None]:
forecasts = generate_forecasts('trained_lstm_model.h5', test_data, forecast_steps)

In [None]:
forecasts = np.squeeze(forecasts)

# Transform data
def inverse_transform(arr):
    extended = np.zeros((len(arr), n))
    extended[:, 0] = arr
    return scaler.inverse_transform(extended)[:, 0]

forecasts = inverse_transform(forecasts)

In [None]:
from pandas.tseries.offsets import DateOffset
future_dates = [all_data.index[-1] + DateOffset(months = x) for x in range (0,forecast_steps+1)]
future_dates_df = pd.DataFrame(index = future_dates[1:], columns = df.columns)

In [None]:
plt.plot(date_range, testPredict, color = 'blue', marker = 'o', linestyle = 'dashed', label = 'Validation')
plt.plot(date_range, testY, color = 'red', label = 'Actual')
plt.plot(future_dates_df.index, forecasts, color = 'green', marker = 'o', linestyle = 'dashed', label = 'Forecast')
plt.legend()