In [None]:
#first dowloand 7train1.csv data file. Then after y


import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import math
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Load data
data = pd.read_csv("/content/drive/My Drive/7Train1.csv", header=None)
bandwidth = data.iloc[:, 0].values  # Convert to NumPy array
length = len(bandwidth)
mean=np.mean(bandwidth)
len_train = math.floor(length * 0.8)
for i in range(length):
  if bandwidth[i] > 40:
     bandwidth[i] = 0


scaler = MinMaxScaler(feature_range=(-1, 1))
bandwidth_normalized = scaler.fit_transform(bandwidth.reshape(-1, 1))



# Convert to PyTorch tensor
data_tensor = torch.FloatTensor(bandwidth_normalized).view(-1, 1)

# prediction size
predict_size=1

# Function to create in-out sequences
def create_inout_sequences(input_data, window_size,predict_size):
    inout_seq = []
    L = len(input_data)
    for i in range(L - window_size-predict_size+1):
        train_seq = input_data[i:i + window_size]
        train_label = input_data[i + window_size:i + window_size + predict_size]
        inout_seq.append((train_seq, train_label))
    return inout_seq

# Parameters
window_size = 5

batch_size=4
# Create sequences for training from the first part of the data
train_inout_seq = create_inout_sequences(data_tensor[:len_train], window_size,predict_size)


# Custom dataset class
class TimeSeriesDataset(Dataset):
    def __init__(self, sequences):
        self.sequences = sequences

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return self.sequences[idx]

# Create DataLoader for training
train_dataset = TimeSeriesDataset(train_inout_seq)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False, drop_last=True)
# Create sequences for testing from the remaining part of the data
test_inout_seq = create_inout_sequences(data_tensor[len_train:], window_size,predict_size)


# Define LSTM model with adjustable number of layers and dropout
class LSTM(nn.Module):
    def __init__(self, input_size=1, hidden_layer_size=128, output_size=predict_size, num_layers=2, dropout=0.5):
        super(LSTM, self).__init__()
        self.hidden_layer_size = hidden_layer_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_layer_size, num_layers, dropout=dropout,batch_first=False)
        self.linear = nn.Linear(hidden_layer_size, output_size)

    def forward(self, input_seq, hidden_state):
        lstm_out, hidden_state = self.lstm(input_seq, hidden_state)
        predictions = self.linear(lstm_out[:,-1,:])
        return predictions[-1], hidden_state

    def init_hidden(self,batch_size):
        return (torch.zeros(self.num_layers, batch_size, self.hidden_layer_size),
                torch.zeros(self.num_layers, batch_size, self.hidden_layer_size))

# Initialize the model, loss function, and optimizer
model = LSTM(dropout=0.2, num_layers=2)  # Adjust the dropout rate and number of layers as needed
loss_function = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training the model
epochs = 30
train_losses = []


for epoch in range(epochs):
    hidden_state = model.init_hidden(batch_size)  # Initialize hidden state for each epoch

    epoch_train_loss = 0
    for seq, labels in train_loader:
        optimizer.zero_grad()

        seq = seq.view(window_size, batch_size, -1)  # Reshape for LSTM input
        labels = labels.view(batch_size, -1)

        y_pred, hidden_state = model(seq, hidden_state)

        # Detach hidden state to prevent backpropagating through the entire history
        hidden_state = (hidden_state[0].detach(), hidden_state[1].detach())

        single_loss = loss_function(y_pred, labels)
        single_loss.backward()
        optimizer.step()

        epoch_train_loss += single_loss.item()

    train_losses.append(epoch_train_loss / len(train_inout_seq))

    if epoch % 5 == 0:
        print(f'Epoch {epoch+1}/{epochs}, Loss: {single_loss.item():.8f}')

# Making predictions
model.eval()
hidden_state = model.init_hidden(1)

predictions = []
for seq, _ in test_inout_seq:
    seq = seq.view(-1, 1, 1)  # Reshape for LSTM input
    with torch.no_grad():
        y_pred, hidden_state = model(seq, hidden_state)
        hidden_state = (hidden_state[0].detach(), hidden_state[1].detach())
        predictions.append(y_pred[predict_size-1].item())

# Convert predictions back to original scale
predictions = scaler.inverse_transform(np.array(predictions).reshape(-1, 1)).flatten()

# Calculate actual values
actual_values = bandwidth[len_train + window_size+predict_size-1:]

# Calculate MAE and RMSE for all test data
mae = mean_absolute_error(actual_values, predictions)
rmse = np.sqrt(mean_squared_error(actual_values, predictions))

# Calculate error ratio as defined
mean_actual = np.mean(actual_values)
error_ratio_rmse = (rmse / mean_actual) * 100
error_ratio_mae = (mae / mean_actual) * 100

print(f'Mean Absolute Error (MAE): {mae:.4f}')
print(f'Root Mean Squared Error (RMSE): {rmse:.4f}')
print(f'Error Ratio RMSE: {error_ratio_rmse:.4f}%')
print(f'Error Ratio MAE: {error_ratio_mae:.4f}%')

# Slice to get some 195 values for plotting
predictions_195 = predictions[:195]


actual_values_195 = actual_values[:195]

# Plot the results
plt.figure(figsize=(10, 6))
plt.plot(range(len_train + window_size, len_train + window_size + 195), actual_values_195, label='Actual Data')
plt.plot(range(len_train + window_size, len_train + window_size + 195), predictions_195, label='LSTM Predctions')


plt.legend()
plt.xlabel("Index")
plt.ylabel("Bandwidth")
plt.title("LSTM Predictions vs Actual Data (First 195 values from 3000 onwards)")
plt.show()
