In [3]:
# 이 예제는 `2. RNN 실습 - 영화평 분류`

# from tensorflow.keras.models import Sequential
# from tensorflow.keras.layers import SimpleRNN, Dense, Input, Embedding
# model = Sequential()
# model.add(Input(shape=(80,))) # 입력하는 영화평의 길이를 80으로 제한, 길면 자르고, 짧으면 zero padding
# model.add(Embedding(input_dim=10000, output_dim=32))
# model.add(SimpleRNN(64))
# model.add(Dense(2, activation='softmax')) # model.add(Dense(1, activation='sigmoid'))
# model.summary()

# 이 모델의 test set accuracy는 77%

# Multi-Head Attention 구현

In [16]:
import tensorflow as tf
from tensorflow.keras.layers import Dense, Layer
import numpy as np

class MultiHeadAttention(Layer):
    def __init__(self, num_heads, key_dim):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads # 헤드의 수
        self.key_dim = key_dim # 각 헤드의 차원
        self.depth = key_dim // num_heads # 각 헤드의 깊이, shape 변경

        # head의 수 만큼 차원을 생성
        self.wq = Dense(key_dim) # q = self.wq(q) 
        self.wk = Dense(key_dim)
        self.wv = Dense(key_dim)

        self.dense = Dense(key_dim) # 출력 레이어


    def split_heads(self, x, batch_size):
        # 텐서의 모양 batch_size, seq_len, key_dim -> batch_size, num_heads, seq_len, depth
        x = tf.reshape(x, (batch_size, self.num_heads, -1, self.depth))
        return x


    # 모델을 만들어 반환
    # MultiHeadAttention(3, 32)(v, k, q) <- v, k, q는 모두 pos_enc_output이 전달
    def call(self, q, k, v):
        # 입력 텐서가 weight와 결합되어서 q, k, v로 변환
        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        # ndarray.shape -> 2,3,4
        # tf.shape(텐서) -> 2,3,4 -> 2가 배치크기, shape 중에서 0번째꺼
        batch_size = tf.shape(q)[0]

        # 각 헤드로 분할
        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        # scaled dot product 계산
        matmul_qk = tf.matmul(q, k, transpose_b=True) # Q @ K.T # 행렬의 곱
        dk = tf.cast(tf.shape(k)[-1], tf.float32)
        scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

        attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
        scaled_attention = tf.matmul(attention_weights, v)

        # 각 헤드의 출력을 원래의 형태로 변환
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.key_dim))
        output = self.dense(concat_attention)

        return output

# Transformer를 이용한 영화평 분류(사용자 정의 멀티헤드어텐션 사용)

In [17]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential, Model

inputs = layers.Input(shape=(80,))

input_embedding = layers.Embedding(input_dim=10000, output_dim=32)(inputs)
positions = tf.range(start=0, limit=80)
pos_encoding = layers.Embedding(input_dim=80, output_dim=32)(positions)
pos_enc_output = pos_encoding + input_embedding

# 직접 구현한 어텐션은 헤드의 수를 2 또는 4개로 하세요.
attention_output = MultiHeadAttention(num_heads=2, key_dim=32)(pos_enc_output, pos_enc_output, pos_enc_output) # (pos_enc_output, pos_enc_output, pos_enc_output) 로 call 함수 호출
x = layers.add([pos_enc_output, attention_output])
x = layers.BatchNormalization()(x)

ffnn = Sequential([layers.Dense(64, activation="relu"),
                   layers.Dense(32, activation="relu")])(x)
x = layers.add([ffnn, x])
x = layers.BatchNormalization()(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.1)(x) # Dropout 은 과적합 방지를 위해 사용하나 잘 사용하지 않는 추세

x = layers.Dense(64, activation="relu")(x)
x = layers.Dropout(0.1)(x) # Dropout 은 과적합 방지를 위해 사용하나 잘 사용하지 않는 추세

outputs = layers.Dense(2, activation="softmax")(x)
model = Model(inputs=inputs, outputs=outputs)

model.summary()

# Transformer를 이용한 영화평 분류

In [18]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential, Model

inputs = layers.Input(shape=(80,))

input_embedding = layers.Embedding(input_dim=10000, output_dim=32)(inputs)
positions = tf.range(start=0, limit=80)
pos_encoding = layers.Embedding(input_dim=80, output_dim=32)(positions)
pos_enc_output = pos_encoding + input_embedding

attention_output = layers.MultiHeadAttention(num_heads=3, key_dim=32)(pos_enc_output, pos_enc_output)
x = layers.add([pos_enc_output, attention_output])
x = layers.BatchNormalization()(x)

ffnn = Sequential([layers.Dense(64, activation="relu"),
                   layers.Dense(32, activation="relu")])(x)
x = layers.add([ffnn, x])
x = layers.BatchNormalization()(x)
x = layers.GlobalAveragePooling1D()(x)
x = layers.Dropout(0.1)(x)

x = layers.Dense(64, activation="relu")(x)
x = layers.Dropout(0.1)(x)

outputs = layers.Dense(2, activation="softmax")(x)
model = Model(inputs=inputs, outputs=outputs)

model.summary()

In [19]:
# model.compile(loss='binary_crossentropy', optimizer='adam',
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam',
              metrics=['accuracy'])

In [20]:
from tensorflow.keras.datasets import imdb
(X_train, y_train), (X_test, y_test) = imdb.load_data(num_words=10000)
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)

(25000,) (25000,) (25000,) (25000,)


In [21]:
from tensorflow.keras.preprocessing.sequence import pad_sequences
X_train_pad = pad_sequences(X_train, maxlen=80, truncating='post', padding='post')
X_test_pad = pad_sequences(X_test, maxlen=80, truncating='post', padding='post')

In [22]:
%%time
model.fit(X_train_pad, y_train, epochs=10, batch_size=200)

Epoch 1/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 12ms/step - accuracy: 0.7555 - loss: 0.4891
Epoch 2/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 12ms/step - accuracy: 0.8600 - loss: 0.3268
Epoch 3/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 11ms/step - accuracy: 0.8902 - loss: 0.2668
Epoch 4/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 11ms/step - accuracy: 0.9061 - loss: 0.2206
Epoch 5/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 12ms/step - accuracy: 0.9192 - loss: 0.1782
Epoch 6/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 12ms/step - accuracy: 0.9353 - loss: 0.1483
Epoch 7/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 12ms/step - accuracy: 0.9514 - loss: 0.1166
Epoch 8/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 12ms/step - accuracy: 0.9642 - loss: 0.0914
Epoch 9/10
[1m125/125[0m [32m

<keras.src.callbacks.history.History at 0x7d0b48ec0d30>

In [23]:
model.evaluate(X_test_pad, y_test)

[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 9ms/step - accuracy: 0.7549 - loss: 1.5605


[1.560515284538269, 0.7549200057983398]

In [24]:
print(X_test_pad.shape)

(25000, 80)


In [25]:
import numpy as np
pred = model.predict(X_test_pad)
# pred = (pred > 0.5).astype(int)
pred = np.argmax(pred, axis=1)

[1m782/782[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 4ms/step


In [26]:
from sklearn.metrics import confusion_matrix
print(confusion_matrix(y_test, pred))

[[ 8442  4058]
 [ 2069 10431]]


In [27]:
from sklearn.metrics import accuracy_score
accuracy_score(y_test, pred)

0.75492

In [None]:
# 옵티마이저를 sgd로 바꿔보세요. accuracy: 0.57784
# 전체 단어의 개수를 1000개로 바꿔보세요. accuracy:
# 영화평의 길이를 200개로 바꿔보세요. accuracy:
# pad_sequence의 truncating과 padding을 pre로 바꿔보세요. accuracy:

In [None]:
# 아래 택스트를 긍/부정 분류하세요.
text = "My God the actors who potrayed the VIP people cannot act. I cringed everytime they said a line. It felt like they were just reading them. Even the intonation was off. It was like when we were kids and had to read a play in class and we exagerated the intonation. Terrible, just awful."

In [None]:
word_index = imdb.get_word_index()
# print(word_index)
# {'fawn': 34701, 'tsukino': 52006, 'nunnery': 52007,

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/imdb_word_index.json
[1m1641221/1641221[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [None]:
word_to_idx = {k:(v+3) for k,v in word_index.items()}
word_to_idx["<PAD>"] = 0
word_to_idx["<START>"] = 1
word_to_idx["<UNK>"] = 2  # unknown
word_to_idx["<UNUSED>"] = 3

In [None]:
input_text = text.lower().split()
print(input_text)

['my', 'god', 'the', 'actors', 'who', 'potrayed', 'the', 'vip', 'people', 'cannot', 'act.', 'i', 'cringed', 'everytime', 'they', 'said', 'a', 'line.', 'it', 'felt', 'like', 'they', 'were', 'just', 'reading', 'them.', 'even', 'the', 'intonation', 'was', 'off.', 'it', 'was', 'like', 'when', 'we', 'were', 'kids', 'and', 'had', 'to', 'read', 'a', 'play', 'in', 'class', 'and', 'we', 'exagerated', 'the', 'intonation.', 'terrible,', 'just', 'awful.']


In [None]:
# 단어를 숫자로 변환합니다.
# word_to_idx에 없는 단어는 2(<UNK>)로 지정하며,
# 인덱스가 10000보다 크면 3(<UNUSED>)로 지정합니다.
def encoding(review_text):
  encoded = []
  for word in review_text:
    try:
      idx = word_to_idx[word]
      if idx>10000:
        encoded.append(3)
      else:
        encoded.append(idx)
    except:
      encoded.append(2)
  return encoded

input_encoded = encoding(input_text)

In [None]:
np.array(input_encoded)[np.newaxis, :]

array([[ 61, 558,   4, 156,  37,   2,   4,   3,  84, 566,   2,  13,   3,
          3,  36, 301,   6,   2,  12, 421,  40,  36,  71,  43, 886,   2,
         60,   4,   3,  16,   2,  12,  16,  40,  54,  75,  71, 362,   5,
         69,   8, 332,   6, 297,  11, 707,   5,  75,   3,   4,   2,   2,
         43,   2]])

In [None]:
input_pad = pad_sequences(
    np.array(input_encoded)[np.newaxis, :],
    maxlen=80, truncating='post', padding='post')

In [None]:
model.predict(input_pad) # LSTM을 이용한 인공신경망 모형은 부정(0)으로 분류함

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 28ms/step


array([[0.8983014 , 0.10169865]], dtype=float32)