## 1. 데이터 전처리

In [25]:
import tensorflow as tf
import numpy as np
import tensorflow_datasets as tfds
import re


In [2]:

# Cornell 대화 데이터 로드 및 전처리
path_to_zip = tf.keras.utils.get_file(
    'cornell_movie_dialogs.zip',
    origin='http://www.cs.cornell.edu/~cristian/data/cornell_movie_dialogs_corpus.zip',
    extract=True
)
path_to_dataset = path_to_zip.replace('cornell_movie_dialogs.zip', 'cornell movie-dialogs corpus/')
path_to_movie_lines = path_to_dataset + 'movie_lines.txt'
path_to_movie_conversations = path_to_dataset + 'movie_conversations.txt'



Downloading data from http://www.cs.cornell.edu/~cristian/data/cornell_movie_dialogs_corpus.zip


In [3]:
# 문장 전처리 함수
def preprocess_sentence(sentence):
    """문장을 소문자로 변환하고 불필요한 문자를 제거"""
    sentence = sentence.lower().strip()
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^a-zA-Z?.!,]+", " ", sentence)
    return sentence.strip()



In [4]:
# 대화 데이터를 로드하고 질문/답변 쌍 생성
def load_conversations():
    """Cornell 대화 데이터를 로드하여 질문/답변 쌍 생성"""
    id2line = {}
    with open(path_to_movie_lines, errors='ignore') as file:
        lines = file.readlines()
    for line in lines:
        parts = line.replace('\n', '').split(' +++$+++ ')
        id2line[parts[0]] = parts[4]

    inputs, outputs = [], []
    with open(path_to_movie_conversations, 'r') as file:
        lines = file.readlines()

    for line in lines:
        parts = line.replace('\n', '').split(' +++$+++ ')
        conversation = [line[1:-1] for line in parts[3][1:-1].split(', ')]

        for i in range(len(conversation) - 1):
            inputs.append(preprocess_sentence(id2line[conversation[i]]))
            outputs.append(preprocess_sentence(id2line[conversation[i + 1]]))
            if len(inputs) >= 50000:  # 최대 샘플 크기 설정
                return inputs, outputs
    return inputs, outputs



In [5]:
# 데이터 로드
questions, answers = load_conversations()



In [76]:
# 데이터를 로드하고 전처리하여 질문을 questions, 답변을 answers에 저장합니다.
questions, answers = load_conversations()
print('전체 샘플 수 :', len(questions))
print('전체 샘플 수 :', len(answers))

전체 샘플 수 : 50000
전체 샘플 수 : 50000


In [77]:
print('전처리 후의 22번째 질문 샘플: {}'.format(questions[21]))
print('전처리 후의 22번째 답변 샘플: {}'.format(answers[21]))

전처리 후의 22번째 질문 샘플: she's not a...
전처리 후의 22번째 답변 샘플: lesbian?  no. i found a picture of jared leto in one of her drawers, so i'm pretty sure she's not harboring same-sex tendencies.


In [6]:
# 토크나이저 설정

tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
    questions + answers, target_vocab_size=2**13
)
START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1]
VOCAB_SIZE = tokenizer.vocab_size + 2

'''
GPT-1에서는 Byte Pair Encoding (BPE)를 사용하여 서브워드 단위의 토크나이징을 수행하였으나,
이번 코드에서는 TensorFlow Datasets의 SubwordTextEncoder를 사용
'''


'\nGPT-1에서는 Byte Pair Encoding (BPE)를 사용하여 서브워드 단위의 토크나이징을 수행하였으나,\n이번 코드에서는 TensorFlow Datasets의 SubwordTextEncoder를 사용\n'

In [78]:
# 시작 토큰과 종료 토큰에 고유한 정수를 부여합니다.
START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1]
print("슝=3")

슝=3


In [79]:
print('START_TOKEN의 번호 :' ,[tokenizer.vocab_size])
print('END_TOKEN의 번호 :' ,[tokenizer.vocab_size + 1])

START_TOKEN의 번호 : [8331]
END_TOKEN의 번호 : [8332]


In [80]:
# 시작 토큰과 종료 토큰을 고려하여 +2를 하여 단어장의 크기를 산정합니다.
VOCAB_SIZE = tokenizer.vocab_size + 2
print(VOCAB_SIZE)

8333


In [97]:
print("Tokenizer vocabulary size:", tokenizer.vocab_size)

Tokenizer vocabulary size: 8331


In [81]:
# 임의의 22번째 샘플에 대해서 정수 인코딩 작업을 수행.
# 각 토큰을 고유한 정수로 변환
print('정수 인코딩 후의 21번째 질문 샘플: {}'.format(tokenizer.encode(questions[21])))
print('정수 인코딩 후의 21번째 답변 샘플: {}'.format(tokenizer.encode(answers[21])))

정수 인코딩 후의 21번째 질문 샘플: [781, 8114, 8, 37, 8172, 8121, 8121, 8121]
정수 인코딩 후의 21번째 답변 샘플: [7824, 1223, 8138, 8107, 8107, 61, 4304, 4, 336, 10, 1595, 14, 1104, 698, 3263, 263, 16, 71, 14, 107, 2133, 900, 3992, 59, 8180, 8114, 23, 355, 204, 781, 8114, 8, 37, 885, 2289, 8107, 1160, 8120, 1001, 5179, 4214, 342, 8121]


In [82]:
# 샘플의 최대 허용 길이 또는 패딩 후의 최종 길이
MAX_LENGTH = 40
print(MAX_LENGTH)

40


In [83]:
# 토큰화 및 패딩 처리

# 문장 토큰화 및 패딩 처리
def tokenize_and_filter(inputs, outputs):
    """토큰화 및 패딩 처리"""
    tokenized_inputs, tokenized_outputs = [], []

    # Check if inputs are raw strings
    if isinstance(inputs[0], str):
        for (sentence1, sentence2) in zip(inputs, outputs):
            # 정수 인코딩
            sentence1 = START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN
            sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN

            # 최대 길이 초과 샘플 필터링
            if len(sentence1) <= MAX_LENGTH and len(sentence2) <= MAX_LENGTH:
                tokenized_inputs.append(sentence1)
                tokenized_outputs.append(sentence2)
    else:
        # If inputs are already tokenized
        tokenized_inputs = inputs
        tokenized_outputs = outputs

    # 패딩
    tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences(
        tokenized_inputs, maxlen=MAX_LENGTH, padding='post'
    )
    tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences(
        tokenized_outputs, maxlen=MAX_LENGTH, padding='post'
    )

    return tokenized_inputs, tokenized_outputs



In [84]:

# 토큰화 및 패딩 처리 호출
questions, answers = tokenize_and_filter(questions, answers)

In [85]:
#questions, answers = tokenize_and_filter(questions, answers)
print('단어장의 크기 :',(VOCAB_SIZE))
print('필터링 후의 질문 샘플 개수: {}'.format(len(questions)))
print('필터링 후의 답변 샘플 개수: {}'.format(len(answers)))

단어장의 크기 : 8333
필터링 후의 질문 샘플 개수: 42376
필터링 후의 답변 샘플 개수: 42376


## 2. Positional Encoding 및 Attention Mechanism

Positional Encoding 클래스

위치 정보를 벡터화하여 시퀀스 데이터에 추가

In [98]:
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, position, d_model):
        """
        Positional Encoding을 초기화.
        - position: 최대 위치의 개수 (시퀀스 길이)
        - d_model: 임베딩 차원
        """
        super(PositionalEncoding, self).__init__()
        self.pos_encoding = self.positional_encoding(position, d_model)

    def get_angles(self, position, i, d_model):
        """
        각 위치와 임베딩 차원에 대한 각도를 계산.
        - position: 위치 값
        - i: 차원 인덱스
        - d_model: 모델 차원
        """
        return position / np.power(10000, (2 * (i // 2)) / np.float32(d_model))

    def positional_encoding(self, position, d_model):
        """
        Sine과 Cosine을 사용하여 위치 인코딩 계산.
        - position: 시퀀스 길이
        - d_model: 임베딩 차원
        """
        angle_rads = self.get_angles(
            np.arange(position)[:, np.newaxis],  # 위치 (행렬 형태)
            np.arange(d_model)[np.newaxis, :],  # 임베딩 차원
            d_model
        )
        # 짝수 인덱스에 Sine 적용
        sines = np.sin(angle_rads[:, 0::2])
        # 홀수 인덱스에 Cosine 적용
        cosines = np.cos(angle_rads[:, 1::2])

        # Sine과 Cosine 결합
        pos_encoding = np.concatenate([sines, cosines], axis=-1)
        pos_encoding = pos_encoding[np.newaxis, ...]  # 배치 차원 추가
        return tf.cast(pos_encoding, tf.float32)

    def call(self, inputs):
        """
        입력 텐서에 위치 인코딩 추가.
        - inputs: 입력 텐서
        """
        return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]

Scaled Dot-Product Attention

Query와 Key의 유사도를 계산하고, Value에 가중치를 적용

In [99]:
def scaled_dot_product_attention(query, key, value, mask):
    """
    Scaled Dot-Product Attention 계산.
    - query: Query 행렬
    - key: Key 행렬
    - value: Value 행렬
    - mask: 패딩 마스크 (필요 시 적용)

    반환값:
    - output: 가중치가 적용된 Value 행렬
    """
    # Query와 Key의 내적 계산
    matmul_qk = tf.matmul(query, key, transpose_b=True)

    # Key의 차원으로 스케일링
    depth = tf.cast(tf.shape(key)[-1], tf.float32)
    logits = matmul_qk / tf.math.sqrt(depth)

    # 마스크가 있다면 -∞ 값을 추가하여 무시
    if mask is not None:
        logits += (mask * -1e9)

    # Softmax로 Attention 가중치 계산
    attention_weights = tf.nn.softmax(logits, axis=-1)

    # Attention 가중치를 Value에 곱함
    output = tf.matmul(attention_weights, value)
    return output


Multi-Head Attention 클래스

In [100]:
class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        """
        Multi-Head Attention 초기화.
        - d_model: 모델의 전체 차원
        - num_heads: 헤드의 개수
        """
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        # d_model이 num_heads로 나누어떨어지는지 확인
        assert d_model % num_heads == 0

        # 각 헤드의 차원
        self.depth = d_model // num_heads

        # Query, Key, Value를 위한 Dense 레이어
        self.query_dense = tf.keras.layers.Dense(units=d_model)
        self.key_dense = tf.keras.layers.Dense(units=d_model)
        self.value_dense = tf.keras.layers.Dense(units=d_model)

        # 최종 출력 Dense 레이어
        self.dense = tf.keras.layers.Dense(units=d_model)

    def split_heads(self, inputs, batch_size):
        """
        입력 텐서를 다중 헤드로 분리.
        - inputs: Dense 레이어 출력
        - batch_size: 배치 크기
        """
        # (batch_size, seq_len, d_model) -> (batch_size, seq_len, num_heads, depth)
        inputs = tf.reshape(inputs, shape=(batch_size, -1, self.num_heads, self.depth))
        # 헤드 축과 시퀀스 축을 교환
        return tf.transpose(inputs, perm=[0, 2, 1, 3])

    def call(self, inputs):
        """
        Multi-Head Attention 계산.
        - inputs: Query, Key, Value, Mask를 포함한 딕셔너리
        """
        query, key, value, mask = inputs['query'], inputs['key'], inputs['value'], inputs['mask']
        batch_size = tf.shape(query)[0]

        # Query, Key, Value 생성 및 다중 헤드로 분리
        query = self.split_heads(self.query_dense(query), batch_size)
        key = self.split_heads(self.key_dense(key), batch_size)
        value = self.split_heads(self.value_dense(value), batch_size)

        # Scaled Dot-Product Attention 수행
        scaled_attention = scaled_dot_product_attention(query, key, value, mask)

        # 헤드 통합
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))

        # 최종 Dense 레이어 통과
        return self.dense(concat_attention)


## 3. GPT 모델 구현

GPT 모델은 Transformer 기반으로 하되, 인코더를 제거하고 디코더만 사용하여 생성 모델로 동작하도록 수정되었습니다. 


주요 변경 사항:

- 인코더 제거: Transformer의 인코더-디코더 구조 중 인코더를 제거하고 디코더만 사용.
- Look-Ahead Mask: GPT는 Causal Language Modeling을 위해 Look-Ahead Mask를 적용.
- 위치 정보 추가: 입력 데이터에 위치 정보를 추가하는 PositionalEncoding 레이어 구현.
- 학습 데이터 전처리: 디코더 입력과 출력 데이터를 시프트하여 구성.

GPT Decoder Layer 정의

In [101]:
# GPT 모델 디코더 레이어 정의
def decoder_layer_gpt(units, d_model, num_heads, dropout):
    """
    GPT의 Decoder Layer를 정의.
    - units: Feed Forward 네트워크의 은닉 유닛 크기
    - d_model: 임베딩 차원
    - num_heads: Multi-Head Attention의 헤드 개수
    - dropout: 드롭아웃 비율
    """
    # 입력 텐서 정의
    inputs = tf.keras.Input(shape=(None, d_model))  # 입력 임베딩
    look_ahead_mask = tf.keras.Input(shape=(1, None, None))  # Look-ahead 마스크

    # Multi-Head Attention
    attention = MultiHeadAttention(d_model, num_heads)(
        {'query': inputs, 'key': inputs, 'value': inputs, 'mask': look_ahead_mask}
    )
    # 잔차 연결과 Layer Normalization
    attention = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention + inputs)

    # Feed Forward 네트워크
    outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention)
    outputs = tf.keras.layers.Dense(units=d_model)(outputs)
    outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)

    # 잔차 연결과 Layer Normalization
    outputs = tf.keras.layers.LayerNormalization(epsilon=1e-6)(outputs + attention)

    # 모델 반환
    return tf.keras.Model(inputs=[inputs, look_ahead_mask], outputs=outputs)



Look-Ahead Mask 생성 함수

Look-ahead Mask는 Transformer에서 미래 정보를 차단하는 데 사용됩니다. 상삼각 행렬로 구성된 마스크는 디코더가 현재 토큰 이후의 정보를 보지 못하게 함.

In [102]:
def create_look_ahead_mask(seq_len):
    """
    Look-ahead 마스크 생성.
    - seq_len: 시퀀스 길이
    반환값:
    - 상삼각 행렬로 이루어진 마스크 텐서
    """
    return 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)


GPT 모델 정의

In [103]:
def gpt_decoder(num_layers, units, d_model, num_heads, dropout, vocab_size, max_seq_len):
    """
    GPT Decoder를 구성.
    - num_layers: Decoder Layer의 개수
    - units: Feed Forward 네트워크의 은닉 유닛 크기
    - d_model: 임베딩 차원
    - num_heads: Multi-Head Attention의 헤드 개수
    - dropout: 드롭아웃 비율
    - vocab_size: 단어 집합 크기
    - max_seq_len: 최대 시퀀스 길이
    """
    # 입력 정의
    inputs = tf.keras.Input(shape=(None,), name='inputs')  # 정수형 시퀀스 입력

    # 임베딩 및 위치 인코딩 추가
    dec_inputs = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
    x = PositionalEncoding(max_seq_len, d_model)(dec_inputs)

    # Look-ahead Mask 생성
    seq_len = tf.shape(inputs)[1]
    look_ahead_mask = create_look_ahead_mask(seq_len)
    look_ahead_mask = look_ahead_mask[tf.newaxis, tf.newaxis, :, :]  # 배치 차원 추가

    # Decoder Layer 반복 적용
    for _ in range(num_layers):
        x = decoder_layer_gpt(units, d_model, num_heads, dropout)([x, look_ahead_mask])

    # 출력 레이어: Vocab Size로 확장
    outputs = tf.keras.layers.Dense(vocab_size)(x)  # Shape: (batch_size, sequence_length, vocab_size)

    # 최종 모델 반환
    return tf.keras.Model(inputs=inputs, outputs=outputs)


In [104]:
# GPT 디코더 모델 생성
gpt_model = gpt_decoder(
    num_layers=12,      # 디코더 레이어의 개수(6->12)
    units=3072,        # Feed Forward 네트워크의 은닉 유닛 크기(2048->3072)
    d_model=768,       # 모델의 임베딩 차원 (768)
    num_heads=12,       # Multi-Head Attention의 헤드 개수(8->12)
    dropout=0.1,       # 드롭아웃 비율
    vocab_size=VOCAB_SIZE,  # 단어 집합 크기
    max_seq_len=MAX_LENGTH  # 최대 시퀀스 길이
)

'''
    d_model=512,       # 모델의 임베딩 차원 (768)
    num_heads=8,       # Multi-Head Attention의 헤드 개수(8->12)
    GPU의 한계로 해당 변수로 실험 진행
'''


# 모델 요약 출력
gpt_model.summary()


Model: "model_25"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
inputs (InputLayer)             [(None, None)]       0                                            
__________________________________________________________________________________________________
tf.compat.v1.shape_1 (TFOpLambd (2,)                 0           inputs[0][0]                     
__________________________________________________________________________________________________
tf.__operators__.getitem_2 (Sli ()                   0           tf.compat.v1.shape_1[0][0]       
__________________________________________________________________________________________________
tf.ones_1 (TFOpLambda)          (None, None)         0           tf.__operators__.getitem_2[0][0] 
                                                                 tf.__operators__.getitem_2

Dataset 준비

In [105]:

# 데이터셋 크기와 배치 크기를 설정
BUFFER_SIZE = 10000  # 데이터셋 크기만큼 설정 (적절히 조정 가능)
BATCH_SIZE = 32      # 배치 크기 설정

# 데이터셋 생성: 입력과 타깃 시퀀스 정의
inputs = answers[:, :-1]  # 입력은 타깃의 마지막 토큰을 제외한 부분
targets = answers[:, 1:]  # 타깃은 첫 번째 토큰을 제외한 부분

# TensorFlow 데이터셋 생성
dataset = tf.data.Dataset.from_tensor_slices((
    {'inputs': inputs},  # 입력 시퀀스
    targets               # 타깃 시퀀스
))
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).prefetch(tf.data.experimental.AUTOTUNE)



## 4. 학습 데이터 및 모델 컴파일

학습률 스케줄러 정의

In [118]:
d_model=768

# 커스텀 학습률 스케줄러
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()
        self.d_model = tf.cast(d_model, tf.float32)
        self.warmup_steps = warmup_steps

    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps**-1.5)
        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

# 학습률 스케줄러 인스턴스 생성
learning_rate = CustomSchedule(d_model)


옵티마이저 및 손실 함수 정의

In [119]:
# Adam 옵티마이저 설정
optimizer = tf.keras.optimizers.Adam(
    learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9
)

# 손실 함수 정의
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(y_true, y_pred):
    mask = tf.math.logical_not(tf.math.equal(y_true, 0))  # 패딩 마스크
    loss_ = loss_object(y_true, y_pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)

모델 컴파일

In [120]:
from tensorflow.keras.metrics import SparseCategoricalAccuracy

# 모델 컴파일
gpt_model.compile(
    optimizer=optimizer,
    loss=loss_function,
    metrics=[tf.keras.metrics.SparseCategoricalAccuracy(name='accuracy')]
)


체크포인트 설정

In [150]:
import os

# 체크포인트 디렉토리 설정
checkpoint_path = "./checkpoints/train"
os.makedirs(checkpoint_path, exist_ok=True)

# 체크포인트 및 매니저 생성
ckpt = tf.train.Checkpoint(transformer=gpt_model, optimizer=optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

# 이전 체크포인트 복원
if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print('이전 체크포인트 복원이 완료되었습니다!')


이전 체크포인트 복원이 완료되었습니다!


학습 루프 및 히스토리 저장

In [148]:
# 학습 설정
EPOCHS = 50  # 학습 에포크 수
history_data = {'loss': [], 'accuracy': []}  # 학습 기록 저장용

# 학습 루프
for epoch in range(EPOCHS):
    print(f"에포크 {epoch + 1}/{EPOCHS} 시작")
    
    # 모델 학습
    history = gpt_model.fit(
        dataset,
        epochs=1  # 에포크별로 수행
    )
    
    # 학습 기록 저장
    history_data['loss'].append(history.history['loss'][0])
    history_data['accuracy'].append(history.history['accuracy'][0])
    
    # 체크포인트 저장
    ckpt_save_path = ckpt_manager.save()
    print(f"체크포인트 저장 완료: {ckpt_save_path}")


에포크 1/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-4
에포크 2/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-5
에포크 3/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-6
에포크 4/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-7
에포크 5/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-8
에포크 6/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-9
에포크 7/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-10
에포크 8/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-11
에포크 9/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-12
에포크 10/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-13
에포크 11/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-14
에포크 12/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-15
에포크 13/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-16
에포크 14/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-17
에포크 15/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-18
에포크 16/50 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-19
에포크 17/50 시작
 105/1325 [=>............................] - ETA: 4:50 - loss: 1.9909 - accuracy: 0.0311

KeyboardInterrupt: 

In [151]:
# 17에 폭에서 정지 후 결과 확인
# 학습 설정
EPOCHS = 1  # 학습 에포크 수
history_data = {'loss': [], 'accuracy': []}  # 학습 기록 저장용

# 학습 루프
for epoch in range(EPOCHS):
    print(f"에포크 {epoch + 1}/{EPOCHS} 시작")
    
    # 모델 학습
    history = gpt_model.fit(
        dataset,
        epochs=1  # 에포크별로 수행
    )
    
    # 학습 기록 저장
    history_data['loss'].append(history.history['loss'][0])
    history_data['accuracy'].append(history.history['accuracy'][0])
    
    # 체크포인트 저장
    ckpt_save_path = ckpt_manager.save()
    print(f"체크포인트 저장 완료: {ckpt_save_path}")

에포크 1/1 시작
체크포인트 저장 완료: ./checkpoints/train/ckpt-20


## 5. 학습 데이터 검증

디코더 추론 함수

문장 생성 함수

In [123]:
#사용하면 i만 출력 됨

# def decoder_inference(sentence):
#     # 입력 문장을 전처리합니다.
#     sentence = preprocess_sentence(sentence)
#     sentence = START_TOKEN + tokenizer.encode(sentence) + END_TOKEN
#     sentence = tf.expand_dims(sentence, axis=0)  # 배치 차원 추가

#     # 초기 입력 설정
#     output = tf.expand_dims(START_TOKEN, 0)  # Shape: (1, 1)
#     # output 텐서의 데이터 타입 확인 (int32로 가정)
#     output = tf.cast(output, dtype=tf.int32)

#     for i in range(MAX_LENGTH):
#         # 모델에 입력합니다.
#         predictions = gpt_model({'inputs': output}, training=False)
#         # 마지막 토큰의 확률 분포를 가져옵니다.
#         predictions = predictions[:, -1:, :]  # (batch_size, 1, vocab_size)
#         predicted_id = tf.argmax(predictions, axis=-1, output_type=tf.int32)

#         # 예측된 토큰을 출력 시퀀스에 추가합니다.
#         output = tf.concat([output, predicted_id], axis=-1)

#         # END_TOKEN을 예측하면 종료합니다.
#         if predicted_id == END_TOKEN[0]:
#             break

#     return tf.squeeze(output, axis=0).numpy()

In [152]:
def decoder_inference(sentence):
    # 입력 문장을 전처리합니다.
    sentence = preprocess_sentence(sentence)
    sentence = START_TOKEN + tokenizer.encode(sentence) + END_TOKEN
    sentence = tf.expand_dims(sentence, axis=0)  # 배치 차원 추가

    # 초기 입력 설정
    output = tf.expand_dims(START_TOKEN, 0)  # Shape: (1, 1)
    output = tf.cast(output, dtype=tf.int32)

    for i in range(MAX_LENGTH):
        # 모델에 입력합니다.
        predictions = gpt_model({'inputs': output}, training=False)
        predictions = predictions[:, -1, :]  # (batch_size, vocab_size)

        # 온도 적용
        temperature = 0.7  #매우보수적으로 설정, 0.1일 때 i만 출력
        predictions = predictions / temperature

        # 확률 분포에서 다음 토큰 샘플링
        predicted_id = tf.random.categorical(predictions, num_samples=1)  # Shape: (batch_size, 1)
        predicted_id = tf.cast(predicted_id, tf.int32)  # Shape: (1, 1)

        # 예측된 토큰을 출력 시퀀스에 추가합니다.
        output = tf.concat([output, predicted_id], axis=-1)  # output: (1, sequence_length)

        # END_TOKEN을 예측하면 종료합니다.
        if predicted_id.numpy()[0][0] == END_TOKEN[0]:
            break

    return tf.squeeze(output, axis=0).numpy()


In [153]:
def sentence_generation(sentence):
    prediction = decoder_inference(sentence)
    # 특수 토큰 제거
    prediction = prediction[1:]  # START_TOKEN 제거
    if END_TOKEN[0] in prediction:
        prediction = prediction[:np.where(prediction == END_TOKEN[0])[0][0]]

    predicted_sentence = tokenizer.decode(
        [i for i in prediction if i < tokenizer.vocab_size]
    )

    print('Input:', sentence)
    print('Output:', predicted_sentence)

    return predicted_sentence




모델 테스트

In [154]:
# 예시 문장으로 챗봇의 응답을 확인합니다.
sentence_generation('Where have you been?')
sentence_generation("It's a trap")

Input: Where have you been?
Output: iyou he noi you yeahnoohcan the we i what i hei i ii i yesi yeahi iiyesiti she are not i you you noohyou we 
Input: It's a trap
Output: forget what noi what welli i inononowelli i i why i do i i maybe what sheyou you yeahwhati the you i whatthat iti you i it no


'forget what noi what welli i inononowelli i i why i do i i maybe what sheyou you yeahwhati the you i whatthat iti you i it no'