In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sympy.codegen import Print


# ------------------------------
# Custom Truncated Normal Initialization
# ------------------------------
def truncated_normal_(tensor, mean=0.0, std=0.2, a=-2.0, b=2.0):
    """
    Custom truncated normal initialization.
    This method ensures values stay within the range [a, b].
    """
    lower, upper = (a - mean) / std, (b - mean) / std
    tensor.data = torch.distributions.Normal(mean, std).rsample(tensor.shape)
    tensor.data = torch.clip(tensor.data, min=a, max=b)
    return tensor

# ------------------------------
# Input Embedding Module (CNN-Based)
# ------------------------------
class InputEmbedding(nn.Module):
    def __init__(self, input_dim, embed_dim=256, kernel_sizes=[4, 3], strides=[2, 2]):
        super(InputEmbedding, self).__init__()

        self.conv1 = nn.Conv1d(input_dim, embed_dim, kernel_sizes[0], stride=strides[0])
        self.bn1 = nn.BatchNorm1d(embed_dim)
        self.relu = nn.ReLU()

        self.conv2 = nn.Conv1d(embed_dim, embed_dim, kernel_sizes[1], stride=strides[1])
        self.bn2 = nn.BatchNorm1d(embed_dim)

        self.cls_token = nn.Parameter(truncated_normal_(torch.empty(1, 1, embed_dim)))
        self.pos_embedding = nn.Parameter(truncated_normal_(torch.empty(1, embed_dim + 1, embed_dim)))

    def forward(self, x):
        """
        x: (batch_size, channels, time_steps)
        Output: (batch_size, seq_len+1, embed_dim)
        """
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.relu(self.bn2(self.conv2(x)))

        x = x.permute(0, 2, 1)  # Reshape for transformer (batch_size, seq_len, embed_dim)

        batch_size = x.shape[0]
        cls_tokens = self.cls_token.expand(batch_size, -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)

        x = x + self.pos_embedding[:, :x.shape[1], :]

        return x

# ------------------------------
# Multi-Head Self-Attention
# ------------------------------
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, embed_dim, num_heads, attn_dropout=0.1):
        super().__init__()
        assert embed_dim % num_heads == 0, "Embedding dimension must be divisible by number of heads"

        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads

        self.qkv_proj = nn.Linear(embed_dim, 3 * embed_dim)
        self.out_proj = nn.Linear(embed_dim, embed_dim)
        self.attn_dropout = nn.Dropout(attn_dropout)

    def forward(self, x, mask=None):
        batch_size, seq_len, embed_dim = x.shape

        qkv = self.qkv_proj(x)
        q, k, v = torch.chunk(qkv, 3, dim=-1)

        q = q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        k = k.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
        v = v.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

        attn_scores = torch.matmul(q, k.transpose(-2, -1)) / (self.head_dim ** 0.5)

        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, float('-inf'))

        attn_weights = F.softmax(attn_scores, dim=-1)
        attn_weights = self.attn_dropout(attn_weights)

        attn_output = torch.matmul(attn_weights, v)
        attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, seq_len, embed_dim)

        return self.out_proj(attn_output)

# ------------------------------
# Feed-Forward Network (MLP Block)
# ------------------------------
class FeedForwardNetwork(nn.Module):
    def __init__(self, embed_dim, ffn_dim, ffn_dropout=0.0):
        super().__init__()
        self.fc1 = nn.Linear(embed_dim, ffn_dim)
        self.gelu = nn.GELU()
        self.fc2 = nn.Linear(ffn_dim, embed_dim)
        self.dropout = nn.Dropout(ffn_dropout)

    def forward(self, x):
        return self.dropout(self.fc2(self.gelu(self.fc1(x))))

# ------------------------------
# DropPath (Stochastic Depth)
# ------------------------------
class DropPath(nn.Module):
    def __init__(self, drop_prob=None):
        super(DropPath, self).__init__()
        self.drop_prob = drop_prob

    def forward(self, x):
        if self.drop_prob == 0. or not self.training:
            return x
        keep_prob = 1 - self.drop_prob
        random_tensor = keep_prob + torch.rand(x.shape, device=x.device)
        random_tensor.floor_()
        return x.div(keep_prob) * random_tensor

# ------------------------------
# Transformer Block
# ------------------------------
class TransformerBlock(nn.Module):
    def __init__(self, embed_dim, num_heads=16, ffn_dim=1024, drop_path_rate=0.1, attn_dropout=0.1):
        super().__init__()
        self.norm1 = nn.LayerNorm(embed_dim)
        self.attn = MultiHeadSelfAttention(embed_dim, num_heads, attn_dropout)
        self.drop_path1 = DropPath(drop_path_rate)

        self.norm2 = nn.LayerNorm(embed_dim)
        self.ffn = FeedForwardNetwork(embed_dim, ffn_dim)
        self.drop_path2 = DropPath(drop_path_rate)

    def forward(self, x, mask=None):
        x = x + self.drop_path1(self.attn(self.norm1(x), mask))
        x = x + self.drop_path2(self.ffn(self.norm2(x)))
        return x

# ------------------------------
# Transformer Encoder
# ------------------------------
class TransformerEncoder(nn.Module):
    def __init__(self, embed_dim=256, num_blocks=4, num_heads=16, ffn_dim=1024, drop_path_rate=0.1):
        super().__init__()
        self.layers = nn.ModuleList([
            TransformerBlock(embed_dim, num_heads, ffn_dim, drop_path_rate)
            for _ in range(num_blocks)
        ])

    def forward(self, x, mask=None):
        for layer in self.layers:
            x = layer(x, mask)
        return x

# ------------------------------
# SOH-TEC Model
# ------------------------------
class SOHTEC(nn.Module):
    def __init__(self, input_dim=5, embed_dim=256, num_blocks=4, num_heads=16, ffn_dim=1024, drop_path_rate=0.1):
        super(SOHTEC, self).__init__()

        self.embedding = InputEmbedding(input_dim, embed_dim)
        self.encoder = TransformerEncoder(embed_dim, num_blocks, num_heads, ffn_dim, drop_path_rate)
        self.mlp_head = nn.Sequential(
            nn.LayerNorm(embed_dim),
            nn.Linear(embed_dim, embed_dim // 2),
            nn.ReLU(),
            nn.Linear(embed_dim // 2, 1)  # Regression output for SOH estimation
        )

    def forward(self, x):
        x = self.embedding(x)
        x = self.encoder(x)
        return self.mlp_head(x[:, 0])  # Using CLS token for SOH prediction


# Fuel Cell

In [3]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# DATA =====================================================================================================

for hours in [100, 10, 0.25]:

    print(f'For {hours} hour splits:\n')

    train_df = pd.read_csv("data/fuel_cell/FC1_train_val_filtered.csv")

    features = ['I (A)']
    X = train_df[features].values

    y = train_df['Utot (V)'].values if 'Utot (V)' in train_df.columns else None
    max_y = max(y)

    time = train_df['Time (h)'].values

    scaler_data = StandardScaler()
    X = scaler_data.fit_transform(X)
    y = y / max_y

    SEQ_LEN = 100
    NUM_FEATURES = len(features)

    def create_variable_length_sequences(X, y, time, time_window=10.0, max_length=400):
        sequences = []
        targets = []
        current_sequence = []
        current_target = []
        start_time = time[0]

        for i in range(len(time)):
            if time[i] - start_time > time_window:
                if len(current_sequence) >= max_length:  # Ensure uniform length
                    sequences.append(np.array(current_sequence[:max_length], dtype=np.float32))
                    targets.append(np.array(current_target[:max_length], dtype=np.float32))
                current_sequence = []
                current_target = []
                start_time = time[i]

            current_sequence.append(X[i])
            if y is not None:
                current_target.append(y[i])

        if len(current_sequence) >= max_length:
            sequences.append(np.array(current_sequence[:max_length], dtype=np.float32))
            targets.append(np.array(current_target[:max_length], dtype=np.float32))

        return sequences, targets


    X_var_len, y_var_len = create_variable_length_sequences(X, y, time, hours)

    X_seq = X_var_len

    train_size = int(0.8 * len(X_seq))
    val_size = int(0.1 * len(X_seq))
    test_size = len(X_seq) - train_size - val_size

    X_train, y_train = X_var_len[:train_size], y_var_len[:train_size]
    X_val, y_val = X_var_len[train_size:train_size+val_size], y_var_len[train_size:train_size+val_size]
    X_test, y_test = X_var_len[train_size+val_size:], y_var_len[train_size+val_size:]

    class VariableLengthSOHDataset(Dataset):
        def __init__(self, X_list, y_list):
            self.X_list = [torch.tensor(x, dtype=torch.float32).transpose(0, 1) for x in X_list]  # (1, time_steps)
            self.y_list = [torch.tensor(y, dtype=torch.float32) for y in y_list]

        def __len__(self):
            return len(self.X_list)

        def __getitem__(self, idx):
            return self.X_list[idx], self.y_list[idx]


    # Custom Collate Function for Padding
    def collate_fn(batch):
        X_batch, y_batch = zip(*batch)

        print("Batch Sizes Before Padding:")
        for i, x in enumerate(X_batch):
            print(f"Sample {i}: {x.shape}")  # Should be (time_steps, 1)

        X_padded = pad_sequence(X_batch, batch_first=True, padding_value=0.0)  # (batch, max_time_steps, 1)
        y_padded = pad_sequence(y_batch, batch_first=True, padding_value=-1)

        print(f"Padded Batch Shape: {X_padded.shape}")  # Should be (batch, max_time_steps, 1)
        return X_padded, y_padded


    train_dataset = VariableLengthSOHDataset(X_train, y_train)
    val_dataset = VariableLengthSOHDataset(X_val, y_val)
    test_dataset = VariableLengthSOHDataset(X_test, y_test)

    BATCH_SIZE = 32

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)


    # MODEL ================================================================================================

    model = SOHTEC(input_dim=NUM_FEATURES, embed_dim=256).to(device)

    criterion = nn.MSELoss()
    optimizer = optim.AdamW(model.parameters(), lr=5e-5, weight_decay=1e-4)

    def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=20):
        best_val_loss = float("inf")

        for epoch in range(num_epochs):
            model.train()
            train_loss = 0.0

            for batch_X, batch_y in train_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)

                optimizer.zero_grad()
                outputs = model(batch_X).squeeze()

                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()
                train_loss += loss.item()

            # Validation
            model.eval()
            val_loss = 0.0
            with torch.no_grad():
                for batch_X, batch_y in val_loader:
                    batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                    outputs = model(batch_X).squeeze()
                    val_loss += criterion(outputs, batch_y).item()

            train_loss /= len(train_loader)
            val_loss /= len(val_loader)

            print(f"Epoch {epoch+1}/{num_epochs}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

            # Save best model
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                torch.save(model.state_dict(), "best_soh_tec_model.pth")

    # Train the model
    train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=20)

    # TESTS ================================================================================================    

    def evaluate_model(model, test_loader):
        model.load_state_dict(torch.load("best_soh_tec_model.pth"))
        model.eval()

        all_preds, all_targets = [], []
        with torch.no_grad():
            for batch_X, batch_y in test_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                outputs = model(batch_X).squeeze()

                all_preds.append(outputs.cpu().numpy())
                all_targets.append(batch_y.cpu().numpy())

        all_preds = np.concatenate(all_preds)
        all_targets = np.concatenate(all_targets)

        rmse = np.sqrt(mean_squared_error(all_targets, all_preds))
        mae = mean_absolute_error(all_targets, all_preds)
        r2 = r2_score(all_targets, all_preds)

        print(f"Test RMSE: {rmse:.4f}")
        print(f"Test MAE: {mae:.4f}")
        print(f"Test R²: {r2:.4f}")

    # Evaluate model
    evaluate_model(model, test_loader)


For 100 hour splits:

Batch Sizes Before Padding:
Sample 0: torch.Size([1, 400])
Sample 1: torch.Size([1, 400])
Sample 2: torch.Size([1, 400])
Sample 3: torch.Size([1, 400])
Sample 4: torch.Size([1, 400])
Sample 5: torch.Size([1, 400])
Sample 6: torch.Size([1, 400])
Padded Batch Shape: torch.Size([7, 1, 400])


  return F.mse_loss(input, target, reduction=self.reduction)


RuntimeError: The size of tensor a (7) must match the size of tensor b (400) at non-singleton dimension 1