<a href="https://colab.research.google.com/github/Chuck2Win/attention/blob/4%2F17/attention_scratch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# 배움
# transformer 모델에는 많은 설정이 필요하다. 이를 위해선 json형태(dict)로 지정하고 
# 이를 읽어서 처리하는 간단한 클래스
class Config(dict):
    # 전역적으로 변수를 선언했다. 그냥 dictionary를 받으면 그 형태로 반환한다는 뜻
    __getattr__=dict.__getitem__
    __setattr__=dict.__setitem__
    @classmethod # beginning of a line is used for class, function, and method decorators.
    def load(cls, file):
        with open(file, 'r') as f:
            config = json.loads(f.read())
            return Config(config)
config=Config({'en_vocab':7170,
'fr_vocab':12238,
'en_seq_len':66,
'fr_seq_len':66,
'embedding':300,
'batch':64,
'epochs':10})

In [0]:
import numpy as np
import torch 
import torch.nn as nn
import torch.nn.functional as F
import torchtext
from gensim.models import Word2Vec
import nltk
import os
from google.colab import files
from google.colab import drive
from matplotlib import pyplot as plt
import json

In [168]:
# google drive와 연동
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
print(os.getcwd())
print(os.listdir('./')) # 현재 directory에 있는 모든 녀석들,..
# 기존 경로 + 새로운 폴더 이름으로 새로운 경로 만들기
current=os.getcwd()
path=os.path.join(current,'gdrive/My Drive/attention')
os.chdir(path)
print(os.getcwd())
# 새로운 directory 만들기
#os.mkdir(path)
# 새로운 경로로 바꾸기
#os.chdir(path)
print(os.listdir('./'))

In [0]:
# drive에 파일 올리기
#files.upload()

In [0]:
import pandas as pd
from pandas import DataFrame as df
data=pd.read_csv('./eng-fra.txt',sep='\t',names=['english','french'],header=None) # column명이 없는 경우 column 명을 지정하고, 불러온다.

In [0]:
data.head()

In [0]:
# nltk의 word tokenize를 활용해보고
# 일단 lower를 진행..

In [0]:
# tokenize하기
nltk.download('punkt')
from nltk.tokenize import word_tokenize
# word_tokenize(sentene) <- dataframe이 들어가면 안된다...
data['tokenized_eng']=data['english'].str.lower().apply(lambda i : word_tokenize(i,language='english'))
data['tokenized_french']=data['french'].str.lower().apply(lambda i : word_tokenize(i,language='french'))

In [0]:
# torch text를 활용않고, vocabulary 만들어서 진행하자....
print(max(data['tokenized_eng'].apply(lambda i : len(i)))) # 51 
print(max(data['tokenized_french'].apply(lambda i : len(i)))) # 58

In [0]:
# 만약 다 손수 한다면...
def make_vocab(sentences,min_count=3):
    # sentences는 pandas 형태일 것이다.
    result={}
    
    for sentence in sentences:
        for vocab in sentence:
            if vocab not in result.keys():
                result[vocab]=1
            else:
                result[vocab]+=1
    #<- Counter 역할을 한 것이고.
    Result={key:_+4 for _,key in enumerate(result.keys()) if result[key]>=min_count}
    Result['<unk>']=0
    Result['<pad>']=1
    Result['<sos>']=2
    Result['<eos>']=3
    return Result
english_index=make_vocab(data.tokenized_eng)
french_index=make_vocab(data.tokenized_french)

In [0]:
print(len((english_index.keys())))
print(len((french_index.keys())))

In [0]:
# tokenized->vectorization # ㅋㅋㅋ 결국 padding 진행
def vectorizing(sentence,word2index,max_len=64):
    result=[2]
    for i in sentence:
        if i in word2index.keys():
            result.append(word2index[i])
        else:
            result.append(word2index['<unk>'])
    result.append(3)
    l=max_len-len(sentence)
    result.extend([1]*l)
    return result
data['vectorized_eng']=data['tokenized_eng'].apply(lambda i : vectorizing(i,english_index))
data['vectorized_fr']=data['tokenized_french'].apply(lambda i : vectorizing(i,french_index))

In [0]:
# https://paul-hyun.github.io/transformer-01/ 참조
# padding을 진행할 때에 torch.nn.utils.rnn.pad_sequence(inputs,batch_first,padding_value)
# 입력 최대 길이에 맞춰서 padding 진행
L=[]
L.append(torch.tensor([1,2,3]))
L.append(torch.tensor([1,2,3,4,5]))
torch.nn.utils.rnn.pad_sequence(L,batch_first=True,padding_value=1)

In [0]:
print(english_index['<sos>']) # 2
print(english_index['<eos>']) # 3
print(len(english_index.keys()))

In [0]:
# vectorized_eng -> vectorized_fr
print(data.vectorized_eng.head())
print(data.vectorized_fr.head())

In [0]:
### 여기까진 데이터 전처리 ###

In [0]:
## torch.utils.data.DataLoader을 활용할 것인데
x=torch.tensor(data['vectorized_eng'].to_list())
y=torch.tensor(data['vectorized_fr'].to_list())
X=torch.utils.data.TensorDataset(x,y)
train_loader=torch.utils.data.DataLoader(X,batch_size=64,drop_last=True)

In [0]:
batch=next(iter(train_loader))

In [0]:
print(batch[0].shape) # 4,66


In [0]:
# positional encoding을 해줘야한다.(실제 모델에서는 pretrained 모델을 활용한다고 했다)
def positional_encoding(seq_len,embedding_dim):
    # input shape : [n,seq_len]
    output=torch.zeros((seq_len+1,embedding_dim))
    def for_insert(output):
        m,n=output.shape
        for i in range(1,m):
            for j in range(n):
                output[i,j]=i/10000**(2*j/embedding_dim)
        return output
    pe=for_insert(output)
    pe[:,0::2]=torch.sin(pe[:,0::2])
    pe[:,1::2]=torch.cos(pe[:,1::2])
    pe[0,:]=0.
    return pe

In [0]:
# positional encoding pretrained + padding 1인 부분 masking
pe=positional_encoding(66,300) # 66이 아니라 66+1로 함. 67(즉, 66)에 해당하는 부분은 padding_idx로 처리하자
print(pe)
pe_embedding=nn.Embedding.from_pretrained(pe,freeze=True)
position=torch.arange(batch[0].shape[1]).expand(batch[0].shape[0],batch[0].shape[1])+1
mask=batch[0].eq(1)
position.masked_fill_(mask,0)
pe_embedding(position)

In [0]:
# batch 1개에 대해서
batch[0][0]
position=torch.tensor([_ for _,i in enumerate(batch[0][0])])
pos_mask=batch[0][0].eq(1)
position.masked_fill_(pos_mask,0)
print(position)
print(pe_embedding(position))

In [0]:
# batch에 대해서
batch[0].shape
seq_len=66
positions=torch.arange(inputs.size(1))
positions
positions.expand(inputs.size(0),inputs.size(1))

In [0]:
def self_attention(input,Mask=False):
    # scaled dot product attention
    # input shape : [n,seq_len,embedding_dim]
    Result=[]
    mask=torch.ones((input.shape[1],input.shape[1])).triu(diagonal=1).bool() # seq_len,seq_len
    for ba in range(input.shape[0]):
        Q=input[ba]
        K=Q
        V=Q
        T=Q.matmul(K.T)/torch.sqrt(torch.tensor(Q.shape[-1]).float())
        mask_=T.eq(0)
        if Mask:
            T.masked_fill_(mask,1e-7) # mask에서 true이 녀석의 부분에 1e-7 대입
        weight=F.softmax(T,dim=-1)
        weight.masked_fill_(mask_,0)
        # weight : seq_len, seq_len
        attention_score=weight.matmul(V)
        # attention score : seq_len,embedding_dim
        Result.append(attention_score)
    return torch.stack(Result,0)# Result : n, seq_len, embedding_dim

In [0]:
def attention(encoder_input,decoder_input): # 고민 요소
    # scaled dot product attention
    # encoder_input shape : [n,encoder_seq_len,encoder_embedding_dim]
    # decoder_input shape : [n,decoder_seq_len,decoder_embedding_dim]
    Result=[]
    for ba in range(encoder_input.shape[0]):
        Q=decoder_input[ba]
        K=encoder_input[ba]
        V=encoder_input[ba]
        T=Q.matmul(K.T)/torch.sqrt(torch.tensor(Q.shape[-1]).float())
        weight=F.softmax(T,dim=-1)
        # weight : seq_len, seq_len
        attention_score=weight.matmul(V)
        # attention score : seq_len,embedding_dim
        Result.append(attention_score)
    return torch.stack(Result,0)# Result : n, seq_len, embedding_dim

In [0]:
class attention_encoder(nn.Module): # 완료
    def __init__(self,n_vocab,seq_len,embedding_dim):
        super().__init__()
        self.embedding=nn.Embedding(n_vocab,embedding_dim,padding_idx=1)
        self.embedding.weight.required_grad=False
        pe=positional_encoding(seq_len,embedding_dim)
        
        self.positional_embedding=nn.Embedding.from_pretrained(pe,freeze=True)
        self.fc=nn.Sequential(nn.Linear(embedding_dim,embedding_dim*4,bias=False),nn.ReLU(),nn.Linear(embedding_dim*4,embedding_dim,bias=False))
        self.batch1=nn.BatchNorm1d(seq_len)
        self.batch2=nn.BatchNorm1d(seq_len)
    def forward(self,input):
        # input shape : [n,seq_len] # n batch size
        x=self.embedding(input) # x : [n,seq_len,embedding_dim]
        position=torch.arange(input.shape[1]).expand(input.shape[0],input.shape[1])+1
        mask=input.eq(1) # shape : n, seq_len # mask는 padding idx 값을 0으로 만들어주기 위한 것
        position.masked_fill_(mask,0)
        position_=self.positional_embedding(position)
        x=x+position_
        x=self.batch1(self_attention(x)+x)
        x=self.batch2(self.fc(x)+x)
        return x

In [0]:
class attention_decoder(nn.Module): # encoder와의 차이점 - masking을 해줘야 함. 
    def __init__(self,n_vocab,seq_len,embedding_dim):
        super().__init__()
        self.embedding=nn.Embedding(n_vocab,embedding_dim,padding_idx=1)
        self.embedding.weight.required_grad=False
        pe=positional_encoding(seq_len,embedding_dim)
        self.positional_embedding=nn.Embedding.from_pretrained(pe,freeze=True)
        self.batch1=nn.BatchNorm1d(seq_len) # 여기에 seq_len이 들어가야한다.
        self.fc1=nn.Sequential(nn.Linear(embedding_dim,embedding_dim*4),nn.ReLU(),nn.Linear(embedding_dim*4,embedding_dim))
        self.batch2=nn.BatchNorm1d(seq_len)
        self.batch3=nn.BatchNorm1d(seq_len)
        self.fc2=nn.Sequential(nn.Linear(embedding_dim,embedding_dim*4),nn.ReLU(),nn.Linear(embedding_dim*4,embedding_dim))
        self.final_fc=nn.Linear(embedding_dim,n_vocab)
       
    def forward(self,input,encoder_output):
        # input shape : [n,seq_len] # n batch size
        # encoder_output : [n,encoder_seq_len,encoder_embedding_dim]
        
        #1. decoder_self_attention
        x=self.embedding(input) # x : [n,decoder_seq_len,decoder_embedding_dim]
        position=torch.arange(input.shape[1]).expand(input.shape[0],input.shape[1])+1
        mask=input.eq(1) # shape : n, seq_len # mask는 padding idx 값을 0으로 만들어주기 위한 것
        position.masked_fill_(mask,0)
        position_=self.positional_embedding(position)
        x=x+position_
        out=self_attention(x,True)
        x=self.batch1(out+x)
        
        #2. decoder_encoder_attention
        output=attention(encoder_output,x)
        x=self.batch3(output+x)

        #3. fc2 layer
        # x shape : [n,decoder_seq_len,decoder_embedding_dim]
        x=self.batch3(self.fc2(x)+x)

        #4. final fc layer
        out=self.final_fc(x)
        return out


In [0]:
# device=torch.device('cuda' if torch.cuda.is_available else 'cpu')
# encoder=attention_encoder(7170,66,300).to(device)
# decoder=attention_decoder(12238,66,300).to(device)
# lr=1e-3
# optimizer=torch.optim.Adam()

In [0]:
encoder=attention_encoder(7170,66,300)
decoder=attention_decoder(12238,66,300)
parameters=list(encoder.parameters())+list(decoder.parameters())
optimizer=torch.optim.Adam(parameters)
epochs=10
cost=[]
for epoch in range(epochs):
    avg_cost=0
    n=0
    for x,y in train_loader:
        optimizer.zero_grad()
        encoder_output=encoder(x)
        decoder_output=decoder(y,encoder_output)
        loss=F.cross_entropy(decoder_output)

In [0]:
encoder=attention_encoder(7170,66,300)
decoder=attention_decoder(12238,66,300)
encoder_output=encoder(batch[0])
decoder_output=decoder(batch[1],encoder_output)

In [0]:
decoder_output[0].argmax(-1)

In [0]:
F.softmax(decoder_output,-1).argmax(-1)

In [0]:
# pytorch를 위한 dataset 구성하기
# x=torch.tensor(data['vectorized_eng'].to_numpy())
# https://discuss.pytorch.org/t/dataloader-for-various-length-of-data/6418
# dataset_eng=torch.utils.data.TensorDataset(x)
# 에러.. 어차피 TensorDataset을 만드려면,, 격자 구조가 되야되네..

In [0]:
# x=data.vectorized_eng.to_numpy()
# eng_dataloader=torch.utils.data.DataLoader(x,batch_size=1,drop_last=True)
# batch=next(iter(eng_dataloader))
# print(batch)
# https://pytorch.org/tutorials/beginner/text_sentiment_ngrams_tutorial.html 참조
# 사실상 padding을 해서 torch.utils.data.TensorDataset()으로 만들어서 DataLoader로 진행하는 것이 깔끔은 하다.
# 하지만 그런식으로 하기 싫을 경우도 대비해서 한번 배워보자
# # torch.utils.data.DataLoader()
# def batch_maker(batch):
#     batch=torch.tensor(batch)
#     return batch
# loader=torch.utils.data.DataLoader(x,2,collate_fn=batch_maker,shuffle=True) # collate function은 단순히 mini batch로 묶어주기 전에, 각각의 data에 대해서 어떻게 처리하냐에 대한 것을 알려주네...

In [0]:
# https://paul-hyun.github.io/transformer-01/ 참고
input=torch.tensor([[3091, 3604,  206, 3958, 3760, 3590,    0,    0],
        [ 212, 3605,   53, 3832, 3596, 3682, 3760, 3590]]
        )
input.eq(0) # input 중에서 0이랑 equal한 녀석을 말하는 것이구나,
print(input.eq(0))
attn_mask=input.eq(0).unsqueeze(1).expand(2,8,8)
scores=torch.tensor([[31.3476, -0.1451, -1.4832, -2.8843, -1.2542,  0.8314, -5.7174, -5.7174],
        [-0.1451, 27.8460,  4.9304,  0.8781,  1.5047,  1.6372,  0.5585,  0.5585],
        [-1.4832,  4.9304, 26.8313,  3.9618,  2.1587,  3.5587,  3.0447,  3.0447],
        [-2.8843,  0.8781,  3.9618, 27.9037,  1.6892,  4.0453,  4.7429,  4.7429],
        [-1.2542,  1.5047,  2.1587,  1.6892, 32.0763,  4.0136, -1.1263, -1.1263],
        [ 0.8314,  1.6372,  3.5587,  4.0453,  4.0136, 38.0982, -2.7216, -2.7216],
        [-5.7174,  0.5585,  3.0447,  4.7429, -1.1263, -2.7216, 56.9788, 56.9788],


        [-5.7174,  0.5585,  3.0447,  4.7429, -1.1263, -2.7216, 56.9788, 56.9788]])
scores=scores.expand(2,8,8)
scores.masked_fill_(attn_mask,0)
seq=torch.randn((1,10))
subsequent_mask = torch.ones_like(seq).unsqueeze(-1).expand(seq.size(0), seq.size(1), seq.size(1))
print(subsequent_mask)
subsequent_mask = subsequent_mask.triu(diagonal=1) # upper triangular part of a matrix(2-D), diagonal=1은 diagonal에서 위로 1칸 부터 1을 채운다는 것
print(subsequent_mask)
subsequent_mask = subsequent_mask.triu(diagonal=2) # diagonal=2은 diagonal에서  위로 2칸까지 1을 채운다는 것
print(subsequent_mask)

In [0]:
# self_attention_example 

input = torch.randn((2,3,10)) # 2개의 문장이고, 문장의 길이 3, 차원은 10
output,T=self_attention(input,True)
output[0]