In [1]:
# Step 1: Upload the file
from google.colab import files
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# Upload the dataset
uploaded = files.upload()  # This will prompt you to upload the TXT file
# Load the data into a DataFrame
file_name = list(uploaded.keys())[0]
data = pd.read_csv(
    file_name,
    delim_whitespace=True,  # or sep="\t", depending on your file
    header=0,               # Adjust if your header is on a different row
    skiprows=0              # Adjust if needed
)

Saving Normalized_WT5.txt to Normalized_WT5.txt


  data = pd.read_csv(


In [21]:
import numpy as np
# General form for slime mould algorithm
def slime_mould_algorithm(obj_func, N=30, max_iter=100, lb=-10, ub=10, dim=5):
    """
    Implementation of the Slime Mould Algorithm (SMA).

    Args:
        obj_func (function): The objective function to minimize.
        N (int): Number of slime moulds (population size).
        max_iter (int): Maximum number of iterations.
        lb (float or np.array): Lower boundary of the search space.
        ub (float or np.array): Upper boundary of the search space.
        dim (int): Dimensionality of the search space.

    Returns:
        best_position (np.array): Best solution found.
        best_fitness (float): Fitness value of the best solution.
    """
    # Initialize population
    population = np.random.uniform(lb, ub, (N, dim))
    fitness = np.apply_along_axis(obj_func, 1, population)

    # Initialize best solution
    best_idx = np.argmin(fitness)
    best_position = population[best_idx]
    best_fitness = fitness[best_idx]

    for t in range(max_iter):
        # Rank population and assign weights
        sorted_indices = np.argsort(fitness)
        population = population[sorted_indices]
        fitness = fitness[sorted_indices]

        # Update best position
        if fitness[0] < best_fitness:
            best_position = population[0]
            best_fitness = fitness[0]

        # Calculate weights
        W = 1 + np.log(1 + (fitness[-1] - fitness) / (fitness[-1] - fitness[0] + 1e-10))

        # Update positions
        for i in range(N):
            r = np.random.rand()
            vb = np.random.uniform(-1, 1)
            vc = np.random.uniform(0, 1)
            if r < vc:
                # Exploitation phase
                new_position = population[0] + vb * (W[i] * (population[i] - population[0]))
            else:
                # Exploration phase
                new_position = np.random.uniform(lb, ub, dim)

            # Boundary control
            population[i] = np.clip(new_position, lb, ub)

        # Evaluate fitness of updated population
        fitness = np.apply_along_axis(obj_func, 1, population)

    return best_position, best_fitness


# Example: Sphere function as the objective
def sphere_function(x):
    return np.sum(x**2)

# Run SMA
best_position, best_fitness = slime_mould_algorithm(sphere_function, N=30, max_iter=100, lb=-5, ub=5, dim=10)

print("Best Position:", best_position)
print("Best Fitness:", best_fitness)


Best Position: [ 1.21133407 -3.35736336  3.45981292  3.12402484 -0.23516792  3.52729965
 -2.5285493  -4.82434544  1.19207925 -0.42090593]
Best Fitness: 3.3661506511051416


In [9]:
# Step 2: Select the first 1440 rows and extract relevant columns
data = data.iloc[:1440]  # Select only the first 1440 rows
time = np.arange(1, 1441)  # Generate time steps (1 to 1440)
# Clean column names
data.columns = data.columns.str.strip()  # Remove leading and trailing whitespaces
print("Cleaned Column Names:", data.columns)  # Check cleaned column names

# Access the 'y' column
power = data["y (% relative to rated power)"]  # Ensure this matches the cleaned column name


# Step 3: Plot Figure 6 - Offshore Wind Power Timing Diagram
train_size = int(0.8 * len(data))  # 80% train, 20% test split
train_power = power[:train_size]
test_power = power[train_size:]

Cleaned Column Names: Index(['Sequence No.', 'V', 'D', 'air density', 'Humidity', 'I', 'S_a', 'S_b',
       'y (% relative to rated power)'],
      dtype='object')


In [10]:
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Flatten, Reshape

class Autoencoder:
    def __init__(self, window_size, latent_dim):
        """
        Initialize the Autoencoder class.
        Args:
            window_size (int): Number of time steps in each sequence (input dimension).
            latent_dim (int): Size of the latent space (latent dimension).
        """
        self.window_size = window_size
        self.latent_dim = latent_dim
        self.autoencoder = None
        self.encoder = None
        self.decoder = None
    def build(self):
        """
        Build the Autoencoder, Encoder, and Decoder models.
        """
        # Encoder
        input_layer = Input(shape=(self.window_size, 1), name="Input_Layer")
        flattened = Flatten(name="Flatten_Layer")(input_layer)  # Flatten time-series input
        hidden1 = Dense(128, activation='relu', name="Hidden_Layer_1")(flattened)
        hidden2 = Dense(64, activation='relu', name="Hidden_Layer_2")(hidden1)
        latent_layer = Dense(self.latent_dim, activation='relu', name="Latent_Layer")(hidden2)
        # Decoder
        hidden3 = Dense(64, activation='relu', name="Decoder_Layer_1")(latent_layer)
        hidden4 = Dense(128, activation='relu', name="Decoder_Layer_2")(hidden3)
        output_flat = Dense(self.window_size, activation='linear', name="Output_Flat")(hidden4)
        output_layer = Reshape((self.window_size, 1), name="Output_Reshape")(output_flat)
        # Autoencoder Model
        self.autoencoder = Model(inputs=input_layer, outputs=output_layer, name="Autoencoder")
        # Encoder Model
        self.encoder = Model(inputs=input_layer, outputs=latent_layer, name="Encoder")

        # Decoder Model
        encoded_input = Input(shape=(self.latent_dim,), name="Encoded_Input")
        decoder_hidden1 = self.autoencoder.get_layer("Decoder_Layer_1")(encoded_input)
        decoder_hidden2 = self.autoencoder.get_layer("Decoder_Layer_2")(decoder_hidden1)
        decoder_flat = self.autoencoder.get_layer("Output_Flat")(decoder_hidden2)
        decoder_output = self.autoencoder.get_layer("Output_Reshape")(decoder_flat)
        self.decoder = Model(inputs=encoded_input, outputs=decoder_output, name="Decoder")

    def compile(self, optimizer='adam', loss='mse'):
        """
        Compile the Autoencoder model.

        Args:
            optimizer (str): Optimizer for training (default: 'adam').
            loss (str): Loss function to minimize (default: 'mse').
        """
        if not self.autoencoder:
            raise ValueError("The model must be built before compiling.")
        self.autoencoder.compile(optimizer=optimizer, loss=loss)

    def train(self, x_train, epochs=100, batch_size=32, shuffle=True, validation_split=0.1):
        """
        Train the Autoencoder model.
        Args:
            x_train (np.array): Training input data (e.g., sequences of time steps).
            epochs (int): Number of epochs to train (default: 100).
            batch_size (int): Batch size for training (default: 16).
            shuffle (bool): Whether to shuffle the training data (default: True).
            validation_split (float): Fraction of data for validation (default: 0.1).

        Returns:
            History object: Contains training history.
        """
        if not self.autoencoder:
            raise ValueError("The model must be built and compiled before training.")
        return self.autoencoder.fit(
            x_train, x_train,  # Input and target are the same for autoencoders
            epochs=epochs,
            batch_size=batch_size,
            shuffle=shuffle,
            validation_split=validation_split
        )
    def summary(self):
        """
        Print the summary of the Autoencoder model.
        """
        if not self.autoencoder:
            raise ValueError("The model must be built before accessing the summary.")
        self.autoencoder.summary()


In [11]:
# Parameters
window_size = 144  # Time steps in each sequence
latent_dim = 16   # Size of the latent space

def create_sliding_windows(data, window_size):
    """
    Create sliding windows from the data.

    Args:
        data (np.array): Input data array (1D time-series).
        window_size (int): Number of time steps in each window.

    Returns:
        np.array: Array of sliding windows with shape (num_samples, window_size, 1).
    """
    windows = []
    for i in range(len(data) - window_size + 1):
        windows.append(data[i:i + window_size])
    return np.array(windows).reshape(-1, window_size, 1)

# Simulated input data (replace with your actual y column values)
sliding_windows = create_sliding_windows(power, window_size)

# Create and build the Autoencoder
autoencoder_model = Autoencoder(window_size, latent_dim)
autoencoder_model.build()

# Compile the Autoencoder
autoencoder_model.compile(optimizer='adam', loss='mse')

# Train the Autoencoder
history = autoencoder_model.train(
    x_train=sliding_windows,
    epochs=100,
    batch_size=16,
    shuffle=True,
    validation_split=0.1
)

# Display the model summary
autoencoder_model.summary()

# Use the trained Autoencoder to "denoise" (reconstruct) the data
data_denoised = autoencoder_model.autoencoder.predict(sliding_windows)

Epoch 1/100
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 21ms/step - loss: 2269.3003 - val_loss: 116.1201
Epoch 2/100
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 373.6405 - val_loss: 102.8658
Epoch 3/100
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 186.6708 - val_loss: 88.2368
Epoch 4/100
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 113.1669 - val_loss: 69.0535
Epoch 5/100
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 85.4543 - val_loss: 53.8679
Epoch 6/100
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 67.8037 - val_loss: 40.3808
Epoch 7/100
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 54.2087 - val_loss: 24.1823
Epoch 8/100
[1m73/73[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step - loss: 40.9080 - val_loss: 22.4475
Epoch 9/100
[1m73/73[0

[1m41/41[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step


In [13]:
# Reconstruct the full denoised sequence from sliding windows
denoised_full = np.zeros((1440,))
count = np.zeros((1440,))  # Track the number of contributions for each time step

# Aggregate overlapping windows
for i in range(len(data_denoised)):
    denoised_full[i:i + window_size] += data_denoised[i].flatten()
    count[i:i + window_size] += 1

# Average overlapping regions
denoised_full /= count

In [14]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler

# ------------------------------
# Step 1: Prepare Data
# ------------------------------

# PARAMETERS
window_size = 144  # e.g., 144 time steps for input
forecast_horizon = 1  # Predict 1 step ahead for single-step forecasting

# Replace the last column with the denoised data
data.iloc[:, -1] = denoised_full


X_full = data.iloc[:, 1:].values  # All columns except the first and last
y_full = data.iloc[:, -1].values   # Only the last column (target)



# ---------------------------------------------------------
# Step 2: Normalize Features and Target
# ---------------------------------------------------------
# Normalize all features together, including the target column
scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data)  # Scale columns together
print(data_scaled.shape)
# Split into train and test sets (80% train, 20% test)
train_ratio = 0.8
train_size = int(train_ratio * len(data_scaled))
data_train_scaled = data_scaled[:train_size]
data_test_scaled = data_scaled[train_size:]
print(data_train_scaled.shape)

print(f"Train size: {len(data_train_scaled)}, Test size: {len(data_test_scaled)}")

# ---------------------------------------------------------
# Step 3: Create Sliding Windows
# ---------------------------------------------------------
def create_sliding_windows(data, window_size, forecast_horizon):
    """
    Create input (X) and output (y) arrays using sliding windows.

    Args:
        data (np.array): Scaled data (features and target together).
        window_size (int): Number of time steps for input.
        forecast_horizon (int): How many steps ahead to predict.

    Returns:
        (X, y): Arrays for input (X) and target (y).
    """
    X, y = [], []
    for i in range(len(data) - window_size - forecast_horizon + 1):
        X.append(data[i : i + window_size, :-1])  # All features except target
        y.append(data[i + window_size : i + window_size + forecast_horizon, -1])  # Only target column
    return np.array(X), np.array(y)

# Create sliding windows for train and test sets
X_train, y_train = create_sliding_windows(data_train_scaled, window_size, forecast_horizon)
X_test, y_test = create_sliding_windows(data_test_scaled, window_size, forecast_horizon)

print("X_train shape:", X_train.shape)  # (samples, window_size, num_features)
print("y_train shape:", y_train.shape)  # (samples, forecast_horizon)
print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)

# ---------------------------------------------------------
# Step 4: Summary of Input and Output
# ---------------------------------------------------------
# X_train will now contain all input features (except `y`) for the sliding window
# y_train will contain the corresponding single-step target values (`y`)

print(f"Number of input features: {X_train.shape[-1]}")
print(f"Number of time steps in each input window: {X_train.shape[1]}")
print(f"Forecast horizon: {y_train.shape[-1]} (single-step prediction)")


(1440, 9)
(1152, 9)
Train size: 1152, Test size: 288
X_train shape: (1008, 144, 8)
y_train shape: (1008, 1)
X_test shape: (144, 144, 8)
y_test shape: (144, 1)
Number of input features: 8
Number of time steps in each input window: 144
Forecast horizon: 1 (single-step prediction)


In [15]:
# Step 5: Dataset Preparation

import torch
from torch.utils.data import Dataset, DataLoader
import math

class WindPowerDataset(Dataset):
    def __init__(self, X, y):
        """
        Initializes the dataset with input features and targets.

        Args:
            X (np.array): Input features of shape (num_samples, window_size).
            y (np.array): Targets of shape (num_samples,).
        """
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Create PyTorch datasets
train_dataset = WindPowerDataset(X_train, y_train.flatten())
print(X_train.shape)
test_dataset = WindPowerDataset(X_test, y_test.flatten())

# Create DataLoaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(f"Number of training samples: {len(train_dataset)}")
print(f"Number of testing samples: {len(test_dataset)}")


(1008, 144, 8)
Number of training samples: 1008
Number of testing samples: 144


In [18]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import math
from typing import Dict, Tuple, List
import time

class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 5000):
        super(PositionalEncoding, self).__init__()

        # Ensure d_model is even
        d_model = (d_model // 2) * 2

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)

        # Calculate number of dimensions for sin/cos
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        # Ensure proper dimensionality
        pe[:, 0:d_model:2] = torch.sin(position * div_term)
        pe[:, 1:d_model:2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
        self.d_model = d_model

    def forward(self, x):
        return x + self.pe[:x.size(0), :self.d_model]

class TransformerModel(nn.Module):
    def __init__(self, input_dim, d_model=64, nhead=4, num_layers=3, dim_feedforward=128, max_len=144, dropout=0.1):
        super(TransformerModel, self).__init__()

        # Ensure d_model is divisible by nhead
        d_model = (d_model // nhead) * nhead
        # Ensure d_model is even for positional encoding
        d_model = (d_model // 2) * 2

        self.embedding = nn.Linear(input_dim, d_model)
        self.pos_encoder = PositionalEncoding(d_model, max_len=max_len)
        encoder_layers = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)
        self.fc_out = nn.Linear(d_model, 1)

    def forward(self, x):
        x = self.embedding(x)
        x = x.permute(1, 0, 2)
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = x[-1, :, :]
        return self.fc_out(x).squeeze()

class HyperparameterOptimizer:
    def __init__(self, train_loader, val_loader, input_dim: int, device: str = 'cuda'):
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.input_dim = input_dim
        self.device = device

        # Modified bounds to ensure even numbers and proper divisibility
        self.param_bounds = {
            'd_model': (32, 256),  # Will be adjusted to be divisible by nhead
            'nhead': (2, 8),       # Must be power of 2
            'num_layers': (1, 6),
            'dim_feedforward': (64, 512),
            'dropout': (0.0, 0.5),
            'learning_rate': (1e-4, 1e-2)
        }

    def decode_solution(self, position: np.ndarray) -> Dict:
        # Get number of heads first
        nhead = 2 ** int(np.interp(position[1], [0, 1], [1, 3]))  # This gives 2, 4, or 8

        # Get d_model and ensure it's divisible by nhead and even
        d_model = int(np.interp(position[0], [0, 1], self.param_bounds['d_model']))
        d_model = max(32, ((d_model // nhead) * nhead) // 2 * 2)

        return {
            'd_model': d_model,
            'nhead': nhead,
            'num_layers': int(np.interp(position[2], [0, 1], self.param_bounds['num_layers'])),
            'dim_feedforward': int(np.interp(position[3], [0, 1], self.param_bounds['dim_feedforward'])),
            'dropout': float(np.interp(position[4], [0, 1], self.param_bounds['dropout'])),
            'learning_rate': float(np.interp(position[5], [0, 1], self.param_bounds['learning_rate']))
        }

    def evaluate_model(self, hyperparams: Dict, epochs: int = 5) -> float:
        try:
            model = TransformerModel(
                input_dim=self.input_dim,
                d_model=hyperparams['d_model'],
                nhead=hyperparams['nhead'],
                num_layers=hyperparams['num_layers'],
                dim_feedforward=hyperparams['dim_feedforward'],
                dropout=hyperparams['dropout']
            ).to(self.device)

            criterion = nn.MSELoss()
            optimizer = torch.optim.Adam(model.parameters(), lr=hyperparams['learning_rate'])

            best_val_loss = float('inf')
            patience = 3
            patience_counter = 0

            for epoch in range(epochs):
                model.train()
                for batch_x, batch_y in self.train_loader:
                    batch_x, batch_y = batch_x.to(self.device), batch_y.to(self.device)
                    optimizer.zero_grad()
                    output = model(batch_x)
                    loss = criterion(output, batch_y)
                    loss.backward()
                    optimizer.step()

                model.eval()
                val_loss = 0
                with torch.no_grad():
                    for batch_x, batch_y in self.val_loader:
                        batch_x, batch_y = batch_x.to(self.device), batch_y.to(self.device)
                        output = model(batch_x)
                        val_loss += criterion(output, batch_y).item()

                val_loss /= len(self.val_loader)

                if val_loss < best_val_loss:
                    best_val_loss = val_loss
                    patience_counter = 0
                else:
                    patience_counter += 1
                    if patience_counter >= patience:
                        break

            return best_val_loss
        except Exception as e:
            print(f"Error during model evaluation: {str(e)}")
            return float('inf')

    def slime_mould_algorithm(self, N=20, max_iter=30):
        """
        Modified SMA for transformer hyperparameter optimization.
        """
        dim = len(self.param_bounds)  # Number of hyperparameters
        lb = np.zeros(dim)  # Lower bound is 0 for all parameters
        ub = np.ones(dim)   # Upper bound is 1 for all parameters

        # Define objective function for the algorithm
        def obj_func(position):
            hyperparams = self.decode_solution(position)
            return self.evaluate_model(hyperparams)

        # Initialize population
        population = np.random.uniform(lb, ub, (N, dim))
        fitness = np.apply_along_axis(obj_func, 1, population)

        # Initialize best solution
        best_idx = np.argmin(fitness)
        best_position = population[best_idx].copy()
        best_fitness = fitness[best_idx]

        for t in range(max_iter):
            print(f"Iteration {t + 1}/{max_iter}, Best Loss: {best_fitness:.6f}")

            # Rank population and assign weights
            sorted_indices = np.argsort(fitness)
            population = population[sorted_indices]
            fitness = fitness[sorted_indices]

            # Update best position
            if fitness[0] < best_fitness:
                best_position = population[0].copy()
                best_fitness = fitness[0]

            # Calculate weights
            W = 1 + np.log(1 + (fitness[-1] - fitness) / (fitness[-1] - fitness[0] + 1e-10))

            # Update positions
            for i in range(N):
                r = np.random.rand()
                vb = np.random.uniform(-1, 1)
                vc = np.random.uniform(0, 1)
                if r < vc:
                    # Exploitation phase
                    new_position = population[0] + vb * (W[i] * (population[i] - population[0]))
                else:
                    # Exploration phase
                    new_position = np.random.uniform(lb, ub, dim)

                # Boundary control
                population[i] = np.clip(new_position, lb, ub)

            # Evaluate fitness of updated population
            fitness = np.apply_along_axis(obj_func, 1, population)

        return self.decode_solution(best_position), best_fitness

# Generate sample data
def generate_sample_data(num_samples=1000, seq_length=24, input_dim=5):
    X = np.random.randn(num_samples, seq_length, input_dim)
    y = np.sum(X[:, -1, :], axis=1)
    return torch.FloatTensor(X), torch.FloatTensor(y)



In [20]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

train_loader = train_loader
val_loader = test_loader
input_dim = X_train.shape[2]
optimizer = HyperparameterOptimizer(train_loader, val_loader, input_dim, device)

print("Starting hyperparameter optimization...")
start_time = time.time()

best_hyperparams, best_loss = optimizer.slime_mould_algorithm(
    N=10,
    max_iter=5
)

end_time = time.time()

print("\nOptimization completed!")
print(f"Time taken: {(end_time - start_time)/60:.2f} minutes")
print("\nBest Hyperparameters:")
for param, value in best_hyperparams.items():
    print(f"{param}: {value}")
print(f"Best Validation Loss: {best_loss:.6f}")

Using device: cuda
Starting hyperparameter optimization...




Iteration 1/5, Best Loss: 0.009315
Iteration 2/5, Best Loss: 0.009315
Iteration 3/5, Best Loss: 0.009315
Iteration 4/5, Best Loss: 0.009315
Iteration 5/5, Best Loss: 0.009315

Optimization completed!
Time taken: 2.11 minutes

Best Hyperparameters:
d_model: 164
nhead: 4
num_layers: 1
dim_feedforward: 151
dropout: 0.022613644455269033
learning_rate: 0.003320770274556317
Best Validation Loss: 0.009315
