In [1]:
!pip install openpyxl



In [2]:
import tensorflow as tf
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Input, Embedding, LSTM, Dense, Dropout, Concatenate
from nltk.translate.bleu_score import sentence_bleu
import os
from tensorflow.keras.callbacks import ModelCheckpoint, LambdaCallback, EarlyStopping
from tensorflow.keras.callbacks import Callback

# 데이터 로드
df = pd.read_excel("/content/drive/MyDrive/자연어 처리 학습 피드백.xlsx", names=["각도차이", "피드백"])
angle_differences = df.loc[:, "각도차이"].apply(eval).tolist()
feedbacks = df.loc[:, "피드백"].tolist()

# Excel 파일에서 각도 차이와 피드백 텍스트를 각각 리스트로 저장

# 데이터 증강 함수
def augment_data(angles, feedbacks): #각도차이와 피드백을 받아서 데이터를 증강시킴
    augmented_angles = []
    augmented_feedbacks = []
    for angle, feedback in zip(angles, feedbacks): 
        augmented_angles.append(angle)
        augmented_feedbacks.append(feedback)

        for _ in range(3):
            noisy_angle = [a + np.random.normal(0, 0.8) for a in angle]  #원래 각도차이에서 평균이 0 표준편차가 0.8을 더한 새로운 앵글 데이터 증강
            augmented_angles.append(noisy_angle)
            augmented_feedbacks.append(feedback)

    return augmented_angles, augmented_feedbacks

angle_differences, feedbacks = augment_data(angle_differences, feedbacks)

# 데이터 전처리 함수 피드백 데이터를 토큰화 해서 정수 시퀀스로 바꿔줌
def preprocess_data(angles, feedbacks):
    max_length = max(len(angle) for angle in angles) #앵글각도차이 최대길이

    padded_angles = [angle + [0] * (max_length - len(angle)) for angle in angles] #만약 앵글각도차이의 리스트 길이가 최대 앵글 각도차이와 다르면 제로패딩

    angle_tensor = np.array(padded_angles)  #패딩된 앵글 정보들을 array로 바꿈
    angle_sign = np.sign(angle_tensor)  # 각도의 부호(+,-) 정보 추출
    angle_tensor = np.abs(angle_tensor)  #부호를 따로 저장해 두었으므로 절대값을 사용

    tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='', lower=True, oov_token="<UNK>") #토크나이저 함수 필터할 단어는 ''으로 없음으로 두고 lower로 모든 문자를 소문자로, 단어사전에 없는 토큰은 unk로 설정
    tokenizer.fit_on_texts(['<start> ' + f.lower() + ' <end>' for f in feedbacks]) #각 피드백에 시작과 끝에 <start> <end> 토큰 추가
    feedback_tensor = tokenizer.texts_to_sequences(['<start> ' + f.lower() + ' <end>' for f in feedbacks]) #피드백 데이터를 시퀀스로 바꿈
    feedback_tensor = tf.keras.preprocessing.sequence.pad_sequences(feedback_tensor, padding='post') #정수 시퀀스들을 패딩해서 길이를 맞춤 (post로 설정하여 뒤에 0이 붙도록) 앞에 0이 오도록하는것은 pre

    return angle_tensor, angle_sign, feedback_tensor, tokenizer



angle_tensor, angle_sign, feedback_tensor, tokenizer = preprocess_data(angle_differences, feedbacks)
#각도차이 ,      각도의 부호 , 피드백 시퀀스 ,  사용한 토크나이저

# 데이터를 훈련셋과 테스트 셋으로 만듬
X_angle = angle_tensor
X_sign = angle_sign
y = feedback_tensor
X_angle_train, X_angle_test, X_sign_train, X_sign_test, y_train, y_test = train_test_split(
    X_angle, X_sign, y, test_size=0.2, random_state=42)

#s2s 모델 정의
class Seq2SeqModel(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, units): #vocab size= 단어 사전의 단어 개수 , embedding_dim= 단어 임베딩시 사용할 차원의 수 units = lstm을 통과한 출력차원의 수 (dense층의 입력차원)
        super(Seq2SeqModel, self).__init__()
        self.angle_input = Input(shape=(None,)) #input shape을 none로 설정해서 input dim에 제한을 없앰 (timestep: lstm의 개수를 길이에 따라 동적으로 지정) 각도입력 (배치사이즈,타임스탭,feature)input레이어에서 별도 설정없으면 fit시에 배치사이즈가 동적으로 설정
        self.sign_input = Input(shape=(None,))# 부호 입력데이터 생성
        self.decoder_input = Input(shape=(None,))#디코더(피드백) 입력

        self.angle_dense = Dense(units, activation='relu')#dense를 지나서 units의 차원으로 출력 (batchsize,timesteps,feature)를 출력
        self.sign_dense = Dense(units, activation='relu')
        self.concat = Concatenate()

        self.embedding = Embedding(vocab_size, embedding_dim)
        self.encoder = LSTM(units, return_sequences=True, return_state=True)
        self.decoder = LSTM(units, return_sequences=True, return_state=True)
        self.dropout = Dropout(0.3)
        self.dense = Dense(vocab_size, activation='softmax')

    def call(self, inputs, training=False): #output함수
        angle_input, sign_input, decoder_input = inputs #각도차이, 부호 ,피드백을 인풋으로 설정

        angle_features = self.angle_dense(angle_input) #앵글 인풋을 dense층을 거쳐서 units차원으로 feature을 추출
        sign_features = self.sign_dense(sign_input) # 부호데이터를 dense층을 지나게함
        encoder_input = self.concat([angle_features, sign_features]) #앵글 각도와 부호를 콘캣 해서 인코더의 인풋으로 활용

        encoder_output, state_h, state_c = self.encoder(tf.expand_dims(encoder_input, axis=1)) #encoder_input데이터를 배치사이즈,1,timesteps,feature 형태의 차원으로 바꾼후 lstm통과
        #각각의 변수에 (lstm의 모든 출력을 포함하는 텐서, 현재 타임스탭에서의 정보를 포함하는 은닉상태, 장기의존성 학습을 위한 셀 상태)

        decoder_hidden = self.embedding(decoder_input) #피드백 데이터를 디코더 input레이어를 통과시켜 (batchsize,timestep,feature)형태로 나오게 한 후 임배딩
        decoder_output, _, _ = self.decoder(decoder_hidden, initial_state=[state_h, state_c]) #lstm층을 통과시키는데 여기서 initial_state를 인코더에서 받은 초기 상태로 설정
        
        decoder_output = self.dropout(decoder_output, training=training) #디코더 lstm출력을 트레이닝중에는 드랍아웃을 시켜 과적합 방지
        output = self.dense(decoder_output) #디코더 아웃풋을 댄스층을 거쳐 softmax로
        return output

# 하이퍼파라미터 설정
vocab_size = len(tokenizer.word_index) + 1
embedding_dim = 256
units = 128
batch_size = 32
epochs = 100

optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
model = Seq2SeqModel(vocab_size, embedding_dim, units)
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

#모델 생성 및 컴파일

# 커스텀 콜백정의
class EfficientModelCheckpoint(Callback):
    def __init__(self, patience=10):
        super().__init__()
        self.patience = patience
        self.best_val_loss = float('inf')
        self.best_weights = None
        self.wait = 0

    def on_epoch_end(self, epoch, logs=None):
        current_val_loss = logs.get('val_loss')
        if current_val_loss < self.best_val_loss:
            self.best_val_loss = current_val_loss
            self.best_weights = self.model.get_weights()
            self.wait = 0
            print(f"\nEpoch {epoch+1}: 새로운 최상의 모델 발견 (val_loss: {current_val_loss:.4f})")
        else:
            self.wait += 1
            if self.wait >= self.patience:
                self.model.stop_training = True
                print(f"\n{self.patience} 에포크 동안 개선이 없어 학습을 종료합니다.")
                print(f"최상의 val_loss: {self.best_val_loss:.4f}")
                self.model.set_weights(self.best_weights)

#val loss가 개선되지 않으면 학습을 조기 종료후에 최상의 모델 가중치를 저장

# 콜백 인스턴스 생성
efficient_checkpoint = EfficientModelCheckpoint(patience=10)

# 모델 학습
history = model.fit(
    [X_angle_train, X_sign_train, y_train[:, :-1]], y_train[:, 1:],
    validation_split=0.2,
    batch_size=batch_size,
    epochs=epochs,
    callbacks=[efficient_checkpoint]
)

# 학습 후 최상의 모델 저장
model.save('best_model', save_format='tf')

# 학습이 완료된 후 최종 모델을 저장합니다.

# 피드백 생성 함수
def generate_feedback(model, tokenizer, angles):
    angle_tensor = np.abs(angles) #각도의 절댓값을 받고
    angle_sign = np.sign(angles) #부호를 받아서
    start_token = tokenizer.word_index['<start>'] #start토큰과 end토큰을 정의
    end_token = tokenizer.word_index['<end>']

    decoder_input = tf.constant([[start_token]]) #시퀀스의 시작을 생성
    result = []

    for _ in range(50): #50번 반복
        predictions = model([np.expand_dims(angle_tensor, 0), np.expand_dims(angle_sign, 0), decoder_input]) #모델에  앵글텐서,앵글 부호 텐서,디코더 인풋 텐서를 입력으로 넣어 입력디코더의 다음 단어를 예측
        predicted_id = tf.argmax(predictions[0, -1, :]) #현재 시퀀스의 마지막 타임스텝에서의 예측 확률 분포를 찾아서 가장 확률이 높은 단어의 인덱스를 저장 
        predicted_id = int(predicted_id) #정수값으로 넣음

        if predicted_id == end_token: #end_token에 도달하면 시퀀스 생성을 종료
            break

        if predicted_id in tokenizer.index_word:
            result.append(tokenizer.index_word[predicted_id]) #predicted_id가 tokenizer.index_word에 존재하면 해당 단어를 result 리스트에 추가
        else: #아니면 unk(oov)추가
            result.append("<UNK>")

        decoder_input = tf.concat([decoder_input, [[predicted_id]]], axis=1) #디코더 인풋에 predicted를 추가해서 다음 단어예측

    return ' '.join(result)

# 학습된 모델을 사용하여 새로운 각도 입력에 대한 피드백을 생성하는 함수

# BLEU 점수 계산 함수
def calculate_bleu(reference, candidate):
    reference = reference.split()
    candidate = candidate.split()
    return sentence_bleu([reference], candidate)

# BLEU 점수를 계산하는 함수, 생성된 피드백에 대한 품질 평가

# 테스트
test_angles = [16, -29, 22, 5, -18, 27, 20, -7]
generated_feedback = generate_feedback(model, tokenizer, test_angles)
print(f"입력 각도: {test_angles}")
print(f"생성된 피드백: {generated_feedback}")

# 모델 평가
test_loss, test_accuracy = model.evaluate([X_angle_test, X_sign_test, y_test[:, :-1]], y_test[:, 1:])
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

# 테스트 데이터셋에 대한 모델의 성능을 평가

# BLEU 점수 계산 (전체 테스트 세트에 대해)
bleu_scores = []
for angles, true_feedback in zip(X_angle_test, y_test):
    generated = generate_feedback(model, tokenizer, angles)
    true = ' '.join([tokenizer.index_word[idx] for idx in true_feedback if idx != 0 and idx in tokenizer.index_word])
    bleu_scores.append(calculate_bleu(true, generated))

average_bleu = np.mean(bleu_scores)
print(f"Average BLEU Score: {average_bleu}")

# 전체 테스트 세트에 대해 BLEU 점수를 계산하고 평균값을 매김
# 모델이 생성한 피드백의 전반적인 품질을 평가

Epoch 1/100
Epoch 1: 새로운 최상의 모델 발견 (val_loss: 3.3837)
Epoch 2/100
Epoch 2: 새로운 최상의 모델 발견 (val_loss: 2.2720)
Epoch 3/100
Epoch 3: 새로운 최상의 모델 발견 (val_loss: 1.9932)
Epoch 4/100
Epoch 4: 새로운 최상의 모델 발견 (val_loss: 1.8309)
Epoch 5/100
Epoch 5: 새로운 최상의 모델 발견 (val_loss: 1.7307)
Epoch 6/100
Epoch 6: 새로운 최상의 모델 발견 (val_loss: 1.6557)
Epoch 7/100
Epoch 7: 새로운 최상의 모델 발견 (val_loss: 1.5867)
Epoch 8/100
Epoch 8: 새로운 최상의 모델 발견 (val_loss: 1.5330)
Epoch 9/100
Epoch 9: 새로운 최상의 모델 발견 (val_loss: 1.4689)
Epoch 10/100
Epoch 10: 새로운 최상의 모델 발견 (val_loss: 1.4120)
Epoch 11/100
Epoch 11: 새로운 최상의 모델 발견 (val_loss: 1.3524)
Epoch 12/100
Epoch 12: 새로운 최상의 모델 발견 (val_loss: 1.2911)
Epoch 13/100
Epoch 13: 새로운 최상의 모델 발견 (val_loss: 1.2128)
Epoch 14/100
Epoch 14: 새로운 최상의 모델 발견 (val_loss: 1.1325)
Epoch 15/100
Epoch 15: 새로운 최상의 모델 발견 (val_loss: 1.0408)
Epoch 16/100
Epoch 16: 새로운 최상의 모델 발견 (val_loss: 0.9639)
Epoch 17/100
Epoch 17: 새로운 최상의 모델 발견 (val_loss: 0.8945)
Epoch 18/100
Epoch 18: 새로운 최상의 모델 발견 (val_loss: 0.8258)
Epoch 19/1

The hypothesis contains 0 counts of 4-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()
The hypothesis contains 0 counts of 3-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()
The hypothesis contains 0 counts of 2-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()


Average BLEU Score: 0.4499625694704042
