In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from typing import Optional

# --- Вспомогательные модули ---

class TimestepEmbedding(nn.Module):
    """
    Создаёт эмбеддинг временного шага, аналогично Stable Diffusion.
    """
    def __init__(self, embedding_dim: int, out_dim: int):
        super().__init__()
        self.linear_1 = nn.Linear(embedding_dim, out_dim)
        self.act = nn.SiLU()
        self.linear_2 = nn.Linear(out_dim, out_dim)

    def forward(self, x: torch.Tensor):
        x = self.linear_1(x)
        x = self.act(x)
        x = self.linear_2(x)
        return x

def get_timestep_embedding(
    timesteps: torch.Tensor,
    embedding_dim: int,
    flip_sin_to_cos: bool = False,
    downscale_freq_shift: float = 1,
    scale: float = 1,
    max_period: int = 10000,
) -> torch.Tensor:
    """
    Создаёт синусоидальные эмбеддинги временных шагов.
    """
    assert len(timesteps.shape) == 1, "Timesteps should be a 1d-array"

    half_dim = embedding_dim // 2
    exponent = -math.log(max_period) * torch.arange(
        start=0, end=half_dim, dtype=torch.float32, device=timesteps.device
    )
    exponent = exponent / (half_dim - downscale_freq_shift)

    emb = torch.exp(exponent).to(timesteps.dtype)
    emb = timesteps[:, None].float() * emb[None, :]

    # масштабирование
    emb = scale * emb

    # конкатенация синуса и косинуса
    emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=-1)

    # изменение порядка
    if flip_sin_to_cos:
        emb = torch.cat([emb[:, half_dim:], emb[:, :half_dim]], dim=-1)

    # дополнение нулями при нечетной размерности
    if embedding_dim % 2 == 1:
        emb = torch.nn.functional.pad(emb, (0, 1, 0, 0))
    return emb


class AdaLayerNorm(nn.Module):
    """
    Адаптивная нормализация с модуляцией из временного эмбеддинга.
    """
    def __init__(self, embedding_dim: int, out_dim: int):
        super().__init__()
        self.silu = nn.SiLU()
        # Генерируем параметры shift и scale
        self.linear = nn.Linear(embedding_dim, out_dim * 2)
        self.layer_norm = nn.LayerNorm(out_dim, elementwise_affine=False)

    def forward(self, x: torch.Tensor, emb: torch.Tensor):
        emb = self.silu(emb)
        emb = self.linear(emb)
        # Разделяем на shift и scale
        shift, scale = emb.chunk(2, dim=1)
        # Применяем нормализацию и модуляцию
        x = self.layer_norm(x) * (1 + scale.unsqueeze(1)) + shift.unsqueeze(1)
        return x


# --- Основные блоки модели ---

class MultiheadAttention(nn.Module):
    """
    Простой многоголовый механизм внимания с поддержкой cross-attention.
    """
    def __init__(self, dim: int, num_heads: int, context_dim: Optional[int] = None):
        super().__init__()
        self.dim = dim
        self.num_heads = num_heads
        self.head_dim = dim // num_heads
        self.context_dim = context_dim or dim
        
        self.to_q = nn.Linear(dim, dim, bias=False)
        self.to_k = nn.Linear(self.context_dim, dim, bias=False)
        self.to_v = nn.Linear(self.context_dim, dim, bias=False)
        self.to_out = nn.Linear(dim, dim)

    def forward(self, x: torch.Tensor, context: Optional[torch.Tensor] = None):
        """
        Args:
            x: Тензор запросов [B, N, D]
            context: Тензор ключей/значений [B, M, D_context]. Если None, используется self-attention.
        Returns:
            Выход внимания [B, N, D]
        """
        context = context if context is not None else x
        
        q = self.to_q(x)  # [B, N, D]
        k = self.to_k(context)  # [B, M, D]
        v = self.to_v(context)  # [B, M, D]
        
        # Разделяем на головы
        batch_size, seq_len_q, _ = q.shape
        seq_len_kv = k.shape[1]
        
        q = q.view(batch_size, seq_len_q, self.num_heads, self.head_dim).transpose(1, 2)  # [B, H, N, D_head]
        k = k.view(batch_size, seq_len_kv, self.num_heads, self.head_dim).transpose(1, 2)  # [B, H, M, D_head]
        v = v.view(batch_size, seq_len_kv, self.num_heads, self.head_dim).transpose(1, 2)  # [B, H, M, D_head]
        
        # Вычисляем attention scores
        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim)  # [B, H, N, M]
        attn_weights = F.softmax(scores, dim=-1)  # [B, H, N, M]
        
        # Применяем attention к значениям
        out = torch.matmul(attn_weights, v)  # [B, H, N, D_head]
        out = out.transpose(1, 2).contiguous().view(batch_size, seq_len_q, self.dim)  # [B, N, D]
        return self.to_out(out)  # [B, N, D]


class DiffusionBlock(nn.Module):
    """
    Основной блок диффузионной модели.
    Сочетает residual connection, нормализацию, внимание и MLP.
    """
    def __init__(self, dim: int, num_heads: int, condition_dim: int, time_emb_dim: int):
        super().__init__()
        self.dim = dim
        self.num_heads = num_heads
        self.condition_dim = condition_dim
        self.time_emb_dim = time_emb_dim

        # Адаптивная нормализация для модуляции временем
        self.norm1 = AdaLayerNorm(time_emb_dim, dim)
        # Внимание: целевые данные обращаются к условию (cross-attention)
        self.attn1 = MultiheadAttention(dim, num_heads, context_dim=condition_dim)
        
        self.norm2 = AdaLayerNorm(time_emb_dim, dim)
        # Само-внимание внутри целевых данных
        self.attn2 = MultiheadAttention(dim, num_heads)
        
        self.norm3 = AdaLayerNorm(time_emb_dim, dim)
        # MLP
        self.mlp = nn.Sequential(
            nn.Linear(dim, dim * 4),
            nn.GELU(),
            nn.Linear(dim * 4, dim),
        )

    def forward(self, noisy_target: torch.Tensor, timestep_emb: torch.Tensor, condition: torch.Tensor):
        """
        Args:
            noisy_target: [B, seq_len_target, dim]
            timestep_emb: [B, time_emb_dim]
            condition: [B, seq_len_condition, condition_dim]
        Returns:
            Обновлённые целевые данные [B, seq_len_target, dim]
        """
        # 1. Cross-Attention с условием
        h = self.norm1(noisy_target, timestep_emb)
        h = self.attn1(h, context=condition)
        noisy_target = noisy_target + h  # Residual connection

        # 2. Self-Attention
        h = self.norm2(noisy_target, timestep_emb)
        h = self.attn2(h)
        noisy_target = noisy_target + h  # Residual connection

        # 3. MLP
        h = self.norm3(noisy_target, timestep_emb)
        h = self.mlp(h)
        noisy_target = noisy_target + h  # Residual connection

        return noisy_target


class CustomDiffusionModel(nn.Module):
    """
    Кастомная диффузионная модель для предсказания шума в временных рядах.
    """
    def __init__(
        self,
        target_seq_len: int = 32,
        target_dim: int = 1,
        condition_seq_len: int = 256,
        condition_dim: int = 256, # Предполагается, что это выход TradingGDTProcessor
        hidden_dim: int = 512,
        num_layers: int = 6,
        num_heads: int = 8,
        time_embedding_dim: int = 256,
    ):
        super().__init__()
        self.target_seq_len = target_seq_len
        self.target_dim = target_dim
        self.condition_seq_len = condition_seq_len
        self.condition_dim = condition_dim
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.time_embedding_dim = time_embedding_dim

        # Проекции входов в скрытое пространство
        self.target_proj = nn.Linear(target_dim, hidden_dim)
        # Условие уже в правильной размерности, но добавим проекцию на всякий случай
        self.condition_proj = nn.Linear(condition_dim, hidden_dim) 

        # Временной эмбеддинг
        self.time_proj = nn.Sequential(
            # Сначала синусоидальный эмбеддинг
            # Затем MLP для увеличения размерности
            nn.Linear(time_embedding_dim, time_embedding_dim * 4),
            nn.SiLU(),
            nn.Linear(time_embedding_dim * 4, time_embedding_dim),
        )
        # Или можно использовать готовый TimestepEmbedding
        # self.time_proj = TimestepEmbedding(time_embedding_dim, time_embedding_dim)

        # Стек диффузионных блоков
        self.layers = nn.ModuleList([
            DiffusionBlock(
                dim=hidden_dim,
                num_heads=num_heads,
                condition_dim=hidden_dim, 
                time_emb_dim=time_embedding_dim
            ) for _ in range(num_layers)
        ])

        # Финальный слой для предсказания шума
        self.final_layer = nn.Linear(hidden_dim, target_dim)

    def forward(self, noisy_target: torch.Tensor, timestep: torch.Tensor, condition: torch.Tensor):
        """
        Прямой проход модели.

        Args:
            noisy_target (torch.Tensor): Зашумлённые целевые данные [B, target_seq_len, target_dim].
            timestep (torch.Tensor): Временной шаг (скаляр или тензор [B]).
            condition (torch.Tensor): Условие (обработанная история) [B, condition_seq_len, condition_dim].

        Returns:
            torch.Tensor: Предсказанный шум [B, target_seq_len, target_dim].
        """
        batch_size = noisy_target.shape[0]
        
        # 1. Проекция входов
        # noisy_target: [B, 32, 1] -> [B, 32, hidden_dim]
        h_target = self.target_proj(noisy_target)
        # condition: [B, 256, 256] -> [B, 256, hidden_dim]
        h_condition = self.condition_proj(condition)

        # 2. Создание и проекция временного эмбеддинга
        if not torch.is_tensor(timestep):
            timestep = torch.tensor([timestep], dtype=torch.long, device=noisy_target.device)
        
        if timestep.ndim == 0:
            timestep = timestep.repeat(batch_size)
            
        # Создаём синусоидальный эмбеддинг
        t_emb = get_timestep_embedding(timestep, self.time_embedding_dim)
        # Проектируем его
        t_emb = self.time_proj(t_emb) # [B, time_embedding_dim]

        # 3. Проход через слои
        for layer in self.layers:
            h_target = layer(h_target, t_emb, h_condition)

        # 4. Финальное предсказание шума
        noise_pred = self.final_layer(h_target) # [B, 32, 1]

        return noise_pred


# --- Пример использования ---
if __name__ == "__main__":
    # Параметры модели
    model_params = {
        'target_seq_len': 32,
        'target_dim': 1,
        'condition_seq_len': 256,
        'condition_dim': 256,
        'hidden_dim': 256,
        'num_layers': 4,
        'num_heads': 8,
        'time_embedding_dim': 128,
    }

    # Создание модели
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = CustomDiffusionModel(**model_params).to(device)
    print(f"Model created with {sum(p.numel() for p in model.parameters() if p.requires_grad):,} trainable parameters.")

    # Примерные данные
    batch_size = 2
    noisy_targets = torch.randn(batch_size, 32, 1, device=device)
    timesteps = torch.randint(0, 1000, (batch_size,), device=device, dtype=torch.long)
    conditions = torch.randn(batch_size, 256, 256, device=device) # Выход TradingGDTProcessor

    # Прямой проход
    model.eval()
    with torch.no_grad():
        predicted_noise = model(noisy_targets, timesteps, conditions)
        
    print(f"Input noisy_targets shape: {noisy_targets.shape}")
    print(f"Input timesteps shape: {timesteps.shape}")
    print(f"Input conditions shape: {conditions.shape}")
    print(f"Predicted noise shape: {predicted_noise.shape}")
    print("Forward pass successful!")


Model created with 5,192,321 trainable parameters.
Input noisy_targets shape: torch.Size([2, 32, 1])
Input timesteps shape: torch.Size([2])
Input conditions shape: torch.Size([2, 256, 256])
Predicted noise shape: torch.Size([2, 32, 1])
Forward pass successful!
