In [None]:
import json
import torch
import numpy as np
import pandas as pd
from sklearn.manifold import TSNE
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import seaborn as sns
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import Dataset, DataLoader, random_split
from transformers import RobertaTokenizer, RobertaModel
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# JSON 데이터 로드
train_file_path = 'train.json'
test_file_path = 'test.json'

with open(train_file_path, 'r') as file:
    train_data = json.load(file)

with open(test_file_path, 'r') as file:
    test_data = json.load(file)

# 데이터셋 준비
class CustomDataset(Dataset):
    def __init__(self, data, tokenizer, max_len, is_train=True):
        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.is_train = is_train

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        item = self.data[index]
        text = ' '.join(item.get("facts", []))
        encoding = self.tokenizer.encode_plus(
            text,
            max_length=self.max_len,
            truncation=True,
            padding='max_length',
            return_tensors='pt',
            return_attention_mask=True,
            add_special_tokens=True
        )

        sample = {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten()
        }

        if self.is_train:
            label = item.get("label", 0)
            sample['labels'] = torch.tensor(label, dtype=torch.float)

        if 'id' in item:
            sample['id'] = torch.tensor(item['id'], dtype=torch.long)

        return sample

In [None]:
# RoBERTa Tokenizer 설정
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
max_len = 512

# Dataset 객체 생성
dataset = CustomDataset(data=train_data, tokenizer=tokenizer, max_len=max_len, is_train=True)
test_dataset = CustomDataset(data=test_data, tokenizer=tokenizer, max_len=max_len, is_train=False)


# Train/Validation Split
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# DataLoader 설정
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# RoBERTa 모델 정의
class RobertaBinaryClassifier(nn.Module):
    def __init__(self, dropout=0.3):
        super(RobertaBinaryClassifier, self).__init__()
        self.roberta = RobertaModel.from_pretrained('roberta-base')
        self.drop = nn.Dropout(dropout)
        self.bn = nn.BatchNorm1d(self.roberta.config.hidden_size)
        self.out = nn.Linear(self.roberta.config.hidden_size, 1)

    def forward(self, input_ids, attention_mask):
        outputs = self.roberta(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        output = self.drop(pooled_output)
        output = self.bn(output)
        logits = self.out(output).squeeze(-1)
        return logits


In [None]:
# 모델 초기화
model = RobertaBinaryClassifier().to(device)
model = nn.DataParallel(model)

# 손실 함수 및 옵티마이저 설정
criterion = nn.BCEWithLogitsLoss()
optimizer = AdamW(model.parameters(), lr=1e-5)

# 학습 루프
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs):
    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        correct_train = 0
        total_train = 0

        for batch in train_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device).float()

            optimizer.zero_grad()

            logits = model(input_ids=input_ids, attention_mask=attention_mask)
            loss = criterion(logits, labels)

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            # Accuracy 계산
            probs = torch.sigmoid(logits)
            preds = (probs > 0.5).float()
            correct_train += (preds == labels).sum().item()
            total_train += labels.size(0)

        avg_loss = total_loss / len(train_loader)
        train_acc = correct_train / total_train

        # 검증 단계
        model.eval()
        correct_val = 0
        total_val = 0
        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['labels'].to(device).float()

                logits = model(input_ids=input_ids, attention_mask=attention_mask)

                probs = torch.sigmoid(logits)
                preds = (probs > 0.5).float()
                correct_val += (preds == labels).sum().item()
                total_val += labels.size(0)

        val_acc = correct_val / total_val

        print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {avg_loss:.4f}, "
              f"Train Accuracy: {train_acc:.4f}, Validation Accuracy: {val_acc:.4f}")

# 학습 실행
num_epochs = 10
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs)

In [None]:
# 평가 함수
def save_predictions_to_csv(model, test_loader, file_path):
    model.eval()
    ids = []
    preds = []

    with torch.no_grad():
        for batch in test_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            batch_ids = batch['id']

            logits = model(input_ids=input_ids, attention_mask=attention_mask)
            probabilities = torch.sigmoid(logits).cpu().numpy()
            predictions = (probabilities > 0.5).astype(int)

            ids.extend(batch_ids.numpy())
            preds.extend(predictions)

    df = pd.DataFrame({
        'id': ids,
        'label': preds
    })

    df.to_csv(file_path, index=False)
    print(f"Predictions saved to {file_path}")

# 평가 실행
save_predictions_to_csv(model, test_loader, 'predictions.csv')