# 0. 라이브러리 로드

In [6]:
import torch
from torch import nn
import math
from typing import Callable, Optional
from torch import Tensor
import torch.nn.functional as F
import numpy as np

#1. PatchTST_layers

In [7]:
class Transpose(nn.Module):
  def __init__(self,*dims,contiguous=False):
    super().__init__()
    self.dims, self.contiguous=dims, contiguous
  def forward(self, x):
    if self.contiguous: return x.transpose(*self.dims).contiguous()
    else: return x.transpose(*self.dims)

def get_activation_fn(activation):
  if callable(activation): return activation()
  elif activation.lower()=="relu": return nn.ReLU()
  elif activation.lower()=="gelu": return nn.GELU()
  raise ValueError(f'{activation} is not available. You can use "relu", "gelu", or a callable')

  #decomposition
  class moving_avg(nn.Module):
    """
    Moving average block to highlight the trend of time series
    """
    # kernel_size: 이동 평균을 계산할 때 사용되는 윈도우 크기를 정의
    # stride: 이동 평균 계산 시의 Stride(간격)을 정의
    def __init__(self, kernel_size, stride):
      super(moving_avg, self).__init__()#굳이 이런식으로 인자 넘겨야 하나
      self.kernel_size=kernel_size

      #nn.AvgPool1d를 이용하여 1차원 평균 풀링을 구현. 이동 평균 계산에 사용
      self.avg=nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)

    def forward(self, x):
      #front 와 end는 필터를 적용함에 따라 감소하는 데이터의 양을
      #채우기 위한 padding의 역할
      front=x[:,0:1,:].repeat(1,(self.kernel_size-1)//2,1)
      end=x[:,-1:,:].repeat(1, (self.kernel_size-1)//2,1)

      #만들어진 Padding과 원래 데이터를 결합한다.
      x=torch.cat([front, x, end],dim=1)

      #Signal의 차원을 변환하고, nn.AvgPool1d를 이용하여 avg 걔산
      x=self.avg(x.permute(0,2,1))
      #차원을 원래대로 변환하여 Avg를 구한다.
      x=x.permute(0,2,1)
      return x

class series_decomp(nn.Module):
      """
      Time Series Data에 대해서
      Moving Average Decomposition을
      수행하기 위한 Class 선언
      """
      def __init__(self, kernel_size):
        super(series_decomp, self).__init__()
        self.moving_avg=moving_avg(kernel_size, stride=1)

      def forward(self, x):
        #moving mean: nn.Module로 선언된 moving_avg를 호출하고
        #입력 데이터의 이동평균을 계산한다.
        moving_mean=self.moving_avg(x)

        #원래 데이터에서 이동평균을 빼서 잔차를 구한다.
        res=x-moving_mean

        #잔차와 이동평균을 반환한다.
        return res, moving_mean
      #pos_encoding
      def PositionalEncoding(q_len, d_model, normalize=True):
        pe=torch.zeros(q_len, d_model)
        position=torch.arange(0,q_len).unsqueeze(1) #내용 변형 없이 차원 1 증가
        div_term=torch.exp(torch.arange(0,d_model,2)*-(math.log(10000.0)/d_model))
        pd[:,0::2]=torch.sin(position*div_term)
        pd[:,1::2]=torch,cos(position*div_term)
        if normalize:
          pe=pe-pe.mean()
          pe=pe/(pe.std()*10) #곱하기 10은 왜 하는 거임?
        return pe

      sinCosPosEncoding=PositionalEncoding

      def Coord2dPosEncoding(q_lem, d_model, exponential=False, normalize=True, eps=1e-3, verbose=False):
        x=.5 if exponential else 1
        i=0
        for i in range(100):
          cpe=2*(torch.linspace(0,1,q_len).reshape(-1,1)**x)*(torch.linspace(0,1,d_model).reshape(1,-1)**x)-1
          #0-1사이를 특정 개수만큼 나누고 원하는 차원으로 변형시켜주기
          print(f'{i:s.0f} {x:5.3f} {cpe.mean(): +6.3f}',verbose)
          if abs(cpe.mean())<=eps:break #평균값이 충분히 0에 가까워지면 중단
          elif cpe.mean()>eps:x+=.001
          else: x-=.001
          i+=1

        if normalize:
         cpe=cpe-cpe.mean()
         cpe=cpe/(cpe.std()*10)
        return cpe

      def Coord1dPosEncoding(q_len, exponential=False, normalize=True):
        cpe=(2*(torch.linspace(0,1,q_len).reshape(-1,1)**(.5 if exponential else 1))-1)
        if normalize:
          cpe=cpe-cpe.mean()
          cpe=cpe/(cpe.std()*10)
        return cpe

      def positional_encoding(pe, learn_pe, q_len, d_model):
        #Positional encoding
        if pe==None:
          W_pos=torch.empty((q_len,d_model)) #pe=None and learn_pe=False can be used to measure impact of pe
          nn.init.uniform_(W_pos, -0.02,0.02)
          learn_pe=Fale
        elif pe=='zero':
          W_pos=torch.empty((q_len,1))
          nn.init.uniform_(W_pos, -0.02, 0.02)
        elif pe=='zeros':
          W_pos=torch.empty((q_len, d_model))
          nn.init.uniform_(W_pos, -0.02, 0.02)
        elif pe=='normal' or pe=='gauss':
          W_pos=torch.zeros((q_len, 1))
          torch.nn.init.normal_(W_pos, mean=0.0, std=0.1)
        elif pe=='uniform':
          W_pos=torch.zeros((q_len,1))
          nn.init.uniform_(W_pos, a=0.0,b=0.1)
        elif pe=='lin1d':W_pos=Coord1dPosEncoding(q_len, exponential=False, normalize=True)
        elif pe=='exp1d':W_pos=Coor1dPosEncoding(q_len, exponential=True, normalize=True)
        elif pe=='lin2d':W_pos=Coor2dPosEncoding(q_len, d_model, exponential=False, normalize=True)
        elif pe=='ex2d':W_pos=Coor2dPosEncoding(q_len, d_model, exponential=True, normalize=True)
        elif pe=='sincos':W_pos=PositionalEncoding(q_len, d_model, normalize=True)
        else: raise ValueError(f"{pe}is not a valid pe (positional encoder, Available types: 'gauss'=='normal','zoros','zero','uniform', 'lin1d', 'ex1d','lin23','ex2d','sincos','None.')")
        return nn.Parameter(W_pos, requires_grad=learn_pe)




.contiguous(): transpose로 인해 변경된 텐서의 메모리 레이아웃이 연속적이지 않게 될 수 있기 때문에, 이를 다시 연속적인 메모리 레이아웃으로 만듦.  
callable(object): 전달받은 object 인자가 호출 가능한지 여부  
.lower(): 소문자로 바꾸기  
torch.arange(0, d_model, 2): 이 부분은 0부터 시작하여 d_model에 도달할 때까지 2씩 증가하는 숫자들로 1차원 텐서를 생성  
verbose가 True일 때만 실행되며, i, x, cpe.mean() 값을 지정된 형식에 따라 출력합니다.  
nn.init.uniform_(W_pos, -0.02, 0.02):
텐서의 모든 값을 균일 분포를 따르는 난수로 초기화.
이 경우, 모든 값은 -0.02와 0.02 사이의 범위에서 무작위로 선택. _ 접미사는 이 연산이 W_pos 텐서 자체를 변경(in-place)한다는 것을 나타냅니다.

# 2. RevIN

In [8]:
import torch
import torch.nn as nn

class RevIN(nn.Module):
  def __init__(self,num_features: int, eps=1e-5, affine=True, subtract_last=False):
    """
    :param num_features: the number of features or channels
    :param eps: a value added for numerical stability
    :param affine: if True, RevIN has learnable affine parameters
    """
    super(RevIN,self).__init__()
    self.num_features=num_features
    self.eps=eps
    self.affine=affine
    self.subtract_last=subtract_last
    if self.affine:
      self._inint_params()

  def forward(self, x, mode:str):
    if mode=='norm':
      self._get_statistics(x)
      x=self._normalize(x)
    elif mode=='dnorm':
      x=self._denormalize(X)
    else: raise NotImplementedError
    return x

  def _init_params(self):
    #initialize RevIN params:
    self.affine_weight=nn.Parameter(torch.ones(self.num_features))
    self.affine_bias=nn.Parameter(torch.zeros(self.num_features))

  def _get_statistics(self, x):
    dim2reduce=tuple(range(1,x.ndim-1))
    if self.subtract_last:
      self.last=x[:,-1,:].unsqueeze(1)
    else:
      self.mean=torch.mean(x, dim=dim2reduce, keepdim=True).detach()
    self.stdev=torch.sqrt(torch.var(x, dim=dim2reduce, keepdim=True, unbiased=False)+self.eps).detach()

  def _normalize(self, x):
      if self.subtract_last:
          x = x - self.last
      else:
          x = x - self.mean
          x = x / self.stdev
      if self.affine:
           x = x * self.affine_weight
           x = x + self.affine_bias
      return x
  def _denormalize(self, x):
      if self.affine:
         x = x - self.affine_bias
         x = x / (self.affine_weight + self.eps*self.eps)
      x = x * self.stdev
      if self.subtract_last:
         x = x + self.last
      else:
         x = x + self.mean
      return x


#3. Backbone

In [10]:
class PatchTST_backbone(nn.Module):
  def __init__(self, c_in:int, context_window:int, target_window:int, patch_len:int, stride:int, max_seq_len:Optional[int]=1024,
               n_layers:int=3, d_model=128, n_heads=16, d_k:Optional[int]=None, d_v:Optional[int]=None,
               d_ff:int=256, norm:str='BatchNorm',attn_dropout:float=0., dropout:float=0., act:str="gelu", ley_padding_mask:bool='auto',
               pe:str='zeros', learn_pe:bool=True, fc_dropout:float=0., head_dropout = 0, padding_patch = None,
              pretrain_head:bool=False, head_type = 'flatten', individual = False, revin = True, affine = True, subtract_last = False,
              verbose:bool=False, **kwargs):
    super().__init__()

    #RevIn
    self.revin=revin
    if self.revin: self.revin_layer=RevIN

    #Patching
    self.patch_len=patch_len
    self.stride=stride
    self.padding_patch=padding_patch
    patch_num=int((context_window-patch_len)/stride+1)
    if padding_patch=='end':
      self.padding_patch_layer=nn.ReplicationPad1d((0,stride))
      patch_num+=1
      #can be modified to general case

    #Backbone
    self.backbone=TSTiEncoder(c_in, patch_num=patch_num, patch_len=patch_len, max_seq_len=max_seq_len,
                                n_layers=n_layers, d_model=d_model, n_heads=n_heads, d_k=d_k, d_v=d_v, d_ff=d_ff,
                                attn_dropout=attn_dropout, dropout=dropout, act=act, key_padding_mask=key_padding_mask, padding_var=padding_var,
                                attn_mask=attn_mask, res_attention=res_attention, pre_norm=pre_norm, store_attn=store_attn,
                                pe=pe, learn_pe=learn_pe, verbose=verbose, **kwargs)

    #Head
    self.head_nf=d_model*patch_num
    self.n_vars=c_in
    self.pretrain_head=pretrain_head
    self.head_tupe=head_type
    self.individual=individual

    if self.pretrain_head:
      self.head=self.create_pretrain_head(self.head_nf, c_in, fc_dropout)
    elif head_type=='flatten':
      self.head=Flatten_Head(self,individual, self.n_vars, self.head_nf,target_window, head_dropout=head_dropout)

    #z: batch size* nvars(numbers of variables)*L(sequence length)
    #넘겨주고 나서 나중에 patching 으로 잘라서?

    def forward(self, z):
      #norm
      if self.revin:
        z=z.permute(0,2,1) #잠시 위치 바꿔주기
        z=self.revin_layer(z,'norm')
        z=z.permute(0,2,1)

      #do patchinging
      if self.padding_patch=='end':
        z=self.padding_patch_layer(z)

      #(bs, 7, N, P)
      z=z.unfole(simension=-1, size=self.patch_len, step=self.stride)

      #(bs, 7, P,N)
      z=z.permute(0,1,3,2)

      #model
      z=self.backbone(z)
      z=self.head(z)

      #denorm :앞에서 norm 시켜줬기 때문에 다시 나의 데이터로 돌아오기 위함
      if self.revin:

        z=z.permute(0,2,1)
        z=self.revin_layer(z, 'denorm')
        z=z.permute(0,2,1)
      return z

    def create_pretrain_head(self, head_nf, vars, dropout):
      return nn.Sequential(nn.Dropout(dropout),
                           nn.Conv1d(head_nf, vars, 1))





