# LTSF-Linear family
- Linear : one linear layer
- DLinear : Decomposition Linear -> trend와 seasonality 패턴을 핸들링
- NLinear : 정규화된 Linear로서 train-test set 분포 변화를 다룬다. 

In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch.utils.data import DataLoader
from torch.utils.data import Dataset



import warnings
warnings.filterwarnings('ignore')

import time
import random
from tqdm import tqdm
import matplotlib.pyplot as plt

from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler


### (1) Linear
- 단 하나의 선형 레이어로 구성된 모델이지만 트랜스포머 모델보다 성능이 우수하다.

In [None]:
class LTSF_Linear(torch.nn.Module):
    def __init__(self, window_size, forcast_size, individual, feature_size):
        super(LTSF_Linear, self).__init__()
        self.window_size = window_size
        self.forcast_size  = forcast_size
        self.individual = individual  # ????
        self.channels = feature_size
        if self.individual: # ?????
            self.Linear = torch.nn.ModuleList() # python List와
            for i in range(self.channels):
                self.Linear.append(torch.nn.Linear(self.window_size, self.forcast_size))
        else:
            self.Linear = torch.nn.Linear(self.window_size, self.forcast_size)

        
    def forward(self, x): # 모델이 학습 데이터를 입력받아서 forward 연산을 진행시키는 함수
        if self.individual:
            output = torch.zeros([x.size(0), self.pred_len, x.size(2)], dtype=x.dtype).to(x.device)
            for i in range(self.channels):
                output[:,:,i] = self.Linear[i](x[:,:,i])
            x = output
        else:
            x = self.Linear(x.permute(0,2,1)).permute(0,2,1)
        return x

In [2]:
class moving_avg(nn.Module): # Module은 또한 다른 모듈들을 담고있다, tree구조로 품고 있다. submodule들을 특성들로 할당할 수 있다.
    """
    Moving average block to highlight the trend of time series
    """
    def __init__(self, kernel_size, stride):
        super(moving_avg, self).__init__()
        self.kernel_size = kernel_size
        self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride = stride, padding = 0)

    def forward(self, x):
        # time series에서 양쪽 끝에 패딩하기
        front = x[:, 0:1, :].repeat(1, (self.kernel_size-1) // 2, 1) # kernel_size : 연산을 수행할 때 윈도우의 크기를 의미
        end = x[:, -1:, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        x = torch.cat([front, x, end], dim=1)
        x = self.avg(x.permute(0, 2, 1)) # torch.permute() 는 원하는 순서로 원본 텐서를 재정렬하고 새로운 차원으로 반환할 수 있다.
        x = x.permute(0,2,1)
        return x
    


class series_decomp(nn.Module): # nn.Module을 상속받는다는 의미 같다.
    """
    Series decomposition block
    """
    def __init__(self, kernel_size):
        super(series_decomp, self).__init__() # 자식 클래스가 상속받는 부모 클래스를 자식 클래스에 불러오겠다는 의미 -> 그냥 상속받는다는 의미 super().__init()__과 차이없음.
        self.moving_avg = moving_avg(kernel_size, stried=1)

    def forward(self, x): 
        moving_mean = self.moving_avg(x)
        res = x - moving_mean # 잔차 구하는거 같다.
        return res, moving_mean # 잔차와 trend 반환
    


class Model(nn.Module):
    """
    Decomposition-Linear
    """
    def __init__(self, configs): # configs에 어떤 정보 받아져있는지 확인해볼 필요 있다.
        super(Model, self).__init__()
        self.seq_len = configs.seq_len
        self.pred_len = configs.pred_len

        # Decomposition Kernel Size
        kernel_size = 25 # ???????????????????????????????????
        self.decomposition = series_decomp(kernel_size) # 위에 25로 설정했으니 25개의 moving avg 찍는다.
        self.individual = configs.individual # ???????????????????????
        self.channels = configs.enc_in # ????????????????????????

        if self.individual:
            self.Linear_Seasonal = nn.ModuleList()
            self.Linear_Trend = nn.ModuleList()

            for i in range(self.channels):
                self.Linear_Seasonal.append(nn.Linear(self.seq_len, self.pred_len))
                self.Linear_Trend.append(nn.Linear(self.seq_len, self.pred_len))

                # Use this two lines if you want to visualize the weights
                # self.Linear_Seasonal[i].weight = nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len,self.seq_len]))
                # self.Linear_Trend[i].weight = nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len,self.seq_len]))

        else:
            self.Linear_Seasonal = nn.Linear(self.seq_len, self.pred_len)
            self.Linear_Trend = nn.Linear(self.seq_len, self.pred_len)

            # Use this two lines if you want to visualize the weights
            # self.Linear_Seasonal.weight = nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len,self.seq_len]))
            # self.Linear_Trend.weight = nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len,self.seq_len]))

    
    def forward(self, x):
        # x : [Batch, Input length, Channel]
        seasonal_init, trend_init = self.decomposition(x)
        seasonal_init, trend_init = seasonal_init.permute(0,2,1), trend_init.permute(0,2,1)
        if self.individual:
            seasonal_output = torch.zeros([seasonal_init.size(0), seasonal_init.size(1), self.pred_len], dtype=seasonal_init.dtype).to(seasonal_init.device)
            trend_output = torch.zeros([trend_init.size(0), trend_init.size(1), self.pred_len], dtype=trend_init.dtype).to(trend_init.device)
            for i in range(self.channels):
                seasonal_output[:,i,:] = self.Linear_Seasonal[i](seasonal_init[:,i,:])
                trend_output[:,i,:] = self.Linear_Trend[i](trend_init[:,i,:])

        else:
            seasonal_output = self.Linear_Seasonal(seasonal_init)
            trend_output = self.Linear_Trend(trend_init)

        x = seasonal_output + trend_output
        return x.permute(0, 2, 1) # to [Batch, Output length, Channel]

                



    

