<a href="https://colab.research.google.com/github/basugautam/Reproducibility-Challenge-Project/blob/main/Untitled2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt

# Load Dataset

dataset_path = r"C:\\LCTSF"
file_name = "timeseries_data.csv"
file_path = os.path.join(dataset_path, file_name)

if not os.path.isfile(file_path):


data = pd.read_csv(file_path, parse_dates=["timestamp"], index_col="timestamp")

# Data Preprocessing
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data)

def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length])
    return np.array(X), np.array(y)

seq_length = 24  # Forecasting 24 time steps ahead
X, y = create_sequences(data_scaled, seq_length)
X_train, X_test = X[:int(0.8*len(X))], X[int(0.8*len(X)):]
y_train, y_test = y[:int(0.8*len(y))], y[int(0.8*len(y)):]

# Convert to PyTorch tensors
X_train, y_train = torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32)
X_test, y_test = torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32)

# Define Transformer Model
class TransformerForecaster(nn.Module):
    def __init__(self, input_dim, d_model=64, nhead=4, num_layers=2):
        super(TransformerForecaster, self).__init__()
        self.encoder = nn.Linear(input_dim, d_model)
        self.transformer = nn.Transformer(d_model, nhead, num_layers)
        self.decoder = nn.Linear(d_model, input_dim)

    def forward(self, src):
        src = self.encoder(src)
        src = src.permute(1, 0, 2)  # Transformer expects seq_len, batch, feature
        output = self.transformer(src, src)
        output = output.permute(1, 0, 2)
        return self.decoder(output[:, -1, :])

# Initialize Model
model = TransformerForecaster(input_dim=X_train.shape[2])
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Loss function with Loss Shaping Constraints
def loss_shaping(y_pred, y_true):
    error = y_pred - y_true
    squared_error = torch.mean(torch.square(error))
    bound_loss = torch.mean(torch.clamp(error, min=-0.1, max=0.1))  # Constrain error bounds
    return squared_error + 0.5 * bound_loss

# Training Loop with Dynamic Constraint Adaptation
epochs = 50
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    y_pred = model(X_train)
    loss = loss_shaping(y_pred, y_train)
    loss.backward()
    optimizer.step()
    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")

# Evaluate Model
model.eval()
y_pred_test = model(X_test).detach().numpy()
y_test = y_test.numpy()
y_pred_test_rescaled = scaler.inverse_transform(y_pred_test)
y_test_rescaled = scaler.inverse_transform(y_test)

# Plot Results
plt.figure(figsize=(10, 5))
plt.plot(y_test_rescaled[:100], label='Actual')
plt.plot(y_pred_test_rescaled[:100], label='Predicted')
plt.legend()
plt.show()


IndentationError: expected an indented block after 'if' statement on line 16 (<ipython-input-1-da5ba91fdb46>, line 19)

In [2]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt

# Load Dataset

dataset_path = r"C:\\LCTSF"
file_name = "timeseries_data.csv"
file_path = os.path.join(dataset_path, file_name)

if not os.path.isfile(file_path):
    raise FileNotFoundError(f"Dataset file not found at {file_path}. Ensure the file exists and the path is correct.")

data = pd.read_csv(file_path, parse_dates=["timestamp"], index_col="timestamp")

# Data Preprocessing
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data)

def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length])
    return np.array(X), np.array(y)

seq_length = 24  # Forecasting 24 time steps ahead
X, y = create_sequences(data_scaled, seq_length)
X_train, X_test = X[:int(0.8*len(X))], X[int(0.8*len(X)):]
y_train, y_test = y[:int(0.8*len(y))], y[int(0.8*len(y)):]

# Convert to PyTorch tensors
X_train, y_train = torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32)
X_test, y_test = torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32)

# Define Transformer Model
class TransformerForecaster(nn.Module):
    def __init__(self, input_dim, d_model=64, nhead=4, num_layers=2):
        super(TransformerForecaster, self).__init__()
        self.encoder = nn.Linear(input_dim, d_model)
        self.transformer = nn.Transformer(d_model, nhead, num_layers)
        self.decoder = nn.Linear(d_model, input_dim)

    def forward(self, src):
        src = self.encoder(src)
        src = src.permute(1, 0, 2)  # Transformer expects seq_len, batch, feature
        output = self.transformer(src, src)
        output = output.permute(1, 0, 2)
        return self.decoder(output[:, -1, :])

# Initialize Model
model = TransformerForecaster(input_dim=X_train.shape[2])
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Loss function with Loss Shaping Constraints and Duality-based Optimization
def loss_shaping(y_pred, y_true):
    error = y_pred - y_true
    squared_error = torch.mean(torch.square(error))
    bound_loss = torch.mean(torch.clamp(error, min=-0.1, max=0.1))  # Constrain error bounds
    duality_term = torch.mean(torch.exp(-torch.abs(error)))  # Duality-based term
    return squared_error + 0.5 * bound_loss + 0.2 * duality_term

# Training Loop with Dynamic Constraint Adaptation
epochs = 50
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    y_pred = model(X_train)
    loss = loss_shaping(y_pred, y_train)
    loss.backward()
    optimizer.step()
    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")

# Evaluate Model
model.eval()
y_pred_test = model(X_test).detach().numpy()
y_test = y_test.numpy()
y_pred_test_rescaled = scaler.inverse_transform(y_pred_test)
y_test_rescaled = scaler.inverse_transform(y_test)

# Compute Evaluation Metrics
mse = np.mean((y_pred_test_rescaled - y_test_rescaled) ** 2)
std_error = np.std(y_pred_test_rescaled - y_test_rescaled)
print(f"Mean Squared Error (MSE): {mse}")
print(f"Standard Deviation of Errors: {std_error}")

# Plot Results
plt.figure(figsize=(10, 5))
plt.plot(y_test_rescaled[:100], label='Actual')
plt.plot(y_pred_test_rescaled[:100], label='Predicted')
plt.legend()
plt.show()


FileNotFoundError: Dataset file not found at C:\\LCTSF/timeseries_data.csv. Ensure the file exists and the path is correct.

In [None]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
import random

# Set random seeds for reproducibility
def set_seed(seed=42):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

# Load Dataset

dataset_path = r"C:\\LCTSF"
file_name = "timeseries_data.csv"
file_path = os.path.join(dataset_path, file_name)

if not os.path.isfile(file_path):
    raise FileNotFoundError(f"Dataset file not found at {file_path}. Ensure the file exists and the path is correct.")

data = pd.read_csv(file_path, parse_dates=["timestamp"], index_col="timestamp")

# Data Preprocessing
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data)

def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length])
    return np.array(X), np.array(y)

seq_length = 24  # Forecasting 24 time steps ahead
X, y = create_sequences(data_scaled, seq_length)
X_train, X_test = X[:int(0.8*len(X))], X[int(0.8*len(X)):]
y_train, y_test = y[:int(0.8*len(y))], y[int(0.8*len(y)):]

# Convert to PyTorch tensors
X_train, y_train = torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32)
X_test, y_test = torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32)

# Define Transformer Model
class TransformerForecaster(nn.Module):
    def __init__(self, input_dim, d_model=64, nhead=4, num_layers=2):
        super(TransformerForecaster, self).__init__()
        self.encoder = nn.Linear(input_dim, d_model)
        self.transformer = nn.Transformer(d_model, nhead, num_layers)
        self.decoder = nn.Linear(d_model, input_dim)

    def forward(self, src):
        src = self.encoder(src)
        src = src.permute(1, 0, 2)  # Transformer expects seq_len, batch, feature
        output = self.transformer(src, src)
        output = output.permute(1, 0, 2)
        return self.decoder(output[:, -1, :])

# Initialize Model
model = TransformerForecaster(input_dim=X_train.shape[2])
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Loss function with Loss Shaping Constraints and Duality-based Optimization
def loss_shaping(y_pred, y_true):
    error = y_pred - y_true
    squared_error = torch.mean(torch.square(error))
    bound_loss = torch.mean(torch.clamp(error, min=-0.1, max=0.1))  # Constrain error bounds
    duality_term = torch.mean(torch.exp(-torch.abs(error)))  # Duality-based term
    return squared_error + 0.5 * bound_loss + 0.2 * duality_term

# Training Loop with Dynamic Constraint Adaptation
epochs = 50
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    y_pred = model(X_train)
    loss = loss_shaping(y_pred, y_train)
    loss.backward()
    optimizer.step()
    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")

# Evaluate Model
model.eval()
y_pred_test = model(X_test).detach().numpy()
y_test = y_test.numpy()
y_pred_test_rescaled = scaler.inverse_transform(y_pred_test)
y_test_rescaled = scaler.inverse_transform(y_test)

# Compute Evaluation Metrics
mse = np.mean((y_pred_test_rescaled - y_test_rescaled) ** 2)
std_error = np.std(y_pred_test_rescaled - y_test_rescaled)
print(f"Mean Squared Error (MSE): {mse}")
print(f"Standard Deviation of Errors: {std_error}")

# Plot Results
plt.figure(figsize=(10, 5))
plt.plot(y_test_rescaled[:100], label='Actual')
plt.plot(y_pred_test_rescaled[:100], label='Predicted')
plt.legend()
plt.show()


In [None]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
import random

# Set random seeds for reproducibility
def set_seed(seed=42):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

# Load Dataset
dataset_path = r"C:\\LCTSF"
file_name = "timeseries_data.csv"
file_path = os.path.join(dataset_path, file_name)

if not os.path.isfile(file_path):
    raise FileNotFoundError(f"Dataset file not found at {file_path}. Ensure the file exists and the path is correct.")

data = pd.read_csv(file_path, parse_dates=["timestamp"], index_col="timestamp")

# Data Preprocessing
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data)

def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length])
    return np.array(X), np.array(y)

seq_length = 24  # Forecasting 24 time steps ahead
X, y = create_sequences(data_scaled, seq_length)
X_train, X_test = X[:int(0.8*len(X))], X[int(0.8*len(X)):]
y_train, y_test = y[:int(0.8*len(y))], y[int(0.8*len(y)):]

# Convert to PyTorch tensors
X_train, y_train = torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32)
X_test, y_test = torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32)

# Define Transformer Model with Loss Shaping Constraints and Optimization
class TransformerForecaster(nn.Module):
    def __init__(self, input_dim, d_model=64, nhead=4, num_layers=2):
        super(TransformerForecaster, self).__init__()
        self.encoder = nn.Linear(input_dim, d_model)
        self.transformer = nn.Transformer(d_model, nhead, num_layers)
        self.decoder = nn.Linear(d_model, input_dim)

    def forward(self, src):
        src = self.encoder(src)
        src = src.permute(1, 0, 2)  # Transformer expects seq_len, batch, feature
        output = self.transformer(src, src)
        output = output.permute(1, 0, 2)
        return self.decoder(output[:, -1, :])

# Initialize Model
model = TransformerForecaster(input_dim=X_train.shape[2])
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Loss Function with Loss Shaping Constraints and Primal-Dual Optimization
def loss_shaping(y_pred, y_true):
    error = y_pred - y_true
    squared_error = torch.mean(torch.square(error))
    bound_loss = torch.mean(torch.clamp(error, min=-0.1, max=0.1))  # Constrain error bounds
    duality_term = torch.mean(torch.exp(-torch.abs(error)))  # Primal-Dual optimization term
    return squared_error + 0.5 * bound_loss + 0.2 * duality_term

# Training Loop with Adaptive Constraints
epochs = 50
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    y_pred = model(X_train)
    loss = loss_shaping(y_pred, y_train)
    loss.backward()
    optimizer.step()
    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")

# Evaluate Model
model.eval()
y_pred_test = model(X_test).detach().numpy()
y_test = y_test.numpy()
y_pred_test_rescaled = scaler.inverse_transform(y_pred_test)
y_test_rescaled = scaler.inverse_transform(y_test)

# Compute Evaluation Metrics
mse = np.mean((y_pred_test_rescaled - y_test_rescaled) ** 2)
std_error = np.std(y_pred_test_rescaled - y_test_rescaled)
print(f"Mean Squared Error (MSE): {mse}")
print(f"Standard Deviation of Errors: {std_error}")

# Plot Results
plt.figure(figsize=(10, 5))
plt.plot(y_test_rescaled[:100], label='Actual')
plt.plot(y_pred_test_rescaled[:100], label='Predicted')
plt.legend()
plt.show()


In [None]:
# Google Colab commands to clone GitHub repository
!git clone https://github.com/adnanzaidi548/forecasting_project.git
%cd forecasting_project


In [None]:
# Google Colab commands to clone GitHub repository
!git clone https://github.com/adnanzaidi548/forecasting_project.git
import os
os.chdir("forecasting_project")  # Change directory properly


In [None]:
!git clone https://your_github_token@github.com/adnanzaidi548/forecasting_project.git
import os
os.chdir("forecasting_project")


In [None]:
# Clone the repository using a Personal Access Token (replace 'your_github_token')
!git clone https://your_github_token@github.com/adnanzaidi548/forecasting_project.git

# Change to the repository directory
import os
os.chdir("forecasting_project")

# Clone the repository using a Personal Access Token (replace 'your_github_token')
!git clone https://your_github_token@github.com/adnanzaidi548/forecasting_project.git

# Change to the repository directory
import os
os.chdir("forecasting_project")

# Verify the files in the current directory
!ls
README.md
data/
notebooks/



In [None]:
import os

# Check if requirements.txt exists
if os.path.isfile("requirements.txt"):
    print("requirements.txt found. Installing dependencies...")
    !pip install -r requirements.txt
else:
    print("requirements.txt not found. Skipping package installation.")


In [None]:
import pandas as pd

# Upload dataset manually in Google Colab
from google.colab import files
uploaded = files.upload()

# Get the uploaded file name dynamically
dataset_file = list(uploaded.keys())[0]  # Get the filename

# Load dataset
data = pd.read_csv(dataset_file)

# Display basic information about the dataset
print("Dataset loaded successfully!")
print(data.info())
print(data.head())  # Show first few rows


In [None]:
import os
import pandas as pd

# Define dataset path
dataset_path = r"C:\LCTSF"
file_name = "timeseries_data.csv"
file_path = os.path.join(dataset_path, file_name)

# Check if the file exists
if not os.path.isfile(file_path):
    raise FileNotFoundError(f"Dataset file not found at {file_path}. Ensure the file exists.")

# Load the dataset
data = pd.read_csv(file_path, parse_dates=["timestamp"], index_col="timestamp")

# Display dataset info
print("Dataset loaded successfully!")
print(data.info())
print(data.head())

# Check for missing values
print("\nChecking for missing values:")
print(data.isnull().sum())

# Convert timestamp column to datetime format if not already done
if not isinstance(data.index, pd.DatetimeIndex):
    data.index = pd.to_datetime(data.index)
    print("\nTimestamp column converted successfully!")

# Display summary statistics
print("\nDataset Summary Statistics:")
print(data.describe())

# Plot time series (if applicable)
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(data, label="Time Series Data")
plt.xlabel("Timestamp")
plt.ylabel("Values")
plt.title("Time Series Data Visualization")
plt.legend()
plt.show()


In [None]:
import pandas as pd
from google.colab import files

# Step 1: Upload the file manually if not using a direct path
uploaded = files.upload()  # Allows user to upload CSV manually

# Step 2: Load the dataset into a DataFrame
file_name = list(uploaded.keys())[0]  # Get the uploaded file name
data = pd.read_csv(file_name)

# Step 3: Display dataset info
print("Dataset loaded successfully!")
print(data.info())
print(data.head())

# Step 4: Convert timestamp column (if present) to datetime format
if "timestamp" in data.columns:
    data["timestamp"] = pd.to_datetime(data["timestamp"])
    data.set_index("timestamp", inplace=True)
    print("\nTimestamp column converted and set as index.")

# Step 5: Check for missing values
print("\nChecking for missing values:")
print(data.isnull().sum())

# Step 6: Summary statistics
print("\nDataset Summary Statistics:")
print(data.describe())

# Step 7: Plot time series data
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(data, label="Time Series Data")
plt.xlabel("Timestamp")
plt.ylabel("Values")
plt.title("Time Series Data Visualization")
plt.legend()
plt.show()


In [None]:
import pandas as pd

# Step 1: Load the dataset (Ensure you've uploaded the CSV file to Colab first)
file_name = "timeseries_data.csv"  # Replace with your actual filename
data = pd.read_csv(file_name)

# Step 2: Verify the data by displaying the first 5 rows
print("First 5 rows of the dataset:")
print(data.head())

# Step 3: Check dataset structure
print("\nDataset Info:")
print(data.info())  # Displays data types and missing values

# Step 4: Check for missing values
print("\nMissing Values in Each Column:")
print(data.isnull().sum())

# Step 5: Display statistical summary of numerical columns
print("\nStatistical Summary of Dataset:")
print(data.describe())

# Step 6: If there's a timestamp column, convert it to datetime format
if "timestamp" in data.columns:
    data["timestamp"] = pd.to_datetime(data["timestamp"])
    data.set_index("timestamp", inplace=True)
    print("\nTimestamp column converted and set as index.")

# Step 7: Display updated DataFrame structure
print("\nUpdated Dataset Info:")
print(data.info())


In [None]:
import pandas as pd

# Load dataset
file_name = "timeseries_data.csv"  # Ensure the file is uploaded in Colab
data = pd.read_csv(file_name)

# Step 1: Inspect column names
print("Column Names in the Dataset:")
print(data.columns)

# Step 2: Check for missing values
print("\nMissing Values in Each Column:")
print(data.isnull().sum())

# Step 3: Display dataset info to check data types and structure
print("\nDataset Information:")
print(data.info())

# Step 4: If there's a timestamp column, check its format
time_columns = ["timestamp", "date", "time"]  # Common time-related column names
for col in time_columns:
    if col in data.columns:
        print(f"\nChecking first 5 values in '{col}':")
        print(data[col].head())


In [None]:
# Step 2: Set the Date Column as Index
data.set_index('Date', inplace=True)

# Step 3: Resample the Data (if necessary)
# Resample to daily frequency (use 'D' for daily, 'H' for hourly, etc.)
data_resampled = data.resample('D').mean()

# Check the changes
print("Resampled Data (Daily Frequency):")
print(data_resampled.head())


In [None]:
import numpy as np
from scipy.stats import zscore

# Calculate Z-scores for each column
z_scores = np.abs(zscore(data_resampled))

# Identify outliers (e.g., Z-score > 3)
outliers = (z_scores > 3).all(axis=1)

# Print the rows with outliers
print("Outliers based on Z-score:")
print(data_resampled[outliers])

# Optionally, remove the outliers (if necessary)
data_cleaned = data_resampled[~outliers]


In [None]:
# For demonstration purposes, we assume the dataset is called "timeseries_data.csv"
# Dataset URL or path can be updated as per your data source.


In [None]:
from google.colab import files
uploaded = files.upload()


In [None]:
import pandas as pd

# Assuming the dataset is uploaded to the current working directory
data = pd.read_csv('timeseries_data.csv')  # Replace with your file name/path if needed


In [None]:
# Show the first few rows of the dataset to ensure it loaded correctly
print("First 5 rows of the dataset:")
print(data.head())

# Check the column names to ensure the time-related column (e.g., 'Date') is present
print("\nColumn names in the dataset:")
print(data.columns)


In [None]:
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler

# Assuming data is already preprocessed and resampled to the correct frequency
# Prepare data for training
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data_resampled.values)

seq_length = 24  # Example sequence length for time steps

# Function to create sequences
def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length])
    return np.array(X), np.array(y)

# Split into training and testing sets (80% training, 20% testing)
X, y = create_sequences(data_scaled, seq_length)
X_train, X_test = X[:int(0.8*len(X))], X[int(0.8*len(X)):]
y_train, y_test = y[:int(0.8*len(y))], y[int(0.8*len(y)):]

# Convert to PyTorch tensors
X_train, y_train = torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32)
X_test, y_test = torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32)


In [None]:
class TransformerForecaster(nn.Module):
    def __init__(self, input_dim, d_model=64, nhead=4, num_layers=2):
        super(TransformerForecaster, self).__init__()
        self.encoder = nn.Linear(input_dim, d_model)
        self.transformer = nn.Transformer(d_model, nhead, num_layers)
        self.decoder = nn.Linear(d_model, input_dim)

    def forward(self, src):
        src = self.encoder(src)
        src = src.permute(1, 0, 2)  # Transformer expects seq_len, batch, feature
        output = self.transformer(src, src)
        output = output.permute(1, 0, 2)
        return self.decoder(output[:, -1, :])


In [None]:
class RNNForecaster(nn.Module):
    def __init__(self, input_dim, hidden_dim=64, num_layers=2):
        super(RNNForecaster, self).__init__()
        self.rnn = nn.RNN(input_dim, hidden_dim, num_layers, batch_first=True)
        self.decoder = nn.Linear(hidden_dim, input_dim)

    def forward(self, x):
        output, _ = self.rnn(x)
        return self.decoder(output[:, -1, :])


In [None]:
class LSTMForecaster(nn.Module):
    def __init__(self, input_dim, hidden_dim=64, num_layers=2):
        super(LSTMForecaster, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.decoder = nn.Linear(hidden_dim, input_dim)

    def forward(self, x):
        output, _ = self.lstm(x)
        return self.decoder(output[:, -1, :])


In [None]:
def train_model(model, X_train, y_train, optimizer, loss_fn, epochs=50):
    model.train()
    for epoch in range(epochs):
        optimizer.zero_grad()
        y_pred = model(X_train)
        loss = loss_fn(y_pred, y_train)
        loss.backward()
        optimizer.step()
        if epoch % 10 == 0:
            print(f"Epoch {epoch}, Loss: {loss.item()}")
    return model

def evaluate_model(model, X_test, y_test, scaler):
    model.eval()
    with torch.no_grad():
        y_pred = model(X_test).detach().numpy()
    y_pred_rescaled = scaler.inverse_transform(y_pred)
    y_test_rescaled = scaler.inverse_transform(y_test.numpy())
    mse = np.mean((y_pred_rescaled - y_test_rescaled) ** 2)
    return mse


In [None]:
# Initialize the models
transformer_model = TransformerForecaster(input_dim=X_train.shape[2])
rnn_model = RNNForecaster(input_dim=X_train.shape[2])
lstm_model = LSTMForecaster(input_dim=X_train.shape[2])

# Define loss function and optimizers
loss_fn = nn.MSELoss()
transformer_optimizer = torch.optim.Adam(transformer_model.parameters(), lr=0.001)
rnn_optimizer = torch.optim.Adam(rnn_model.parameters(), lr=0.001)
lstm_optimizer = torch.optim.Adam(lstm_model.parameters(), lr=0.001)

# Train the models
transformer_model = train_model(transformer_model, X_train, y_train, transformer_optimizer, loss_fn)
rnn_model = train_model(rnn_model, X_train, y_train, rnn_optimizer, loss_fn)
lstm_model = train_model(lstm_model, X_train, y_train, lstm_optimizer, loss_fn)

# Evaluate the models
transformer_mse = evaluate_model(transformer_model, X_test, y_test, scaler)
rnn_mse = evaluate_model(rnn_model, X_test, y_test, scaler)
lstm_mse = evaluate_model(lstm_model, X_test, y_test, scaler)

# Print MSE for each model
print(f"Transformer MSE: {transformer_mse}")
print(f"RNN MSE: {rnn_mse}")
print(f"LSTM MSE: {lstm_mse}")

# Select the best model based on MSE
best_model = None
if transformer_mse < rnn_mse and transformer_mse < lstm_mse:
    best_model = transformer_model
elif rnn_mse < transformer_mse and rnn_mse < lstm_mse:
    best_model = rnn_model
else:
    best_model = lstm_model

print(f"The best model is: {best_model.__class__.__name__}")


In [None]:
import torch
import torch.optim as optim
import numpy as np
from sklearn.metrics import mean_squared_error

# Define your models (Transformer, RNN, LSTM) with the required architecture
class TransformerForecaster(nn.Module):
    def __init__(self, input_dim, d_model=64, nhead=4, num_layers=2):
        super(TransformerForecaster, self).__init__()
        self.encoder = nn.Linear(input_dim, d_model)
        self.transformer = nn.Transformer(d_model, nhead, num_layers)
        self.decoder = nn.Linear(d_model, input_dim)

    def forward(self, src):
        src = self.encoder(src)
        src = src.permute(1, 0, 2)  # Transformer expects seq_len, batch, feature
        output = self.transformer(src, src)
        output = output.permute(1, 0, 2)
        return self.decoder(output[:, -1, :])

class RNNForecaster(nn.Module):
    def __init__(self, input_dim, hidden_dim=64, num_layers=2):
        super(RNNForecaster, self).__init__()
        self.rnn = nn.RNN(input_dim, hidden_dim, num_layers, batch_first=True)
        self.decoder = nn.Linear(hidden_dim, input_dim)

    def forward(self, x):
        out, _ = self.rnn(x)
        return self.decoder(out[:, -1, :])

class LSTMForecaster(nn.Module):
    def __init__(self, input_dim, hidden_dim=64, num_layers=2):
        super(LSTMForecaster, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        self.decoder = nn.Linear(hidden_dim, input_dim)

    def forward(self, x):
        out, _ = self.lstm(x)
        return self.decoder(out[:, -1, :])

# Function to calculate the MSE for a given model
def calculate_mse(model, X_test, y_test, scaler):
    model.eval()
    y_pred = model(X_test).detach().numpy()
    y_test_rescaled = y_test.numpy()
    y_pred_rescaled = scaler.inverse_transform(y_pred)
    y_test_rescaled = scaler.inverse_transform(y_test_rescaled)
    mse = mean_squared_error(y_test_rescaled, y_pred_rescaled)
    return mse

# Initialize the models
transformer_model = TransformerForecaster(input_dim=X_train.shape[2])
rnn_model = RNNForecaster(input_dim=X_train.shape[2])
lstm_model = LSTMForecaster(input_dim=X_train.shape[2])

# Define the optimizer for each model
optimizer_transformer = optim.Adam(transformer_model.parameters(), lr=0.001)
optimizer_rnn = optim.Adam(rnn_model.parameters(), lr=0.001)
optimizer_lstm = optim.Adam(lstm_model.parameters(), lr=0.001)

# Train the models (you can implement your training loop here)

# After training, evaluate the models on the test data
transformer_mse = calculate_mse(transformer_model, X_test, y_test, scaler)
rnn_mse = calculate_mse(rnn_model, X_test, y_test, scaler)
lstm_mse = calculate_mse(lstm_model, X_test, y_test, scaler)

# Print the MSE values
print(f"Transformer MSE: {transformer_mse}")
print(f"RNN MSE: {rnn_mse}")
print(f"LSTM MSE: {lstm_mse}")

# Select the model with the lowest MSE
mse_dict = {
    "Transformer": transformer_mse,
    "RNN": rnn_mse,
    "LSTM": lstm_mse
}

best_model_name = min(mse_dict, key=mse_dict.get)
print(f"The best model is: {best_model_name}")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error


In [None]:
def set_seed(seed=42):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)


In [None]:
# Load and preprocess data
dataset_path = "path_to_your_dataset.csv"
data = pd.read_csv(dataset_path, parse_dates=["timestamp"], index_col="timestamp")

# Data scaling
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data)

# Create sequences for forecasting
def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length])
    return np.array(X), np.array(y)

seq_length = 24  # Forecast 24 time steps ahead
X, y = create_sequences(data_scaled, seq_length)

# Split data into training and testing sets
X_train, X_test = X[:int(0.8*len(X))], X[int(0.8*len(X)):]
y_train, y_test = y[:int(0.8*len(y))], y[int(0.8*len(y)):]

# Convert to PyTorch tensors
X_train, y_train = torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32)
X_test, y_test = torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32)


In [None]:
class TransformerForecaster(nn.Module):
    def __init__(self, input_dim, d_model=64, nhead=4, num_layers=2, output_dim=1):
        super(TransformerForecaster, self).__init__()

        # Encoder layer
        self.encoder = nn.Linear(input_dim, d_model)  # Linear layer to embed the input

        # Transformer
        self.transformer = nn.Transformer(d_model, nhead, num_layers)

        # Decoder layer
        self.decoder = nn.Linear(d_model, output_dim)

    def forward(self, src):
        # Pass through encoder layer
        src = self.encoder(src)

        # Reshape for transformer (seq_len, batch_size, feature)
        src = src.permute(1, 0, 2)  # (seq_len, batch_size, d_model)

        # Pass through transformer
        output = self.transformer(src, src)

        # Reshape for prediction (batch_size, seq_len, feature)
        output = output.permute(1, 0, 2)

        # Decoder for the final output
        return self.decoder(output[:, -1, :])  # Take the last sequence step for prediction


In [None]:
# Initialize model, optimizer, and loss function
model = TransformerForecaster(input_dim=X_train.shape[2], d_model=64, nhead=4, num_layers=2, output_dim=1)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

# Training loop
epochs = 50
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    y_pred = model(X_train)
    loss = criterion(y_pred, y_train)
    loss.backward()
    optimizer.step()

    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")

# Evaluate the model
model.eval()
with torch.no_grad():
    y_pred_test = model(X_test).numpy()
    y_test = y_test.numpy()

# Inverse scale the data
y_pred_test_rescaled = scaler.inverse_transform(y_pred_test)
y_test_rescaled = scaler.inverse_transform(y_test)

# Calculate evaluation metrics
mse = mean_squared_error(y_test_rescaled, y_pred_test_rescaled)
print(f"Mean Squared Error (MSE): {mse}")

# Plot results
plt.figure(figsize=(10, 5))
plt.plot(y_test_rescaled[:100], label='Actual')
plt.plot(y_pred_test_rescaled[:100], label='Predicted')
plt.legend()
plt.show()


In [None]:
!pip install transformers


In [None]:
import torch
import torch.nn as nn
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from transformers import TransformerModel, TransformerConfig
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error


In [None]:
# Load and preprocess data
data = pd.read_csv('timeseries_data.csv', parse_dates=["timestamp"], index_col="timestamp")

# Data scaling
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data)

# Create sequences for forecasting
def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(data[i + seq_length])
    return np.array(X), np.array(y)

seq_length = 24  # Forecast 24 time steps ahead
X, y = create_sequences(data_scaled, seq_length)

# Split data into training and testing sets
X_train, X_test = X[:int(0.8*len(X))], X[int(0.8*len(X)):]
y_train, y_test = y[:int(0.8*len(y))], y[int(0.8*len(y)):]

# Convert to PyTorch tensors
X_train, y_train = torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32)
X_test, y_test = torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32)


In [None]:
class TransformerForecaster(nn.Module):
    def __init__(self, input_dim, d_model=64, nhead=4, num_layers=2, output_dim=1):
        super(TransformerForecaster, self).__init__()

        # Encoder layer
        self.encoder = nn.Linear(input_dim, d_model)  # Linear layer to embed the input

        # Transformer
        self.transformer = nn.Transformer(d_model, nhead, num_layers)

        # Decoder layer
        self.decoder = nn.Linear(d_model, output_dim)

    def forward(self, src):
        # Pass through encoder layer
        src = self.encoder(src)

        # Reshape for transformer (seq_len, batch_size, feature)
        src = src.permute(1, 0, 2)  # (seq_len, batch_size, d_model)

        # Pass through transformer
        output = self.transformer(src, src)

        # Reshape for prediction (batch_size, seq_len, feature)
        output = output.permute(1, 0, 2)

        # Decoder for the final output
        return self.decoder(output[:, -1, :])  # Take the last sequence step for prediction


In [None]:
from transformers import InformerForSequenceClassification, InformerTokenizer

# Load the pre-trained Informer model (time series forecasting model)
model = InformerForSequenceClassification.from_pretrained('InformerModel')
tokenizer = InformerTokenizer.from_pretrained('InformerModel')

# Use Informer for sequential prediction
inputs = tokenizer("your_sequence_input", return_tensors="pt")

# Forward pass through the model
outputs = model(**inputs)


In [None]:
# Initialize model, optimizer, and loss function
model = TransformerForecaster(input_dim=X_train.shape[2], d_model=64, nhead=4, num_layers=2, output_dim=1)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

# Training loop
epochs = 50
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    y_pred = model(X_train)
    loss = criterion(y_pred, y_train)
    loss.backward()
    optimizer.step()

    if epoch % 10 == 0:
        print(f"Epoch {epoch}, Loss: {loss.item()}")

# Evaluate the model
model.eval()
with torch.no_grad():
    y_pred_test = model(X_test).numpy()
    y_test = y_test.numpy()

# Inverse scale the data
y_pred_test_rescaled = scaler.inverse_transform(y_pred_test)
y_test_rescaled = scaler.inverse_transform(y_test)

# Calculate evaluation metrics
mse = mean_squared_error(y_test_rescaled, y_pred_test_rescaled)
print(f"Mean Squared Error (MSE): {mse}")

# Plot results
plt.figure(figsize=(10, 5))
plt.plot(y_test_rescaled[:100], label='Actual')
plt.plot(y_pred_test_rescaled[:100], label='Predicted')
plt.legend()
plt.show()


In [None]:
from sklearn.model_selection import ParameterGrid

# Define the hyperparameters grid
param_grid = {
    'd_model': [64, 128],  # Dimensionality of the model
    'nhead': [4, 8],       # Number of attention heads
    'num_layers': [2, 3],  # Number of transformer layers
    'lr': [0.001, 0.0005], # Learning rates
    'batch_size': [16, 32] # Batch size
}

# Generate the combinations of parameters
grid = ParameterGrid(param_grid)


In [None]:
import torch.optim as optim
import torch.nn as nn
from sklearn.metrics import mean_squared_error

# Function to train the model
def train_model(params, X_train, y_train, X_test, y_test):
    model = TransformerForecaster(input_dim=X_train.shape[2],
                                  d_model=params['d_model'],
                                  nhead=params['nhead'],
                                  num_layers=params['num_layers'],
                                  output_dim=1)

    optimizer = optim.Adam(model.parameters(), lr=params['lr'])
    criterion = nn.MSELoss()

    # Training loop
    epochs = 50
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        y_pred = model(X_train)
        loss = criterion(y_pred, y_train)
        loss.backward()
        optimizer.step()

    # Evaluate the model on test data
    model.eval()
    with torch.no_grad():
        y_pred_test = model(X_test)
        y_pred_test = y_pred_test.numpy().flatten()
        y_test_numpy = y_test.numpy().flatten()

        # Calculate MSE
        mse = mean_squared_error(y_test_numpy, y_pred_test)

    return mse, model

# Initialize best MSE and model
best_mse = float('inf')
best_model = None
best_params = None

# Grid search over hyperparameters
for params in grid:
    print(f"Training model with params: {params}")
    mse, model = train_model(params, X_train, y_train, X_test, y_test)

    # Update best model if current model has lower MSE
    if mse < best_mse:
        best_mse = mse
        best_model = model
        best_params = params

print(f"Best Model Parameters: {best_params}")
print(f"Best Model MSE: {best_mse}")


In [None]:
# Function to calculate MSE on test set
def evaluate_model(model, X_test, y_test):
    model.eval()
    with torch.no_grad():
        y_pred_test = model(X_test)
        y_pred_test = y_pred_test.numpy().flatten()
        y_test_numpy = y_test.numpy().flatten()

        # Calculate Mean Squared Error
        mse = mean_squared_error(y_test_numpy, y_pred_test)
        print(f"Mean Squared Error (MSE): {mse}")

    return mse

# Evaluate best model
evaluate_model(best_model, X_test, y_test)


In [None]:
from transformers import InformerForSequenceClassification, InformerTokenizer, Trainer, TrainingArguments

# Load the pre-trained Informer model
model = InformerForSequenceClassification.from_pretrained('huggingface/informer')

# Tokenize the data
tokenizer = InformerTokenizer.from_pretrained('huggingface/informer')
inputs = tokenizer(data['feature_column'].tolist(), padding=True, truncation=True, return_tensors="pt")

# Set up Trainer for fine-tuning
training_args = TrainingArguments(
    output_dir='./results',          # output directory
    num_train_epochs=3,              # number of training epochs
    per_device_train_batch_size=16,  # batch size for training
    per_device_eval_batch_size=32,   # batch size for evaluation
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    logging_dir='./logs',            # directory for storing logs
)

trainer = Trainer(
    model=model,                         # the instantiated 🤗 Transformers model to be trained
    args=training_args,                  # training arguments, defined above
    train_dataset=train_dataset,         # training dataset
    eval_dataset=eval_dataset            # evaluation dataset
)

# Fine-tune the model
trainer.train()

# Evaluate the model
trainer.evaluate()


In [None]:
import numpy as np

def create_sequences(data, time_steps, forecast_horizon):
    """
    Create sequences for time series forecasting.
    data: The time series data (e.g., historical electricity demand)
    time_steps: The number of time steps to look back for each sequence
    forecast_horizon: The number of time steps to predict (future values)

    Returns:
    X: Sequences of input data (features)
    y: Corresponding target values (future time steps)
    """
    X, y = [], []

    for i in range(len(data) - time_steps - forecast_horizon + 1):
        # Input sequence: past time_steps
        X.append(data[i:i + time_steps])

        # Output sequence: future forecast_horizon
        y.append(data[i + time_steps: i + time_steps + forecast_horizon])

    return np.array(X), np.array(y)

# Example usage:
time_steps = 24  # The number of hours to look back
forecast_horizon = 12  # The number of hours to predict (e.g., next 12 hours)
data = np.array([i for i in range(100)])  # Example data (replace with your actual dataset)

X, y = create_sequences(data, time_steps, forecast_horizon)
print(f"Shape of X: {X.shape}")
print(f"Shape of y: {y.shape}")


In [None]:
# Assuming 'data' is a DataFrame with multiple features (e.g., electricity demand, temperature, etc.)
import pandas as pd

# Example with multiple features (e.g., electricity demand and temperature)
data = pd.DataFrame({
    'demand': np.random.rand(100),  # Example demand data
    'temperature': np.random.rand(100)  # Example temperature data
})

# Convert to numpy array (if needed)
data_values = data.values

# Create sequences for multivariate time series
X, y = create_sequences(data_values, time_steps, forecast_horizon)
print(f"Shape of X: {X.shape}")  # (number_of_sequences, time_steps, num_features)
print(f"Shape of y: {y.shape}")  # (number_of_sequences, forecast_horizon)


In [None]:
from sklearn.preprocessing import MinMaxScaler

# Normalize the data (optional, but recommended)
scaler = MinMaxScaler(feature_range=(0, 1))
data_normalized = scaler.fit_transform(data_values)

# Now create sequences with the normalized data
X, y = create_sequences(data_normalized, time_steps, forecast_horizon)


In [None]:
# Split into training and test sets (80% training, 20% testing)
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]


In [None]:
from sklearn.preprocessing import MinMaxScaler

# Assuming 'data' is your original data (e.g., a pandas DataFrame or a numpy array)
# Example: For a univariate time series, it will be a 1D array or DataFrame column.
# If your data is multivariate, it will be a 2D array.

# Create a MinMaxScaler instance to scale the data between 0 and 1
scaler = MinMaxScaler(feature_range=(0, 1))

# Fit the scaler and transform the data to a normalized form
data_scaled = scaler.fit_transform(data)

# If 'data' is a DataFrame, you can convert the scaled data back to DataFrame
# with the same column names
import pandas as pd
data_scaled_df = pd.DataFrame(data_scaled, columns=data.columns)

# View the first few rows


In [None]:
# Example for multivariate data (e.g., electricity demand, temperature, etc.)
data = pd.DataFrame({
    'demand': [10, 12, 14, 15, 18],
    'temperature': [30, 32, 31, 33, 34]
})

# Normalize the data
scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data)

# Convert the scaled data back to DataFrame
data_scaled_df = pd.DataFrame(data_scaled, columns=data.columns)

# Display the normalized data
print(data_scaled_df)


In [None]:
# To revert to original scale
data_original = scaler.inverse_transform(data_scaled)

# Convert back to DataFrame if needed
data_original_df = pd.DataFrame(data_original, columns=data.columns)

# Display the original data after inverse scaling
print(data_original_df)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Transformer-based model for time series forecasting
class TransformerTimeSeries(nn.Module):
    def __init__(self, input_size, d_model, nhead, num_layers, output_size):
        super(TransformerTimeSeries, self).__init__()

        self.input_size = input_size
        self.d_model = d_model
        self.nhead = nhead
        self.num_layers = num_layers
        self.output_size = output_size

        # Transformer Encoder Layer
        self.encoder = nn.TransformerEncoderLayer(d_model=self.d_model, nhead=self.nhead)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder, num_layers=self.num_layers)

        # Fully connected output layer to predict the next time step
        self.fc_out = nn.Linear(self.d_model, self.output_size)

    def forward(self, src):
        # src: (batch_size, seq_len, input_size)

        # Apply transformer encoding
        src = src.permute(1, 0, 2)  # Change shape to (seq_len, batch_size, input_size)
        output = self.transformer_encoder(src)

        # Take the output from the last time step
        output = output[-1, :, :]

        # Output layer to predict the next time step
        output = self.fc_out(output)

        return output

# Example of defining model parameters
input_size = 1  # Number of features in your input data (univariate case)
d_model = 64  # Dimension of model (embedding size)
nhead = 8  # Number of attention heads
num_layers = 3  # Number of layers in the encoder
output_size = 1  # Output size (1 for single value prediction)

# Instantiate the transformer model
model = TransformerTimeSeries(input_size, d_model, nhead, num_layers, output_size)
print(model)


In [None]:
# Example of hyperparameter tuning
learning_rate = 0.001
batch_size = 64
epochs = 50

# Optimizer (Adam)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Loss Function (Mean Squared Error for regression)
loss_function = nn.MSELoss()

# Optionally, you can use learning rate scheduling
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)


In [None]:
# Function to train the model
def train_model(model, train_data, train_labels, epochs=50, batch_size=64):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for i in range(0, len(train_data), batch_size):
            batch_data = train_data[i:i+batch_size]
            batch_labels = train_labels[i:i+batch_size]

            # Convert to PyTorch tensors
            batch_data = torch.tensor(batch_data, dtype=torch.float32)
            batch_labels = torch.tensor(batch_labels, dtype=torch.float32)

            # Zero the gradients
            optimizer.zero_grad()

            # Forward pass
            output = model(batch_data)

            # Compute the loss
            loss = loss_function(output, batch_labels)

            # Backward pass
            loss.backward()

            # Optimizer step
            optimizer.step()

            total_loss += loss.item()

        # Learning rate scheduler step
        scheduler.step()

        # Print loss every few epochs
        if (epoch+1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_data):.4f}')

# Train the model
train_model(model, X_train, y_train, epochs=50, batch_size=64)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import numpy as np


In [None]:
class TransformerTimeSeries(nn.Module):
    def __init__(self, input_size, d_model, nhead, num_layers, output_size):
        super(TransformerTimeSeries, self).__init__()

        self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)

        self.fc_out = nn.Linear(d_model, output_size)

    def forward(self, src):
        src = src.permute(1, 0, 2)  # Convert shape to (seq_len, batch_size, input_size)
        output = self.transformer_encoder(src)
        output = self.fc_out(output[-1, :, :])  # Take last time step's output
        return output


In [None]:
# Generate synthetic time series data (Replace this with real dataset)
np.random.seed(42)
time_series = np.sin(np.arange(0, 1000) * 0.05) + np.random.normal(0, 0.1, 1000)  # Example data

# Normalize the data
scaler = MinMaxScaler(feature_range=(0, 1))
time_series_scaled = scaler.fit_transform(time_series.reshape(-1, 1))

# Function to create sequences
def create_sequences(data, time_steps):
    X, y = [], []
    for i in range(len(data) - time_steps):
        X.append(data[i:i+time_steps])
        y.append(data[i+time_steps])
    return np.array(X), np.array(y)

time_steps = 24  # Past 24 data points to predict the next one
X, y = create_sequences(time_series_scaled, time_steps)

# Split into train and validation sets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, shuffle=False)

# Convert to PyTorch tensors
X_train, X_val = torch.tensor(X_train, dtype=torch.float32), torch.tensor(X_val, dtype=torch.float32)
y_train, y_val = torch.tensor(y_train, dtype=torch.float32), torch.tensor(y_val, dtype=torch.float32)


In [None]:
input_size = 1  # One feature (univariate)
d_model = 64  # Model embedding size
nhead = 8  # Number of attention heads
num_layers = 3  # Number of transformer layers
output_size = 1  # Predicting one value

# Initialize the model
model = TransformerTimeSeries(input_size, d_model, nhead, num_layers, output_size)

# Loss function and optimizer
loss_function = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [None]:
def train_model(model, X_train, y_train, X_val, y_val, epochs=50, batch_size=32):
    model.train()

    train_losses, val_losses = [], []

    for epoch in range(epochs):
        total_loss = 0
        num_batches = len(X_train) // batch_size

        for i in range(0, len(X_train), batch_size):
            batch_X = X_train[i:i+batch_size]
            batch_y = y_train[i:i+batch_size]

            # Reshape for Transformer input
            batch_X = batch_X.view(batch_X.shape[0], batch_X.shape[1], 1)

            # Zero gradients
            optimizer.zero_grad()

            # Forward pass
            predictions = model(batch_X)
            loss = loss_function(predictions, batch_y)

            # Backpropagation
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        # Compute validation loss
        model.eval()
        with torch.no_grad():
            X_val_reshaped = X_val.view(X_val.shape[0], X_val.shape[1], 1)
            val_predictions = model(X_val_reshaped)
            val_loss = loss_function(val_predictions, y_val).item()

        train_losses.append(total_loss / num_batches)
        val_losses.append(val_loss)

        print(f"Epoch [{epoch+1}/{epochs}], Train Loss: {total_loss/num_batches:.4f}, Val Loss: {val_loss:.4f}")

    return train_losses, val_losses

# Train the model
train_model(model, X_train, y_train, X_val, y_val, epochs=50, batch_size=32)


In [None]:
# Split test set from validation data
X_test, y_test = X_val[-int(0.2 * len(X_val)):], y_val[-int(0.2 * len(y_val)):]  # 20% of validation set as test set

# Evaluate the model on test data
model.eval()  # Set model to evaluation mode
with torch.no_grad():
    X_test_reshaped = X_test.view(X_test.shape[0], X_test.shape[1], 1)
    test_predictions = model(X_test_reshaped)
    test_loss = loss_function(test_predictions, y_test).item()

print(f"Test Loss (MSE): {test_loss:.4f}")


In [None]:
# Hyperparameter tuning: Try different configurations
hyperparams = [
    {'d_model': 64, 'nhead': 4, 'num_layers': 2, 'lr': 0.001},
    {'d_model': 128, 'nhead': 8, 'num_layers': 3, 'lr': 0.0005},
    {'d_model': 256, 'nhead': 16, 'num_layers': 4, 'lr': 0.0001},
]

best_model = None
best_loss = float('inf')

for params in hyperparams:
    print(f"Training with d_model={params['d_model']}, nhead={params['nhead']}, num_layers={params['num_layers']}, lr={params['lr']}")

    # Initialize new model with different hyperparameters
    model = TransformerTimeSeries(input_size, params['d_model'], params['nhead'], params['num_layers'], output_size)

    # Optimizer with adjusted learning rate
    optimizer = optim.Adam(model.parameters(), lr=params['lr'])

    # Train model with current hyperparameters
    train_losses, val_losses = train_model(model, X_train, y_train, X_val, y_val, epochs=30, batch_size=32)

    # Evaluate on test set
    model.eval()
    with torch.no_grad():
        X_test_reshaped = X_test.view(X_test.shape[0], X_test.shape[1], 1)
        test_predictions = model(X_test_reshaped)
        test_loss = loss_function(test_predictions, y_test).item()

    print(f"Validation Loss: {val_losses[-1]:.4f}, Test Loss: {test_loss:.4f}\n")

    # Select best model based on test loss
    if test_loss < best_loss:
        best_loss = test_loss
        best_model = model

print(f"Best Model Test Loss: {best_loss:.4f}")


In [None]:
import torch

# Save the best model state dictionary
torch.save(best_model.state_dict(), "time_series_forecasting_model.pth")

print("Model saved successfully!")


In [None]:
# Recreate the model architecture (make sure it matches the saved model)
loaded_model = TransformerTimeSeries(input_size, best_d_model, best_nhead, best_num_layers, output_size)

# Load the saved weights
loaded_model.load_state_dict(torch.load("time_series_forecasting_model.pth"))

# Set model to evaluation mode
loaded_model.eval()

print("Model loaded successfully!")


In [None]:
dummy_input = torch.randn(1, time_steps, input_size)  # Dummy input for tracing
torch.onnx.export(loaded_model, dummy_input, "time_series_forecasting_model.onnx", export_params=True)

print("Model exported to ONNX format!")


In [None]:
import torch
import torch.nn as nn

class LossShapingMSE(nn.Module):
    def __init__(self, upper_bounds):
        """
        Custom MSE loss with loss shaping constraints.

        Args:
            upper_bounds (torch.Tensor): A tensor defining upper bounds on the errors at each prediction step.
        """
        super(LossShapingMSE, self).__init__()
        self.mse = nn.MSELoss(reduction='none')  # Calculate per-time-step MSE
        self.upper_bounds = upper_bounds  # Upper error limits

    def forward(self, y_pred, y_true):
        """
        Compute the constrained loss.

        Args:
            y_pred (torch.Tensor): Model predictions (batch_size, time_steps)
            y_true (torch.Tensor): Ground truth values (batch_size, time_steps)

        Returns:
            torch.Tensor: Loss value with constraints applied.
        """
        loss = self.mse(y_pred, y_true)  # Compute per-step MSE
        penalty = torch.maximum(loss - self.upper_bounds, torch.tensor(0.0, device=loss.device))  # Apply shaping constraints
        return loss.mean() + penalty.mean()  # Combine standard MSE with constraint penalty


In [None]:
# Example: Defining loss shaping constraints (adjust based on dataset characteristics)
time_steps = 24  # Example number of forecasting steps
upper_bounds = torch.tensor([0.05] * time_steps)  # Constraint: Maximum allowable error at each time step

# Initialize loss function
loss_function = LossShapingMSE(upper_bounds)

# Define optimizer (Adam)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 50
for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()

    # Forward pass
    y_pred = model(X_train)

    # Compute loss with shaping constraints
    loss = loss_function(y_pred, y_train)

    # Backpropagation
    loss.backward()
    optimizer.step()

    print(f"Epoch {epoch+1}, Loss: {loss.item()}")


In [None]:
import torch
import numpy as np

def verify_loss_shaping(model, X_test, y_test, loss_function, upper_bounds):
    """
    Verify that the trained model adheres to loss shaping constraints.

    Args:
        model (torch.nn.Module): Trained forecasting model.
        X_test (torch.Tensor): Input test sequences.
        y_test (torch.Tensor): Ground truth test values.
        loss_function (LossShapingMSE): Custom loss function.
        upper_bounds (torch.Tensor): Error shaping constraints.

    Returns:
        None (Prints results)
    """
    model.eval()  # Set model to evaluation mode

    # Forward pass to get predictions
    with torch.no_grad():
        y_pred = model(X_test)

    # Compute per-time-step MSE
    errors = (y_pred - y_test) ** 2
    mse_per_step = errors.mean(dim=0)  # Average error for each time step

    # Check constraint violations
    violations = (errors > upper_bounds).sum(dim=0) / errors.shape[0] * 100  # % of samples exceeding bounds

    print("\n=== Loss Shaping Verification ===")
    print(f"Per-Time-Step MSE:\n{mse_per_step.numpy()}")
    print(f"Violation Percentage (Higher means worse):\n{violations.numpy()} %")

    # Final check
    if violations.sum() == 0:
        print("\n✅ Model adheres to loss shaping constraints!")
    else:
        print("\n⚠️ Some time steps exceed the predefined error bounds!")

# Example usage
verify_loss_shaping(model, X_test, y_test, loss_function, upper_bounds)


In [None]:
class LossShapingMSE(nn.Module):
    def __init__(self, upper_bounds):
        """
        Implements MSE loss with loss shaping constraints.

        Args:
            upper_bounds (torch.Tensor): A tensor specifying acceptable error bounds at each time step.
        """
        super(LossShapingMSE, self).__init__()
        self.mse = nn.MSELoss(reduction='none')  # Compute per-time-step loss
        self.upper_bounds = upper_bounds  # Shape: (time_steps,)

    def forward(self, y_pred, y_true):
        errors = (y_pred - y_true) ** 2  # Squared errors
        mse_per_step = errors.mean(dim=0)  # Mean per time step

        # Apply loss shaping: Clip errors that exceed predefined bounds
        constrained_loss = torch.where(mse_per_step > self.upper_bounds, self.upper_bounds, mse_per_step)

        return constrained_loss.mean()  # Final loss is the mean of constrained losses

# Example usage:
time_steps = 2
upper_bounds = torch.tensor([0.2, 0.3], dtype=torch.float32)  # Define bounds for each time step

loss_shaping = LossShapingMSE(upper_bounds)
loss_value_shaped = loss_shaping(y_pred, y_true)

print("Loss Shaping MSE Loss:", loss_value_shaped.item())


In [None]:
class LossShapingMSE(nn.Module):
    def __init__(self, upper_bounds):
        """
        Implements MSE loss with loss shaping constraints.

        Args:
            upper_bounds (torch.Tensor): A tensor specifying acceptable error bounds at each time step.
        """
        super(LossShapingMSE, self).__init__()
        self.mse = nn.MSELoss(reduction='none')  # Compute per-time-step loss
        self.upper_bounds = upper_bounds  # Shape: (time_steps,)

    def forward(self, y_pred, y_true):
        errors = (y_pred - y_true) ** 2  # Squared errors
        mse_per_step = errors.mean(dim=0)  # Mean per time step

        # Apply loss shaping: Clip errors that exceed predefined bounds
        constrained_loss = torch.where(mse_per_step > self.upper_bounds, self.upper_bounds, mse_per_step)

        return constrained_loss.mean()  # Final loss is the mean of constrained losses

# Example usage:
time_steps = 2
upper_bounds = torch.tensor([0.2, 0.3], dtype=torch.float32)  # Define bounds for each time step

loss_shaping = LossShapingMSE(upper_bounds)
loss_value_shaped = loss_shaping(y_pred, y_true)

print("Loss Shaping MSE Loss:", loss_value_shaped.item())


In [None]:
Step 4: Modify the Base Loss Function with Loss Shaping Constraints
To incorporate the loss shaping constraints into the standard loss function (such as MSE), we need to penalize errors that exceed the predefined bounds ϵi\epsilon_i at each time step.
Modified Loss Function:
The idea is to add a penalty term for predictions that exceed the upper bounds:
Modified Loss=1N∑i=1N(yi−y^i)2+λ∑i=1Nmax⁡(0,∣yi−y^i∣−ϵi)2\text{Modified Loss} = \frac{1}{N} \sum_{i=1}^{N} (y_i - \hat{y}_i)^2 + \lambda \sum_{i=1}^{N} \max(0, |y_i - \hat{y}_i| - \epsilon_i)^2
Where:
•	λ\lambda is a hyperparameter that controls the strength of the penalty for violating the error bounds.
•	The second summation term adds a penalty for any error ∣yi−y^i∣|y_i - \hat{y}_i| that exceeds ϵi\epsilon_i. If the error is within the bound, the penalty is zero.
Explanation of Components:
1.	First Term (Standard Loss): This is the typical MSE or MAE, representing the error for each prediction step.
2.	Second Term (Shaping Penalty): This adds a penalty for any prediction error that exceeds the allowed bound ϵi\epsilon_i.
o	If ∣yi−y^i∣|y_i - \hat{y}_i| is greater than ϵi\epsilon_i, the penalty term becomes positive, and the model is penalized for violating the constraint.


In [None]:
import torch
import torch.nn as nn

class LossShapingMSE(nn.Module):
    def __init__(self, upper_bounds, lambda_penalty=0.1):
        """
        Implements MSE loss with loss shaping constraints.

        Args:
            upper_bounds (torch.Tensor): A tensor specifying acceptable error bounds at each time step.
            lambda_penalty (float): A hyperparameter controlling the penalty strength for exceeding error bounds.
        """
        super(LossShapingMSE, self).__init__()
        self.mse = nn.MSELoss(reduction='none')  # Compute per-time-step loss
        self.upper_bounds = upper_bounds  # Shape: (time_steps,)
        self.lambda_penalty = lambda_penalty  # Penalty factor for exceeding error bounds

    def forward(self, y_pred, y_true):
        errors = (y_pred - y_true) ** 2  # Squared errors (MSE per element)
        mse_per_step = errors.mean(dim=0)  # Mean loss per time step

        # Compute the penalty term: max(0, |error| - epsilon)^2
        absolute_errors = torch.abs(y_pred - y_true)  # Absolute error at each step
        exceedance = torch.clamp(absolute_errors - self.upper_bounds, min=0)  # Errors exceeding bounds
        penalty = self.lambda_penalty * (exceedance ** 2).mean(dim=0)  # Compute penalty for exceedance

        # Final loss: standard MSE + penalty term
        modified_loss = mse_per_step.mean() + penalty.mean()

        return modified_loss

# Example usage:
time_steps = 2
upper_bounds = torch.tensor([0.2, 0.3], dtype=torch.float32)  # Define max allowable errors per time step
lambda_penalty = 0.5  # Set a penalty factor

loss_shaping = LossShapingMSE(upper_bounds, lambda_penalty)

# Example Predictions and Ground Truth
y_pred = torch.tensor([[3.5, 2.8], [4.2, 3.1]], dtype=torch.float32)
y_true = torch.tensor([[3.0, 3.0], [4.0, 3.0]], dtype=torch.float32)

loss_value_shaped = loss_shaping(y_pred, y_true)

print("Loss Shaping MSE Loss


In [None]:
import tensorflow as tf
from tensorflow.keras.losses import MeanSquaredError

def loss_shaping_loss(y_true, y_pred, epsilon, lambda_param):
    """
    Custom loss function with loss shaping constraints.

    Args:
        y_true (tf.Tensor): True values of shape (batch_size, time_steps).
        y_pred (tf.Tensor): Predicted values of shape (batch_size, time_steps).
        epsilon (tf.Tensor): Error threshold at each time step (shape: time_steps,).
        lambda_param (float): Hyperparameter controlling the penalty strength.

    Returns:
        tf.Tensor: Computed loss value.
    """
    mse_loss = tf.reduce_mean(tf.square(y_true - y_pred), axis=0)  # Standard MSE loss

    # Compute absolute error
    abs_error = tf.abs(y_true - y_pred)

    # Compute exceedance (values exceeding the defined threshold)
    exceedance = tf.nn.relu(abs_error - epsilon)  # max(0, |y_true - y_pred| - epsilon)

    # Compute the shaping penalty (quadratic penalty for exceedance)
    penalty = lambda_param * tf.reduce_mean(tf.square(exceedance), axis=0)

    # Final loss: MSE + penalty term
    total_loss = tf.reduce_mean(mse_loss) + tf.reduce_mean(penalty)

    return total_loss

# Example Usage
batch_size = 2
time_steps = 3

# Define epsilon (error thresholds for each time step)
epsilon = tf.constant([0.2, 0.3, 0.4], dtype=tf.float32)

# Define lambda (penalty strength)
lambda_param = 0.5

# Example ground truth and predicted values
y_true = tf.constant_


In [None]:
Explanation of Components:
1.	First Term (Standard Loss): This is the typical MSE or MAE, representing the error for each prediction step.
2.	Second Term (Shaping Penalty): This adds a penalty for any prediction error that exceeds the allowed bound ϵi\epsilon_i.
o	If ∣yi−y^i∣|y_i - \hat{y}_i| is greater than ϵi\epsilon_i, the penalty term becomes positive, and the model is penalized for violating the constraint.
o	The penalty term grows quadratically as the error increases beyond the threshold.
3.	Hyperparameter λ\lambda: This controls the trade-off between minimizing the regular loss function (MSE) and enforcing the constraints on the errors. Larger values of λ\lambda will enforce stricter error bounds.
________________________________________
Step 5: Implementing the Custom Loss Function in Code
Once we’ve defined the custom loss function with the loss shaping constraints, the next step is to implement it in the programming environment.
Here’s how you can implement this custom loss function in TensorFlow/Keras (for deep learning models):
A. TensorFlow Implementation of the Custom Loss Function
import tensorflow as tf

def loss_shaping_loss(y_true, y_pred, epsilon, lambda_param):
    """
    Custom loss function with loss shaping constraints


In [None]:
import tensorflow as tf
import numpy as np

def loss_shaping_loss(y_true, y_pred, epsilon, lambda_param):
    """
    Custom loss function with loss shaping constraints.

    Arguments:
    - y_true: True values (ground truth) for the time series.
    - y_pred: Predicted values for the time series.
    - epsilon: The upper bounds on errors at each time step.
    - lambda_param: The penalty weight for exceeding the bounds.

    Returns:
    - Total loss: The combination of standard loss and penalty terms.
    """
    # Calculate the standard MSE loss
    mse_loss = tf.reduce_mean(tf.square(y_true - y_pred))

    # Calculate the absolute error term at each time step
    error = tf.abs(y_true - y_pred)

    # Apply the loss shaping constraint: penalize errors beyond epsilon
    penalty = tf.reduce_mean(tf.square(tf.maximum(0.0, error - epsilon)))

    # Total loss with shaping penalty
    total_loss = mse_loss + lambda_param * penalty
    return total_loss

# ------------------- VERIFICATION -------------------

# Example Data: True values and predicted values
y_true = tf.constant([[3.0, 3.5, 4.0], [4.0, 4.5, 5.0]], dtype=tf.float32)
y_pred = tf.constant([[3.2, 3.8, 4.5], [4.2, 4.3, 5.5]], dtype=tf.float32)

# Define error threshold epsilon (per time step)
epsilon = tf.constant([0.2, 0.3, 0.4], dtype=tf.float32)

# Lambda parameter to control penalty strength
lambda_param = 0.5

# Compute loss
loss_value = loss_shaping_loss(y_true, y_pred, epsilon, lambda_param)

print("Loss Shaping Custom Loss:", loss_value.numpy())

# ------------------- MODEL INTEGRATION -------------------

# Example Model with Custom Loss Function
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense

# Dummy data dimensions
time_steps = 3
features = 1

# Create a simple LSTM model for time series forecasting
model = Sequential([
    LSTM(50, activation='relu', return_sequences=True, input_shape=(time_steps, features)),
    LSTM(50, activation='relu'),
    Dense(1)
])

# Compile the model with custom loss
model.compile(optimizer='adam',
              loss=lambda y_true, y_pred: loss_shaping_loss(y_true, y_pred, epsilon, lambda_param))

print("Model compiled successfully with loss shaping constraints!")

