In [None]:
# data download
from torchtext.datasets import IMDB
train_dataset = IMDB(split='train')
test_datatse= IMDB(split = 'test')


In [2]:
from torch.utils.data.dataset import random_split
import torch
torch.manual_seed(1)
train_dataset, valid_dataset = random_split( list(train_dataset), [20000, 5000])

In [None]:
import re
from collections import Counter, OrderedDict

def tokenizer(text):
    # HTML 태그 제거
    text = re.sub('<[^>]*>', '', text)
    
    # 이모티콘 패턴 검색 (":", ";", "=" 기호로 시작하고, 그 뒤에 ")", "(", "D", "P"가 올 수 있음)
    emoticons = re.findall(r'[:;=](?:-)?[)(DP]', text)
    
    # 텍스트에서 모든 특수문자와 숫자를 공백으로 대체하고, 이모티콘은 다시 추가
    text = re.sub(r'[\W]+', ' ', text.lower()) + ' ' + ' '.join(emoticons).replace('-', '')

    tokenized = text.split()
    
    return tokenized

token_counts = Counter()
for label, line in train_dataset:
    tokens = tokenizer(line)
    token_counts.update(tokens)
print(f"어휘사전 크기 {len(token_counts)}")

In [None]:
from torchtext.vocab import vocab
print(token_counts.items())
sorted_by_freq_tuples = sorted(
    token_counts.items(), key = lambda x: x[1], reverse = True
)
ordered_dict = OrderedDict(sorted_by_freq_tuples)
vocab = vocab(ordered_dict)
vocab.insert_token("<pad>", 0)
vocab.insert_token("<unk>",1)
vocab.set_default_index(1)

print([vocab[token] for token in ['this','is','an','example']])

In [5]:
import torch
from torch import nn as nn
text_pipeline = lambda x: [vocab[token] for token in tokenizer(x)]
label_pipeline = lambda x: 1. if x== 'pos' else 0

def collate_batch(batch):
    label_list, text_list, lengths = [],[],[]
    for _label, _text in batch:
        label_list.append(label_pipeline(_label))
        processed_text = torch.tensor(text_pipeline(_text), dtype = torch.int64)
        text_list.append(processed_text)
        lengths.append(processed_text.size(0))
    label_list = torch.tensor(label_list)
    lengths = torch.tensor(lengths)
    padded_text_list = nn.utils.rnn.pad_sequence(text_list , batch_first = True)
    
    return padded_text_list, label_list, lengths

from torch.utils.data import DataLoader
dataloader = DataLoader(train_dataset, batch_size =4, shuffle=True, collate_fn=collate_batch)

In [None]:
text_batch,label_batch, length_batch = next(iter(dataloader))
print(text_batch)
print(label_batch)
print(length_batch)

In [7]:
from re import T


batch_size = 32
train_dl = DataLoader(train_dataset, batch_size=batch_size,shuffle=True,collate_fn=collate_batch)
valid_dl = DataLoader(valid_dataset, batch_size= batch_size, collate_fn=collate_batch)
test_dl = DataLoader(test_datatse, batch_size=batch_size, shuffle=True, collate_fn = collate_batch)


In [None]:
embedding = nn.Embedding(num_embeddings=10, embedding_dim=3, padding_idx = 0)
text_encoded_input = torch.LongTensor([[1,2,4,5],[4,3,2,0]])
print(embedding(text_encoded_input))

In [None]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.rnn = nn.RNN(input_size,hidden_size, num_layers=2,batch_first = True)
        #self.rnn = nn.GRU(input_size,hidden_size, num_layers=2,batch_first = True)
        #self.rnn = nn.LSTM(input_size,hidden_size, num_layers=2,batch_first = True)
        self.fc = nn.Linear(hidden_size, 1)
    def forward(self,x):
        _, hidden = self.rnn(x)
        out = hidden[-1,:,:]
        out = self.fc(out)
        return out

model = RNN(64,32)
print(model)
model(torch.randn(5,3,64))


In [None]:
import numpy as np
import torch

class RNN(nn.Module):
    def __init__(self, vocab_size, embedded_dim, rnn_hidden_size, fc_hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedded_dim, padding_idx = 0)
        self.rnn = nn.LSTM(embedded_dim, rnn_hidden_size, fc_hidden_size)
        self.fc1 = nn.Linear(rnn_hidden_size, fc_hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(fc_hidden_size, 1)
        self.sigmoid = nn.Sigmoid()
    def forward(self, text, lengths):
        out = self.embedding(text)
        out = nn.utils.rnn.pack_padded_sequence(
            out, lengths.cpu().numpy(), enforce_sorted = False, batch_first = True
        )
        out, (hidden, cell)= self.rnn(out)
        out = hidden[-1,:,:]
        out = self.fc1(out)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.sigmoid(out)
        return out

vocab_size = len(vocab)
embed_dim = 20
rnn_hidden_size = 64
fc_hidden_size = 64
torch.manual_seed(1)
model = RNN(vocab_size, embed_dim, rnn_hidden_size, fc_hidden_size)
model

device = torch.device("mps") if torch.backends.mps.is_available() else "mps"
print(device)
model.to(device)


In [None]:
def train(dataloader):
    model.train()
    total_acc, total_loss = 0,0
    b = 0
    for text_batch, label_batch, lengths in dataloader:
        text_batch = text_batch.to(device)
        label_batch = label_batch.to(device)
        lengths = lengths.to(device)

        optimizer.zero_grad()
        pred = model(text_batch, lengths.float())[:,0]
        loss = loss_fn(pred, label_batch.float())
        loss.backward()
        optimizer.step()
        total_acc +=((pred >= 0.5).float() == label_batch).float().sum().item()
        total_loss += loss.item() * label_batch.size(0)
        b = b+1
        print(f"no.batch = {b}, acc :{total_acc}")
    return total_acc / len(dataloader.dataset), total_loss / len(dataloader.dataset)
def evaluate(dataloader):
    model.eval()
    total_acc, total_loss = 0,0
    with torch.no_grad():
        for text_batch, label_batch, lengths in dataloader:
            text_batch = text_batch.to(device)
            label_batch = label_batch.to(device)
            lengths = lengths.to(device)
            pred = model(text_batch, lengths)[:,0]
            loss = loss_fn(pred, label_batch)
            total_acc += ( (pred>=0.5).float() == label_batch).float().sum().item()
            total_loss += loss.item() * label_batch.size(0)
        return total_acc/len(list(dataloader.dataset)), total_loss/len(list(dataloader.dataset))

loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)

epochs = 10
torch.manual_seed(1)
for epoch in range(epochs):
    acc_trian, loss_train = train(train_dl)
    acc_test, loss_test = evaluate(valid_dl)
    print(f'epoch : {epoch}, accuracy : {acc_trian:4f}'
          f'valid acc : {acc_test}')

acc_test, _ = evaluate(test_dl)
print(f"test acc : {acc_test}")

In [1]:
print(1,2,3)
print(1,2,3)

1 2 3
