In [1]:
!pip install pyts
!pip install keras-tuner
import os
import sys
import torch
import random
import numpy as np
import pandas as pd
from uu import Error
import torch.nn as nn
from tqdm import trange
from torch import optim
import keras_tuner as kt
from datetime import datetime
from google.colab import drive
from scipy.fft import fft, ifft
import matplotlib.pyplot as plt
import torch.nn.functional as F
from keras.optimizers import Adam
from keras.models import Sequential
from pyts.image import GramianAngularField
from keras_tuner.tuners import RandomSearch
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.model_selection import train_test_split
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.layers import InputLayer, LSTM, BatchNormalization, Conv2D, Dense, Flatten, MaxPooling2D, Dropout, Reshape, GRU

drive.mount('/content/drive/')
project = '/content/drive/My Drive/final_project'
sys.path.append(project)

from RICODataset import *
from aux_functions import *

def plot_image(data, title, save_path, index):
    """
    Function to visualize and save a single FFT image.
    :param data: 2D or 3D array representing the images.
    :param title: Title of the plot.
    :param save_path: Path to save the image.
    :param index: Index of the image to be plotted.
    """
    plt.figure(figsize=(6, 6))
    if data.ndim == 4 and data.shape[3] == 1:  # Single-channel 3D data
        plt.imshow(data[index, :, :, 0], cmap='gray', origin='lower')
    elif data.ndim == 3:  # data is 2D
        plt.imshow(data[index], cmap='gray', origin='lower')

    plt.title(title)
    f = data.shape[2]
    time = datetime.now().strftime("%d_%H%M")
    plt.savefig(os.path.join(save_path, f'images/img2img_cnn/F{f}_Images'))


def transform_to_image(data):
    fft_data = fft(data, axis=1)
    image_data = np.abs(fft_data).reshape(data.shape[0], data.shape[1], -1, 1)
    return image_data

# Function to display images
def display_images(images):
    fig, axes = plt.subplots(1, len(images), figsize=(15, 5))
    for img, ax in zip(images, axes):
        ax.imshow(img.squeeze(), cmap='gray')
        ax.axis('on')
    fig.suptitle('Input FFT Images', fontsize=12)
    plt.savefig(os.path.join(project,f'images/img2img_cnn/F6_Images.png'))
    plt.show()

def create_sequences(data, input_length, label_length):
    sequences = []
    labels = []
    for i in range(len(data) - input_length - label_length):
        sequences.append(data[i:(i + input_length)])  # Input sequence
        labels.append(data[(i + input_length):(i + input_length + label_length)])  # Label sequence
    return np.array(sequences), np.array(labels)


def create_cnn_model(input_shape, output_size):
    model = Sequential()
    model.add(Conv2D(64, kernel_size=3, activation='relu', input_shape=input_shape))
    model.add(Conv2D(32, kernel_size=3, activation='relu'))
    model.add(Flatten())
    model.add(Dense(output_size))
    return model

def plot_predictions_with_input(input_data, actual, predicted):
  plt.figure(figsize=(12, 6))

  # Select a random index from the test set
  idx = np.random.randint(0, actual.shape[0])
  print(f'input.shape: {input_data.shape}')

  #Inverse tranasforming the values
  scaler = full_dataset.get_scaler()
  input_inv = scaler.inverse_transform(input_data.reshape(-1, input_data.shape[-1])).reshape(input_data.shape)
  actual_inv = scaler.inverse_transform(actual.reshape(-1, actual.shape[-1])).reshape(actual.shape)
  predicted_inv = scaler.inverse_transform(predicted.reshape(-1, predicted.shape[-1])).reshape(predicted.shape)


  # Inverse FFT to transform input data back to time series
  time_series_input = ifft(input_inv[idx]).squeeze().real
  print(f'input.shape: {actual.shape}')

  # Prepare indices for plotting
  input_indices = np.arange(len(time_series_input))
  forecast_indices = np.arange(len(time_series_input), len(time_series_input) + len(predicted[idx]))

  plt.plot(input_indices, time_series_input[:,idx], label='Input')
  plt.plot(forecast_indices, actual_inv[idx,:,idx], label='Actual values', color='green')
  plt.plot(forecast_indices, predicted_inv[idx,:,idx], label='Predictions', color='red')
  plt.title(f'Prediction for Sequence {idx}')
  plt.xlabel('Point in sequence')
  plt.ylabel('Value')
  plt.legend(loc='best')
  plt.savefig(os.path.join(project,f'images/img2img_cnn/F6_{mse:.4f}.png'))
  plt.show()


Collecting pyts
  Downloading pyts-0.13.0-py3-none-any.whl (2.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pyts
Successfully installed pyts-0.13.0
Collecting keras-tuner
  Downloading keras_tuner-1.4.6-py3-none-any.whl (128 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m128.9/128.9 kB[0m [31m857.9 kB/s[0m eta [36m0:00:00[0m
Collecting kt-legacy (from keras-tuner)
  Downloading kt_legacy-1.0.5-py3-none-any.whl (9.6 kB)
Installing collected packages: kt-legacy, keras-tuner
Successfully installed keras-tuner-1.4.6 kt-legacy-1.0.5
Mounted at /content/drive/
Mounted at /content/drive


In [2]:
df_1 = pd.read_hdf(os.path.join(project, 'src/RICO1_Dataset.hdf'), key='all')

full_dataset = RICOFullDataset(data=df_1,
                               ori_seq_len=240,
                               tgt_seq_len=240,
                               stride=1,
                               channels=['B.RTD1'],
                               standardize=True)

dataset_1 = RICODataset(
    full_dataset,
    kind="full",
    get_every=10
)



In [9]:
# 1. curing data [DONE] return: data w/ shape (102, 24, 1)
  # load and sample 10min interval
rtd1 = pd.read_csv(os.path.join(project, 'src/rico1_full.tsv'), header=0, usecols=range(1,25))
#print(rtd1)
rtd1 = rtd1.values
#print(rtd1) # (N=102, L=24)
train, test = train_test_split(rtd1, test_size=0.2, random_state=42, shuffle=True)
print(f'train: {train.shape}, test: {test.shape}')

# X_train[0]->y_train[0] , X_train[1]->y_train[1] however, y_train[0] == X_train[1]
# grabbing len-forecast number of values as evaluation is based on "new predictions approach"
# shape (seq_len, batch_size, input_size)
rtd1 = torch.from_numpy(rtd1.reshape(rtd1.shape[0], rtd1.shape[1],1))
print(f'rtd1.shape:{rtd1.shape}')

forecast_horizon = 6
inputs, targets = rtd1[:,:-forecast_horizon,:].permute(1,0,2).float(), rtd1[:,-forecast_horizon:,:].permute(1,0,2).float()

print(f"Inputs shape :", inputs.shape)
print(f"Targets shape :", targets.shape)


train: (81, 24), test: (21, 24)
rtd1.shape:torch.Size([102, 24, 1])
Inputs shape : torch.Size([18, 102, 1])
Targets shape : torch.Size([6, 102, 1])


In [13]:
forecast = 6

x, y = create_sequences(rtd1, rtd1.shape[1]-forecast, forecast)

# Transform to FFT image
x_img = transform_to_image(x)

x_train_img, x_test_img, y_train, y_test = train_test_split(x_img, y, test_size=0.2, random_state=42)

# Display the first 5 FFT transformed images
display_images(x_img[:5])

model = create_cnn_model(x_train_img.shape[1:], y_train.shape[1] * y_train.shape[2])
model.compile(optimizer='adam', loss='mean_squared_error')

# Training the model
model.fit(x_train_img, y_train.reshape(y_train.shape[0], -1), epochs=100, batch_size=16)

# Predicting with the test set
predictions = model.predict(x_test_img)
#print(f'predictions.shape:{predictions.shape}, y_test.shape:{y_test.shape}')

# Evaluating the model on the test set
mse = mean_squared_error(y_test.reshape(y_test.shape[0], -1), predictions)
print(f"Mean Squared Error: {mse}")

# Reshape y_test to match the predictions shape for plotting
predictions = predictions.reshape(y_test.shape[0], y_test.shape[1], y_test.shape[2])

# Plotting the first 5 sequences of input, actual values, and predictions
plot_predictions_with_input(x_test_img, y_test, predictions)


  return np.array(sequences), np.array(labels)
  return np.array(sequences), np.array(labels)


ValueError: ignored

# Data transformation to images

In [None]:
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset

class FFTDataset(Dataset):
    def __init__(self, inputs, targets):
        self.inputs = inputs
        self.targets = targets

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return self.inputs[idx], self.targets[idx]

class FFTNet(nn.Module):
    def __init__(self, input_shape, output_horizon):
        super(FFTNet, self).__init__()
        self.output_horizon = output_horizon
        # Define Convolutional layers
        self.conv1 = nn.Conv2d(1, 16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)

        # Compute the size of the features after convolutional layers
        conv_output_size = self._get_conv_output(input_shape)

        # Define Fully connected layers
        self.fc1 = nn.Linear(conv_output_size, 128)
        self.fc2 = nn.Linear(128, output_horizon * input_shape[1])

    def _get_conv_output(self, shape):
        with torch.no_grad():
            input = torch.autograd.Variable(torch.rand(1, *shape))
            output = self._forward_conv(input)
            return int(torch.prod(torch.tensor(output.size())))

    def _forward_conv(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv3(x))
        x = F.max_pool2d(x, 2)
        return x

    def forward(self, x):
        x = self._forward_conv(x)
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        x = x.view(-1, self.output_horizon, x.size(1) // self.output_horizon)
        return x

    def train_model(self, train_loader, optimizer, criterion, num_epochs):
        self.train()
        for epoch in range(num_epochs):
            total_loss = 0
            for inputs, targets in train_loader:
                optimizer.zero_grad()
                outputs = self(inputs)
                loss = criterion(outputs, targets)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()
            avg_loss = total_loss / len(train_loader)
            print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

    def evaluate_model(self, test_loader, criterion):
        self.eval()
        total_loss = 0
        with torch.no_grad():
            for inputs, targets in test_loader:
                outputs = self(inputs)
                loss = criterion(outputs, targets)
                total_loss += loss.item()
        avg_loss = total_loss / len(test_loader)
        return avg_loss

    def predict(self, data_loader):
        self.eval()
        predictions = []
        with torch.no_grad():
            for inputs in data_loader:
                outputs = self(inputs)
                predictions.append(outputs.cpu().numpy())
        return np.concatenate(predictions)


############################################

# Creating dataset instances
train_dataset = FFTDataset(input_train, target_train)
test_dataset = FFTDataset(input_test, target_test)
print(f'train_dataset.shape:{len(train_dataset)}, test_dataset.shape:{len(test_dataset)}')

#Creating DataLoader instances
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)


# Create the model
fft_model = FFTNet(train_loader.shape, forecast_horizon)

# Define optimizer and loss function
optimizer = torch.optim.Adam(fft_model.parameters(), lr=0.001)
loss_function = nn.MSELoss()

# Train the model
fft_model.train_model(train_dataset, optimizer, loss_function, num_epochs=10)

# Evaluate the model
test_loss = fft_model.evaluate_model(test_loader, loss_function)
print(f"Test Loss: {test_loss:.4f}")

# Predict using the model
predictions = fft_model.predict(test_loader)



train_dataset.shape:18, test_dataset.shape:18


AttributeError: ignored