DLinear

In [None]:
import torch
import torch.nn as nn
import numpy as np

class DLinear(nn.Module):
    """
    Диссекция DLinear: 
    - Универсальная модель для univariate и multivariate задач.
    - Поддерживает флаг --individual (для мультивариантной задачи с независимыми линейными слоями).
    """
    def __init__(self, seq_len: int, pred_len: int, individual: bool = False, n_vars: int = 1):
        """
        seq_len: длина входной последовательности
        pred_len: длина прогноза
        individual: если True, для каждого признака обучается отдельный линейный слой (аналог --individual в официальном коде)
        n_vars: количество переменных (признаков), используется при individual=True
        """
        super(DLinear, self).__init__()
        self.seq_len = seq_len
        self.pred_len = pred_len
        self.output_dim = pred_len
        self.individual = individual
        self.n_vars = n_vars

        # Усреднение по времени (для тренда)
        self.Linear_Seasonal = nn.Linear(seq_len, pred_len)
        self.Linear_Trend = nn.Linear(seq_len, pred_len)

        if self.individual:
            # Для каждого признака — отдельный линейный слой
            self.Linear_Seasonal.weight = nn.Parameter(torch.zeros(n_vars, pred_len, seq_len))
            self.Linear_Seasonal.bias = nn.Parameter(torch.zeros(n_vars, pred_len))
            self.Linear_Trend.weight = nn.Parameter(torch.zeros(n_vars, pred_len, seq_len))
            self.Linear_Trend.bias = nn.Parameter(torch.zeros(n_vars, pred_len))
        else:
            # Общий линейный слой для всех признаков
            self.Linear_Seasonal.weight = nn.Parameter(torch.zeros(pred_len, seq_len))
            self.Linear_Seasonal.bias = nn.Parameter(torch.zeros(pred_len))
            self.Linear_Trend.weight = nn.Parameter(torch.zeros(pred_len, seq_len))
            self.Linear_Trend.bias = nn.Parameter(torch.zeros(pred_len))

        # Инициализация весов
        nn.init.kaiming_uniform_(self.Linear_Seasonal.weight, mode='fan_in', nonlinearity='relu')
        nn.init.kaiming_uniform_(self.Linear_Trend.weight, mode='fan_in', nonlinearity='relu')

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        x: (batch_size, seq_len, n_vars)
        return: (batch_size, pred_len, n_vars)
        """
        # Декомпозиция: вычитание среднего (тренда)
        seasonal_init = x
        trend = seasonal_init.mean(dim=1, keepdim=True)
        seasonal = seasonal_init - trend

        # Транспонируем для линейного слоя: (batch, n_vars, seq_len)
        seasonal = seasonal.transpose(1, 2)  # (b, n_vars, seq_len)
        trend = trend.transpose(1, 2)       # (b, n_vars, seq_len)

        if self.individual:
            # Применяем линейные слои индивидуально для каждого признака
            seasonal_output = torch.stack([
                self.Linear_Seasonal[i](seasonal[:, i, :]) for i in range(self.n_vars)
            ], dim=1)  # (b, n_vars, pred_len)

            trend_output = torch.stack([
                self.Linear_Trend[i](trend[:, i, :]) for i in range(self.n_vars)
            ], dim=1)  # (b, n_vars, pred_len)
        else:
            # Применяем один линейный слой ко всем признакам
            seasonal_output = self.Linear_Seasonal(seasonal)  # (b, n_vars, pred_len)
            trend_output = self.Linear_Trend(trend)          # (b, n_vars, pred_len)

        # Складываем сезонность и тренд
        x = seasonal_output + trend_output
        # Транспонируем обратно: (batch, pred_len, n_vars)
        x = x.transpose(1, 2)
        return x

# Пример использования
if __name__ == "__main__":
    # Параметры задачи
    seq_len = 96
    pred_len = 48
    n_vars = 7  # например, 7 признаков
    batch_size = 32

    # --- Univariate ---
    print("=== Univariate ===")
    model_uni = DLinear(seq_len=seq_len, pred_len=pred_len, individual=False, n_vars=1)
    x_uni = torch.randn(batch_size, seq_len, 1)
    out_uni = model_uni(x_uni)
    print(f"Input: {x_uni.shape} -> Output: {out_uni.shape}")

    # --- Multivariate (non-individual) ---
    print("\n=== Multivariate (non-individual) ===")
    model_multi = DLinear(seq_len=seq_len, pred_len=pred_len, individual=False, n_vars=n_vars)
    x_multi = torch.randn(batch_size, seq_len, n_vars)
    out_multi = model_multi(x_multi)
    print(f"Input: {x_multi.shape} -> Output: {out_multi.shape}")

    # --- Multivariate (individual) ---
    print("\n=== Multivariate (individual) ===")
    model_indiv = DLinear(seq_len=seq_len, pred_len=pred_len, individual=True, n_vars=n_vars)
    x_indiv = torch.randn(batch_size, seq_len, n_vars)
    out_indiv = model_indiv(x_indiv)
    print(f"Input: {x_indiv.shape} -> Output: {out_indiv.shape}")

NLinear

In [None]:
import torch
import torch.nn as nn

class NLinear(nn.Module):
    """
    Non-stationary Linear Model (NLinear)
    Основано на статье: https://arxiv.org/abs/2205.13504
    """
    def __init__(self, seq_len: int, pred_len: int, individual: bool = False, n_vars: int = 1):
        """
        seq_len: длина входной последовательности
        pred_len: длина прогноза
        individual: если True, для каждого признака обучается отдельный линейный слой
        n_vars: количество переменных (признаков)
        """
        super(NLinear, self).__init__()
        self.seq_len = seq_len
        self.pred_len = pred_len
        self.individual = individual
        self.n_vars = n_vars

        if self.individual:
            self.Linear = nn.ModuleList()
            for i in range(n_vars):
                self.Linear.append(nn.Linear(seq_len, pred_len))
        else:
            self.Linear = nn.Linear(seq_len, pred_len)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        x: (batch_size, seq_len, n_vars)
        return: (batch_size, pred_len, n_vars)
        """
        if self.individual:
            # Применяем линейный слой к каждому признаку отдельно
            output = torch.stack(
                [self.Linear[i](x[:, :, i]) for i in range(self.n_vars)], dim=-1
            )
        else:
            # Транспонируем: (b, n_vars, seq_len)
            x = x.transpose(1, 2)
            output = self.Linear(x)  # (b, n_vars, pred_len)
            output = output.transpose(1, 2)  # (b, pred_len, n_vars)

        return output

# Пример использования
if __name__ == "__main__":
    seq_len = 96
    pred_len = 48
    n_vars = 7
    batch_size = 32

    model = NLinear(seq_len=seq_len, pred_len=pred_len, individual=False, n_vars=n_vars)
    x = torch.randn(batch_size, seq_len, n_vars)
    out = model(x)
    print(f"NLinear | Input: {x.shape} -> Output: {out.shape}")

PatchTST

In [None]:
import torch
import torch.nn as nn

class PatchTST(nn.Module):
    """
    PatchTST: Transformer with Patching
    Основано на статье: https://arxiv.org/abs/2211.14730
    """
    def __init__(
        self,
        seq_len: int,
        pred_len: int,
        patch_len: int = 16,
        stride: int = 8,
        d_model: int = 128,
        n_heads: int = 8,
        e_layers: int = 3,
        dropout: float = 0.1,
        individual: bool = False,
        n_vars: int = 1
    ):
        super(PatchTST, self).__init__()
        self.seq_len = seq_len
        self.pred_len = pred_len
        self.patch_len = patch_len
        self.stride = stride
        self.d_model = d_model
        self.n_vars = n_vars
        self.individual = individual

        # Вычисляем количество патчей
        self.num_patches = (seq_len - patch_len) // stride + 1

        # Линейный слой для проецирования патчей в d_model
        self.patch_projection = nn.Linear(patch_len, d_model)

        # Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_model * 2,
            dropout=dropout,
            activation='gelu',
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=e_layers)

        # Предсказание длины pred_len
        self.predictor = nn.Linear(d_model, pred_len)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        x: (batch_size, seq_len, n_vars)
        return: (batch_size, pred_len, n_vars)
        """
        batch_size, seq_len, n_vars = x.shape

        # Патчинг: разбиваем на патчи (batch, n_vars, num_patches, patch_len)
        patches = x.unfold(dimension=1, size=self.patch_len, step=self.stride)  # (b, n_vars, num_patches, patch_len)
        patches = patches.transpose(1, 2)  # (b, num_patches, n_vars, patch_len)

        # Проекция патчей в d_model
        patches = self.patch_projection(patches)  # (b, num_patches, n_vars, d_model)

        # Суммируем признаки вдоль оси n_vars (или используем отдельно, если individual=True)
        # Для простоты: усредняем по признакам (в оригинале может быть индивидуальный выход)
        patches = patches.mean(dim=2)  # (b, num_patches, d_model)

        # Пропускаем через Transformer
        out = self.transformer(patches)  # (b, num_patches, d_model)

        # Предсказание
        out = self.predictor(out)  # (b, num_patches, pred_len)

        # Суммируем или усредняем по патчам
        out = out.mean(dim=1)  # (b, pred_len)
        out = out.unsqueeze(-1).repeat(1, 1, self.n_vars)  # (b, pred_len, n_vars)

        return out

# Пример использования
if __name__ == "__main__":
    seq_len = 96
    pred_len = 48
    patch_len = 16
    stride = 8
    n_vars = 7
    batch_size = 32

    model = PatchTST(
        seq_len=seq_len,
        pred_len=pred_len,
        patch_len=patch_len,
        stride=stride,
        n_vars=n_vars
    )
    x = torch.randn(batch_size, seq_len, n_vars)
    out = model(x)
    print(f"PatchTST | Input: {x.shape} -> Output: {out.shape}")