In [1]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import LSTM
import numpy as np
import pandas as pd
import re

In [2]:
df = pd.read_csv("Interval_Data.csv")
df_extracted = df
df_extracted.head()

Unnamed: 0,Date,Interval_Price,midpoint
0,2022-07-01,"[135.660004, 139.039993]",137.349998
1,2022-07-05,"[136.929993, 141.610001]",139.269997
2,2022-07-06,"[141.080002, 144.119995]",142.599998
3,2022-07-07,"[143.279999, 146.550003]",144.915001
4,2022-07-08,"[145.0, 147.550003]",146.275002


In [3]:
def str_to_interval(interval_str):
    if not isinstance(interval_str, str):
        return interval_str
    numbers = re.findall(r"[-+]?\d*\.\d+|\d+", interval_str)
    if len(numbers) == 2:
        return pd.Interval(float(numbers[0]), float(numbers[1]))
    return None

df_extracted['Interval_Price'] = df_extracted['Interval_Price'].apply(str_to_interval)
# Remove '[]' and split the interval range into two columns 'Low' and 'High'
# df_extracted[['Low', 'High']] = df_extracted['Interval_Price'].str.strip('[]').str.split('-', expand=True)

df_extracted['Low'] = df_extracted['Interval_Price'].apply(lambda x: x.left)
df_extracted['High'] = df_extracted['Interval_Price'].apply(lambda x: x.right)
df_extracted['Mid'] = df_extracted['Interval_Price'].apply(lambda x: (x.left + x.right) / 2)

# Convert 'Low' and 'High' to numeric, and calculate the 'Mid' from 'Low' and 'High'.
# df_extracted['Low'] = pd.to_numeric(df_extracted['Low'])
# df_extracted['High'] = pd.to_numeric(df_extracted['High'])
# df_extracted['Mid'] = pd.to_numeric(df_extracted['midpoint'])

df_extracted = df_extracted.drop(columns=['Interval_Price','midpoint'])
df_extracted.head()

Unnamed: 0,Date,Low,High,Mid
0,2022-07-01,135.660004,139.039993,137.349998
1,2022-07-05,136.929993,141.610001,139.269997
2,2022-07-06,141.080002,144.119995,142.599998
3,2022-07-07,143.279999,146.550003,144.915001
4,2022-07-08,145.0,147.550003,146.275002


In [4]:
from sklearn.preprocessing import MinMaxScaler
scaler=MinMaxScaler(feature_range=(0,1))
data = df_extracted
scaled_Low=scaler.fit_transform(np.array(data['Low']).reshape(-1,1))

In [5]:
##splitting dataset into train and test split
def split_train_test(data, portion):
    train_length = int(len(data) * portion)
    test_length = len(data) - train_length
    
    train_set = data[:train_length]
    test_set = data[train_length:train_length + test_length]
    
    return train_set, test_set


In [6]:
def generate_data_matrix(data, step=1):
    x_data, y_data = [], []
    
    for idx in range(len(data) - step - 1):
        segment = data[idx:idx + step, 0]
        x_data.append(segment)
        y_data.append(data[idx + step, 0])
        
    return np.array(x_data), np.array(y_data)


In [7]:
from matplotlib import pyplot as plt

def visualize_LSTM_forecasts(original_dataset, normalized_dataset, train_forecast, test_forecast, offset):
    # Prepare train forecasts for visualization
    train_plot_data = np.empty_like(normalized_dataset)
    train_plot_data[:, :] = np.nan
    train_plot_data[offset:len(train_forecast)+offset, :] = train_forecast

    # Prepare test forecasts for visualization
    test_plot_data = np.empty_like(normalized_dataset)
    test_plot_data[:, :] = np.nan
    test_plot_data[len(train_forecast)+(offset*2)+1:len(normalized_dataset)-1, :] = test_forecast

    train_df = pd.DataFrame(train_plot_data, index=original_dataset.index)
    test_df = pd.DataFrame(test_plot_data, index=original_dataset.index)
    actual_df = pd.DataFrame(scaler.inverse_transform(normalized_dataset), index=original_dataset.index)
    
    # Visualize the results
    plt.figure(figsize=(18,10))
    plt.plot(actual_df, label="Actual Data")
    plt.plot(train_df, label="Training Forecast")
    plt.plot(test_df, label="Testing Forecast")
    plt.legend(loc='upper left')
    plt.title('LSTM Forecast Visualization')
    plt.show()


In [8]:
def visualize_LSTM_forecasts_30days(original_dataset, normalized_dataset, testing_dataset, offset):
    # Initialize input for prediction
    prediction_input = testing_dataset[len(testing_dataset) - (offset + 30):len(testing_dataset) - 30].reshape(1, -1)
    temp_list = list(prediction_input[0])

    predictions = []
    i = 0
    while i < 30:
        if len(temp_list) > offset:
            prediction_input = np.array(temp_list[1:])
            prediction_input = prediction_input.reshape(1, -1, 1)
            
            predicted_value = model.predict(prediction_input, verbose=0)
            temp_list.extend(predicted_value[0])
            temp_list = temp_list[1:]
            predictions.extend(predicted_value)
            i += 1
        else:
            prediction_input = prediction_input.reshape(1, offset, 1)
            predicted_value = model.predict(prediction_input, verbose=0)
            temp_list.extend(predicted_value[0])
            predictions.extend(predicted_value)
            i += 1

    # Prepare data for plotting
    dataset_df = pd.DataFrame(scaler.inverse_transform(normalized_dataset), index=original_dataset.index)
    last_30_days_actual = dataset_df.iloc[-30:]
    predicted_30_days = pd.DataFrame(scaler.inverse_transform(predictions), index=original_dataset[-30:].index)
    
    # Plot results
    plt.figure(figsize=(20, 10))
    plt.plot(dataset_df[:-30])
    plt.plot(last_30_days_actual)
    plt.plot(predicted_30_days, label='30 Days Forecast')
    plt.legend(loc='upper left')
    plt.title('LSTM 30 Days Forecast Visualization')
    plt.show()