In [1]:
import torch
import torch.nn as nn
import numpy as np

class DataEmbedding(nn.Module):
    def __init__(self, feats_in: int, feats_model: int, embedding_type: str = 'fixed', frequency: str = 'h', dropout: float = 0.1):
        super(DataEmbedding, self).__init__()
        self.value_embedding = nn.Linear(feats_in, feats_model)  # [1] -> [d_model]
        self.position_embedding = PositionalEmbedding(feats_model=feats_model)
        self.temporal_embedding = TemporalEmbedding(feats_model=feats_model, embedding_type=embedding_type, frequency=frequency)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x: torch.Tensor, x_mark: torch.Tensor) -> torch.Tensor:
        # x shape: [B, T, 1]
        x_emb = self.value_embedding(x)  # [B, T, 1] -> [B, T, d_model]
        x_emb = x_emb + self.position_embedding(x)  # [B, T, d_model]

        if x_mark is not None:
            x_emb += self.temporal_embedding(x_mark)

        return self.dropout(x_emb)

class TemporalEmbedding(nn.Module):
    def __init__(self, feats_model: int, embedding_type: str = 'timeF', frequency: str = 'h'):
        super(TemporalEmbedding, self).__init__()

        # 시간 주기별 임베딩 차원
        Embed = nn.Embedding

        # 시간 주기 구성: 시간 단위에 따라 다르게 처리
        if frequency == 'h':
            self.minute_embed = None
            self.hour_embed = Embed(24, feats_model)       # 0~23
            self.weekday_embed = Embed(7, feats_model)     # 0~6
            self.day_embed = Embed(32, feats_model)        # 1~31
            self.month_embed = Embed(13, feats_model)      # 1~12
        else:
            raise NotImplementedError(f"Unsupported frequency: {frequency}")

    def forward(self, x_mark: torch.Tensor) -> torch.Tensor:
        """
        Parameters
        ----------
        x_mark : Tensor of shape [B, T, 4] → columns: hour, weekday, day, month

        Returns
        -------
        Tensor of shape [B, T, d_model]
        """
        hour_x = self.hour_embed(x_mark[:, :, 0].long())
        weekday_x = self.weekday_embed(x_mark[:, :, 1].long())
        day_x = self.day_embed(x_mark[:, :, 2].long())
        month_x = self.month_embed(x_mark[:, :, 3].long())

        return hour_x + weekday_x + day_x + month_x


class PositionalEmbedding(nn.Module):
    def __init__(self, feats_model: int, max_len: int = 5000):
        super(PositionalEmbedding, self).__init__()

        # 위치 인덱스 (0, 1, 2, ..., max_len - 1)
        pe = torch.zeros(max_len, feats_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, feats_model, 2).float() * (-np.log(10000.0) / feats_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)  # [1, max_len, d_model]
        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        Parameters
        ----------
        x : Tensor of shape [batch_size, seq_len, feats_model]
        """
        return self.pe[:, :x.size(1), :]


class Inception_Block_V1(nn.Module):
    def __init__(self, in_channels, out_channels, num_kernels=6, init_weight=True):
        super(Inception_Block_V1, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.num_kernels = num_kernels
        kernels = []
        for i in range(self.num_kernels):
            kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=2 * i + 1, padding=i))
        self.kernels = nn.ModuleList(kernels)
        if init_weight:
            self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)

    def forward(self, x):
        res_list = []
        for i in range(self.num_kernels):
            res_list.append(self.kernels[i](x))
        res = torch.stack(res_list, dim=-1).mean(-1)
        return res


class Inception_Block_V2(nn.Module):
    def __init__(self, in_channels, out_channels, num_kernels=6, init_weight=True):
        super(Inception_Block_V2, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.num_kernels = num_kernels
        kernels = []
        for i in range(self.num_kernels // 2):
            kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=[1, 2 * i + 3], padding=[0, i + 1]))
            kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=[2 * i + 3, 1], padding=[i + 1, 0]))
        kernels.append(nn.Conv2d(in_channels, out_channels, kernel_size=1))
        self.kernels = nn.ModuleList(kernels)
        if init_weight:
            self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)

    def forward(self, x):
        res_list = []
        for i in range(self.num_kernels + 1):
            res_list.append(self.kernels[i](x))
        res = torch.stack(res_list, dim=-1).mean(-1)
        return res



In [None]:

import torch
import torch.nn as nn
import torch.nn.functional as F

class ShallowPPGEncoder(nn.Module):
    def __init__(self, in_channels=1, cnn_channels=64, kernel_size=15):
        super().__init__()
        self.temporal_conv = nn.Sequential(
            nn.Conv1d(in_channels, cnn_channels, kernel_size, padding=kernel_size//2),
            nn.BatchNorm1d(cnn_channels),
            nn.ELU(),
            nn.MaxPool1d(2)
        )

    def forward(self, x):
        x = self.temporal_conv(x)  # [B, C, T//2]
        x = x.permute(0, 2, 1)     # [B, T//2, C]
        return x

class FineTemporalBranch(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv1d(d_model, d_model, kernel_size=5, padding=2),
            nn.BatchNorm1d(d_model),
            nn.ELU(),
            nn.MaxPool1d(2)
        )

    def forward(self, x):
        x = x.permute(0, 2, 1)  # [B, C, T]
        x = self.conv(x)
        x = x.permute(0, 2, 1)  # [B, T', C]
        return x

class CoarseTransformerBranch(nn.Module):
    def __init__(self, d_model, nhead):
        super().__init__()
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, batch_first=True)
        self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=1)

    def forward(self, x):
        return self.encoder(x)

class IPUnit(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        power = torch.mean(x ** 2, dim=1)
        return torch.log1p(power)

class PPGDeformer(nn.Module):
    def __init__(self, in_channels=1, cnn_channels=64, d_model=64, nhead=4, output_dim=2):
        super().__init__()
        self.encoder = ShallowPPGEncoder(in_channels, cnn_channels)
        self.fine_branch = FineTemporalBranch(d_model)
        self.coarse_branch = CoarseTransformerBranch(d_model, nhead)
        self.ip_unit = IPUnit()
        self.regressor = nn.Linear(d_model + d_model, output_dim)

    def forward(self, x):
        tokens = self.encoder(x)        # [B, T', C]
        fine_feat = self.fine_branch(tokens)        # [B, T'', C]
        coarse_feat = self.coarse_branch(tokens)   # [B, T', C]

        fine_summary = self.ip_unit(fine_feat)     # [B, C]
        coarse_summary = torch.mean(coarse_feat, dim=1)  # [B, C]

        combined = torch.cat([fine_summary, coarse_summary], dim=-1)
        out = self.regressor(combined)  # [B, output_dim]
        return out
