<a href="https://colab.research.google.com/github/Dkepffl/Advanced_Analysis/blob/main/Transformer/Transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Reporting**
- (2024.11.15 16:06) 코드를 Part 별로 정돈.
- (2024.11.18 2:06) 논문 읽으면서 채우긴 함
  - 근데 Positional Encoding이 뭐죠?
  - Masking은?
  - Layer Normalization은 뭐죠?

## **Import Packages**

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import torchvision.transforms as transforms
from torchvision.datasets import CIFAR100
from torch.utils.data import DataLoader

import numpy as np

## **Data loader**

In [None]:
# data loader
path = './datasets/'

# Data transform Setting
transform = transforms.Compose([transforms.ToTensor()]) # Tensor 변환. 다른 전처리X

# Load data
train_data = CIFAR100(root=path,train=True,transform=transform,download=True)
test_data = CIFAR100(root=path,train=False,transform=transform,download=True)

batch_size = 100

train_loader = DataLoader(dataset=train_data,batch_size=batch_size,shuffle=True,num_workers=0)
test_loader = DataLoader(dataset=test_data,batch_size=batch_size,shuffle=False,num_workers=0)

Downloading https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz to ./datasets/cifar-100-python.tar.gz


100%|██████████| 169M/169M [00:04<00:00, 39.3MB/s]


Extracting ./datasets/cifar-100-python.tar.gz to ./datasets/
Files already downloaded and verified


In [None]:
input_shape = train_data[0][0].shape # [3, 32, 32]
output_shape = len(train_data.classes) # 100

In [None]:
##########################################
#### there is nothing to do upto here ####
##########################################

## **Transformer 구현**

### **Positional Encoding**

In [None]:
# refer to Section 3.5 in the paper
class PositionalEncoding(nn.Module):
    def __init__(self,device, max_len=512, d_model=16):
        super().__init__()
        # fill out here
        # how should we fill out self.pos_enc?
        self.device = device
        self.max_len = max_len
        self.d_model = d_model

        # initialization : (512, 16)
        self.pos_enc = torch.zeros(max_len,d_model,requires_grad=False)

    def forward(self,x):
        # fill out here
        """
        x: transformed input embedding where x.shape = [batch_size, seq_len, data_dim]
        """
        # What is pos? what is i? pos is the position and i is the dimension
        # 네?
        for i in range(self.d_model):
          for pos in range(self.max_len):
            self.pos_enc[pos,2*i] = np.sin(pos/(10000**(2*i/self.d_model))) # PE(pos,2i)
            self.pos_enc[pos,2*i+1] = np.cos(pos/(10000**(2*i/self.d_model))) # PE(pos,2i+1)
        batch_size, seq_len = x.shape[0], x.shape[1]
        pos_emb = self.pos_enc[:seq_len,:]
        return pos_emb

### **ScaledDotProductAttention**

In [None]:
# refer to Section 3.2.1 and Fig 2 (left) in the paper
class ScaledDotProductAttention(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self,q,k,v,mask=None):
        # fill out here
        # compute attention value based on transformed query, key, value where mask is given conditionally
        """
        q, k, v = transformed query, key, value
        q.shape, k.shape, v.shpae = [batch_size, num_head, seq_len, d=d_model/num_head]
        mask = masking matrix, if the index has value False, kill the value; else, leave the value
        """
        temp = torch.matmul(q,torch.transpose(k))
        ########################################dimension 확인 필요###################################
        temp = temp/torch.sqrt(q.shape[0]) # d_k = q.shape[0]
        ########################################확인 필요##############################################
        if mask != None:
          temp = mask(temp) # mask가 행렬인 듯??
        temp = nn.Softmax(temp)
        attention_value = torch.matmul(temp,v)

        return attention_value

### **Multi Head Attention**

In [None]:
# refer to Section 3.2.2 and Fig 2 (right) in the paper
class MultiHeadAttention(nn.Module):

    def __init__(self,d_model=16,num_head=4):
        super().__init__()
        # fill out the rest
        assert d_model % num_head == 0, "check if d_model is divisible by num_head"

        self.d_model = d_model
        self.num_head = num_head # head 개수
        self.d = d_model//num_head # d_k = d_v = d_model/num_head = 4?

        self.attention = ScaledDotProductAttention()

        # 일단 zero로 초기화해둠
        # nn.linear가 아닐까?
        self.w_q = nn.Linear(self.d_model,self.d)
        self.w_k = nn.Linear(self.d_model,self.d)
        self.w_v = nn.Linear(self.d_model,self.d)

        self.w_o = nn.Linear(self.d*self.num_head, self.d_model)

    def forward(self,q,k,v,mask=None):
        # fill out here
        # compute multi-head attention value
        # here, query, key, value are pre-transformed, so you need to transfrom them in this module
        """
        q, k, v = pre-transformed query, key, value
        q.shape, k.shape, v.shpae = [batch_size, seq_len, d_model]
        mask = masking matrix, if the index has value False, kill the value; else, leave the value
        """
        QW = torch.matmul(q,self.w_q)
        KW = torch.matmul(k,self.w_k)
        VW = torch.matmul(v,self.w_v)

        for i in range(self.num_head):
          head = self.attention(QW, KW, VW)
          head = torch.cat(head, dim=i)

        output = torch.matmul(head, self.w_o)
        return output

### **PositionwiseFeedForwardNetwork**

In [None]:
# refer to Section 3.3 in the paper
class PositionwiseFeedForwardNetwork(nn.Module):

    def __init__(self,d_model=16,d_ff=32):
        super().__init__()
        # fill out here
        self.d_model = d_model
        self.d_ff = d_ff
        self.linear1 = nn.Linear(self.d_model, self.d_ff) # W1
        self.linear2 = nn.Linear(self.d_ff, self.d_model) # W2

    def forward(self,x):
        # fill out here
        temp = self.linear1(x)
        temp = torch.ReLU(0, temp) # max(0, xW1 + b1) : ReLU 아닌가?
        output = self.linear2(temp)

        return output

### **Masking**

This
 masking, combined with fact that the output embeddings are offset by one position, ensures that the
 predictions for position i can depend only on the known outputs at positions less than i.

네?

In [None]:
class Masking(nn.Module):
    def __init__(self,device):
        super().__init__()
        # fill out here

    def forward(self,x):
        # fill out here
        """
        x.shape = [batch_size, seq_len, data_dim]
        """

        return mask

### **Layer Normalization**
- [Pytorch LayerNorm Document](https://pytorch.org/docs/stable/generated/torch.nn.LayerNorm.html)

In [None]:
# do not use torch.nn.LayerNorm
class LayerNormalization(nn.Module):
    def __init__(self, d_model=16, eps=1e-5):
        super().__init__()
        # fill out here
        self.d_model = d_model
        self.eps = eps # epsilon 이겠지?
        self.gamma = nn.Parameter(torch.ones(self.d_model)) # torch document 는 1로 initialize...
        self.beta = nn.Parameter(torch.zeros(self.d_model)) # torch document 는 0으로 initialize...

    def forward(self,x):
        # fill out here
        temp = x-torch.mean(x)
        temp = temp/(torch.sqrt(torch.var(x, unbiased=False) + self.eps))
        normed = temp*self.gamma + self.beta # gamma, beta : learnable parameter... 라고 써있는데요?

        return normed

### **Layerwise Encoder&Decoder**

 **Residual Dropout** We apply dropout [27] to the output of each sub-layer, before it is added to the
 sub-layer input and normalized. In addition, we apply dropout to the sums of the embeddings and the
 positional encodings in both the encoder and decoder stacks. For the base model, we use a rate of
 Pdrop = 01.

In [None]:
class EncoderLayer(nn.Module):
# refer to Section 3.1 and Figure 1 in the paper
# this is a single encoder block consists of the following
# multi-head attention, positionwise feed forward network, residual connections, layer normalizations

    def __init__(self,d_model=16,num_head=4,d_ff=32, drop_prob=0.1):
        super().__init__()
        # fill out here
        self.d_model = d_model
        self.num_head = num_head
        self.d_ff = d_ff
        self.drop_prob = drop_prob


    def forward(self,enc):
        # fill out here
        # 2-sub layers


        return output

In [None]:
class DecoderLayer(nn.Module):
# refer to Section 3.1 and Figure 1 in the paper
# this is a single decoder block consists of the following
# mawsked multi-head attention, multi-head attention, positionwise feed forward network, residual connections, layer normalizations

    def __init__(self,d_model=16,num_head=4,d_ff=32,drop_prob=0.1):
        super().__init__()
        # fill out here
        self.d_model=d_model
        self.num_head=num_head
        self.d_ff = d_ff
        self.drop_prob = drop_prob

    def forward(self,enc_output,dec,dec_mask):
        # fill out here
        # 3-sub layers

        return output

#### **찐 Encoder&Decoder**

In [None]:
class Encoder(nn.Module):
# refer to Section 3.1 and Figure 1 in the paper
# this is a whole encoder, i.e., the left side of Figure 1, consists of the following as well
# input embedding, positional encoding
    """
    in this homework, encoder inputs are not tokens, it is already embeddings in the input dimension
    hence, you don't have to set input embedding layer
    instead, you have to transform the input into the hidden dimension with single linear transformation
    """
    def __init__(self,device,input_dim=3,num_layer=3,max_len=512,d_model=16,num_head=4,d_ff=32,drop_prob=.1):
        super().__init__()
        # fill out here
        self.device = device
        self.input_dim = input_dim
        self.num_layer = num_layer
        self.max_len = max_len
        self.d_model = d_model
        self.num_head = num_head
        self.d_ff = d_ff
        self.drop_prob = drop_prob

        self.positionalencoding = PositionalEncoding(self.device, self.max_len, self.d_model)
        self.encoderlayer = EncoderLayer(self.d_model, self.num_head, self.d_ff, self.drop_prob)

    def forward(self,x):
        # fill out here
        input_embedding = x # input into the hidden dimension with single linear transformation
        hidden = self.positionalencoding(input_embedding)

        for i in range(self.num_layer):
          hidden = self.encoderlayer(hidden)

        return hidden

In [None]:
class Decoder(nn.Module):
# refer to Section 3.1 and Figure 1 in the paper
# this is a whole decoder, i.e., the left side of Figure 1, consists of the following as well
# input embedding, positional encoding, linear classifier
    """
    in this homework, decoder inputs are not tokens, it is already embeddings in the input dimension
    hence, you don't have to set input embedding layer
    instead, you have to transform the input into the hidden dimension with single linear transformation
    """
    def __init__(self,device,input_dim=3,num_layer=3,max_len=512,d_model=16,num_head=4,d_ff=32,drop_prob=.1):
        super().__init__()
        # fill out here
        self.device = device
        self.input_dim = input_dim
        self.num_layer = num_layer
        self.max_len = max_len
        self.d_model = d_model
        self.num_head = num_head
        self.d_ff = d_ff
        self.drop_prob = drop_prob

        self.positionalencoding = PositionalEncoding(self.device, self.max_len,d_model)
        self.decoderlayer = DecoderLayer(self.d_model, self.num_head, self.d_ff, self.drop_prob)

    def forward(self,enc_output,y,y_mask):
        # fill out here
        input_embedding = enc_output # input into the hidden dimension with single linear transformation
        output = self.positionalencoding(input_embedding, y_mask)
        # N=6
        for i in range(self.num_layer):
          output = self.encoderlayer(output)

        return output

### **Transformer**

In [None]:
# refer to Section 3.1 and Figure 1 in the paper
# sum up encoder and decoder
class Transformer(nn.Module):

    def __init__(self,device,input_dim=3,num_layer=3,max_len=512,d_model=16,num_head=4,d_ff=32,drop_prob=.1):
        super().__init__()
        self.encoder = Encoder(device,input_dim,num_layer,max_len,d_model,num_head,d_ff,drop_prob)
        self.decoder = Decoder(device,input_dim,num_layer,max_len,d_model,num_head,d_ff,drop_prob)

    def forward(self,x,y):
        # fill out here
        enc_output = self.encoder(x)
        dec_output = self.decoder(enc_output)
        dec_output = nn.linear(dec_output,100)
        dec_output = softmax(dec_output)

        return dec_output

NameError: name 'nn' is not defined

## **Model Train Setting**

In [None]:
##########################################
#### there is nothing to do from here ####
##########################################

In [None]:
class ScheduledOptimizer:
    def __init__(self,optimizer,d_model=16,warmup_steps=4000):
        self.optimizer = optimizer
        self.d_model = d_model
        self.warmup_steps = warmup_steps
        self.step_num = 0

    def zero_grad(self):
        self.optimizer.zero_grad()

    def update_parameter_and_learning_rate(self):
        self.optimizer.step()
        self.step_num += 1
        self.lr = self.d_model**(-.5) * min(self.step_num**(-.5),self.step_num*self.warmup_steps**(-1.5))
        for param_group in self.optimizer.param_groups:
            param_group['lr'] = self.lr

### **Setting**

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# modify num_layer, d_model, num_head, d_ff while debugging your code
model = Transformer(device=device,input_dim=3,num_layer=3,max_len=512,d_model=16,num_head=4,d_ff=32,drop_prob=.1).to(device)
loss = nn.BCEWithLogitsLoss(reduction="mean")
optimizer = torch.optim.Adam(model.parameters(),betas=(.9,.98),eps=1e-9)
scheduled_optimizer = ScheduledOptimizer(optimizer,d_model=16)

In [None]:
# Epoch 설정
num_epoch = 1 # 돌아가나 확인 중...

## **Train Model**

In [None]:
train_loss_list, test_loss_list = list(), list()

total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("num_param:", total_params)

### **Train/Evaluate 함수(일단 모델 돌아가면 수정)**

반복문 길어서 보기 힘들면 함수로 따로 빼기

In [None]:
'''
def train():
# train
    model.train()

    # initialize(epoch마다)
    total_loss = 0
    count = 0

    for batch_idx, (image, label) in enumerate(train_loader):
        image = image.reshape(-1,3,1024).transpose(1,2)
        x, y = image[:,:512,:].to(device), image[:,512:,:].to(device)

        y_ = torch.zeros([y.shape[0],1,3],requires_grad=False).to(device)
        y_ = torch.cat([y_,y[:,:-1,:]],dim=1)

        logit = model.forward(x,y_)
        cost = loss(logit, y)

        total_loss += cost.item() * y.shape[0] * y.shape[1] * y.shape[2]

        scheduled_optimizer.zero_grad()
        cost.backward()
        scheduled_optimizer.update_parameter_and_learning_rate()

    ave_loss = total_loss/len(train_data)
    train_loss_list.append(ave_loss)

    if i % 1 == 0:
        print("\nEpoch %d Train: %.3f w/ Learning Rate: %.5f"%(i,ave_loss, scheduled_optimizer.lr))
'''

In [None]:
'''
def evaluate():
  ## test
    model.eval()

    total_loss = 0
    count = 0

    with torch.no_grad():
        for batch_idx, (image, label) in enumerate(test_loader):

            image = image.reshape(-1,3,1024).transpose(1,2)
            x, y = image[:,:512,:].to(device), image[:,512:,:].to(device)

            y_ = torch.zeros([y.shape[0],1,3],requires_grad=False).to(device)
            y_ = torch.cat([y_,y[:,:-1,:]],dim=1)

            logit = model.forward(x,y_)
            cost = loss(logit, y)

            total_loss += cost.item() * y.shape[0] * y.shape[1] * y.shape[2]

    ave_loss = total_loss/len(test_data)
    test_loss_list.append(ave_loss)

    if i % 1 == 0:
        print("Epoch %d Test: %.3f"%(i,ave_loss))
'''

In [None]:
'''
for i in range(num_epoch):
  train()
  evaluate()
'''

### **학습 결과**

In [None]:
for i in range(num_epoch):
    # train
    model.train()

    # initialize(epoch마다)
    total_loss = 0
    count = 0

    for batch_idx, (image, label) in enumerate(train_loader):
        image = image.reshape(-1,3,1024).transpose(1,2)
        x, y = image[:,:512,:].to(device), image[:,512:,:].to(device)

        y_ = torch.zeros([y.shape[0],1,3],requires_grad=False).to(device)
        y_ = torch.cat([y_,y[:,:-1,:]],dim=1)

        logit = model.forward(x,y_)
        cost = loss(logit, y)

        total_loss += cost.item() * y.shape[0] * y.shape[1] * y.shape[2]

        scheduled_optimizer.zero_grad()
        cost.backward()
        scheduled_optimizer.update_parameter_and_learning_rate()

    ave_loss = total_loss/len(train_data)
    train_loss_list.append(ave_loss)

    if i % 1 == 0:
        print("\nEpoch %d Train: %.3f w/ Learning Rate: %.5f"%(i,ave_loss, scheduled_optimizer.lr))

    ## test
    model.eval()

    total_loss = 0
    count = 0

    with torch.no_grad():
        for batch_idx, (image, label) in enumerate(test_loader):

            image = image.reshape(-1,3,1024).transpose(1,2)
            x, y = image[:,:512,:].to(device), image[:,512:,:].to(device)

            y_ = torch.zeros([y.shape[0],1,3],requires_grad=False).to(device)
            y_ = torch.cat([y_,y[:,:-1,:]],dim=1)

            logit = model.forward(x,y_)
            cost = loss(logit, y)

            total_loss += cost.item() * y.shape[0] * y.shape[1] * y.shape[2]

    ave_loss = total_loss/len(test_data)
    test_loss_list.append(ave_loss)

    if i % 1 == 0:
        print("Epoch %d Test: %.3f"%(i,ave_loss))