# Transformer를 이용한 감정분석기 - 영어 Data

제작자: Park Chanjun (박찬준) <br>
소속: Korea University Natural Language Processing & Artificial Intelligence Lab<br>
Email: bcj1210@naver.com<br>
참고자료: https://keras.io/


## Multi head Attention


In [0]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, embed_dim, num_heads=8):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError(
                f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
            )
        self.projection_dim = embed_dim // num_heads #Multi-head Attention에서는 query, key, value를 바로 사용하는 것이 아닌 h번의 Linear projection을 따라 서로 다른 representation의 조합으로부터 Attention을 계산하는 방법이다. 
        self.query_dense = layers.Dense(embed_dim) #쿼리
        self.key_dense = layers.Dense(embed_dim) #키
        self.value_dense = layers.Dense(embed_dim) #밸류
        self.combine_heads = layers.Dense(embed_dim)

    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True) #Q와 V를 곱한다.
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32) #텐서를 새로운 자료형으로 변환합니다.(tf.shape(key)[-1] = 차원)
        scaled_score = score / tf.math.sqrt(dim_key) #Sclae 작업, K차원의 루트값으로
        weights = tf.nn.softmax(scaled_score, axis=-1) #Softmax
        output = tf.matmul(weights, value) #V 곱하기
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim)) #Multihead Attention
        return tf.transpose(x, perm=[0, 2, 1, 3]) #x를 전치합니다. perm에 따라 차원의 순서를 구성합니다.

    def call(self, inputs):
        # x.shape = [batch_size, seq_len, embedding_dim]
        batch_size = tf.shape(inputs)[0] #배치크기 
        query = self.query_dense(inputs)  # (batch_size, seq_len, embed_dim)
        key = self.key_dense(inputs)  # (batch_size, seq_len, embed_dim)
        value = self.value_dense(inputs)  # (batch_size, seq_len, embed_dim)
        query = self.separate_heads(
            query, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim) => tf.transpose(x, perm=[0, 2, 1, 3])의 결과
        key = self.separate_heads(
            key, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        value = self.separate_heads(
            value, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        attention, weights = self.attention(query, key, value) #Self Attention
        attention = tf.transpose(
            attention, perm=[0, 2, 1, 3]
        )  # (batch_size, seq_len, num_heads, projection_dim)
        concat_attention = tf.reshape(
            attention, (batch_size, -1, self.embed_dim)
        )  # (batch_size, seq_len, embed_dim)
        output = self.combine_heads(
            concat_attention
        )  # (batch_size, seq_len, embed_dim)
        return output



## Transformer Layer


In [0]:
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs) #Multihead Attn 블록
        attn_output = self.dropout1(attn_output, training=training) #드롭아웃
        out1 = self.layernorm1(inputs + attn_output) #LM + Residual
        ffn_output = self.ffn(out1) #FF 블록
        ffn_output = self.dropout2(ffn_output, training=training) #드롭아웃
        return self.layernorm2(out1 + ffn_output) #LM + Residual



## 임베딩 Layer


In [0]:
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, emded_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=emded_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=emded_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1) #포지션 정보
        positions = self.pos_emb(positions) #포지션 임베딩
        x = self.token_emb(x) #토큰임베딩
        return x + positions #합치기

## 데이터셋 준비 및 전처리


In [4]:
vocab_size = 20000  # Only consider the top 20k words
maxlen = 200  # Only consider the first 200 words of each movie review
(x_train, y_train), (x_val, y_val) = keras.datasets.imdb.load_data(num_words=vocab_size) #데이터셋 로딩
print(len(x_train), "Training sequences")
print(len(x_val), "Validation sequences")
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)#패딩
x_val = keras.preprocessing.sequence.pad_sequences(x_val, maxlen=maxlen)#패딩

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/imdb.npz
25000 Training sequences
25000 Validation sequences


## 모델 구축


In [0]:
embed_dim = 32  # Embedding size for each token
num_heads = 2  # Number of attention heads
ff_dim = 32  # Hidden layer size in feed forward network inside transformer

inputs = layers.Input(shape=(maxlen,)) #처음 입력
embedding_layer = TokenAndPositionEmbedding(maxlen, vocab_size, embed_dim) #객체 생성
x = embedding_layer(inputs)  #포지셔널 임베딩
transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim) #객체 생성
x = transformer_block(x) #트랜스포머 
x = layers.GlobalAveragePooling1D()(x) #Average Pooling
x = layers.Dropout(0.1)(x) #드롯아웃
x = layers.Dense(20, activation="relu")(x) #FFNN
x = layers.Dropout(0.1)(x) #드롭아웃
outputs = layers.Dense(2, activation="softmax")(x) #Softmax

model = keras.Model(inputs=inputs, outputs=outputs) #모델 생성

## 학습


In [0]:
model.compile("adam", "sparse_categorical_crossentropy", metrics=["accuracy"])

history = model.fit(
    x_train, y_train, batch_size=32, epochs=5, validation_data=(x_val, y_val)
)

## 성능측정


In [0]:
#모델 정보 출력
model.summary() 

#성능 측정
test_loss,test_acc=model.evaluate(x_val,y_val)
print("Test_acc: ",test_acc)

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 200)]             0         
_________________________________________________________________
token_and_position_embedding (None, 200, 32)           646400    
_________________________________________________________________
transformer_block (Transform (None, 200, 32)           6464      
_________________________________________________________________
global_average_pooling1d (Gl (None, 32)                0         
_________________________________________________________________
dropout_2 (Dropout)          (None, 32)                0         
_________________________________________________________________
dense_6 (Dense)              (None, 20)                660       
_________________________________________________________________
dropout_3 (Dropout)          (None, 20)                0     