# Deep Learning Ensemble of TS Classifiers

## 1. Informer

In [None]:
import torch
import torch.nn as nn
from einops import rearrange, repeat
from einops.layers.torch import Rearrange

class TemporalAggregation(nn.Module):
    '''
    In this modified version, the Informer encoder is used to extract features 
    from the input time series data, and a temporal aggregation layer is applied 
    to aggregate the features across the time dimension. Finally, a linear classifier 
    is used to predict the class labels.
    '''
    def __init__(self, dim_model):
        super().__init__()
        self.proj = nn.Conv1d(dim_model, dim_model, 1)

    def forward(self, x):
        x = self.proj(x)
        return x.mean(dim=2)

class InformerEncoder(nn.Module):
    def __init__(self, input_dim, dim_model, n_heads, num_stacks, factor, distil):
        super().__init__()
        self.input_dim = input_dim
        self.dim_model = dim_model
        self.n_heads = n_heads
        self.num_stacks = num_stacks
        self.factor = factor
        self.distil = distil

        self.proj = nn.Conv1d(input_dim, dim_model, 1)
        self.pos_enc = nn.Parameter(torch.randn(1, dim_model, factor))

        self.stacks = nn.ModuleList(
            [InformerStack(dim_model, n_heads, factor, distil) for _ in range(num_stacks)]
        )

        self.aggregation = TemporalAggregation(dim_model)
        self.classifier = nn.Linear(dim_model, num_classes)

    def forward(self, x):
        x = self.proj(x)
        x = rearrange(x, 'b c l -> b l c')
        x += self.pos_enc[:, :, : x.size(1)]

        for stack in self.stacks:
            x = stack(x)

        x = self.aggregation(x)
        x = rearrange(x, 'b l c -> b c l')
        x = self.classifier(x)

        return x

class InformerStack(nn.Module):
    def __init__(self, dim_model, n_heads, factor, distil):
        super().__init__()
        self.attn = ProbSparseSelfAttention(dim_model, n_heads, factor)
        self.ff = FFN(dim_model, distil)
        self.norm1 = nn.LayerNorm(dim_model)
        self.norm2 = nn.LayerNorm(dim_model)

    def forward(self, x):
        x = x + self.attn(self.norm1(x))
        x = x + self.ff(self.norm2(x))
        return x

class ProbSparseSelfAttention(nn.Module):
    # ProbSparse Self-Attention mechanism
    def __init__(self, dim_model, n_heads, factor):
        super().__init__()
        self.dim_model = dim_model
        self.n_heads = n_heads
        self.factor = factor
        self.head_dim = dim_model // n_heads

        self.qkv = nn.Conv1d(dim_model, 3 * dim_model, 1)
        self.proj = nn.Conv1d(dim_model, dim_model, 1)
        self.scale = self.head_dim ** -0.5

    def forward(self, x):
        b, l, c = x.size()
        x = self.qkv(x)
        qkv = rearrange(x, 'b l (three c) -> three b c l', three=3)
        q, k, v = map(lambda t: rearrange(t, 'b c l -> b l (h c)', h=self.n_heads), qkv)

        # Compute attention scores and probabilities
        attn = torch.einsum('b l h d, b l h d -> b l h', q * self.scale, k)
        attn = attn.softmax(dim=1)

        # Sample top-k indices
        topk_indices = attn.topk(self.factor, dim=1).indices

        # Gather values
        v_gather = gather(v, 1, topk_indices)

        # Compute output
        out = torch.einsum('b l h d, b l h d -> b l h', attn[:, :, :, :self.factor], v_gather)
        out = rearrange(out, 'b l (h d) -> b l (h d)', h=self.n_heads)
        out = self.proj(out)
        return out

class FFN(nn.Module):
    def __init__(self, dim_model, distil):
        super().__init__()
        self.fc1 = nn.Conv1d(dim_model, dim_model * 4, 1)
        self.fc2 = nn.Conv1d(dim_model * 4, dim_model, 1)
        self.distil = distil

    def forward(self, x):
        x1 = self.fc1(x)
        x1 = x1.gelu()
        x2 = self.fc2(x1)
        if self.distil:
            x2 = x + x2
        return x2


## 2. Autoformer

In [None]:
import torch
import torch.nn as nn
import math
from einops import rearrange, repeat
from einops.layers.torch import Rearrange

class TemporalAggregation(nn.Module):
    def __init__(self, dim_model):
        super().__init__()
        self.proj = nn.Conv1d(dim_model, dim_model, 1)

    def forward(self, x):
        x = self.proj(x)
        return x.mean(dim=2)

class AutoformerEncoder(nn.Module):
    def __init__(self, input_dim, dim_model, n_heads, num_stacks, factor):
        super().__init__()
        self.input_dim = input_dim
        self.dim_model = dim_model
        self.n_heads = n_heads
        self.num_stacks = num_stacks
        self.factor = factor

        self.proj = nn.Conv1d(input_dim, dim_model, 1)
        self.pos_enc = nn.Parameter(torch.randn(1, dim_model, factor))

        self.stacks = nn.ModuleList(
            [AutoformerStack(dim_model, n_heads, factor) for _ in range(num_stacks)]
        )

        self.aggregation = TemporalAggregation(dim_model)
        self.classifier = nn.Linear(dim_model, num_classes)

    def forward(self, x):
        x = self.proj(x)
        x = rearrange(x, 'b c l -> b l c')
        x += self.pos_enc[:, :, : x.size(1)]

        for stack in self.stacks:
            x = stack(x)

        x = self.aggregation(x)
        x = rearrange(x, 'b l c -> b c l')
        x = self.classifier(x)

        return x

class AutoformerStack(nn.Module):
    def __init__(self, dim_model, n_heads, factor):
        super().__init__()
        self.attn = AutoCorrelation(dim_model, n_heads, factor)
        self.ff = FFN(dim_model)
        self.norm1 = nn.LayerNorm(dim_model)
        self.norm2 = nn.LayerNorm(dim_model)

    def forward(self, x):
        x = x + self.attn(self.norm1(x))
        x = x + self.ff(self.norm2(x))
        return x

class AutoCorrelation(nn.Module):
    def __init__(self, dim_model, n_heads, factor):
        super().__init__()
        self.dim_model = dim_model
        self.n_heads = n_heads
        self.factor = factor
        self.head_dim = dim_model // n_heads

        self.qkv = nn.Conv1d(dim_model, 3 * dim_model, 1)
        self.proj = nn.Conv1d(dim_model, dim_model, 1)
        self.scale = self.head_dim ** -0.5

    def forward(self, x):
        b, l, c = x.size()
        x = self.qkv(x)
        qkv = rearrange(x, 'b l (three c) -> three b c l', three=3)
        q, k, v = map(lambda t: rearrange(t, 'b c l -> b l (h c)', h=self.n_heads), qkv)

        # Compute auto-correlation matrix
        k_trans = k.transpose(-2, -1)
        r = torch.einsum('b l h d, b l h e -> b l h d e', k, k_trans) / (l - 1)

        # Compute attention scores and probabilities
        attn = torch.einsum('b l h d, b l h d e -> b l h e', q * self.scale, r)
        attn = attn.softmax(dim=-1)

        # Compute output
        v_mean = v.mean(dim=1, keepdim=True)
        v_var = v.var(dim=1, keepdim=True, unbiased=False)
        v_agg = torch.cat([v_mean, v_var], dim=-1)
        v_agg = rearrange(v_agg, 'b 1 (h d) e -> b e (h d)')
        out = torch.einsum('b l h e, b e h d -> b l h d', attn, v_agg)
        out = rearrange(out, 'b l (h d) -> b l (h d)', h=self.n_heads)
        out = self.proj(out)
        return out

class FFN(nn.Module):
    def __init__(self, dim_model):
        super().__init__()
        self.fc1 = nn.Conv1d(dim_model, dim_model * 4, 1)
        self.fc2 = nn.Conv1d(dim_model * 4, dim_model, 1)

    def forward(self, x):
        x1 = self.fc1(x)
        x1 = x1.gelu()
        x2 = self.fc2(x1)
        return x2


## 3. TFT

In [None]:
import torch
import torch.nn as nn
from einops import rearrange, repeat
from einops.layers.torch import Rearrange

class TemporalAggregation(nn.Module):
    '''
    In this modified version, the TFT encoder is used to extract features from the input time series data, 
    and a temporal aggregation layer is applied to aggregate the features across the time dimension. 
    Finally, a linear classifier is used to predict the class labels.
    '''
    def __init__(self, dim_model):
        super().__init__()
        self.proj = nn.Conv1d(dim_model, dim_model, 1)

    def forward(self, x):
        x = self.proj(x)
        return x.mean(dim=2)

class TFTEncoder(nn.Module):
    def __init__(self, input_dim, dim_model, n_heads, num_stacks, factor):
        super().__init__()
        self.input_dim = input_dim
        self.dim_model = dim_model
        self.n_heads = n_heads
        self.num_stacks = num_stacks
        self.factor = factor

        self.proj = nn.Conv1d(input_dim, dim_model, 1)
        self.pos_enc = nn.Parameter(torch.randn(1, dim_model, factor))

        self.stacks = nn.ModuleList(
            [TFTStack(dim_model, n_heads, factor) for _ in range(num_stacks)]
        )

        self.aggregation = TemporalAggregation(dim_model)
        self.classifier = nn.Linear(dim_model, num_classes)

    def forward(self, x):
        x = self.proj(x)
        x = rearrange(x, 'b c l -> b l c')
        x += self.pos_enc[:, :, : x.size(1)]

        for stack in self.stacks:
            x = stack(x)

        x = self.aggregation(x)
        x = rearrange(x, 'b l c -> b c l')
        x = self.classifier(x)

        return x

class TFTStack(nn.Module):
    def __init__(self, dim_model, n_heads, factor):
        super().__init__()
        self.attn = TFTAttention(dim_model, n_heads, factor)
        self.ff = FFN(dim_model)
        self.norm1 = nn.LayerNorm(dim_model)
        self.norm2 = nn.LayerNorm(dim_model)

    def forward(self, x):
        x = x + self.attn(self.norm1(x))
        x = x + self.ff(self.norm2(x))
        return x

class TFTAttention(nn.Module):
    def __init__(self, dim_model, n_heads, factor):
        super().__init__()
        self.dim_model = dim_model
        self.n_heads = n_heads
        self.factor = factor
        self.head_dim = dim_model // n_heads

        self.qkv = nn.Conv1d(dim_model, 3 * dim_model, 1)
        self.proj = nn.Conv1d(dim_model, dim_model, 1)
        self.scale = self.head_dim ** -0.5

        self.gating = nn.Linear(dim_model, 1)
        self.static_conv = nn.Conv1d(dim_model, dim_model, 1)
        self.static_proj = nn.Conv1d(dim_model, dim_model, 1)

    def forward(self, x):
        b, l, c = x.size()
        x = self.qkv(x)
        qkv = rearrange(x, 'b l (three c) -> three b c l', three=3)
        q, k, v = map(lambda t: rearrange(t, 'b c l -> b l (h c)', h=self.n_heads), qkv)

        # Compute attention scores and probabilities
        attn = torch.einsum('b l h d, b l h e -> b l h d e', q * self.scale, k.transpose(-2, -1))
        attn = attn.softmax(dim=-1)

        # Compute static and temporal components
        static_comp = self.static_conv(x)
        static_comp = rearrange(static_comp, 'b l (h d) -> b l h d', h=self.n_heads)
        static_comp = self.static_proj(static_comp.mean(dim=1))
        static_comp = rearrange(static_comp, 'b l (h d) -> b l h d', h=self.n_heads)

        temporal_comp = torch.einsum('b l h d e, b l h d -> b l h d', attn, v)

        # Apply gating mechanism
        gate = self.gating(x).sigmoid()
        gate = rearrange(gate, 'b l (h d) -> b l h d', h=self.n_heads)
        out = gate * static_comp + (1 - gate) * temporal_comp

        out = rearrange(out, 'b l (h d) -> b l (h d)', h=self.n_heads)
        out = self.proj(out)
        return out

class FFN(nn.Module):
    def __init__(self, dim_model):
        super().__init__()
        self.fc1 = nn.Conv1d(dim_model, dim_model * 4, 1)
        self.fc2 = nn.Conv1d(dim_model * 4, dim_model, 1)

    def forward(self, x):
        x1 = self.fc1(x)
        x1 = x1.gelu()
        x2 = self.fc2(x1)
        return x2


## 4. Ensemble

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import accuracy_score

# Instantiate models
input_dim = 1
dim_model = 512
n_heads = 8
num_stacks = 2
factor = 50
num_classes = 10

informer_model = InformerEncoder(input_dim, dim_model, n_heads, num_stacks, factor, True)
autformer_model = AutoformerEncoder(input_dim, dim_model, n_heads, num_stacks, factor)
tft_model = TFTEncoder(input_dim, dim_model, n_heads, num_stacks, factor)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer_informer = optim.Adam(informer_model.parameters())
optimizer_autformer = optim.Adam(autformer_model.parameters())
optimizer_tft = optim.Adam(tft_model.parameters())

# Train models
num_epochs = 10

for epoch in range(num_epochs):
    # Train Informer
    informer_model.train()
    optimizer_informer.zero_grad()
    informer_output = informer_model(input_data)
    loss_informer = criterion(informer_output, target_data)
    loss_informer.backward()
    optimizer_informer.step()

    # Train Autoformer
    autformer_model.train()
    optimizer_autformer.zero_grad()
    autformer_output = autformer_model(input_data)
    loss_autformer = criterion(autformer_output, target_data)
    loss_autformer.backward()
    optimizer_autformer.step()

    # Train TFT
    tft_model.train()
    optimizer_tft.zero_grad()
    tft_output = tft_model(input_data)
    loss_tft = criterion(tft_output, target_data)
    loss_tft.backward()
    optimizer_tft.step()

# Evaluate ensemble
informer_model.eval()
autformer_model.eval()
tft_model.eval()

with torch.no_grad():
    informer_output = informer_model(test_data)
    autformer_output = autformer_model(test_data)
    tft_output = tft_model(test_data)

    # Combine predictions using soft voting
    ensemble_logits = (informer_output + autformer_output + tft_output) / 3
    ensemble_probs = nn.functional.softmax(ensemble_logits, dim=1)
    ensemble_preds = torch.argmax(ensemble_probs, dim=1)

    # Calculate accuracy
    accuracy = accuracy_score(test_labels, ensemble_preds.cpu().numpy())
    print(f"Ensemble accuracy: {accuracy}")
