In [None]:
import tensorflow as tf
import os

try:
    # Attempt to get TPU address from environment variable
    tpu_address = os.environ['COLAB_TPU_ADDR']
    resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='grpc://' + tpu_address) # Fix typo: 'grcp' to 'grpc'
    print("Running on TPU:", tpu_address)
except KeyError:
    # If TPU address not found, use default strategy (CPU or GPU)
    print("TPU not found, using default strategy")
    resolver = tf.distribute.cluster_resolver.TPUClusterResolver()  # Initialize without arguments for default behavior

tf.config.experimental_connect_to_cluster(resolver)
tf.tpu.experimental.initialize_tpu_system(resolver)

# TPU Strategy 세팅
strategy = tf.distribute.TPUStrategy(resolver)

In [None]:
# 딥러닝 모델 컴파일
def create_model():
  return tf.keras.Sequential(
      [
          tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28,28,1)),
          tf.keras.layers.Conv2D(256, 3, activation='relu'),
          tf.keras.layers.Flatten(),
          tf.keras.layers.Dense(256, activation='relu'),
          tf.keras.layers.Dense(128, activation='relu'),
          tf.keras.layers.Dense(10)
      ]
  )

# with GPU
# model = create_model()
# model.compile(optimizer='adam',
#               loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
#               metrics=['sparse_categorical_accuracy'])

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

In [None]:
# 데이터 로드 및 정제
import pandas as pd
import numpy as np
import urllib.request
import os
from tqdm import tqdm
import tensorflow as tf
from transformers import BertTokenizer, TFBertModel

In [None]:
# 데이터 로드
urllib.request.urlretrieve("https://raw.githubusercontent.com/e9t/nsmc/master/ratings_train.txt", filename="ratings_train.txt")
urllib.request.urlretrieve("https://raw.githubusercontent.com/e9t/nsmc/master/ratings_test.txt", filename="ratings_test.txt")

train_data = pd.read_table('ratings_train.txt')
test_data = pd.read_table('ratings_test.txt')

print('훈련용 리뷰 개수 :',len(train_data)) # 훈련용 리뷰 개수 출력
print('테스트용 리뷰 개수 :',len(test_data)) # 테스트용 리뷰 개수 출력

In [None]:
# 중복 데이터 및 결측치 제거
train_data.drop_duplicates(subset=['document'], inplace=True) # 중복값 제거
train_data = train_data.dropna(how = 'any')                   # null 제거
print('훈련용 리뷰 개수 :',len(train_data))

test_data.drop_duplicates(subset=['document'], inplace=True)  # 중복값 제거
test_data = test_data.dropna(how = 'any')                     # null 제거
print('테스트용 리뷰 개수 :',len(test_data))

In [None]:
# BERT 모델 생성
def convert_examples_to_features(sentences, labels, max_seq_len, tokenizer):
  input_ids, attention_masks, token_type_ids, data_labels = [], [], [], []

  for example, label in tqdm(zip(sentences, labels), total=len(sentences)):
    # input_id는 워드 임베딩을 위한 문장의 정수 인코딩
    input_id = tokenizer.encode(example, max_length=max_seq_len, pad_to_max_length=True)

    # attention_mask 설정 - 실제 단어 존재시 1, 패딩이면 0
    padding_count = input_id.count(tokenizer.pad_token_id)
    attention_mask = [1] * (max_seq_len - padding_count) + [0] * padding_count

    # token_type_id는 문장을 구분하기 위한 인덱스
    token_type_id = [0] * max_seq_len

    assert len(input_id) == max_seq_len, "Error with input length {} vs {}".format(len(input_id), max_seq_len)
    assert len(attention_mask) == max_seq_len, "Error with attention mask length {} vs {}".format(len(attention_mask), max_seq_len)
    assert len(token_type_id) == max_seq_len, "Error with token type length {} vs {}".format(len(token_type_id), max_seq_len)

    input_ids.append(input_id)
    attention_masks.append(attention_mask)
    token_type_ids.append(token_type_id)
    data_labels.append(label)

  # np.array로 변환
  input_ids = np.array(input_ids, dtype=int)
  attention_masks = np.array(attention_masks, dtype=int)
  token_type_ids = np.array(token_type_ids, dtype=int)
  data_labels = np.asarray(data_labels, dtype=np.int32)

  return (input_ids, attention_masks, token_type_ids), data_labels

In [None]:
# 훈련 데이터
max_seq_len = 128
tokenizer = BertTokenizer.from_pretrained('klue/bert-base')
train_X, train_y = convert_examples_to_features(train_data['document'], train_data['label'], max_seq_len=max_seq_len, tokenizer=tokenizer)

In [None]:
# 테스트 데이터
max_seq_len = 128
tokenizer = BertTokenizer.from_pretrained('klue/bert-base')
test_X, test_y = convert_examples_to_features(test_data['document'], test_data['label'], max_seq_len=max_seq_len, tokenizer=tokenizer)

In [None]:
# 훈련 데이터의 첫 번째 샘플 출력
input_id = train_X[0][0]
attention_mask = train_X[1][0]
token_type_id = train_X[2][0]
label = train_y[0]

print('단어에 대한 정수 인코딩 :',input_id)
print('어텐션 마스크 :',attention_mask)
print('세그먼트 인코딩 :',token_type_id)
print('각 인코딩의 길이 :', len(input_id))
print('정수 인코딩 복원 :', tokenizer.decode(input_id))
print('출력 샘플의 레이블 :',label)

In [None]:
# BERT를 이용한 Many-to-one 모델
class TFBertForSequenceClassification(tf.keras.Model):
  def __init__(self, model_name):
    super(TFBertForSequenceClassification, self).__init__()
    self.bert = TFBertModel.from_pretrained(model_name, from_pt=True)
    self.classifier = tf.keras.layers.Dense(1, kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.02), activation='sigmoid', name='classifier')

  def call(self, inputs):
    input_ids, attention_mask, token_type_ids = inputs
    outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
    cls_token = outputs[1]
    prediction = self.classifier(cls_token)

    return prediction

In [None]:
# TPU Setting
with strategy.scope():
  model = TFBertForSequenceClassification('klue/bert-base')
  optimizer = tf.keras.optimizers.Adam(learning_rate=5e-5)
  loss = tf.keras.losses.BinaryCrossentropy()
  model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])

In [None]:
# 모델 학습
model.fit(train_X,train_y, epochs=2, batch_size=64, validation_split=0.2)  # 배치크기 64, validation 20%, 2 epochs

In [None]:
# 테스트 데이터셋에 대한 loss, accuracy
results=model.evaluate(test_X, test_y, batch_size=1024)
print("test loss, test acc: ", results)

In [None]:
# 모델 저장하기
# from keras.models import load_model  # This is not needed for saving weights
import tensorflow as tf

# Save the entire model to a directory
model.save('/content/drive/MyDrive/AIML/Codes/NLP/Modeling/KoBERT_Practice_Model')

In [None]:
# 리뷰 예측해보기
def sentiment_predict(new_sentence):
  input_id = tokenizer.encode(new_sentence, max_length=max_seq_len, pad_to_max_length=True)
  padding_count = input_id.count(tokenizer.pad_token_id)
  attention_mask = [1] * (max_seq_len - padding_count) + [0] * padding_count
  token_type_id = [0] * max_seq_len

  input_ids = np.array([input_id])
  attention_masks = np.array([attention_mask])
  token_type_ids = np.array([token_type_id])

  encoded_input = [input_ids, attention_masks, token_type_ids]
  score = model.predict(encoded_input)[0]

  # Extract the prediction score from the NumPy array (assuming it's the first element)
  score = score[0]  # Access the first element to get the scalar value

  if(score > 0.5):
    print("{:.2f}% 확률로 긍정 리뷰입니다.\n".format(score * 100))
  else:
    print("{:.2f}% 확률로 부정 리뷰입니다.\n".format((1 - score) * 100))

In [None]:
# Model test
sentiment_predict("이 영화 존잼입니다 대박")