# Transformer_and_BERT_GPT

# Transformer로 감정 분류(Classification task) 수행하기

Transformer는 분명, seq2seq 형태의 task에서 강점을 보입니다. 하지만, Attention과 Transformer를 classificatoin task에 적용할 수 있습니다.

Transformer의 Encoder 부분만을 활용하여 분류 태스크를 적용할 수 있습니다. 전체 모델 그림은 다음과 같습니다.

<img src=https://s3.ap-northeast-2.amazonaws.com/urclass-images/aoJFaVFMvRAsntP5k16yq-1640699853834.png>

## Code

In [1]:
import tensorflow as tf
import numpy as np

In [2]:
def create_padding_mask(seq):
    # 패딩 처리된 부분을 마스킹합니다.
    # 모델이 패딩을 입력으로 취급하지 않도록 
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)

    return seq[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)

x = tf.constant([[7, 6, 5, 4, 0], [1, 2, 3, 0, 0], [1, 8, 0, 0, 0]])
create_padding_mask(x)

<tf.Tensor: shape=(3, 1, 1, 5), dtype=float32, numpy=
array([[[[0., 0., 0., 0., 1.]]],


       [[[0., 0., 0., 1., 1.]]],


       [[[0., 0., 1., 1., 1.]]]], dtype=float32)>

In [3]:
def get_angles(pos, i, d_model):
    """
    sin, cos 안에 들어갈 수치를 구하는 함수입니다.
    """
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
    return pos * angle_rates

def positional_encoding(position, d_model):
    """
    위치 인코딩(Positional Encoding)을 구하는 함수입니다.
    """
    angle_rads = get_angles(np.arange(position)[:, np.newaxis],
                          np.arange(d_model)[np.newaxis, :],
                          d_model)

    # apply sin to even indices in the array; 2i
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])

    # apply cos to odd indices in the array; 2i+1
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

### Dot Scaled Attention 구현

In [4]:
def scaled_dot_product_attention(q, k, v, mask):
    
    """Calculate the attention weights.
    q, k, v must have matching leading dimensions.
    k, v must have matching penultimate dimension, i.e.: seq_len_k = seq_len_v.
    The mask has different shapes depending on its type(padding or look ahead) 
    but it must be broadcastable for addition.
  
    Args:
        q: query shape == (..., seq_len_q, depth)
        k: key shape == (..., seq_len_k, depth)
        v: value shape == (..., seq_len_v, depth_v)
        mask: Float tensor with shape broadcastable 
            to (..., seq_len_q, seq_len_k). Defaults to None.
    
    Returns:
        output, attention_weights
    """
    
    matmul_qk = tf.matmul(q, k, transpose_b=True)  # (..., seq_len_q, seq_len_k)
    
    # scale matmul_qk
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    """
    mask가 있을 경우 masking된 자리(mask=1)에는 (-inf)에 해당하는 절댓값이 큰 음수 -1e9(=-10억)을 더해줍니다.
    그 값에 softmax를 취해주면 거의 0에 가까운 값이 나옵니다. 그 다음 value 계산시에 반영되지 않습니다.
    """
    
    # add the mask to the scaled tensor.
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)  
        
    # softmax is normalized on the last axis (seq_len_k) so that the scores
    # add up to 1.
    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)  # (..., seq_len_q, seq_len_k)
    
    output = tf.matmul(attention_weights, v)  # (..., seq_len_q, depth_v)
    
    return output, attention_weights

### MultiHead Attention

Lecture Note에서는 keras에서 제공하는 MultiHead Attention을 사용했습니다. 여기서는, Multi Head Attention을 직접 구현해보겠습니다.

In [5]:
def point_wise_feed_forward_network(**kargs):
    """
    FFNN을 구현한 코드입니다.

    Args:
        d_model : 모델의 차원입니다.
        dff : 은닉층의 차원 수입니다. 논문에서는 2048을 사용하였습니다.
    """
    return tf.keras.Sequential([
      tf.keras.layers.Dense(kargs['dff'], activation='relu'),  # (batch_size, seq_len, dff)
      tf.keras.layers.Dense(kargs['d_model'])  # (batch_size, seq_len, d_model)
    ])

In [6]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, **kargs):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = kargs['num_heads']
        self.d_model = kargs['d_model']

        assert self.d_model % self.num_heads == 0

        self.depth = self.d_model // self.num_heads

        self.wq = tf.keras.layers.Dense(self.d_model)
        self.wk = tf.keras.layers.Dense(self.d_model)
        self.wv = tf.keras.layers.Dense(self.d_model)

        self.dense = tf.keras.layers.Dense(kargs['d_model'])

    def split_heads(self, x, batch_size):
        """
        Split the last dimension into (num_heads, depth).
        Transpose the result such that the shape is (batch_size, num_heads, seq_len, depth)
        """
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)  # (batch_size, seq_len, d_model)
        k = self.wk(k)  # (batch_size, seq_len, d_model)
        v = self.wv(v)  # (batch_size, seq_len, d_model)

        q = self.split_heads(q, batch_size)  # (batch_size, num_heads, seq_len_q, depth)
        k = self.split_heads(k, batch_size)  # (batch_size, num_heads, seq_len_k, depth)
        v = self.split_heads(v, batch_size)  # (batch_size, num_heads, seq_len_v, depth)

        # scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
        # attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
        scaled_attention, attention_weights = scaled_dot_product_attention(
            q, k, v, mask)

        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])  # (batch_size, seq_len_q, num_heads, depth)

        concat_attention = tf.reshape(scaled_attention, 
                                      (batch_size, -1, self.d_model))  # (batch_size, seq_len_q, d_model)

        output = self.dense(concat_attention)  # (batch_size, seq_len_q, d_model)

        return output, attention_weights

### Encoder Layer

In [7]:
class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, **kargs):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(**kargs)
        self.ffn = point_wise_feed_forward_network(**kargs)

        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

        self.dropout1 = tf.keras.layers.Dropout(kargs['rate'])
        self.dropout2 = tf.keras.layers.Dropout(kargs['rate'])

    def call(self, x, mask):
        attn_output, _ = self.mha(x, x, x, mask)  # (batch_size, input_seq_len, d_model)
        attn_output = self.dropout1(attn_output)
        out1 = self.layernorm1(x + attn_output)  # (batch_size, input_seq_len, d_model)

        ffn_output = self.ffn(out1)  # (batch_size, input_seq_len, d_model)
        ffn_output = self.dropout2(ffn_output)
        out2 = self.layernorm2(out1 + ffn_output)  # (batch_size, input_seq_len, d_model)

        return out2

### Encoder

In [8]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, **kwargs):
        super(Encoder, self).__init__()

        self.d_model = kwargs['d_model']
        self.num_layers = kwargs['num_layers']

        self.embedding = tf.keras.layers.Embedding(kwargs['vocab_size'], self.d_model)
        self.pos_encoding = positional_encoding(kwargs['maximum_position_encoding'], 
                                                self.d_model)


        self.enc_layers = [EncoderLayer(**kwargs) 
                           for _ in range(self.num_layers)]

        self.dropout = tf.keras.layers.Dropout(kwargs['rate'])

    def call(self, x, mask):

        seq_len = tf.shape(x)[1]

        # adding embedding and position encoding.
        x = self.embedding(x)  # (batch_size, input_seq_len, d_model)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, mask)

        return x  # (batch_size, input_seq_len, d_model)

### Transformer

여기서는 가장 첫 번째 타임 스텝의 은닉 값을 가져와서 classification_layer에 통과시키겠습니다.

(batch_size, 1, d_model) 크기의 텐서가 바로 이진 분류를 수행하는 Dense Layer로 들어가게끔 구현했습니다.
- 이 부분은 설계에 따라 변경될 수 있습니다. 다음과 같이 2개의 Dense Layer를 쌓아서 이진 분류를 수행할 수도 있습니다.
  - Dense(64, activation='relu')
  - Dense(1, activation='sigmoid')


In [9]:
class Transformer(tf.keras.Model):
    def __init__(self, **kargs):
        super(Transformer, self).__init__()
        
        self.encoder = Encoder(**kargs)
        self.classification_layer = tf.keras.layers.Dense(1, activation='sigmoid')

    def call(self, inputs, training):
        # Keras models prefer if you pass all your inputs in the first argument

        enc_padding_mask = self.create_masks(inputs)

        enc_output = self.encoder(inputs, enc_padding_mask)  # (batch_size, inp_seq_len, d_model)

        final_output = self.classification_layer(enc_output[:, 0, :]) # (bs, 1, d_model)

        return final_output

    def create_masks(self, inputs):
        # Encoder padding mask
        enc_padding_mask = create_padding_mask(inputs)

        return enc_padding_mask

### Hyper-Parameter 설정

In [10]:
BATCH_SIZE = 16
MAX_SEQUENCE = 80
EPOCHS = 3
VALID_SPLIT = 0.1
MAX_FEATURES = 20000
MAXLEN = 80

kargs = {'num_layers': 2,
         'd_model': 512,
         'num_heads': 8,
         'dff': 2048,
         'vocab_size': MAX_FEATURES,
         'maximum_position_encoding': MAXLEN,
         'rate': 0.1
        }

IMDB Review Data 불러오기

In [11]:
from tensorflow.keras.datasets import imdb

# 데이터를 불러옵니다.
print('Loading data...')
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=MAX_FEATURES)

Loading data...
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/imdb.npz


In [12]:
x_train = tf.keras.preprocessing.sequence.pad_sequences(x_train, maxlen=MAXLEN)
x_test = tf.keras.preprocessing.sequence.pad_sequences(x_test, maxlen=MAXLEN)

### 모델 선언

In [13]:
model = Transformer(**kargs)
model.compile(optimizer=tf.keras.optimizers.Adam(1e-4),
              loss='binary_crossentropy',
              metrics='accuracy')

In [14]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

earlystop_callback = EarlyStopping(monitor='val_accuracy', min_delta=0.0001, patience=10)

checkpoint_path = 'weights.h5'

cp_callback = ModelCheckpoint(
    checkpoint_path, monitor='val_accuracy', verbose=1, save_best_only=True, save_weights_only=True)

### 모델 학습

In [15]:
history = model.fit(x_train, y_train, 
                    batch_size=BATCH_SIZE, epochs=EPOCHS,
                    validation_split=VALID_SPLIT, callbacks=[earlystop_callback, cp_callback])

Epoch 1/3
Epoch 1: val_accuracy improved from -inf to 0.83800, saving model to weights.h5
Epoch 2/3
Epoch 2: val_accuracy did not improve from 0.83800
Epoch 3/3
Epoch 3: val_accuracy did not improve from 0.83800
