<a href="https://colab.research.google.com/github/IRevan/AI/blob/master/UPAA/UPAA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from torch.utils.data import Dataset, DataLoader

# 데이터 제너레이터 정의
class CustomDataset(Dataset):
# CustomDataset 클래스의 __init__ 메서드에서 데이터 읽어오는 부분 수정
    def __init__(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as file:
            data = [line.strip().split('\t') for line in file]
        remove_set = [['']]
        data = [i for i in data if i not in remove_set]
        self.questions = ['<sos> ' + pair[0] + ' <eos>' for pair in data]  # '<sos>'와 '<eos>' 추가
        self.answers = ['<sos> ' + pair[-1] + ' <eos>' for pair in data]  # '<sos>'와 '<eos>' 추가


    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        return self.questions[idx], self.answers[idx]

file_path = '/content/drive/MyDrive/Colab Notebooks/cleaned_output.txt'

# 특별 토큰을 포함한 어휘 리스트 생성
all_tokens = [word for sentence in CustomDataset(file_path).questions + CustomDataset(file_path).answers for word in sentence.split()]
vocab = list(set(all_tokens + ['<sos>', '<eos>']))
word_to_index = {word: idx for idx, word in enumerate(vocab)}
index_to_word = {idx: word for idx, word in enumerate(vocab)}


# 모델 정의
class SimpleChatbot(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SimpleChatbot, self).__init__()
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.rnn = nn.GRU(hidden_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, input, lengths):
        embedded = self.embedding(input)
        packed = pack_padded_sequence(embedded, lengths, batch_first=True, enforce_sorted=False)
        output, _ = self.rnn(packed)
        padded, _ = pad_packed_sequence(output, batch_first=True, padding_value=0)
        output = self.fc(padded)
        return output

    def chat(self, initial_input, max_len=20):
        # 초기 입력에 대한 처리
        initial_input_tensor = torch.tensor([word_to_index[word] for word in initial_input.split()], dtype=torch.long).unsqueeze(0).cuda()
        initial_embedded = self.embedding(initial_input_tensor)

        # 초기 은닉 상태를 0으로 초기화
        hidden = torch.zeros(1, 1, hidden_size).cuda()

        decoder_output, hidden = self.rnn(initial_embedded, hidden)

        predicted_sequence = [word_to_index['<sos>']]

        for _ in range(20):
            output = self.fc(decoder_output)
            _, top_index = output.topk(1)
            predicted_sequence.append(top_index.item())

            decoder_input = torch.tensor([top_index.item()], dtype=torch.long).unsqueeze(0).cuda()
            decoder_embedded = self.embedding(decoder_input)

            decoder_output, hidden = self.rnn(decoder_embedded, hidden)

            if top_index.item() == word_to_index['<eos>']:
                break

        # 예측된 시퀀스를 단어로 변환
        predicted_words = [index_to_word[idx] for idx in predicted_sequence]

        return predicted_words

# 하이퍼파라미터 설정
input_size = len(vocab)  # 어휘 사전 크기
hidden_size = 32
output_size = len(vocab)  # 어휘 사전 크기

# 데이터 로드 및 전처리
dataset = CustomDataset(file_path)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

# 모델, 손실 함수, 최적화 함수 초기화
model = SimpleChatbot(input_size, hidden_size, output_size).cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# 학습
epochs = 10
for epoch in range(epochs):
    total_loss = 0
    for batch in dataloader:
        questions, answers = batch

        # 텍스트를 토큰 인덱스로 변환
        questions_tensor = [torch.tensor([word_to_index[word] for word in sentence.split()], dtype=torch.long).cuda() for sentence in questions]
        answers_tensor = [torch.tensor([word_to_index[word] for word in sentence.split()], dtype=torch.long).cuda() for sentence in answers]

        # 패딩을 처리하기 위해 각 시퀀스의 길이를 구합니다.
        questions_lengths = [len(seq) for seq in questions_tensor]
        answers_lengths = [len(seq) for seq in answers_tensor]

        # 패딩된 텐서 생성
        questions_padded = nn.utils.rnn.pad_sequence(questions_tensor, batch_first=True)
        answers_padded = nn.utils.rnn.pad_sequence(answers_tensor, batch_first=True)

        optimizer.zero_grad()
        output = model(questions_padded, questions_lengths)
        loss = criterion(output.view(-1, output_size), answers_padded.view(-1))
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f'Epoch {epoch}/{epochs}, Loss: {total_loss}')

Epoch 0/10, Loss: 2957.921025276184
Epoch 1/10, Loss: 559.7160369455814
Epoch 2/10, Loss: 226.8856780230999
Epoch 3/10, Loss: 99.25282522290945
Epoch 4/10, Loss: 35.21456088125706
Epoch 5/10, Loss: 13.565986992791295
Epoch 6/10, Loss: 6.35005354648456
Epoch 7/10, Loss: 3.799939677119255
Epoch 8/10, Loss: 2.6369836900848895
Epoch 9/10, Loss: 1.9485031655058265


In [6]:
# 챗봇 기능 테스트
user_input = "3D프린팅융합기술센터"
model.eval()  # evaluation 모드로 설정

# 초기 입력에 대한 처리
initial_input_tensor = torch.tensor([word_to_index[word] for word in user_input.split()], dtype=torch.long).unsqueeze(0).cuda()
initial_embedded = model.embedding(initial_input_tensor)

# 초기 은닉 상태를 0으로 초기화
hidden = torch.zeros(1, 1, hidden_size).cuda()

# 디코더 실행
decoder_output, hidden = model.rnn(initial_embedded, hidden)

# chat 메서드의 예측 결과 초기화 부분 수정
predicted_sequence = [word_to_index['<sos>']]

for _ in range(20):  # max_len은 20으로 설정
    output = model.fc(decoder_output)
    _, top_index = output.topk(1)
    predicted_sequence.append(top_index.item())

    # 디코더 입력 업데이트
    decoder_input = torch.tensor([top_index.item()], dtype=torch.long).unsqueeze(0).cuda()
    decoder_embedded = model.embedding(decoder_input)

    # 다음 스텝 예측을 위해 다음 히든 상태 계산
    decoder_output, hidden = model.rnn(decoder_embedded, hidden)  # 수정된 부분

    # <eos>를 만나면 종료
    if top_index.item() == word_to_index['<eos>']:
        break

# 예측된 시퀀스를 단어로 변환
predicted_words = [index_to_word[idx] for idx in predicted_sequence]

print(f"사용자: {user_input}")
print(f"챗봇: {' '.join(predicted_words)}")

사용자: 3D프린팅융합기술센터
챗봇: <sos> 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터 3D프린팅융합기술센터


In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from torch.utils.data import Dataset, DataLoader
from gensim.models import FastText
import re

# 한국어 FastText 모델 다운로드 경로
korean_fasttext_model_path = '/content/drive/MyDrive/Colab Notebooks/fasttext/ko.bin'

# 한국어 FastText 모델 로드
korean_fasttext_model = FastText.load_fasttext_format(korean_fasttext_model_path)

def preprocess_text(text):
    # 특수 문자 제거
    text = re.sub(r'[^ㄱ-ㅎㅏ-ㅣ가-힣\s]', '', text)
    # 공백 기준으로 단어 분리
    words = text.split()
    return words

def embed_korean_text(text):
    words = preprocess_text(text)
    # 각 단어에 대한 임베딩 벡터 평균을 구함
    vectors = [korean_fasttext_model.wv[word] for word in words if word in korean_fasttext_model.wv]
    if vectors:
        return sum(vectors) / len(vectors)
    else:
        return None

# .txt 파일에서 규정 텍스트 읽어오기 (한국어)
file_path = '/content/drive/MyDrive/Colab Notebooks/cleaned_output.txt'
with open(file_path, 'r', encoding='utf-8') as file:
    korean_regulation_text = file.read()

# 규정 텍스트를 한국어로 임베딩
korean_embedding = embed_korean_text(korean_regulation_text)

# 데이터 제너레이터 정의
class CustomDataset(Dataset):
    def __init__(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as file:
            data = [line.strip().split('\t') for line in file]
        remove_set = [['']]
        data = [i for i in data if i not in remove_set]
        self.questions = ['<sos> ' + pair[0] + ' <eos>' for pair in data]
        self.answers = ['<sos> ' + pair[0] + ' <eos>' for pair in data]

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        return self.questions[idx], self.answers[idx]

file_path = '/content/drive/MyDrive/Colab Notebooks/cleaned_output.txt'

# 특별 토큰을 포함한 어휘 리스트 생성
all_tokens = [word for sentence in CustomDataset(file_path).questions + CustomDataset(file_path).answers for word in sentence.split()]
vocab = list(set(all_tokens + ['<sos>', '<eos>']))
word_to_index = {word: idx for idx, word in enumerate(vocab)}
index_to_word = {idx: word for idx, word in enumerate(vocab)}

  korean_fasttext_model = FastText.load_fasttext_format(korean_fasttext_model_path)


In [19]:
class SimpleChatbot(nn.Module):
    def __init__(self, embedding_model, hidden_size, output_size):
        super(SimpleChatbot, self).__init__()
        self.embedding_model = embedding_model
        self.rnn = nn.GRU(embedding_model.wv.vector_size, hidden_size, batch_first=True)  # Update input size here
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, input, lengths):
    # Extract embeddings for each word separately and pad sequences
        embedded = [torch.stack([torch.tensor(self.embedding_model.wv[word], dtype=torch.float32) for word in str(sentence).split()]) for sentence in input]

    # Pad sequences
        padded = nn.utils.rnn.pad_sequence(embedded, batch_first=True).cuda()

    # Rest of the code remains the same
        packed = pack_padded_sequence(padded, lengths, batch_first=True, enforce_sorted=False)
        output, _ = self.rnn(packed)
        padded, _ = pad_packed_sequence(output, batch_first=True, padding_value=0)
        output = self.fc(padded)
        return output

    def chat(self, initial_input, max_len=20):
        # 초기 입력에 대한 처리
        initial_input_tensor = torch.tensor([word_to_index[word] for word in initial_input.split(" ")], dtype=torch.long).unsqueeze(0).cuda()
        initial_embedded = self.embedding_model.wv[initial_input].reshape(1, -1)

        # 초기 은닉 상태를 0으로 초기화
        hidden = torch.zeros(1, 1, hidden_size).cuda()

        # 디코더 실행
        decoder_output, hidden = self.rnn(initial_embedded, hidden)

        # chat 메서드의 예측 결과 초기화 부분 수정
        predicted_sequence = [word_to_index['<sos>']]

        for _ in range(max_len):
            output = self.fc(decoder_output)
            _, top_index = output.topk(1)
            predicted_sequence.append(top_index.item())

            # 디코더 입력 업데이트
            decoder_input = torch.tensor([[top_index.item()]], dtype=torch.long).cuda()
            decoder_embedded = self.embedding_model.wv[index_to_word[top_index.item()]].reshape(1, -1)

            # 다음 스텝 예측을 위해 다음 히든 상태 계산
            decoder_output, hidden = self.rnn(decoder_embedded, hidden)

            # <eos>를 만나면 종료
            if top_index.item() == word_to_index['<eos>']:
                break

        # 예측된 시퀀스를 단어로 변환
        predicted_words = [index_to_word[idx] for idx in predicted_sequence]

        return predicted_words

# 하이퍼파라미터 설정
input_size = korean_fasttext_model.vector_size  # 임베딩 차원
hidden_size = 32
output_size = len(vocab)

# 데이터 로드 및 전처리
dataset = CustomDataset(file_path)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

# 모델, 손실 함수, 최적화 함수 초기화
model = SimpleChatbot(korean_fasttext_model, hidden_size, output_size).cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# 학습
epochs = 2
for epoch in range(epochs):
    total_loss = 0
    for batch in dataloader:
        questions, answers = batch

        # 텍스트를 토큰 인덱스로 변환
        questions_tensor = [torch.tensor([word_to_index[word] for word in sentence.split()], dtype=torch.long).cuda() for sentence in questions]
        answers_tensor = [torch.tensor([word_to_index[word] for word in sentence.split()], dtype=torch.long).cuda() for sentence in answers]

        # 패딩을 처리하기 위해 각 시퀀스의 길이를 구합니다.
        questions_lengths = [len(seq) for seq in questions_tensor]
        answers_lengths = [len(seq) for seq in answers_tensor]

        # 패딩된 텐서 생성
        questions_padded = nn.utils.rnn.pad_sequence(questions_tensor, batch_first=True).cuda()
        answers_padded = nn.utils.rnn.pad_sequence(answers_tensor, batch_first=True).cuda()

        optimizer.zero_grad()
        output = model(questions_padded, questions_lengths)
        loss = criterion(output.view(-1, output_size), answers_padded.view(-1))
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f'Epoch {epoch}/{epochs}, Loss: {total_loss}')

model.eval()

Epoch 0/2, Loss: 4214.158621072769
Epoch 1/2, Loss: 2107.456081032753


SimpleChatbot(
  (rnn): GRU(200, 32, batch_first=True)
  (fc): Linear(in_features=32, out_features=23847, bias=True)
)

In [20]:
def chat_with_bot(chatbot_model, initial_input, max_len=20):
    # Initialize the chatbot model for conversation
    chatbot_model.eval()

    # Convert the initial input to a PyTorch tensor
    initial_input_tensor = torch.tensor([word_to_index[word] for word in initial_input.split()], dtype=torch.long).unsqueeze(0).cuda()

    # Start the conversation
    print(f"User: {initial_input}")

    # Initialize the chat method to get the bot's response
    response_words = chatbot_model.chat(initial_input_tensor, max_len)

    # Convert the predicted sequence to words
    response_words = [index_to_word[idx] for idx in response_words]

    # Print the generated response
    response = ' '.join(response_words[1:])  # Exclude the <sos> token
    print(f"Chatbot: {response}")

# Example usage:
initial_input = "장학금 수여"
chat_with_bot(model, initial_input)

User: 장학금 수여


TypeError: ignored