#### Project
- 낚시성 기사 탐지
- data -> Fake : 낚시성 기사, Real : 일반 기사

In [62]:
import pandas as pd
from Korpora import Korpora
from tabulate import tabulate
import json
import os 
import torch
from collections import Counter
from konlpy.tag import *

In [2]:
# 장치 사용
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

print(f"Using device: {DEVICE}")

Using device: cpu


In [28]:
TRAIN_PATH = "./data/Train/"
# TRAIN_DATA = "./data/Train"
# VAL_DATA = "./data/Validation"

label_list = os.listdir(TRAIN_PATH)
print(label_list)

['Fake', 'Real']


In [80]:
dict = {'text': [], 'label': []}

for label in label_list:
    folder_path = os.path.join(TRAIN_PATH, label)
    file_list = os.listdir(folder_path)
    
    for file in file_list:
        file_path = os.path.join(folder_path, file)
        
        with open(file_path, encoding='utf-8') as f:
            data = json.load(f)
            df = pd.json_normalize(data, record_path=["sourceDataInfo", "sentenceInfo"])
            
            full_article = ' '.join(df['sentenceContent'])
            
            dict['text'].append(full_article)
            dict['label'].append(label)

KeyboardInterrupt: 

In [41]:
trainDF = pd.DataFrame(dict)
trainDF.head(7)

Unnamed: 0,text,label
0,손목시계는 이제 시간을 알려주는 도구 이상의 의미를 갖고 있다.,Fake
1,"어떤 이에게는 신분과 지위를 상징하고, 또 다른 이에게는 패션과 개성을 상징한다.",Fake
2,"여기에 가난을 없애고 목마름을 해소시키며 암을 예방해준다면, 그리고 가격이 40달러...",Fake
3,이 심플하고 기능적인 손목시계에 기부의 의미를 추가한 시계가 바로 페이스 워치(Fa...,Fake
4,페이스 워치는 미국 시카고의 브랜딩 회사 Mirza Minds의 대표 Fam Mir...,Fake
5,제3세계의 빈곤과 선진국의 경제발전을 다 경험한 Fam Mirza 대표는 세계의 빈...,Fake
6,"Fam Mirza 대표는 인스타그램, 페이스북, 트위터등을 통해 사람들의 감정에 그...",Fake


In [42]:
trainDF.tail(7)

Unnamed: 0,text,label
593705,김순남의 곡들은 천지윤의 해금과 박윤우의 기타에 여현우의 클라리넷이 합세한 트리오 ...,Real
593706,서양음악에 국악과 재즈가 어우러져 독특한 울림을 자아낸다.,Real
593707,"윤이상의 곡들은 천지윤의 해금과 조윤성의 피아노가 때론 함께 노닐고, 때론 서로 겨...",Real
593708,전통 악기에 재즈의 리듬감과 현대음악의 조성이 섞이면서 세련된 음색을 빚어낸다.,Real
593709,악기 두 대의 조촐한 편성이지만 조금도 단조롭지 않다.,Real
593710,천지윤은 협업 경험이 풍부한 국악인이다. ‘이날치’의 베이시스트 장영규와 국악그룹 ...,Real
593711,"9일 오전엔 두 앨범에 참여한 박윤우, 여현우, 조윤성과 함께 크로스오버 가수 박현...",Real


- train, text data 나누기

In [76]:
from sklearn.model_selection import train_test_split

X = trainDF[['text']]
y = trainDF['label']  

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

# 데이터 확인
print(f"Train 데이터 개수: {len(X_train)}")
print(f"Test 데이터 개수: {len(X_test)}")
print(f"총 Data 수 : {len(X_train) + len(X_test)}")

Train 데이터 개수: 474969
Test 데이터 개수: 118743
총 Data 수 : 593712


- 구두점, 불용어 제거

In [77]:
import string

string.punctuation

'!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'

In [78]:
# 구두점 제거 함수
def remove_punctuation(text):
    punc = string.punctuation
    if text in punc :
        text = text.replace(punc, "")
    return text

In [79]:
# Train 데이터에서 구두점 제거
X_train = X_train.apply(remove_punctuation)

# Test 데이터에서 구두점 제거
X_test = X_test.apply(remove_punctuation)

TypeError: 'in <string>' requires string as left operand, not Series

In [72]:
# 제거된 결과 확인
print("구두점 제거 후 Train 데이터 샘플:")
X_train.head()

구두점 제거 후 Train 데이터 샘플:


Unnamed: 0,text
536407,"최고 시속 300㎞ 이상으로 달린 창사-광저우, 류저우-구이양 간 고속철과 구이양~..."
147040,한편 토론자로 나선 김점용 시인( 주간)은 “미당의 기념사업을 철회할 경우 윤리적 ...
453530,"에서 왕을 보필하는 최측근, 조 내관 역이 그의 몫이다."
14285,"각각에 대해 경제통으로 꼽히는 중앙일보시사미디어 편집위원들이 분석기사를 쓰고, 해당..."
385465,"뱅쇼는 프랑스어로 ‘Em거운 와인’이라는 뜻으로 독일에서는 글루바인Gluhwien,..."


In [73]:
print("구두점 제거 후 Test 데이터 샘플:")
print(X_test.head())

구두점 제거 후 Test 데이터 샘플:
                                                     text
418800             이화학술원 해외 석좌교수인 그는 2007년부터 이화여대 강단에 섰다.
389748                          즉 감염자 99.9%는 걸린 후 나은 셈이다.
375766  심한 정계정맥류에서 고환의 정맥이 부어오르고 임파선이 커지며 주변조직을 압박하는 경...
591316  1998년 파리 초연 이후 지난해 한국 공연을 통해 18년 만에 작품에 복귀한 71...
140416         범행 뒤 역삼동 또다른 화장실에 머물렀던 범인은 추가 범행을 계획했던 걸까?


- 불용어 제거

In [54]:
# 불용어 리스트 생성
stop_path = "./data/stop_words.txt"

In [55]:
with open(stop_path, "r", encoding="utf-8") as f:
    wordlist = f.readlines()

wordlist

['프\n',
 '뗴\n',
 '잠시\n',
 '채\n',
 '즉시\n',
 '드\n',
 '하도록시키다\n',
 '제\n',
 '하는바\n',
 '쓰\n',
 '으로써\n',
 '연이서\n',
 '삐\n',
 '결론을 낼 수 있다\n',
 '지만\n',
 '조용히\n',
 '보는데서\n',
 'ㅟ\n',
 'ㅘ\n',
 '탸\n',
 '깨\n',
 '똬\n',
 '여보세요\n',
 '잇따라\n',
 'ㅐ\n',
 '삼\n',
 '거바\n',
 '이 때문에\n',
 '소생\n',
 '쉿\n',
 '함께\n',
 '난\n',
 '정도에 이르다\n',
 '언제\n',
 '괴\n',
 '매번\n',
 '하는 김에\n',
 '거기\n',
 '초\n',
 '까\n',
 '바꾸어말하자면\n',
 '솨\n',
 '처\n',
 '꾸\n',
 '까닭으로\n',
 '그럼에도\n',
 '아이참\n',
 '곳\n',
 '더욱더\n',
 '그중에서\n',
 '흥\n',
 '양자\n',
 '차\n',
 'ㅈ\n',
 '라서\n',
 '나\n',
 '비로소\n',
 '쐬\n',
 '요만한걸\n',
 '이 외에\n',
 '허\n',
 '하는 것도\n',
 '겨\n',
 '절대\n',
 '지든지\n',
 '하물며\n',
 '요만한 것\n',
 '좌\n',
 '지말고\n',
 '이리하여\n',
 '좋아\n',
 '이었다\n',
 '하네요\n',
 '따위\n',
 '아하\n',
 '할때\n',
 '그렇지만\n',
 '두\n',
 '어떻게\n',
 '내일\n',
 '퐈\n',
 '료\n',
 '무엇때문에\n',
 '종\n',
 '게\n',
 '버\n',
 '하기에\n',
 '하기 보다는\n',
 '한켠으로는\n',
 '시각\n',
 '펴\n',
 '만은 아니다\n',
 '심지어\n',
 '설\n',
 '도달하다\n',
 '퍽\n',
 'ㅒ\n',
 '겹\n',
 '어기여차\n',
 '에도\n',
 '트\n',
 '타인\n',
 '나왔는데\n',
 '볘\n',
 '퇴\n',
 '세상에\n',
 '

In [56]:
stopwords = []

for word in wordlist :
    stopwords.append(word.replace("\n", ""))
print(len(wordlist), len(stopwords))
print(stopwords[-10:])

1193 1193
['만일', '상대적으로 말하자면', '더구나', '붕붕', '바꾸어말하면', '주룩주룩', '한항목', '예를 들면', '아야', '지금']


In [64]:
# 토큰화
counter = Counter()
tokenizer = Okt()

In [65]:
train_tokens = [tokenizer.morphs(review) for review in X_train]
test_tokens = [tokenizer.morphs(review) for review in X_test]

KeyboardInterrupt: 

In [None]:
len(train_tokens), len(test_tokens)

- 단어사전 생성

In [None]:
def build_vocab(corpus, n_vocab, special_tokens) :
    counter = Counter()
    for tokens in corpus :
        counter.update(tokens)
    vocab = special_tokens
    for token, count in counter.most_common(n_vocab) :
        vocab.append(token)
    return vocab

In [None]:
vocab = build_vocab(corpus = train_tokens, n_vocab = 20000, special_tokens = ["<PAD>", "<UNK>"])
token_to_id = {token:idx for idx, token in enumerate(vocab)}
id_to_token = {idx:token for idx, token in enumerate(vocab)}

print(vocab[:20])
print(len(vocab))

print(token_to_id)
print(id_to_token)

- 정수 인코딩 및 패딩

In [None]:
def pad_sequences(sequences, max_length, pad_value) :
    result = list()
    for sequence in sequences :
        sequence = sequence[:max_length]
        pad_length = max_length - len(sequence)
        padded_sequence = sequence + [pad_value] * pad_length
        result.append(padded_sequence)
    return np.asarray(result)

In [None]:
unk_id = token_to_id['<UNK>']
train_ids = [[token_to_id.get(token, unk_id)for token in text] for text in train_tokens]
test_ids = [[token_to_id.get(token, unk_id)for token in text] for text in test_tokens]

max_length = 200
pad_id = token_to_id["<PAD>"]
train_ids = pad_sequences(train_ids, max_length, pad_id)
test_ids = pad_sequences(test_ids, max_length, pad_id)

print(train_ids[0])
print(test_ids[0])

- Data Loader

In [None]:
import torch
from torch.utils.data import TensorDataset, DataLoader

train_ids = torch.tensor(train_ids)
test_ids = torch.tensor(test_ids)

train_labels = torch.tensor(X_train.label.values, dtype = torch.float32)
test_labels = torch.tensor(X_test.label.values, dtype=torch.float32)

train_dataset = TensorDataset(train_ids, train_labels)
test_dataset = TensorDataset(test_ids, test_labels)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

- 학습모델

In [None]:
import torch
from torch import nn, optim

# SentenceClassifier 정의
class SentenceClassifier(nn.Module):
    def __init__(self, n_vocab, hidden_dim, embedding_dim, n_layers, dropout=0.5, bidirectional=True, model_type="lstm", pretrained_embedding=None):
        super().__init__()


        if pretrained_embedding is not None:
            self.embedding = nn.Embedding.from_pretrained(
                torch.tensor(pretrained_embedding, dtype=torch.float32),
                padding_idx=0
            )
        else:
            self.embedding = nn.Embedding(
                num_embeddings=n_vocab,
                embedding_dim=embedding_dim,
                padding_idx=0
            )
        
        # LSTM 모델 사용
        self.model = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            bidirectional=bidirectional,
            dropout=dropout,
            batch_first=True
        )

        # Bidirectional 설정에 따른 분류기
        if bidirectional:
            self.classifier = nn.Linear(hidden_dim * 2, 1)
        else:
            self.classifier = nn.Linear(hidden_dim, 1)
        
        self.dropout = nn.Dropout(dropout)

    def forward(self, inputs):
        # Embedding lookup
        embeddings = self.embedding(inputs)
        
        # LSTM 모델에 입력
        output, _ = self.model(embeddings)
        
        # 마지막 시퀀스 출력 사용
        last_output = output[:, -1, :]
        last_output = self.dropout(last_output)
        
        # 로짓 계산
        logits = self.classifier(last_output)
        return logits

# 모델 초기화
n_vocab = len(token_to_id)  # 사전 크기
hidden_dim = 674  # LSTM 은닉층 크기
embedding_dim = 128  # Embedding 차원
n_layers = 2  # LSTM 레이어 수

device = "cuda" if torch.cuda.is_available() else "cpu"

classifier = SentenceClassifier(
    n_vocab=n_vocab, hidden_dim=hidden_dim, embedding_dim=embedding_dim, n_layers=n_layers
).to(device)

# 손실 함수 및 옵티마이저 설정
criterion = nn.BCEWithLogitsLoss().to(device)  # 이진 분류 손실 함수
optimizer = optim.RMSprop(classifier.parameters(), lr=0.001)

# 학습 함수 정의
def train(model, datasets, criterion, optimizer, device, interval):
    model.train()
    losses = []

    for step, (input_ids, labels) in enumerate(datasets):
        input_ids = input_ids.to(device)
        labels = labels.to(device).unsqueeze(1)  # 이진 분류이므로 차원 추가

        logits = model(input_ids)
        loss = criterion(logits, labels)
        losses.append(loss.item())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if step % interval == 0:
            print(f"Train Loss {step}: {np.mean(losses)}")

# 테스트 함수 정의
def test(model, datasets, criterion, device):
    model.eval()
    losses = []
    corrects = []

    with torch.no_grad():  # 테스트에서는 그래디언트 계산 안함
        for step, (input_ids, labels) in enumerate(datasets):
            input_ids = input_ids.to(device)
            labels = labels.to(device).unsqueeze(1)

            logits = model(input_ids)
            loss = criterion(logits, labels)
            losses.append(loss.item())

            yhat = torch.sigmoid(logits) > 0.5  # Sigmoid로 이진 분류
            corrects.extend(torch.eq(yhat, labels).cpu().tolist())

    print(f"Val Loss: {np.mean(losses)}, Val Accuracy: {np.mean(corrects)}")

# 학습 설정
epochs = 5
interval = 500  # 500번마다 출력

# 학습 및 테스트 반복
for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    train(classifier, train_loader, criterion, optimizer, device, interval)
    test(classifier, test_loader, criterion, device)

- Data Loader

In [None]:
import torch
from torch.utils.data import TensorDataset, DataLoader

# train_ids 및 test_ids는 이미 존재한다고 가정합니다.
train_ids = torch.tensor(train_ids)
test_ids = torch.tensor(test_ids)

# label 열이 'label'이라는 이름으로 존재한다고 가정하여 수정
train_labels = torch.tensor(trainDF.loc[X_train.index, 'label'].values, dtype=torch.float32)
test_labels = torch.tensor(trainDF.loc[X_test.index, 'label'].values, dtype=torch.float32)

# TensorDataset을 사용해 데이터셋 구성
train_dataset = TensorDataset(train_ids, train_labels)
test_dataset = TensorDataset(test_ids, test_labels)

# DataLoader를 사용해 배치 데이터 구성
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)


- 학습모델

In [None]:
class SentenceClassifier(nn.Module):
    def __init__(self, n_vocab, hidden_dim, embedding_dim, n_layers, dropout=0.5, bidirectional=True, model_type="lstm", pretrained_embedding=None):
        super().__init__()
        if pretrained_embedding is not None:
            self.embedding = nn.Embedding.from_pretrained(
                torch.tensor(pretrained_embedding, dtype=torch.float32),
                padding_idx=0
            )
        else:
            self.embedding = nn.Embedding(
                num_embeddings=n_vocab,
                embedding_dim=embedding_dim,
                padding_idx=0
            )
        
        # LSTM 모델 사용
        self.model = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=n_layers,
            bidirectional=bidirectional,
            dropout=dropout,
            batch_first=True
        )

        if bidirectional:
            self.classifier = nn.Linear(hidden_dim * 2, 1)
        else:
            self.classifier = nn.Linear(hidden_dim, 1)
        
        self.dropout = nn.Dropout(dropout)

    def forward(self, inputs):
        embeddings = self.embedding(inputs)
        output, _ = self.model(embeddings)
        last_output = output[:, -1, :]
        last_output = self.dropout(last_output)
        logits = self.classifier(last_output)
        return logits

# 모델 초기화
n_vocab = len(token_to_id)  # 사전 크기
hidden_dim = 674  # LSTM 은닉층 크기
embedding_dim = 128  # Embedding 차원
n_layers = 2  # LSTM 레이어 수

device = "cuda" if torch.cuda.is_available() else "cpu"

classifier = SentenceClassifier(
    n_vocab=n_vocab, hidden_dim=hidden_dim, embedding_dim=embedding_dim, n_layers=n_layers
).to(device)

# 손실 함수 및 옵티마이저 설정
criterion = nn.BCEWithLogitsLoss().to(device)
optimizer = optim.RMSprop(classifier.parameters(), lr=0.001)

# 모델 저장을 위한 경로
SAVE_FILE = "best_model_state_dict.pth"
SAVE_MODEL = "best_model.pth"

best_val_loss = float('inf')

# 학습 함수 정의
# 학습 함수 정의 (Train 정확도 추가)
def train(model, datasets, criterion, optimizer, device, interval):
    model.train()
    losses = []
    corrects = []

    for step, (input_ids, labels) in enumerate(datasets):
        input_ids = input_ids.to(device)
        labels = labels.to(device).unsqueeze(1)

        logits = model(input_ids)
        loss = criterion(logits, labels)
        losses.append(loss.item())

        # 예측값 계산 (sigmoid로 확률화하고 0.5를 기준으로 이진 분류)
        predictions = torch.sigmoid(logits) > 0.5
        corrects.extend(torch.eq(predictions, labels).cpu().tolist())  # 정답과 예측 비교

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if step % interval == 0:
            train_loss = np.mean(losses)  # 현재까지의 평균 손실 계산
            train_accuracy = np.mean(corrects)  # 현재까지의 평균 정확도 계산
            print(f"Train Step {step}: Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.4f}")

# 테스트 함수 정의
def test(model, datasets, criterion, device):
    global best_val_loss  # 전역 변수로 설정하여 저장 기준으로 사용
    model.eval()
    losses = []
    corrects = []

    with torch.no_grad():
        for step, (input_ids, labels) in enumerate(datasets):
            input_ids = input_ids.to(device)
            labels = labels.to(device).unsqueeze(1)

            logits = model(input_ids)
            loss = criterion(logits, labels)
            losses.append(loss.item())

            yhat = torch.sigmoid(logits) > 0.5
            corrects.extend(torch.eq(yhat, labels).cpu().tolist())

    val_loss = np.mean(losses)
    val_accuracy = np.mean(corrects)

    print(f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")

    # 가장 좋은 성능일 때 모델 저장
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), SAVE_FILE)  # state_dict 저장
        torch.save(model, SAVE_MODEL)  # 전체 모델 저장
        print(f"Best model saved with loss: {val_loss:.4f}")

# 학습 설정
epochs = 5
interval = 500

# 학습 및 테스트 반복
for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    train(classifier, train_loader, criterion, optimizer, device, interval)
    test(classifier, test_loader, criterion, device)