<a href="https://colab.research.google.com/github/hyojgun/hyojeong/blob/main/LSTM_tem.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import pandas as pd
import os
import string

df = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/data/ArticlesApril2017.csv")
print(df.columns)

Index(['abstract', 'articleID', 'articleWordCount', 'byline', 'documentType',
       'headline', 'keywords', 'multimedia', 'newDesk', 'printPage', 'pubDate',
       'sectionName', 'snippet', 'source', 'typeOfMaterial', 'webURL'],
      dtype='object')


In [4]:
import numpy as np
import glob
import pandas as pd
import string
from torch.utils.data.dataset import Dataset

class TextGeneration(Dataset):
    def clean_text(self, txt):
        """
        모든 단어를 소문자로 변환하고, 텍스트에서 구두점을 제거합니다.

        Args:
            txt (str): 원본 텍스트 문자열.

        Returns:
            str: 소문자로 변환되고 구두점이 제거된 텍스트.
        """
        txt = "".join(v for v in txt if v not in string.punctuation).lower()
        return txt

    def __init__(self):
        """
        TextGeneration 데이터셋 초기화 함수입니다.

        - 지정된 CSV 파일에서 모든 헤드라인을 불러옵니다.
        - 'Unknown'으로 표시된 헤드라인을 제거합니다.
        - 텍스트 전처리를 통해 모든 헤드라인을 정리하여 코퍼스를 만듭니다.
        - 단어를 고유한 인덱스로 지정하는 bag-of-words (BOW) 딕셔너리를 생성합니다.
        - 모델 학습에 사용할 입력 시퀀스를 생성합니다.
        """
        all_headlines = []

        # 'Articles'가 포함된 파일에서 모든 텍스트 불러오기
        for filename in glob.glob("/content/drive/MyDrive/Colab Notebooks/data/*.csv"):
            if 'Articles' in filename:
                article_df = pd.read_csv(filename)

                # 데이터프레임에서 'headline' 열의 값을 리스트에 추가
                all_headlines.extend(list(article_df.headline.values))
                break

        # 'Unknown' 값 제거
        all_headlines = [h for h in all_headlines if h != "Unknown"]

        # 구두점 제거 및 전처리된 문장 리스트 생성
        self.corpus = [self.clean_text(x) for x in all_headlines]
        self.BOW = {}

        # 모든 문장의 단어를 추출하여 고유번호를 지정
        for line in self.corpus:
            for word in line.split():
                if word not in self.BOW.keys():
                    self.BOW[word] = len(self.BOW.keys())

        # 모델 입력으로 사용할 데이터 생성
        self.data = self.generate_sequence(self.corpus)

    def generate_sequence(self, txt):
        """
        단어 시퀀스를 생성하여 입력-정답 데이터 쌍을 만듭니다.

        Args:
            txt (list of str): 코퍼스의 문장 리스트.

        Returns:
            list of tuple: 입력 시퀀스와 해당 정답 단어 쌍 리스트.
        """
        seq = []

        for line in txt:
            line = line.split()
            line_bow = [self.BOW[word] for word in line]

            # 단어 2개를 입력, 다음 단어를 정답으로 설정
            data = [([line_bow[i], line_bow[i+1]], line_bow[i+2])
                    for i in range(len(line_bow) - 2)]

            seq.extend(data)

        return seq

    def __len__(self):
        """
        데이터셋의 총 길이를 반환합니다.

        Returns:
            int: 데이터셋의 총 길이.
        """
        return len(self.data)

    def __getitem__(self, i):
        """
        인덱스 i에 해당하는 데이터와 레이블을 반환합니다.

        Args:
            i (int): 데이터 인덱스.

        Returns:
            tuple: 입력 데이터와 정답 레이블.
            - 입력 데이터 (np.array): 모델의 입력으로 사용할 단어 시퀀스.
            - 정답 레이블 (np.array): 모델의 출력으로 사용할 다음 단어.
        """
        data = np.array(self.data[i][0])  # 입력 데이터
        label = np.array(self.data[i][1]).astype(np.float32)  # 출력 데이터

        return data, label


In [5]:
import torch
import torch.nn as nn

class LSTM(nn.Module):
    def __init__(self, num_embeddings):
        """
        LSTM 네트워크 초기화 함수입니다.

        Args:
            num_embeddings (int): 단어 사전의 크기 (임베딩할 단어 개수).
        """
        super(LSTM, self).__init__()

        # 임베딩 층: 단어를 고차원의 희소 벡터에서 밀집된 벡터로 변환하여 학습의 효율성을 높입니다.
        # 예를 들어, num_embeddings가 10,000이고 embedding_dim이 16이라면, 각 단어를 16차원의 벡터로 표현
        self.embed = nn.Embedding(
            num_embeddings=num_embeddings,  # 단어 사전 크기
            embedding_dim=16                # 각 단어의 임베딩 벡터 차원
        )

        # LSTM 층: 임베딩 벡터를 입력받아 시계열 패턴을 학습합니다.
        # input_size는 임베딩 차원과 동일하게 설정하며, hidden_size는 LSTM 셀의 차원 크기입니다.
        self.lstm = nn.LSTM(
            input_size=16,                  # LSTM 입력 차원 (임베딩 벡터 차원과 일치)
            hidden_size=64,                 # LSTM 숨김층의 차원
            num_layers=5,                   # LSTM 층의 개수 (깊이)
            batch_first=True                # 배치 차원이 첫 번째 위치
        )

        # 완전 연결층 (fc1): LSTM 출력 차원을 단어 사전 크기에 맞추기 위해 변환합니다.
        # 여기서는 hidden_size가 64이고, 두 개의 LSTM 출력을 연결해 입력 크기는 128로 설정
        self.fc1 = nn.Linear(128, num_embeddings)

        # 완전 연결층 (fc2): 최종 예측값을 생성
        self.fc2 = nn.Linear(num_embeddings, num_embeddings)

        # ReLU 활성화 함수: 비선형성을 추가해 학습 능력을 향상시킵니다.
        self.relu = nn.ReLU()

    def forward(self, x):
        """
        순전파 함수로, 입력 x에 대해 예측값을 계산합니다.

        Args:
            x (torch.Tensor): 입력 데이터 (문장 속 단어의 인덱스 시퀀스)

        Returns:
            torch.Tensor: 모델의 예측 결과 (단어 사전 크기와 동일한 차원으로 출력)
        """
        # 임베딩 층을 통해 단어 인덱스를 밀집 벡터로 변환합니다.
        x = self.embed(x)

        # LSTM 층을 통과하며 시계열 정보를 학습합니다.
        x, _ = self.lstm(x)

        # LSTM의 출력을 2차원으로 변형하여 완전 연결층에 맞게 변환
        x = torch.reshape(x, (x.shape[0], -1))

        # 첫 번째 완전 연결층을 거쳐 예측 벡터 생성
        x = self.fc1(x)

        # 활성화 함수 적용
        x = self.relu(x)

        # 두 번째 완전 연결층을 거쳐 최종 출력
        x = self.fc2(x)

        return x


In [10]:
import tqdm

from torch.utils.data.dataloader import DataLoader
from torch.optim.adam import Adam

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

dataset = TextGeneration()
model = LSTM(num_embeddings=len(dataset.BOW)).to(device)
loader = DataLoader(dataset, batch_size=64)
optim = Adam(model.parameters(), lr=0.001)

for epoch in range(250):
  iterator =tqdm.tqdm(loader)
  for data, label in iterator:
    optim.zero_grad()

    pred = model(torch.tensor(data, dtype= torch.long).to(device))

    loss = nn.CrossEntropyLoss()(
        pred, torch.tensor(label, dtype= torch.long).to(device))

    loss.backward()
    optim.step()

    iterator.set_description(f"epoch:{epoch+1} loss:{loss.item()}")
torch.save(model.state_dict(), "lstm.pth")

  pred = model(torch.tensor(data, dtype= torch.long).to(device))
  pred, torch.tensor(label, dtype= torch.long).to(device))
epoch:1 loss:7.415966987609863: 100%|██████████| 104/104 [00:01<00:00, 89.45it/s]
epoch:2 loss:6.923974514007568: 100%|██████████| 104/104 [00:01<00:00, 89.59it/s]
epoch:3 loss:6.337023735046387: 100%|██████████| 104/104 [00:01<00:00, 88.48it/s]
epoch:4 loss:5.80476713180542: 100%|██████████| 104/104 [00:01<00:00, 90.10it/s]
epoch:5 loss:5.407581806182861: 100%|██████████| 104/104 [00:01<00:00, 90.97it/s]
epoch:6 loss:5.3897318840026855: 100%|██████████| 104/104 [00:01<00:00, 91.42it/s]
epoch:7 loss:5.158023834228516: 100%|██████████| 104/104 [00:01<00:00, 91.68it/s]
epoch:8 loss:5.206933975219727: 100%|██████████| 104/104 [00:01<00:00, 91.52it/s]
epoch:9 loss:5.47351598739624: 100%|██████████| 104/104 [00:01<00:00, 91.00it/s]
epoch:10 loss:6.110394477844238: 100%|██████████| 104/104 [00:01<00:00, 90.93it/s]
epoch:11 loss:6.845939636230469: 100%|██████████| 104/10

In [11]:
def generate(model, BOW, string="finding an ", strlen=10):
  device = "cuda" if torch.cuda.is_available() else "cpu"

  print(f"input word: {string}")

  with torch.no_grad():
    for p in range(strlen):
      words = torch.tensor(
          [BOW[w] for w in string.split()], dtype=torch.long).to(device)


      input_tensor = torch.unsqueeze(words[-2:], dim=0)
      output = model(input_tensor)
      output_word = (torch.argmax(output).cpu().numpy())
      string += list(BOW.keys())[output_word]
      string += " "

    print(f'predicted sentence: {string}')


model.load_state_dict(torch.load("lstm.pth", map_location=device))
pred= generate(model, dataset.BOW)

  model.load_state_dict(torch.load("lstm.pth", map_location=device))


input word: finding an 
predicted sentence: finding an australia developer looks no legal to its answer’s exhusband epa 


In [12]:
# 예시로 vocab_size=10000 이고 embedding_dim=16 일 때
embedding_layer = nn.Embedding(num_embeddings=10000, embedding_dim=16)

# 단어 인덱스 3을 임베딩 벡터로 변환
word_index = torch.LongTensor([3])
embedding_vector = embedding_layer(word_index)
print(embedding_vector)  # 출력은 16차원의 밀집된 벡터


tensor([[-0.8955, -0.8043, -0.6432,  0.5089, -1.5682, -1.6388, -0.8074,  0.6936,
          0.9364,  0.2896,  0.8949, -1.0001,  0.1107, -1.2239, -1.5864, -0.5754]],
       grad_fn=<EmbeddingBackward0>)


In [19]:
# 온도 샘플링 추가
import torch
import torch.nn.functional as F

def generate(model, BOW, string="finding an ", strlen=10, temperature=0.5):
    """
    시작 문장으로부터 모델을 통해 단어를 생성하여 문장을 예측합니다.

    Args:
        model (torch.nn.Module): 학습된 LSTM 모델.
        BOW (dict): 단어와 인덱스의 매핑이 저장된 Bag of Words 딕셔너리.
        string (str): 예측을 시작할 초기 단어들로 이루어진 문장.
        strlen (int): 예측할 단어의 개수.
        temperature (float): 샘플링 시 온도 값, 값이 높을수록 예측이 다양해집니다.

    Returns:
        str: 예측된 문장을 반환합니다.
    """
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # 시작 문장을 출력
    print(f"Input word: {string}")

    with torch.no_grad():
        for p in range(strlen):
            # 현재 문장의 각 단어를 BOW 인덱스로 변환하여 텐서로 만듦
            try:
                words = torch.tensor([BOW[w] for w in string.split()], dtype=torch.long).to(device)
            except KeyError as e:
                print(f"KeyError: 단어 '{e}'가 BOW에 없습니다.")
                return

            # LSTM 입력을 위해 최근 두 단어만 선택하여 차원 추가
            input_tensor = torch.unsqueeze(words[-2:], dim=0)

            # 모델에 입력 텐서를 전달하여 예측 결과 생성
            output = model(input_tensor)

            # 온도 조정하여 소프트맥스를 적용하여 확률 분포 생성
            output = output / temperature
            probabilities = F.softmax(output, dim=-1).squeeze()

            # 확률 분포에서 랜덤 샘플링하여 단어 선택
            output_word = torch.multinomial(probabilities, 1).item()

            # 예측된 단어를 BOW에서 찾아 문자열에 추가
            string += list(BOW.keys())[output_word] + " "

    # 최종 예측된 문장을 출력
    print(f'Predicted sentence: {string.strip()}')
    return string.strip()

# 함수 호출
pred = generate(model, dataset.BOW)


Input word: finding an 
Predicted sentence: finding an australia developer looks no legal a who biden in explores
