In [1]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [2]:
cd /content/drive/MyDrive/Projects/toxic_comment_classification

/content/drive/MyDrive/Projects/toxic_comment_classification


In [3]:
from collections import defaultdict, Counter
import numpy as np
import string
import pandas as pd
import re
import os
import torch
from argparse import Namespace
import tqdm

## 설정값 지정

In [123]:
args = Namespace(
    # 데이터 정보
    raw_train_dataset_csv="data/train.csv",
    #raw_test_dataset_csv="data/raw_test.csv",
    proportion_subset_of_train=0.1,
    train_proportion=0.7,
    val_proportion=0.15,
    test_proportion=0.15,
    comment_csv='data/comments.csv',
    # 날짜와 경로 정보
    frequency_cutoff=25,
    model_state_file='model.pth',
    save_dir='model_storage/',
    vectorizer_file='vectorizer.json',
    # 훈련 하이퍼파라미터
    batch_size=128,
    early_stopping_criteria=5,
    learning_rate=0.001,
    num_epochs=100,
    seed=1337,
    # 실행 옵션
    cuda=True,
    expand_filepaths_to_save_dir=True,
)

## 데이터 로드

In [124]:
# 원본 데이터를 읽습니다
train_comments = pd.read_csv(args.raw_train_dataset_csv)
train_comments.head()

Unnamed: 0,comment,toxicity
0,I am testing. ) 207.97.213.169,0
1,honestly ==\nyou need to crawl under a rock an...,1
2,""":The Russian idiom """"to let the he-goat into ...",0
3,"""\nCreated and dominated\n[Hamilton] created a...",0
4,This one and this one. 24.177.120.138,0


In [125]:
train_comments.loc[1, "comment"]

'honestly ==\nyou need to crawl under a rock and DIE YOU FAT BASTARD\n\n=='

## 훈련, 검증, 테스트 세트 만들기

In [126]:
# Toxicity 기준으로 데이터를 나누어 클래스 비율을 유지하도록 합니다.
by_comment = defaultdict(list)
for _, row in train_comments.iterrows():
    by_comment[row.toxicity].append(row.to_dict())

In [127]:
# 훈련, 검증, 테스트 분할 데이터를 만듭니다.
final_list = []
np.random.seed(args.seed)

for _, item_list in sorted(by_comment.items()):
    np.random.shuffle(item_list)
    n_total = len(item_list)
    n_train = int(args.train_proportion * n_total)
    n_val = int(args.val_proportion * n_total)
    n_test = int(args.test_proportion * n_total)

    for item in item_list[:n_train]:
        item['split'] = 'train'

    for item in item_list[n_train:n_train+n_val]:
        item['split'] = 'val'

    for item in item_list[n_train+n_val:]:
        item['split'] = 'test'

    # 최종 리스트에 추가합니다
    final_list.extend(item_list)

final_comments = pd.DataFrame(final_list)

In [128]:
final_comments

Unnamed: 0,comment,toxicity,split
0,""":(indent reset) Please do. While these look g...",0,train
1,I would like to known about Pointer in JAVA la...,0,train
2,"""==Copyright problems==\nHello, . Concerning y...",0,train
3,Elbing disambig \n\nAny objections to redirect...,0,train
4,Help \n\nWhy am I blocked HELP HELP HELP one m...,0,train
...,...,...,...
79780,"niggerballs \n\nWhy did you delete the page, y...",1,test
79781,You're very foolish you know that - right if y...,1,test
79782,Awwwwww. Doesn't poor little Femto like gettin...,1,test
79783,WikiTheClown\nWelcome to Wikipedia. Although e...,1,test


## 데이터 정제

In [129]:
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r"([.,!?])", r" \1 ", text)
    text = re.sub(r"[^a-zA-Z.,!?]+", r" ", text)
    return text

final_comments.comment = final_comments.comment.apply(preprocess_text)
final_comments['toxicity'] = final_comments.toxicity.apply({0: 'nottoxic', 1: 'toxic'}.get)
final_comments.to_csv(args.comment_csv, index=False)

In [130]:
final_comments

Unnamed: 0,comment,toxicity,split
0,indent reset please do . while these look goo...,nottoxic,train
1,i would like to known about pointer in java la...,nottoxic,train
2,"copyright problems hello , . concerning your ...",nottoxic,train
3,elbing disambig any objections to redirecting ...,nottoxic,train
4,help why am i blocked help help help one more ...,nottoxic,train
...,...,...,...
79780,"niggerballs why did you delete the page , you ...",toxic,test
79781,you re very foolish you know that right if you...,toxic,test
79782,awwwwww . doesn t poor little femto like getti...,toxic,test
79783,wikitheclown welcome to wikipedia . although e...,toxic,test


## 파이토치 데이터셋

### `Dataset`

In [137]:
from torch.utils.data import Dataset

class CommentDataset(Dataset):
    def __init__(self, comment_df, vectorizer):
        """
        매개변수:
            comment_df (pandas.DataFrame): 데이터셋
            vectorizer (CommentVectorizer): CommentVectorizer 객체
        """

        self.comment_df = comment_df
        self._vectorizer = vectorizer

        self.train_df = self.comment_df[self.comment_df.split=='train']
        self.train_size = len(self.train_df)

        self.val_df = self.comment_df[self.comment_df.split=='val']
        self.val_size = len(self.val_df)

        self.test_df= self.comment_df[self.comment_df.split=='test']
        self.test_size = len(self.test_df)

        self._lookup_dict = {'train': (self.train_df, self.train_size),
                             'val': (self.val_df, self.val_size),
                             'test': (self.test_df, self.test_size)}

        self.set_split('train') # train, test, val 결정

    @classmethod
    def load_dataset_and_make_vectorizer(cls, comment_csv):
        """ 데이터셋을 로드하고 새로운 CommentVectorizer 객체를 만듭니다

        매개변수:
            comment_csv (str): 데이터셋의 위치
        반환값:
            CommentDataset의 인스턴스
        """
        comment_df = pd.read_csv(comment_csv)
        train_comment_df = comment_df[comment_df.split=='train']
        return cls(comment_df, CommentVectorizer.from_dataframe(train_comment_df))

    @classmethod
    def load_dataset_and_load_vectorizer(cls, comment_csv, vectorizer_filepath):
        """ 데이터셋을 로드하고 새로운 CommentVectorizer 객체를 만듭니다.
        캐시된 CommentVectorizer 객체를 재사용할 때 사용합니다.

        매개변수:
            comment_csv (str): 데이터셋의 위치
            vectorizer_filepath (str): CommentVectorizer 객체의 저장 위치
        반환값:
            CommentDataset의 인스턴스
        """
        comment_df = pd.read_csv(comment_csv)
        vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
        return cls(comment_df, vectorizer)

    @staticmethod
    def load_vectorizer_only(vectorizer_filepath):
        """ 파일에서 CommentVectorizer 객체를 로드하는 정적 메서드

        매개변수:
            vectorizer_filepath (str): 직렬화된 CommentVectorizer 객체의 위치
        반환값:
            CommentVectorizer의 인스턴스
        """
        with open(vectorizer_filepath) as fp:
            return CommentVectorizer.from_serializable(json.load(fp))

    def save_vectorizer(self, vectorizer_filepath):
        """ CommentVectorizer 객체를 json 형태로 디스크에 저장합니다

        매개변수:
            vectorizer_filepath (str): CommentVectorizer 객체의 저장 위치
        """
        with open(vectorizer_filepath, "w") as fp:
            json.dump(self._vectorizer.to_serializable(), fp)

    def get_vectorizer(self):
        """ 벡터 변환 객체를 반환합니다 """
        return self._vectorizer

    def set_split(self, split="train"):
        """ 데이터프레임에 있는 열을 사용해 분할 세트를 선택합니다

        매개변수:
            split (str): "train", "val", "test" 중 하나
        """
        self._target_split = split
        self._target_df, self._target_size = self._lookup_dict[split]

    def __len__(self):
        """ 데이터셋 사이즈 반환 """

        return self._target_size

    def __getitem__(self, index):
        """ 파이토치 데이터셋의 핵심 메서드

        매개변수:
            index (int): 데이터 포인트의 인덱스
        반환값:
            데이터 포인트의 특성(x_data)과 레이블(y_target)로 이루어진 딕셔너리
        """
        # TRY IT YOURSELF
        row = self._target_df.iloc[index]

        comment_vector = self._vectorizer.vectorize(row.comment)

        toxicity_index = self._vectorizer.toxicity_vocab.lookup_token(row.toxicity)

        return {'x_data': comment_vector,
               'y_target': toxicity_index}

    def get_num_batches(self, batch_size):
        """ 배치 크기가 주어지면 데이터셋으로 만들 수 있는 배치 개수를 반환합니다

        매개변수:
            batch_size (int)
        반환값:
            배치 개수
        """
        return len(self) // batch_size

### `Vocabulary`

In [138]:
class Vocabulary(object):

    def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
        """
        매개변수:
            token_to_idx (dict): 기존 토큰-인덱스 매핑 딕셔너리
            add_unk (bool): UNK 토큰을 추가할지 지정하는 플래그
            unk_token (str): Vocabulary에 추가할 UNK 토큰
        """

        if token_to_idx is None:
            token_to_idx = {}
        self._token_to_idx = token_to_idx

        self._idx_to_token = {idx: token
                             for token, idx in self._token_to_idx.items()}

        self._add_unk = add_unk
        self._unk_token = unk_token

        self.unk_index = -1
        if add_unk:
            self.unk_index = self.add_token(unk_token)


    def to_serializable(self):
        """ 직렬화할 수 있는 딕셔너리를 반환합니다 """
        return {'token_to_idx': self._token_to_idx,
                'add_unk': self._add_unk,
                'unk_token': self._unk_token}

    @classmethod
    def from_serializable(cls, contents):
        """ 직렬화된 딕셔너리에서 Vocabulary 객체를 만듭니다 """
        return cls(**contents)

    def add_token(self, token):
        """ 토큰을 기반으로 매핑 딕셔너리를 업데이트합니다

        매개변수:
            token (str): Vocabulary에 추가할 토큰
        반환값:
            index (int): 토큰에 상응하는 정수
        """
        if token in self._token_to_idx:
            index = self._token_to_idx[token]
        else:
            index = len(self._token_to_idx)
            self._token_to_idx[token] = index
            self._idx_to_token[index] = token

        return index

    def lookup_token(self, token):
        """ 토큰에 대응하는 인덱스를 추출합니다.
        토큰이 없으면 UNK 인덱스를 반환합니다.

        매개변수:
            token (str): 찾을 토큰
        반환값:
            index (int): 토큰에 해당하는 인덱스
        노트:
            UNK 토큰을 사용하려면 (Vocabulary에 추가하기 위해)
            `unk_index`가 0보다 커야 합니다.
        """
        # TRY IT YOURSELF
        if self.unk_index >=0:
            return self._token_to_idx.get(token, self.unk_index)
        else:
            return self._token_to_idx[token]

    def lookup_index(self, index):
        """ 인덱스에 해당하는 토큰을 반환합니다.

        매개변수:
            index (int): 찾을 인덱스
        반환값:
            token (str): 인텍스에 해당하는 토큰
        에러:
            KeyError: 인덱스가 Vocabulary에 없을 때 발생합니다.
        """
        if index not in self._idx_to_token:
            raise KeyError("Vocabulary에 인덱스(%d)가 없습니다." % index)
        return self._idx_to_token[index]

    def __str__(self):
        return "<Vocabulary(size=%d)>" % len(self)

    def __len__(self):
        return len(self._token_to_idx)

### `Vectorizer`

In [142]:
class CommentVectorizer(object):
    """ 어휘 사전을 생성하고 관리합니다 """
    def __init__(self, comment_vocab, toxicity_vocab):
        """
        매개변수:
            comment_vocab (Vocabulary): 단어를 정수에 매핑하는 Vocabulary
            toxicity_vocab (Vocabulary): 클래스 레이블을 정수에 매핑하는 Vocabulary
        """
        self.comment_vocab = comment_vocab
        self.toxicity_vocab = toxicity_vocab

    def vectorize(self, comment):
        """ 리뷰에 대한 웟-핫 벡터를 만듭니다

        매개변수:
            Comment (str): 리뷰
        반환값:
            one_hot (np.ndarray): 원-핫 벡터
        """
        one_hot = np.zeros(len(self.comment_vocab), dtype=np.float32)# TRY IT YOURSELF

        for token in comment.split(" "):
            if token not in string.punctuation:
                one_hot[self.comment_vocab.lookup_token(token)] = 1

                # TRY IT YOURSELF

        return one_hot

    @classmethod
    def from_dataframe(cls, comment_df, cutoff=25):
        """ 데이터셋 데이터프레임에서 Vectorizer 객체를 만듭니다

        매개변수:
            comment_df (pandas.DataFrame): 리뷰 데이터셋
            cutoff (int): 빈도 기반 필터링 설정값
        반환값:
            CommentVectorizer 객체
        """
        comment_vocab = Vocabulary(add_unk=True)
        toxicity_vocab = Vocabulary(add_unk=False)

        # 점수를 추가합니다
        for toxicity in sorted(set(comment_df.toxicity)):
            toxicity_vocab.add_token(toxicity)

        # count > cutoff인 단어를 추가합니다
        word_counts = Counter()
        for comment in comment_df.comment:
            for word in comment.split(" "):
                if word not in string.punctuation:
                    word_counts[word] += 1
            # TRY IT YOURSELF

        for word, count in word_counts.items():
            if count > cutoff:
                comment_vocab.add_token(word)
            # TRY IT YOURSELF

        return cls(comment_vocab, toxicity_vocab)

    @classmethod
    def from_serializable(cls, contents):
        """ 직렬화된 딕셔너리에서 CommentVectorizer 객체를 만듭니다

        매개변수:
            contents (dict): 직렬화된 딕셔너리
        반환값:
            CommentVectorizer 클래스 객체
        """
        continueomment_vocab = Vocabulary.from_serializable(contents['comment_vocab'])
        toxicity_vocab =  Vocabulary.from_serializable(contents['toxicity_vocab'])

        return cls(comment_vocab=comment_vocab, toxicity_vocab=toxicity_vocab)

    def to_serializable(self):
        """ 캐싱을 위해 직렬화된 딕셔너리를 만듭니다

        반환값:
            contents (dict): 직렬화된 딕셔너리
        """
        return {'comment_vocab': self.comment_vocab.to_serializable(),
                'toxicity_vocab': self.comment_vocab.to_serializable()}

### `DataLoader`

In [140]:
from torch.utils.data import DataLoader

def generate_batches(dataset, batch_size, shuffle=True,
                     drop_last=True, device="cpu"):
    """
    파이토치 DataLoader를 감싸고 있는 제너레이터 함수.
    각 텐서를 지정된 장치로 이동합니다.
    """
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
                           shuffle=shuffle, drop_last=drop_last)
    # TRY IT YOURSELF

    for data_dict in dataloader:
        out_data_dict = {}
        for name, tensor in data_dict.items():
            out_data_dict[name] = data_dict[name].to(device)
        yield out_data_dict

## 로지스틱 회귀 모델 기반 감성 분류기

In [144]:
import torch.nn as nn

class CommentClassifier(nn.Module):
    """ 로지스틱 회귀 모델 """
    def __init__(self, num_features):
        """
        매개변수:
            num_features (int): 입력 특성 벡트의 크기
        """
        super(CommentClassifier, self).__init__()
        self.fc1 = nn.Linear(in_features=num_features,
                             out_features=1)# TRY IT YOURSELF

    def forward(self, x_in, apply_sigmoid=False):
        """ 분류기의 정방향 계산

        매개변수:
            x_in (torch.Tensor): 입력 데이터 텐서
                x_in.shape는 (batch, num_features)입니다.
            apply_sigmoid (bool): 시그모이드 활성화 함수를 위한 플래그
                크로스-엔트로피 손실을 사용하려면 False로 지정합니다
        반환값:
            결과 텐서. tensor.shape은 (batch,)입니다.
        """
        y_out = self.fc1(x_in).squeeze()# TRY IT YOURSELF
        if apply_sigmoid:
            y_out = torch.sigmoid(y_out)
        return y_out

In [143]:
dataset = CommentDataset.load_dataset_and_make_vectorizer(args.comment_csv)
vectorizer = dataset.get_vectorizer()
vectorizer.toxicity_vocab._token_to_idx

{'nottoxic': 0, 'toxic': 1}

## 설정

In [145]:
def set_seed_everywhere(seed, cuda):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if cuda:
        torch.cuda.manual_seed_all(seed)

def handle_dirs(dirpath):
    if not os.path.exists(dirpath):
        os.makedirs(dirpath)

In [146]:
if args.expand_filepaths_to_save_dir:
    args.vectorizer_file = os.path.join(args.save_dir,
                                        args.vectorizer_file)

    args.model_state_file = os.path.join(args.save_dir,
                                         args.model_state_file)

# 재현성을 위해 시드 설정
set_seed_everywhere(args.seed, args.cuda)

# 디렉토리 처리
handle_dirs(args.save_dir)

### 헬퍼 함수

In [147]:
import torch.optim as optim

def make_train_state(args):
    return {'stop_early': False,
            'early_stopping_step': 0,
            'early_stopping_best_val': 1e8,
            'learning_rate': args.learning_rate,
            'epoch_index': 0,
            'train_loss': [],
            'train_acc': [],
            'val_loss': [],
            'val_acc': [],
            'test_loss': -1,
            'test_acc': -1,
            'model_filename': args.model_state_file}

def update_train_state(args, model, train_state):
    """ 훈련 상태를 업데이트합니다.

    Components:
     - 조기 종료: 과대 적합 방지
     - 모델 체크포인트: 더 나은 모델을 저장합니다

    :param args: 메인 매개변수
    :param model: 훈련할 모델
    :param train_state: 훈련 상태를 담은 딕셔너리
    :returns:
        새로운 훈련 상태
    """

    # 적어도 한 번 모델을 저장합니다
    if train_state['epoch_index'] == 0:
        torch.save(model.state_dict(), train_state['model_filename'])
        train_state['stop_early'] = False

    # 성능이 향상되면 모델을 저장합니다
    elif train_state['epoch_index'] >= 1:
        loss_tm1, loss_t = train_state['val_loss'][-2:]

        # 손실이 나빠지면
        if loss_t >= train_state['early_stopping_best_val']:
            # 조기 종료 단계 업데이트
            train_state['early_stopping_step'] += 1
        # 손실이 감소하면
        else:
            # 최상의 모델 저장
            if loss_t < train_state['early_stopping_best_val']:
                torch.save(model.state_dict(), train_state['model_filename'])

            # 조기 종료 단계 재설정
            train_state['early_stopping_step'] = 0

        # 조기 종료 여부 확인
        train_state['stop_early'] = \
            train_state['early_stopping_step'] >= args.early_stopping_criteria

    return train_state


def compute_accuracy(y_pred, y_target):
    y_target = y_target.cpu()
    y_pred_indices = (torch.sigmoid(y_pred)>0.5).cpu().long()#.max(dim=1)[1]
    n_correct = torch.eq(y_pred_indices, y_target).sum().item()
    return n_correct / len(y_pred_indices) * 100

## 데이터셋, 모델, 손실, 옵티마이저, 훈련 상태 딕셔너리 만들기

In [148]:
# CUDA 체크
if not torch.cuda.is_available():
    args.cuda = False

args.device = torch.device("cuda" if args.cuda else "cpu")

# 데이터셋과 Vectorizer
dataset = CommentDataset.load_dataset_and_make_vectorizer(args.comment_csv)
vectorizer = dataset.get_vectorizer()

# 모델
# TRY IT YOURSELF
classifier = CommentClassifier(num_features=len(vectorizer.comment_vocab))
classifier = classifier.to(args.device)

# 손실 함수와 옵티마이저
loss_func = nn.BCEWithLogitsLoss()# TRY IT YOURSELF
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)# TRY IT YOURSELF
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
                                                 mode='min', factor=0.5,
                                                 patience=1)# TRY IT YOURSELF

train_state = make_train_state(args)

In [105]:
vectorizer.comment_vocab

TypeError: ignored

## 훈련 반복

In [149]:
epoch_bar = tqdm.notebook.tqdm(desc='training routine',
                          total=args.num_epochs,
                          position=0)

dataset.set_split('train')
train_bar = tqdm.notebook.tqdm(desc='split=train',
                          total=dataset.get_num_batches(args.batch_size),
                          position=1,
                          leave=True)
dataset.set_split('val')
val_bar = tqdm.notebook.tqdm(desc='split=val',
                        total=dataset.get_num_batches(args.batch_size),
                        position=1,
                        leave=True)


for epoch_index in range(args.num_epochs):
    train_state['epoch_index'] = epoch_index

    # 훈련 세트에 대한 순회

    # 훈련 세트와 배치 제너레이터 준비, 손실과 정확도를 0으로 설정
    dataset.set_split('train')
    batch_generator = generate_batches(dataset,
                                       batch_size=args.batch_size,
                                       device=args.device)
    # TRY IT YOURSELF
    running_loss = 0.0
    running_acc = 0.0
    classifier.train()

    for batch_index, batch_dict in enumerate(batch_generator):
        # 훈련 과정은 5단계로 이루어집니다

        # --------------------------------------
        # 단계 1. 그레이디언트를 0으로 초기화합니다
        # TRY IT YOURSELF
        optimizer.zero_grad()

        # 단계 2. 출력을 계산합니다
        # TRY IT YOURSELF
        y_pred = classifier(x_in=batch_dict['x_data'].float())

        # 단계 3. 손실을 계산합니다
        # TRY IT YOURSELF
        loss = loss_func(y_pred, batch_dict['y_target'].float())
        loss_t = loss.item()
        running_loss += (loss_t - running_loss) / (batch_index + 1)
        # 단계 4. 손실을 사용해 그레이디언트를 계산합니다
        # TRY IT YOURSELF
        loss.backward()

        # 단계 5. 옵티마이저로 가중치를 업데이트합니다
        # TRY IT YOURSELF
        # -----------------------------------------
        optimizer.step()

        # 정확도를 계산합니다
        acc_t = compute_accuracy(y_pred, batch_dict['y_target'])
        running_acc += (acc_t - running_acc) / (batch_index + 1)

        # 진행 바 업데이트
        train_bar.set_postfix(loss=running_loss,
                              acc=running_acc,
                              epoch=epoch_index)
        train_bar.update()

    train_state['train_loss'].append(running_loss)
    train_state['train_acc'].append(running_acc)

    # 검증 세트에 대한 순회

    # 검증 세트와 배치 제너레이터 준비, 손실과 정확도를 0으로 설정
    dataset.set_split('val')
    batch_generator = generate_batches(dataset,
                                       batch_size=args.batch_size,
                                       device=args.device)
    running_loss = 0.
    running_acc = 0.
    classifier.eval()

    for batch_index, batch_dict in enumerate(batch_generator):

        # 단계 1. 출력을 계산합니다
        y_pred = classifier(x_in=batch_dict['x_data'].float())

        # 단계 2. 손실을 계산합니다
        loss = loss_func(y_pred, batch_dict['y_target'].float())
        loss_t = loss.item()
        running_loss += (loss_t - running_loss) / (batch_index + 1)

        # 단계 3. 정확도를 계산합니다
        acc_t = compute_accuracy(y_pred, batch_dict['y_target'])
        running_acc += (acc_t - running_acc) / (batch_index + 1)

        val_bar.set_postfix(loss=running_loss,
                            acc=running_acc,
                            epoch=epoch_index)
        val_bar.update()

    train_state['val_loss'].append(running_loss)
    train_state['val_acc'].append(running_acc)

    train_state = update_train_state(args=args, model=classifier,
                                     train_state=train_state)

    scheduler.step(train_state['val_loss'][-1])

    train_bar.n = 0
    val_bar.n = 0
    epoch_bar.update()

    if train_state['stop_early']:
        break

    train_bar.n = 0
    val_bar.n = 0
    epoch_bar.update()

training routine:   0%|          | 0/100 [00:00<?, ?it/s]

split=train:   0%|          | 0/436 [00:00<?, ?it/s]

split=val:   0%|          | 0/93 [00:00<?, ?it/s]

## 테스트 세트 평가

In [150]:
# 가장 좋은 모델을 사용해 테스트 세트의 손실과 정확도를 계산합니다
classifier.load_state_dict(torch.load(train_state['model_filename']))
classifier = classifier.to(args.device)

dataset.set_split('test')
batch_generator = generate_batches(dataset,
                                   batch_size=args.batch_size,
                                   device=args.device)
running_loss = 0.
running_acc = 0.
classifier.eval()
all_y_pred_proba = []
all_y_target = []
for batch_index, batch_dict in enumerate(batch_generator):
    # 출력을 계산합니다
    # TRY IT YOURSELF
    y_pred = classifier(x_in = batch_dict['x_data'].float())
    y_pred = torch.sigmoid(y_pred)
    y_target = batch_dict['y_target']
    all_y_pred_proba.extend(y_pred.cpu().detach().numpy())
    all_y_target.extend(y_target.cpu().numpy())


# train_state['test_loss'] = running_loss
# train_state['test_acc'] = running_acc

In [151]:
all_y_pred = [0 if i < 0.5 else 1 for i in all_y_pred_proba]

In [154]:
count_ones = all_y_pred.count(1)

print(f"Number of 1's in the list: {count_ones}")


Number of 1's in the list: 781


In [155]:
from sklearn.metrics import f1_score
# Calculate F1 score
f1 = f1_score(all_y_target, all_y_pred, average='binary')  # Use 'micro', 'macro', or 'weighted' if appropriate

print(f"F1 Score: {f1}")

F1 Score: 0.7171769590036325


In [156]:
  from sklearn.metrics import accuracy_score

  # Assuming you have lists of true labels (all_y_target) and predicted labels (all_y_pred)
  accuracy = accuracy_score(all_y_target, all_y_pred)

  print(f"Accuracy: {accuracy}")



Accuracy: 0.9542170698924731


In [157]:
from sklearn.metrics import recall_score

# Assuming you have lists of true labels (all_y_target) and predicted labels (all_y_pred)
tpr = recall_score(all_y_target, all_y_pred, average='binary')  # Use 'micro', 'macro', or 'weighted' if appropriate

print(f"True Positive Rate (TPR): {tpr}")


True Positive Rate (TPR): 0.6029668411867365


In [75]:
# 가장 좋은 모델을 사용해 테스트 세트의 손실과 정확도를 계산합니다
classifier.load_state_dict(torch.load(train_state['model_filename']))
classifier = classifier.to(args.device)

dataset.set_split('test')
batch_generator = generate_batches(dataset,
                                   batch_size=args.batch_size,
                                   device=args.device)
running_loss = 0.
running_acc = 0.
classifier.eval()

for batch_index, batch_dict in enumerate(batch_generator):
    # 출력을 계산합니다
    # TRY IT YOURSELF
    y_pred = classifier(x_in = batch_dict['x_data'].float())
    print(batch_dict['y_target'])
    # 손실을 계산합니다
    loss = loss_func(y_pred, batch_dict['y_target'].float())# TRY IT YOURSELF
    loss_t = loss.item()# TRY IT YOURSELF
    running_loss += (loss_t - running_loss) / (batch_index + 1)

    # 정확도를 계산합니다
    acc_t = compute_accuracy(y_pred, batch_dict['y_target'])# TRY IT YOURSELF
    running_acc += (acc_t - running_acc) / (batch_index + 1)

train_state['test_loss'] = running_loss
train_state['test_acc'] = running_acc

tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0], device='cuda:0')
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0], device='cuda:0')
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 

In [61]:
from sklearn.metrics import f1_score

# ...

running_f1 = 0.
classifier.eval()

for batch_index, batch_dict in enumerate(batch_generator):
    # Outputs
    y_pred = classifier(x_in=batch_dict['x_data'].float())

    # Loss
    loss = loss_func(y_pred, batch_dict['y_target'].float())
    loss_t = loss.item()
    running_loss += (loss_t - running_loss) / (batch_index + 1)

    # F1 score
    y_true = batch_dict['y_target'].cpu().numpy().astype(int)
    y_pred_probs = torch.sigmoid(y_pred).cpu().detach().numpy()
    y_pred_labels = (y_pred_probs >= 0.5).astype(int)
    f1_t = f1_score(y_true, y_pred_labels, average='micro')  # You can change 'micro' to 'macro' or 'weighted' as needed
    running_f1 += (f1_t - running_f1) / (batch_index + 1)

train_state['test_loss'] = running_loss
train_state['test_f1'] = running_f1


In [65]:
print("테스트 손실: {:.3f}".format(train_state['test_loss']))
print("테스트 정확도: {:.2f}".format(train_state['test_acc']))

테스트 손실: 0.000
테스트 정확도: 100.00


In [72]:
# ...

classifier.load_state_dict(torch.load(train_state['model_filename']))
classifier = classifier.to(args.device)

dataset.set_split('test')

all_y_true = []
all_y_pred_probs = []

classifier.eval()

for batch_index, batch_dict in enumerate(batch_generator):
    # Outputs
    y_pred = classifier(x_in=batch_dict['x_data'].float())

    # Loss
    loss = loss_func(y_pred, batch_dict['y_target'].float())
    loss_t = loss.item()
    running_loss += (loss_t - running_loss) / (batch_index + 1)

    # Collect true labels and predicted probabilities
    all_y_true.extend(batch_dict['y_target'].cpu().numpy().astype(int))
    all_y_pred_probs.extend(torch.sigmoid(y_pred).cpu().detach().numpy())

# Convert the lists to numpy arrays
all_y_true = np.array(all_y_true)
all_y_pred_probs = np.array(all_y_pred_probs)



In [73]:
all_y_pred_probs

array([], dtype=float64)

## 샘플 리뷰에 대한 예측

In [None]:
def predict_comment(Comment, classifier, vectorizer, decision_threshold=0.5):
    """ 리뷰 점수 예측하기

    매개변수:
        Comment (str): 리뷰 텍스트
        classifier (CommentClassifier): 훈련된 모델
        vectorizer (CommentVectorizer): Vectorizer 객체
        decision_threshold (float): 클래스를 나눌 결정 경계
    """
    Comment = preprocess_text(Comment)

    vectorized_Comment = torch.tensor(vectorizer.vectorize(# TRY IT YOURSELF
    result = # TRY IT YOURSELF

    probability_value = # TRY IT YOURSELF
    index = 1
    if probability_value < decision_threshold:
        index = 0

    return vectorizer.comment_vocab.lookup_index(index)

In [None]:
test_Comment = "this is a pretty awesome book"

classifier = classifier.cpu()
prediction = predict_comment(test_Comment, classifier, vectorizer,
                            decision_threshold=0.5)
print("{} -> {}".format(test_Comment, prediction))

## 분류기의 가중치 분석하기

In [None]:
# 가중치 정렬
fc1_weights = classifier.fc1.weight.detach()[0]
_, indices = torch.sort(fc1_weights, dim=0, descending=True)
indices = indices.numpy().tolist()

# 긍정적인 상위 20개 단어
print("긍정 리뷰에 영향을 미치는 단어:")
print("--------------------------------------")
for i in range(20):
    print(vectorizer.Comment_vocab.lookup_index(indices[i]))

print("====\n\n\n")

# 부정적인 상위 20개 단어
print("부정 리뷰에 영향을 미치는 단어:")
print("--------------------------------------")
indices.reverse()
for i in range(20):
    print(vectorizer.Comment_vocab.lookup_index(indices[i]))