<a href="https://colab.research.google.com/github/8Chatea8/HeadChecker/blob/main/model/20230904_Mahi_Mahi_data_whole_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 2023.09.04.
담당: 노유현  
데이터: train: data_whole.csv  
모델: 'bert-base-multilingual-cased'  
파라미터: MAX_LEN = 450/batch_size = 8/epoch=4  

In [1]:
import tensorflow as tf
import torch

from transformers import BertTokenizer
from transformers import BertForSequenceClassification, AdamW, BertConfig
from transformers import get_linear_schedule_with_warmup
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split

import pandas as pd
import numpy as np
import random
import time
import datetime

2023-09-18 01:58:32.832917: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-09-18 01:58:32.888328: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-09-18 01:58:32.889787: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# 메모리 삭제 : CUDA out of memery

torch.cuda.empty_cache()

### 데이터 로드

In [5]:
# 데이터 로드
df = pd.read_csv('~/data_whole.csv', encoding='utf-8')

In [6]:
df['Class'].value_counts() # 진짜뉴스 1: 41,332개 / 낚시뉴스 0: 29,663개

Class
1    162468
0    155664
Name: count, dtype: int64

### 데이터 전처리

1. 특수토큰 삽입 [CLS] + 헤드라인 + [SEP] + 본문 + [SEP]
2. 토크나이징(BertTokenizer.from_pretrained('bert-base-multilingual-cased' 사용))
3. MAX_LEN에 맞게 패딩
4. 패딩에 맞게 어텐션 마스크 만듦
5. train, validation으로 나눔(9:1)
6. Dataloader에 피딩

In [7]:
## 헤드라인 데이터 전처리

sentences_h = df['Headline']
sentences_h = ["[CLS] " + str(sentence) + " [SEP]" for sentence in sentences_h]

sentences_c = df['Content']
sentences_c = [ str(sentence) + " [SEP]" for sentence in sentences_c]

sentences = []

for i in range(len(sentences_h)):
    sentence = sentences_h[i] + " " + sentences_c[i]
    sentences.append(sentence)

In [8]:
# Class(라벨) 추출 : 가짜뉴스 0 / 진짜뉴스 1
labels = df['Class'].values
labels

array([0, 1, 1, ..., 0, 1, 0])

In [None]:
# BERT의 토크나이저로 문장을 토큰으로 분리
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased', do_lower_case=False)
tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]

print (sentences[0])
print (tokenized_texts[0])

In [None]:
# 입력 토큰의 최대 시퀀스 길이
MAX_LEN = 450

# 토큰을 숫자 인덱스로 변환
input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]

# 문장을 MAX_LEN 길이에 맞게 자르고, 모자란 부분을 패딩 0으로 채움
input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")

input_ids[0]

In [None]:
## 어텐션 마스크

# 어텐션 마스크 초기화
attention_masks = []

# 어텐션 마스크를 패딩이 아니면 1, 패딩이면 0으로 설정
# 패딩 부분은 BERT 모델에서 어텐션을 수행하지 않아 속도 향상
for seq in input_ids:
    seq_mask = [float(i>0) for i in seq]
    attention_masks.append(seq_mask)

print(attention_masks[0])

In [None]:
# 훈련셋과 검증셋으로 분리
df_inputs, validation_inputs, df_labels, validation_labels = train_test_split(input_ids,
                                                                                    labels,
                                                                                    random_state=2018,
                                                                                    test_size=0.1)

# 어텐션 마스크를 훈련셋과 검증셋으로 분리
df_masks, validation_masks, _, _ = train_test_split(attention_masks,
                                                       input_ids,
                                                       random_state=2018,
                                                       test_size=0.1)

# 데이터를 파이토치의 텐서로 변환
df_inputs = torch.tensor(df_inputs)
df_labels = torch.tensor(df_labels)
df_masks = torch.tensor(df_masks)
validation_inputs = torch.tensor(validation_inputs)
validation_labels = torch.tensor(validation_labels)
validation_masks = torch.tensor(validation_masks)

print(df_inputs[0])
print(df_labels[0])
print(df_masks[0])
print(validation_inputs[0])
print(validation_labels[0])
print(validation_masks[0])

In [None]:
# 배치 사이즈
batch_size = 8

# 파이토치의 DataLoader로 입력, 마스크, 라벨을 묶어 데이터 설정
# 학습시 배치 사이즈 만큼 데이터를 가져옴
df_data = TensorDataset(df_inputs, df_masks, df_labels)
df_sampler = RandomSampler(df_data)
df_dataloader = DataLoader(df_data, sampler=df_sampler, batch_size=batch_size)

validation_data = TensorDataset(validation_inputs, validation_masks, validation_labels)
validation_sampler = SequentialSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=batch_size)

### 모델 생성

In [None]:
## 모델 생성

# GPU 디바이스 이름 구함
device_name = tf.test.gpu_device_name()

# GPU 디바이스 이름 검사
if device_name == '/device:GPU:0':
    print('Found GPU at: {}'.format(device_name))
else:
    raise SystemError('GPU device not found')

In [None]:
# 디바이스 설정
if torch.cuda.is_available():
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('We will use the GPU:', torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print('No GPU available, using the CPU instead.')

In [None]:
# 분류를 위한 BERT 모델 생성
model = BertForSequenceClassification.from_pretrained("bert-base-multilingual-cased", num_labels=2)
model.cuda()

In [None]:
# 옵티마이저 설정
optimizer = AdamW(model.parameters(),
                  lr = 2e-5, # 학습률
                  eps = 1e-8 # 0으로 나누는 것을 방지하기 위한 epsilon 값
                )

# 에폭수
epochs = 4

# 총 훈련 스텝 : 배치반복 횟수 * 에폭
total_steps = len(df_dataloader) * epochs

# 처음에 학습률을 조금씩 변화시키는 스케줄러 생성
scheduler = get_linear_schedule_with_warmup(optimizer,
                                            num_warmup_steps = 0,
                                            num_training_steps = total_steps)

In [None]:
## 모델학습

# 정확도 계산 함수
def flat_accuracy(preds, labels):

    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()

    return np.sum(pred_flat == labels_flat) / len(labels_flat)

In [None]:
# 시간 표시 함수
def format_time(elapsed):

    # 반올림
    elapsed_rounded = int(round((elapsed)))

    # hh:mm:ss으로 형태 변경
    return str(datetime.timedelta(seconds=elapsed_rounded))

### 모델 학습

In [None]:
# 재현을 위해 랜덤시드 고정
seed_val = 42
random.seed(seed_val)
np.random.seed(seed_val)
torch.manual_seed(seed_val)
torch.cuda.manual_seed_all(seed_val)

# 그래디언트 초기화
model.zero_grad()

# 에폭만큼 반복
for epoch_i in range(0, epochs):

    # ========================================
    #               Training
    # ========================================

    print("")
    print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs))
    print('Training...')

    # 시작 시간 설정
    t0 = time.time()

    # 로스 초기화
    total_loss = 0

    # 훈련모드로 변경
    model.train()

    # 데이터로더에서 배치만큼 반복하여 가져옴
    for step, batch in enumerate(df_dataloader):
        # 경과 정보 표시
        if step % 500 == 0 and not step == 0:
            elapsed = format_time(time.time() - t0)
            print('  Batch {:>5,}  of  {:>5,}.    Elapsed: {:}.'.format(step, len(df_dataloader), elapsed))

        # 배치를 GPU에 넣음
        batch = tuple(t.to(device) for t in batch)

        # 배치에서 데이터 추출
        b_input_ids, b_input_mask, b_labels = batch

        # Forward 수행
        outputs = model(b_input_ids,
                        token_type_ids=None,
                        attention_mask=b_input_mask,
                        labels=b_labels)

        # 로스 구함
        loss = outputs[0]

        # 총 로스 계산
        total_loss += loss.item()

        # Backward 수행으로 그래디언트 계산
        loss.backward()

        # 그래디언트 클리핑
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        # 그래디언트를 통해 가중치 파라미터 업데이트
        optimizer.step()

        # 스케줄러로 학습률 감소
        scheduler.step()

        # 그래디언트 초기화
        model.zero_grad()

    # 평균 로스 계산
    avg_df_loss = total_loss / len(df_dataloader)

    print("")
    print("  Average Data loss: {0:.2f}".format(avg_df_loss))
    print("  Training epcoh took: {:}".format(format_time(time.time() - t0)))

    # ========================================
    #               Validation
    # ========================================

    print("")
    print("Running Validation...")

    #시작 시간 설정
    t0 = time.time()

    # 평가모드로 변경
    model.eval()

    # 변수 초기화
    eval_loss, eval_accuracy = 0, 0
    nb_eval_steps, nb_eval_examples = 0, 0

    # 데이터로더에서 배치만큼 반복하여 가져옴
    for batch in validation_dataloader:
        # 배치를 GPU에 넣음
        batch = tuple(t.to(device) for t in batch)

        # 배치에서 데이터 추출
        b_input_ids, b_input_mask, b_labels = batch

        # 그래디언트 계산 안함
        with torch.no_grad():
            # Forward 수행
            outputs = model(b_input_ids,
                            token_type_ids=None,
                            attention_mask=b_input_mask)

        # 출력 로짓 구함
        logits = outputs[0]

        # CPU로 데이터 이동
        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()

        # 출력 로짓과 라벨을 비교하여 정확도 계산
        tmp_eval_accuracy = flat_accuracy(logits, label_ids)
        eval_accuracy += tmp_eval_accuracy
        nb_eval_steps += 1

    print("  Accuracy: {0:.2f}".format(eval_accuracy/nb_eval_steps))
    print("  Validation took: {:}".format(format_time(time.time() - t0)))

print("")
print("Training complete!")

In [None]:
# 모델 저장 방법 : save_pretrained(디렉토리)
# 모델 저장 변수 이름.save_pretrained(원하는 디렉토리) 형태

model_test.save_pretrained('/home/hyunkoo/DATA/Uhyeon/data_whole_model_test.pt') # 파이토치 기반 모델

In [None]:
# 모델 불러오기 방법 : from_pretrained(디렉토리)
# 모델 클래스 이름.from_pretrained(저장된 디렉토리) 형태
model_test = BertForSequenceClassification.from_pretrained('/home/hyunkoo/DATA/Uhyeon/data_whole_model_test.pt')

In [None]:
print(model_test)

### 테스트셋 평가

In [None]:
##### 새로운 문장 테스트 #####
model.eval()  # 모델을 평가 모드로 설정

# CPU를 사용합니다.
device = torch.device("cpu")

# 입력 문장을 토큰화하고 BERT 모델에 맞게 변환하는 함수를 정의합니다.
def convert_input_data(sentences):
    tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]
    MAX_LEN = 450
    input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]
    input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")
    attention_masks = []
    for seq in input_ids:
        seq_mask = [float(i > 0) for i in seq]
        attention_masks.append(seq_mask)
    inputs = torch.tensor(input_ids).to(device)  # 입력 데이터를 CPU로 이동
    masks = torch.tensor(attention_masks).to(device)  # 어텐션 마스크를 CPU로 이동
    return inputs, masks

In [None]:
# 문장을 테스트하는 함수를 정의합니다.
def test_sentences(sentences):
    inputs, masks = convert_input_data(sentences)

    with torch.no_grad():
        outputs = model(inputs, token_type_ids=None, attention_mask=masks)

    logits = outputs.logits
    probabilities = torch.softmax(logits, dim=1)
    probabilities = probabilities.numpy()
    return probabilities

In [None]:
# 파일에서 텍스트를 읽어오는 함수를 정의합니다
def read_text_from_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        text = file.read()
    return text

###### 네이버 뉴스에서 실제 기사를 가져와서 테스트 해보기

###### Test 01 가짜 뉴스 테스트 ######
file_path = '/home/hyunkoo/DATA/Uhyeon/news_test_01.txt'

# 파일에서 텍스트를 읽어옵니다
news_test_01 = read_text_from_file(file_path)
print(news_test_01)

In [None]:
# 모델을 사용하여 텍스트의 유사도를 테스트합니다
###### Test 01 가짜 뉴스 테스트 => 결과 0 출력되어야 함
logits = test_sentences([news_test_01])

print(logits)
print(np.argmax(logits))

In [None]:
###### 네이버 뉴스에서 실제 기사를 가져와서 테스트 해보기

###### Test 02 가짜 뉴스 테스트 ######
file_path = '/home/hyunkoo/DATA/Uhyeon/news_test_02.txt'

# 파일에서 텍스트를 읽어옵니다
news_test_02 = read_text_from_file(file_path)
print(news_test_02)

In [None]:
# 모델을 사용하여 텍스트의 유사도를 테스트합니다
###### Test 02 가짜 뉴스 테스트 => 결과 0 출력되어야 함
logits = test_sentences([news_test_02])

print(logits)
print(np.argmax(logits))

In [None]:
###### 네이버 뉴스에서 실제 기사를 가져와서 테스트 해보기

###### Test 03 진짜 뉴스 테스트 ######
file_path = '/home/hyunkoo/DATA/Uhyeon/news_test_03.txt'

# 파일에서 텍스트를 읽어옵니다
news_test_03 = read_text_from_file(file_path)
print(news_test_03)

In [None]:
# 모델을 사용하여 텍스트의 유사도를 테스트합니다
###### Test 03 진짜 뉴스 테스트 => 결과 1 출력되어야 함
logits = test_sentences([news_test_03])

print(logits)
print(np.argmax(logits))

In [None]:
###### (이슈) 네이버 뉴스에서 실제 기사로 테스트 해본 결과 분류가 제대로 되지 않는 문제 발생 => 결과가 모두 "진짜뉴스 Class 1" 으로 나옴
###### 기존에 학습한 Train 데이터에서 가짜뉴스 5개, 진짜뉴스 5개를 가져와서 분류가 제대로 되는지 확인

# 가짜뉴스 : Class 0 데이터 인덱스
class_0_indices = [0, 6, 12, 17, 20]
for index in class_0_indices:
    # [CLS] + Headline + [SEP] + Content + [SEP]로 조합
    sentences = ["[CLS] " + df['Headline'][index] + " [SEP] " + df['Content'][index] + " [SEP]"]
    logits = test_sentences(sentences)
    print(f"가짜뉴스_Class_0_Index {index}:")
    print("Probabilities:", logits)
    print("Predicted Class:", np.argmax(logits))

# 진짜뉴스 : Class 1 데이터 인덱스
class_1_indices = [1, 2, 3, 4, 5]
for index in class_1_indices:
    # [CLS] + Headline + [SEP] + Content + [SEP]로 조합
    sentences = ["[CLS] " + df['Headline'][index] + " [SEP] " + df['Content'][index] + " [SEP]"]
    logits = test_sentences(sentences)
    print(f"진짜뉴스_class_1_Index {index}:")
    print("Probabilities:", logits)
    print("Predicted Class:", np.argmax(logits))

In [None]:
# 데이터 로드
df = pd.read_csv('/home/hyunkoo/DATA/Uhyeon/data_whole.csv', encoding='utf-8')

# BERT 토크나이저를 로드합니다.
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased', do_lower_case=False)

# 저장된 모델을 로드합니다.
model = BertForSequenceClassification.from_pretrained('/home/hyunkoo/DATA/Uhyeon/data_whole_model.pt')

# 입력 문장을 토큰화하고 BERT 모델에 맞게 변환하는 함수를 정의합니다.
def convert_input_data(sentences):
    tokenized_texts = [tokenizer.tokenize(sent) for sent in sentences]
    MAX_LEN = 450
    input_ids = [tokenizer.convert_tokens_to_ids(x) for x in tokenized_texts]
    input_ids = pad_sequences(input_ids, maxlen=MAX_LEN, dtype="long", truncating="post", padding="post")
    attention_masks = []
    for seq in input_ids:
        seq_mask = [float(i > 0) for i in seq]
        attention_masks.append(seq_mask)
    inputs = torch.tensor(input_ids)
    masks = torch.tensor(attention_masks)
    return inputs, masks

# 문장을 테스트하는 함수를 정의합니다.
def test_sentences(sentences):
    model.eval()
    inputs, masks = convert_input_data(sentences)
    b_input_ids = inputs.to(device)
    b_input_mask = masks.to(device)
    with torch.no_grad():
        outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask)
    logits = outputs.logits
    probabilities = torch.softmax(logits, dim=1)
    probabilities = probabilities.cpu().numpy()
    return probabilities

# 가짜 뉴스 및 진짜 뉴스의 인덱스를 정의합니다.
class_0_indices = [0, 6, 12, 17, 20]  # 가짜 뉴스
class_1_indices = [1, 2, 3, 4, 5]    # 진짜 뉴스

# 가짜 뉴스 테스트
for index in class_0_indices:
    sentences = ["[CLS] " + df['Headline'][index] + " [SEP] " + df['Content'][index] + " [SEP]"]
    logits = test_sentences(sentences)
    print(f"가짜뉴스_Class_0_Index {index}:")
    print("Probabilities:", logits)
    print("Predicted Class:", np.argmax(logits))

# 진짜 뉴스 테스트
for index in class_1_indices:
    sentences = ["[CLS] " + df['Headline'][index] + " [SEP] " + df['Content'][index] + " [SEP]"]
    logits = test_sentences(sentences)
    print(f"진짜뉴스_class_1_Index {index}:")
    print("Probabilities:", logits)
    print("Predicted Class:", np.argmax(logits))