# LSTM for CPI prediction

Imports

In [None]:
# Standard library imports
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

# Third-party library imports
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

Data preparation

In [None]:
# Load dataset
data = pd.read_csv('Monthly_data.csv')

# Preprocess data
# Remove unnecessary columns and handle missing values
data.drop(columns=['Year and Month'], inplace=True)  # Drop unwanted column
data.dropna(inplace=True)  # Remove rows with missing values

# Setup for prediction task
lag = 1  # Number of months ahead to predict
column_name = f'CPI {lag} Month{"s" if lag > 1 else ""} ahead'
data[column_name] = data['CPI'].shift(-lag)  # Target variable for prediction

# Select features and target for modeling
# Excludes 'Year and Month' from features and uses dynamically selected last column as the target
features = data.columns[1:-1]  # Features selection
target = data.columns[-1]  # Target variable

# Initialize and apply MinMaxScaler
scaler = MinMaxScaler()
data[features] = scaler.fit_transform(data[features])  # Scale features
data[[target]] = scaler.fit_transform(data[[target]])  # Scale target variable separately

data 

Data used to make predictions

In [None]:
# Define parameters for the sequence and prediction
sequence_length = 20  # The length of input sequences for the model
test_split = 0.2  # Fraction of the data to be used as the test set

# Copy the original DataFrame to preserve the original data
data_for_prediction = data.copy()

# Select the final sequence of data for making future predictions
# This dataset will be used to predict the future value(s) outside of the available dataset
data_for_prediction = data_for_prediction.iloc[-( sequence_length + lag - 1):, :]

# Prepare the dataset for training and testing the model
# Exclude the last part used for prediction to ensure we are only working with historical data
data_for_model = data.iloc[:-lag , :]

# Split the historical data into training and testing datasets
# Determine the index to split the data
test_data_size = int(len(data_for_model) * test_split)

# Split the data into training and testing sets
train_data = data_for_model[:-test_data_size]
test_data = data_for_model[-test_data_size:]

# Display the prepared training data
print("Training data shape:", train_data.shape)
print("Testing data shape:", test_data.shape)

Create the sequences from the features and target

In [None]:
def create_sequences(data, input_columns, target_column, sequence_length):
    """Create sequences of input features and target values."""
    X, y = [], []
    for i in range(len(data) - sequence_length):
        # Extract the sequence of input features
        X.append(data[input_columns].iloc[i:i+sequence_length].values)
        # Extract the target value following the sequence
        y.append(data[target_column].iloc[i+sequence_length])
    
    return np.array(X), np.array(y)

# Specify the input and target columns
input_columns = features 
target_column = target  

# Generate sequences for the LSTM model
X_train, y_train = create_sequences(train_data, input_columns, target_column, sequence_length)
X_test, y_test = create_sequences(test_data, input_columns, target_column, sequence_length)

# Output the shape of the training data sequences for verification
print("Training data sequences shape:", X_train.shape)
print("Test data sequences shape:", X_test.shape)

Model 1 - One LSTM layer

In [None]:
model = Sequential([
    LSTM(50, activation='tanh', recurrent_activation='sigmoid',
         input_shape=(X_train.shape[1], X_train.shape[2]),
         dropout=0.05, recurrent_dropout=0.05),
    Dense(1)
])

# Trying the Adam optimizer
model.compile(optimizer='adam', loss='mse')

model.summary()

# Include validation split for training monitoring
history = model.fit(X_train, y_train, epochs=500, validation_split=0.05, batch_size=32)

train_loss = model.evaluate(X_train, y_train, verbose=0)
test_loss = model.evaluate(X_test, y_test, verbose=0)

print(f'Training Loss: {train_loss}')
print(f'Testing Loss: {test_loss}')


Model 2 - Two LSTM layers

In [None]:
'''
# Model architecture

model = Sequential([
    # First LSTM layer returns sequences to feed into the next LSTM layer
    LSTM(20, activation='tanh', input_shape=(X_test.shape[1], X_test.shape[2]), return_sequences=True),
    # Second LSTM layer only needs to return the last output
    LSTM(2, activation='relu'),
    # Followed by a Dense layer that makes the final prediction
    Dense(1)
])

model.compile(optimizer='sgd', loss='mse')

# Model summary
model.summary()

# Fit the model
history = model.fit(X_train, y_train, epochs=100, validation_split=0.2, batch_size=32)

train_loss = model.evaluate(X_train, y_train, verbose=0)
test_loss = model.evaluate(X_test, y_test, verbose=0)

print(f'Training Loss: {train_loss}')
print(f'Testing Loss: {test_loss}')
'''

In [None]:
n_iterations = 3
train_losses = []
test_losses = []

for i in range(n_iterations):
    print(f"Running iteration: {i+1}/{n_iterations}")
    
    # Re-initialize the model at each iteration
    model = Sequential([
        LSTM(200, activation='tanh', recurrent_activation='sigmoid',
             input_shape=(X_train.shape[1], X_train.shape[2]),
             dropout=0.1, recurrent_dropout=0.1),
        Dense(1)
    ])
    
    model.compile(optimizer='adam', loss='mse')
    
    # Fit the model
    model.fit(X_train, y_train, epochs=200, validation_split=0.0, batch_size=32, verbose=0)  # Set verbose=0 for less output
    
    # Evaluate the model
    train_loss = model.evaluate(X_train, y_train, verbose=0)
    test_loss = model.evaluate(X_test, y_test, verbose=0)
    
    # Store losses
    train_losses.append(train_loss)
    test_losses.append(test_loss)

# Calculate the average losses
average_train_loss = np.mean(train_losses)
average_test_loss = np.mean(test_losses)

print(f'Average Training Loss: {average_train_loss}')
print(f'Average Testing Loss: {average_test_loss}')


Visualise training loss vs validation loss

In [None]:
import matplotlib.pyplot as plt

# Plot training & validation loss values
plt.plot(history.history['loss'], label='Training loss')
plt.plot(history.history['val_loss'], label='Validation loss')
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(loc='upper right')
plt.show()


Predictions

In [None]:
X_full, y_full = create_sequences(data_for_model, input_columns, target_column, sequence_length)

model = Sequential([
    LSTM(50, activation='tanh', recurrent_activation='sigmoid',
         input_shape=(X_train.shape[1], X_train.shape[2]),
         dropout=0.03, recurrent_dropout=0.03),
    Dense(1)
])

model.compile(optimizer='adam', loss='mse')

model.fit(X_full, y_full, epochs=200, validation_split=0.0, batch_size=32, verbose=0)

def create_sequences_X(data, input_columns, sequence_length):
    X= []
    for i in range(lag):
        X.append(data[input_columns].iloc[i:i+sequence_length].values)
    
    return np.array(X)

input_columns = features

X_prediction = create_sequences_X(data_for_prediction, input_columns, sequence_length)

predicted_cpi = model.predict(X_prediction)

predicted_cpi_inverted = scaler.inverse_transform(predicted_cpi)

predicted_cpi_inverted

Predictions averaged

In [None]:
# Store all predicted CPIs
all_predicted_cpi_inverted = []

for run in range(10):
    X_full, y_full = create_sequences(data_for_model, input_columns, target_column, sequence_length)

    model = Sequential([
        LSTM(200, activation='tanh', recurrent_activation='sigmoid',
             input_shape=(X_full.shape[1], X_full.shape[2]),
             dropout=0.1, recurrent_dropout=0.1),
        Dense(1)
    ])

    # Compile and fit the model
    model.compile(optimizer='adam', loss='mse')
    model.fit(X_full, y_full, epochs=200, validation_split=0.0, batch_size=32, verbose=0)

    # Assuming the create_sequences_X function and necessary inputs are defined correctly
    X_prediction = create_sequences_X(data_for_prediction, input_columns, sequence_length)
    predicted_cpi = model.predict(X_prediction)
    predicted_cpi_inverted = scaler.inverse_transform(predicted_cpi)

    all_predicted_cpi_inverted.append(predicted_cpi_inverted)

# Convert the list of numpy arrays to a 3D numpy array for easier averaging
all_predicted_cpi_inverted_array = np.array(all_predicted_cpi_inverted)

# Calculate the average across the first dimension (runs)
average_predicted_cpi_inverted = np.mean(all_predicted_cpi_inverted_array, axis=0)

print("Average of CPI Inverted:", average_predicted_cpi_inverted)