In [None]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output

import time

tf.__version__

# Word 감정분석
* Text를 모델에 학습시키기 위한 전처리 과정을 확인해보자

### 데이터셋 구성
* 각 단어들을 정의 (긍정과 부정을 나타내는 단어들)
* 단어에 대한 정답을 부정(0) 긍정(1)로 정의해서 데이터셋 구성

In [None]:
x_train_words = ['good', 'bad', 'amazing', 'so good', 'bull shit',
                 'awesome', 'how dare', 'very much', 'nice', 'god damn it',
                 'very very very happy', 'what the fuck']
y_train = np.array([1, 0, 1, 1, 0,
                    1, 0, 1, 1, 0,
                    1, 0], dtype=np.int32)

* 데이터셋 확인

In [None]:
# negative sample
index = 0
print("word: {}\nlabel: {}".format(x_train_words[index], y_train[index]))

In [None]:
# positive sample
index = 1
print("word: {}\nlabel: {}".format(x_train_words[index], y_train[index]))

### 텍스트데이터 처리를 위한 Tokenizer 사용

In [None]:
from tensorflow.python.keras.preprocessing.text import Tokenizer
from tensorflow.python.keras.preprocessing.sequence import pad_sequences

* 캐릭터(알파벳) 단위로 단어를 자르는 Tokenizer

In [None]:
tokenizer = Tokenizer(char_level=True)

In [None]:
tokenizer.fit_on_texts(x_train_words)

In [None]:
num_chars = len(tokenizer.word_index) + 1
print("number of characters: {}".format(num_chars))

* 각 캐릭터(알파벳)의 index값

In [None]:
tokenizer.word_index

* 학습을 위한 train word를 idx로 변환

In [None]:
x_train_tokens = tokenizer.texts_to_sequences(x_train_words)

In [None]:
index = 2
print("text: {}".format(x_train_words[index]))
print("token: {}".format(x_train_tokens[index]))

In [None]:
x_train_seq_length = np.array([len(tokens) for tokens in x_train_tokens], dtype=np.int32)
num_seq_length = x_train_seq_length

* Input 데이터의 길이는 모두 같아야 함으로 최대 길이에 맞춰 길이를 조정해준다.

In [None]:
max_seq_length = np.max(num_seq_length)
print(max_seq_length)

In [None]:
pad = 'pre'
# pad = 'post'

In [None]:
x_train_pad = pad_sequences(sequences=x_train_tokens, maxlen=max_seq_length,
                            padding=pad, truncating=pad)

* 학습을 위해 처리된 데이터를 확인해보자

In [None]:
index = 7
print("text: {}\n".format(x_train_words[index]))
print("token: {}\n".format(x_train_tokens[index]))
print("pad: {}".format(x_train_pad[index]))

### 토큰이 어떻게 처리되었는지 확인해보자

In [None]:
idx = tokenizer.word_index
inverse_map = dict(zip(idx.values(), idx.keys()))
print(inverse_map)

In [None]:
def tokens_to_string(tokens):
  # Map from tokens back to words.
  words = [inverse_map[token] for token in tokens if token != 0]

  # Concatenate all words.
  text = "".join(words)

  return text

In [None]:
index = 10
print("original text:\n{}\n".format(x_train_words[index]))
print("tokens:\n{}\n".format(x_train_tokens[index]))
print("tokens to string:\n{}".format(tokens_to_string(x_train_tokens[index])))

### 데이터셋 구성
* 데이터 학습을 위한 데이터셋 구성

In [None]:
# Set the hyperparameter set
batch_size = 4
max_epochs = 50
# embedding_size = 8
num_units = 16 # the number of nodes in RNN hidden layer
num_classes = 2 # Two classes [True, False]
initializer_scale = 0.1
learning_rate = 1e-3

In [None]:
## create data pipeline with tf.data
train_dataset = tf.data.Dataset.from_tensor_slices((x_train_pad, x_train_seq_length, y_train))
train_dataset = train_dataset.shuffle(buffer_size = 100)
train_dataset = train_dataset.repeat()
train_dataset = train_dataset.batch(batch_size = batch_size)
print(train_dataset)

In [None]:
model = tf.keras.Sequential([
          layers.Embedding(num_chars, num_chars, embeddings_initializer='identity', trainable=False),
          layers.SimpleRNN(units=num_units),
          layers.Dense(units=num_classes, activation='sigmoid')])

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate)
loss_obj = tf.keras.losses.BinaryCrossentropy(logits=False)
mean_loss = tf.keras.metrics.Mean("loss")
loss_history = []

### 학습 진행
* tf.GradientTape을 이용한 학습 진행

In [None]:
total_steps = int( len(x_train_words) / batch_size * max_epochs)
for (step, (seq_pad, seq_length, labels)) in enumerate(train_dataset.take(total_steps)): # just steps number (iterations), NOT epochs
  start_time = time.time()
  with tf.GradientTape() as tape:
    logits = model(seq_pad)    
    loss_value = loss_obj(tf.one_hot(labels, depth=num_classes), logits)

  mean_loss(loss_value)
  loss_history.append((mean_loss.result().numpy()))
  grads = tape.gradient(loss_value, model.variables)
  optimizer.apply_gradients(zip(grads, model.variables))
  
  if step % 3 == 0:
      clear_output(wait=True)
      duration = time.time() - start_time
      examples_per_sec = batch_size / float(duration)
      epochs = batch_size * step / float(len(x_train_words))
      print("epochs: {:.2f}, step: {}, loss: {:g}, ({:.2f} examples/sec; {:.3f} sec/batch)".format(epochs+1, step, loss_value, examples_per_sec, duration))
    
print("training done!")

In [None]:
loss_history = np.array(loss_history)
plt.plot(loss_history, label='train')

### 모델 평가
* 작은 데이터셋 이므로 train_set으로 다시 평가해보자

In [None]:
train_dataset_eval = tf.data.Dataset.from_tensor_slices((x_train_pad, x_train_seq_length, y_train))
train_dataset_eval = train_dataset_eval.batch(batch_size = len(x_train_pad))

In [None]:
# accuracy = tf.keras.metrics.CategoricalAccuracy()

# for (step, (seq_pad, seq_length, labels)) in enumerate(train_dataset.take(1)):
#   logits = model(seq_pad)
# #   accuracy(labels=labels, prdictions=tf.cast(tf.argmax(logits, 1), tf.int32))
#   mean_accuracy = tf.keras.metrics.Mean("accuracy")
  
# print("test accuracy: {}".format(accuracy.result()))
loss_object = tf.keras.losses.CategoricalCrossentropy()
acc_object = tf.keras.metrics.CategoricalAccuracy()
val_acc_object = tf.keras.metrics.CategoricalAccuracy()

val_mean_loss = tf.keras.metrics.Mean("loss")
val_mean_accuracy = tf.keras.metrics.Mean("accuracy")

for (step, (seq_pad, seq_length, labels)) in enumerate(train_dataset.take(1)):
    predictions = model(seq_pad, training=False)
    val_loss_value = loss_object(tf.one_hot(labels, depth=num_classes), predictions)
    val_acc_value = val_acc_object(tf.one_hot(labels, depth=num_classes), predictions)

    val_mean_loss(val_loss_value)
    val_mean_accuracy(val_acc_value)

    print("valid loss: {:.4g}, valid accuracy: {:.4g}%".format(val_mean_loss.result(),
                                                             val_mean_accuracy.result() * 100))

In [None]:
for (step, (seq_pad, seq_length, labels)) in enumerate(train_dataset_eval.take(1)):
  logits = model(seq_pad)
  predictions = tf.cast(tf.argmax(logits, 1), tf.int32)

In [None]:
predictions

* 단어를 입력했을때 모델의 예측값

In [None]:
for x, y in zip(seq_pad, predictions):
  if y.numpy() == 1:
    print("{} : positive".format(tokens_to_string(x.numpy())))
  else:
    print("{} : negative".format(tokens_to_string(x.numpy())))