# 1. KoELECTRA + LSTM
- 이진 분류를 제대로 못함 (일반 대화를 제대로 분류하지 못함)
- 그에 반해 다중 분류 작업은 준수한 것 같음
- 따라서 이진 분류 부분을 수정하기로 함

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModel, AutoTokenizer
from torch.optim import AdamW
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from torch.utils.data import WeightedRandomSampler

# KoELECTRA tokenizer 로드
tokenizer = AutoTokenizer.from_pretrained("monologg/koelectra-base-discriminator")

# 클래스명-정수 매핑
label_map_str2int = {
    '협박 대화': 0,
    '갈취 대화': 1,
    '직장 내 괴롭힘 대화': 2,
    '기타 괴롭힘 대화': 3,
    '일반대화': 4
}
label_map_int2str = {v: k for k, v in label_map_str2int.items()}

# Dataset 클래스 정의
class TextDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len, task='binary'):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.task = task
        self.label_map = label_map_str2int

        # 디버깅을 위한 레이블 분포 출력
        if self.task == 'binary':
            self.binary_labels = [0 if self.label_map[row['class']] == 4 else 1 for _, row in self.data.iterrows()]
            unique, counts = np.unique(self.binary_labels, return_counts=True)
            print(f"Binary label distribution: {dict(zip(unique, counts))}")
        elif self.task == 'multi':
            self.multi_labels = [self.label_map[row['class']] for _, row in self.data.iterrows()]
            unique, counts = np.unique(self.multi_labels, return_counts=True)
            print(f"Multi-class label distribution: {dict(zip(unique, counts))}")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        text = self.data.iloc[index]['conversation']
        label_str = self.data.iloc[index]['class']
        orig_label = self.label_map[label_str]  # 원래 레이블 (0-4)

        inputs = self.tokenizer(text,
                                truncation=True,
                                padding='max_length',
                                max_length=self.max_len,
                                return_tensors="pt")

        input_ids = inputs['input_ids'].squeeze()
        attention_mask = inputs['attention_mask'].squeeze()

        # 일반대화(4) → 0, 나머지(0-3) → 1로 이진 분류
        if self.task == 'binary':
            binary_label = 0 if orig_label == 4 else 1
            return {
                'input_ids': input_ids,
                'attention_mask': attention_mask,
                'label': torch.tensor(binary_label, dtype=torch.long)
            }
        # 다중 분류용 - 일반대화는 제외하고 레이블을 그대로 사용 (0-3)
        elif self.task == 'multi':
            return {
                'input_ids': input_ids,
                'attention_mask': attention_mask,
                'label': torch.tensor(orig_label, dtype=torch.long)
            }

    # 샘플링 가중치 계산용 함수 추가
    def get_labels(self):
        if self.task == 'binary':
            return self.binary_labels
        elif self.task == 'multi':
            return self.multi_labels

# KoELECTRA + LSTM 이진 분류 모델
class KoELECTRA_LSTM_BinaryClassifier(nn.Module):
    def __init__(self, bert_model_name='monologg/koelectra-base-discriminator', hidden_size=128, lstm_layers=1, dropout=0.3, freeze_bert=False):
        super().__init__()
        self.bert = AutoModel.from_pretrained(bert_model_name)
        self.freeze_bert = freeze_bert
        self.lstm = nn.LSTM(input_size=self.bert.config.hidden_size, hidden_size=hidden_size,
                            num_layers=lstm_layers, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(hidden_size * 2, 2)  # 이진 분류: 0(일반), 1(폭력)

    def forward(self, input_ids, attention_mask):
        if self.freeze_bert:
            with torch.no_grad():
                bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        else:
            bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)

        sequence_output = bert_output.last_hidden_state
        lstm_output, _ = self.lstm(sequence_output)
        pooled_output = lstm_output[:, -1, :]
        out = self.dropout(pooled_output)
        logits = self.classifier(out)
        return logits

# KoELECTRA + LSTM 다중 분류 모델 (4개 클래스)
class KoELECTRA_LSTM_MultiClassifier(nn.Module):
    def __init__(self, bert_model_name='monologg/koelectra-base-discriminator', hidden_size=128, lstm_layers=1, dropout=0.3, freeze_bert=False):
        super().__init__()
        self.bert = AutoModel.from_pretrained(bert_model_name)
        self.freeze_bert = freeze_bert
        self.lstm = nn.LSTM(input_size=self.bert.config.hidden_size, hidden_size=hidden_size,
                            num_layers=lstm_layers, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(hidden_size * 2, 4)  # 4개 클래스 (0-3)

    def forward(self, input_ids, attention_mask):
        if self.freeze_bert:
            with torch.no_grad():
                bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        else:
            bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)

        sequence_output = bert_output.last_hidden_state
        lstm_output, _ = self.lstm(sequence_output)
        pooled_output = lstm_output[:, -1, :]
        out = self.dropout(pooled_output)
        logits = self.classifier(out)
        return logits

# 학습 및 검증 함수
def train(model, train_loader, val_loader, optimizer, criterion, device, epochs=5, patience=2):
    best_val_loss = float('inf')
    best_val_f1 = 0.0
    patience_counter = 0

    for epoch in range(epochs):
        # 학습
        model.train()
        total_loss = 0
        train_preds = []
        train_true = []

        for batch in train_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

            # 학습 중 정확도 계산용
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            train_preds.extend(preds)
            train_true.extend(labels.cpu().numpy())

        avg_train_loss = total_loss / len(train_loader)
        train_accuracy = accuracy_score(train_true, train_preds)
        train_f1 = f1_score(train_true, train_preds, average='weighted')

        # 검증
        model.eval()
        val_loss = 0
        val_preds = []
        val_true = []

        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)

                outputs = model(input_ids=input_ids, attention_mask=attention_mask)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                preds = torch.argmax(outputs, dim=1).cpu().numpy()
                val_preds.extend(preds)
                val_true.extend(labels.cpu().numpy())

        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = accuracy_score(val_true, val_preds)
        val_f1 = f1_score(val_true, val_preds, average='weighted')

        # 클래스별 성능 확인
        print(f"\nEpoch {epoch+1}/{epochs}")
        print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Train F1: {train_f1:.4f}")
        print(f"Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Val F1: {val_f1:.4f}")

        # 검증 데이터의 클래스별 성능 출력
        val_report = classification_report(val_true, val_preds, output_dict=True)
        for cls in sorted(set(val_true)):
            print(f"Class {cls}: F1={val_report[str(cls)]['f1-score']:.4f}, Recall={val_report[str(cls)]['recall']:.4f}")

        # 조기 종료 체크 (F1 스코어 기반으로 변경)
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            patience_counter = 0
            # 최적 모델 저장
            torch.save(model.state_dict(), 'best_model.pt')
            print("새로운 최적 모델 저장!")
        else:
            patience_counter += 1
            print(f"성능 개선 없음: {patience_counter}/{patience}")
            if patience_counter >= patience:
                print("조기 종료!")
                break

    # 최적 모델 로드
    model.load_state_dict(torch.load('best_model.pt'))
    return model

# 학습 준비
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 데이터 로드
data = pd.read_csv("/content/sample_data/merged_dataset.csv")
print(f"Total data size: {len(data)}")

# 클래스 분포 확인
class_counts = data['class'].value_counts()
print("Class distribution:")
print(class_counts)

# 일반대화 포함 → 이진 분류용, 일반대화 제외 → 다중 분류용
data_binary = data.copy()
data_multi = data[data['class'] != '일반대화'].copy()

# 데이터 분할
train_binary, val_binary = train_test_split(data_binary, test_size=0.2, stratify=data_binary['class'], random_state=42)
train_multi, val_multi = train_test_split(data_multi, test_size=0.2, stratify=data_multi['class'], random_state=42)

print(f"Binary training set: {len(train_binary)}, validation set: {len(val_binary)}")
print(f"Multi training set: {len(train_multi)}, validation set: {len(val_multi)}")

# 데이터셋 생성
train_dataset_binary = TextDataset(train_binary, tokenizer, max_len=128, task='binary')
val_dataset_binary = TextDataset(val_binary, tokenizer, max_len=128, task='binary')

train_dataset_multi = TextDataset(train_multi, tokenizer, max_len=128, task='multi')
val_dataset_multi = TextDataset(val_multi, tokenizer, max_len=128, task='multi')

# 불균형 데이터 처리를 위한 가중치 샘플링 설정 (이진 분류용)
binary_labels = train_dataset_binary.get_labels()
unique_binary_labels = np.unique(binary_labels)
binary_class_weights = compute_class_weight(class_weight='balanced',
                                           classes=unique_binary_labels,
                                           y=binary_labels)
# 클래스 인덱스를 dictionary로 변환
binary_weight_dict = {cls: weight for cls, weight in zip(unique_binary_labels, binary_class_weights)}
binary_samples_weight = [binary_weight_dict[lbl] for lbl in binary_labels]
binary_sampler = WeightedRandomSampler(weights=binary_samples_weight,
                                      num_samples=len(binary_samples_weight),
                                      replacement=True)

# 불균형 데이터 처리를 위한 가중치 샘플링 설정 (다중 분류용)
multi_labels = train_dataset_multi.get_labels()
unique_multi_labels = np.unique(multi_labels)
multi_class_weights = compute_class_weight(class_weight='balanced',
                                         classes=unique_multi_labels,
                                         y=multi_labels)
# 클래스 인덱스를 dictionary로 변환
multi_weight_dict = {cls: weight for cls, weight in zip(unique_multi_labels, multi_class_weights)}
multi_samples_weight = [multi_weight_dict[lbl] for lbl in multi_labels]
multi_sampler = WeightedRandomSampler(weights=multi_samples_weight,
                                    num_samples=len(multi_samples_weight),
                                    replacement=True)

# 데이터로더 생성 (가중치 샘플러 적용)
train_loader_binary = DataLoader(train_dataset_binary, batch_size=16, sampler=binary_sampler)
val_loader_binary = DataLoader(val_dataset_binary, batch_size=16, shuffle=False)

train_loader_multi = DataLoader(train_dataset_multi, batch_size=16, sampler=multi_sampler)
val_loader_multi = DataLoader(val_dataset_multi, batch_size=16, shuffle=False)

# 모델 생성 - BERT 미세조정 활성화
model_binary = KoELECTRA_LSTM_BinaryClassifier(freeze_bert=False).to(device)
model_multi = KoELECTRA_LSTM_MultiClassifier(freeze_bert=False).to(device)

# 손실 함수 정의 (클래스 가중치 적용)
binary_loss_weights = torch.tensor(binary_class_weights, dtype=torch.float).to(device)
criterion_binary = nn.CrossEntropyLoss(weight=binary_loss_weights)

multi_loss_weights = torch.tensor(multi_class_weights, dtype=torch.float).to(device)
criterion_multi = nn.CrossEntropyLoss(weight=multi_loss_weights)

# 옵티마이저 설정 (더 낮은 학습률 사용)
optimizer_binary = AdamW(model_binary.parameters(), lr=2e-5)
optimizer_multi = AdamW(model_multi.parameters(), lr=2e-5)

# 학습률 스케줄러 추가
from transformers import get_linear_schedule_with_warmup

# 총 훈련 스텝 계산
num_epochs = 5
total_steps_binary = len(train_loader_binary) * num_epochs
warmup_steps_binary = int(total_steps_binary * 0.1)  # 10% 웜업
scheduler_binary = get_linear_schedule_with_warmup(
    optimizer_binary, num_warmup_steps=warmup_steps_binary, num_training_steps=total_steps_binary
)

total_steps_multi = len(train_loader_multi) * num_epochs
warmup_steps_multi = int(total_steps_multi * 0.1)  # 10% 웜업
scheduler_multi = get_linear_schedule_with_warmup(
    optimizer_multi, num_warmup_steps=warmup_steps_multi, num_training_steps=total_steps_multi
)

# 모델 평가 지표 설정 - 전역 변수로 선언하지 않고 함수 내에서 사용
# 각 모델 학습 함수 내에서 best_val_f1을 정의하도록 수정

# 모델 학습 함수 수정 (스케줄러 적용)
def train_with_scheduler(model, train_loader, val_loader, optimizer, scheduler, criterion, device, epochs=5, patience=2):
    best_val_f1 = 0.0
    patience_counter = 0

    for epoch in range(epochs):
        # 학습
        model.train()
        total_loss = 0
        train_preds = []
        train_true = []

        for batch in train_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            loss = criterion(outputs, labels)
            loss.backward()

            # 그래디언트 클리핑 추가
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            optimizer.step()
            scheduler.step()  # 스케줄러 업데이트

            total_loss += loss.item()

            # 학습 중 정확도 계산용
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            train_preds.extend(preds)
            train_true.extend(labels.cpu().numpy())

        avg_train_loss = total_loss / len(train_loader)
        train_accuracy = accuracy_score(train_true, train_preds)
        train_f1 = f1_score(train_true, train_preds, average='weighted')

        # 검증
        model.eval()
        val_loss = 0
        val_preds = []
        val_true = []

        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)

                outputs = model(input_ids=input_ids, attention_mask=attention_mask)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                preds = torch.argmax(outputs, dim=1).cpu().numpy()
                val_preds.extend(preds)
                val_true.extend(labels.cpu().numpy())

        # 클래스별 성능 평가
        val_report = classification_report(val_true, val_preds, output_dict=True)
        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = accuracy_score(val_true, val_preds)
        val_f1 = f1_score(val_true, val_preds, average='weighted')

        print(f"\nEpoch {epoch+1}/{epochs}")
        print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Train F1: {train_f1:.4f}")
        print(f"Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Val F1: {val_f1:.4f}")

        # 각 클래스별 성능 출력
        for cls in sorted(set(val_true)):
            cls_str = str(cls)
            if cls_str in val_report:
                print(f"Class {cls}: F1={val_report[cls_str]['f1-score']:.4f}, Recall={val_report[cls_str]['recall']:.4f}")

        # 조기 종료 체크 (F1 스코어 기반)
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            patience_counter = 0
            # 최적 모델 저장
            torch.save(model.state_dict(), 'best_model.pt')
            print("새로운 최적 모델 저장!")
        else:
            patience_counter += 1
            print(f"성능 개선 없음: {patience_counter}/{patience}")
            if patience_counter >= patience:
                print("조기 종료!")
                break

    # 최적 모델 로드
    model.load_state_dict(torch.load('best_model.pt'))
    return model

# 모델 학습 실행
print("\n[INFO] Training Binary Classifier...")
model_binary = train_with_scheduler(model_binary, train_loader_binary, val_loader_binary,
                   optimizer_binary, scheduler_binary, criterion_binary, device, epochs=num_epochs, patience=2)

print("\n[INFO] Training Multi Classifier...")
model_multi = train_with_scheduler(model_multi, train_loader_multi, val_loader_multi,
                  optimizer_multi, scheduler_multi, criterion_multi, device, epochs=num_epochs, patience=2)

# 추론 함수 개선 - 확률 기반 임계값 추가
def predict(text, binary_threshold=0.6):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    input_ids = inputs["input_ids"].to(device)
    attention_mask = inputs["attention_mask"].to(device)

    with torch.no_grad():
        # 이진 분류 (일반/폭력)
        binary_logits = model_binary(input_ids=input_ids, attention_mask=attention_mask)
        binary_probs = torch.softmax(binary_logits, dim=1)

        # 확률값 추출
        normal_prob = binary_probs[0][0].item()  # 일반대화 확률
        violent_prob = binary_probs[0][1].item()  # 폭력대화 확률

        print(f"일반대화 확률: {normal_prob:.4f}, 폭력대화 확률: {violent_prob:.4f}")

        # 확률 기반 분류 (임계값 적용)
        binary_pred = 1 if violent_prob > binary_threshold else 0

        print(f"Binary prediction: {binary_pred} ({'폭력대화' if binary_pred == 1 else '일반대화'})")

        # 이진 분류에서 0이면 일반 대화 (원래 레이블 4)
        if binary_pred == 0:
            return 4  # 일반대화

        # 폭력 대화인 경우, 세부 유형 분류
        multi_logits = model_multi(input_ids=input_ids, attention_mask=attention_mask)
        multi_probs = torch.softmax(multi_logits, dim=1)
        multi_pred = torch.argmax(multi_logits, dim=1).item()

        # 확률값 출력
        for i, prob in enumerate(multi_probs[0].cpu().numpy()):
            print(f"Class {i} ({label_map_int2str[i]}) 확률: {prob:.4f}")

        # multi_pred는 0-3 사이의 값으로, 원래 레이블 체계와 일치함
        print(f"Multi prediction: {multi_pred} ({label_map_int2str[multi_pred]})")
        return multi_pred

# 테스트셋 평가
print("\n[INFO] Loading test set...")
test_df = pd.read_csv("/content/sample_data/test.csv")
print(f"Test set size: {len(test_df)}")

predicted_labels = []

# 원래 레이블 매핑과 동일한 매핑 사용
label_map = label_map_int2str

# text 컬럼을 기준으로 예측 수행
test_texts = test_df["text"].tolist()
for i, text in enumerate(test_texts):
    print(f"\nPredicting sample {i+1}/{len(test_texts)}")
    pred = predict(text, binary_threshold=0.6)  # 임계값 적용
    predicted_labels.append(pred)
    print(f"Final prediction: {pred} ({label_map[pred]})")

test_df["prediction"] = predicted_labels
test_df["prediction_label"] = test_df["prediction"].map(label_map)

# 클래스별 예측 분포 확인
pred_counts = test_df["prediction_label"].value_counts()
print("\nPrediction distribution:")
print(pred_counts)

# 실제 레이블이 있는 경우 평가 지표 계산
if "class" in test_df.columns:
    test_df["true_label_id"] = test_df["class"].map(label_map_str2int)
    true_labels = test_df["true_label_id"].tolist()

    print("\n[INFO] Classification Report:")
    print(classification_report(true_labels, predicted_labels, target_names=list(label_map_str2int.keys())))

    accuracy = accuracy_score(true_labels, predicted_labels)
    f1 = f1_score(true_labels, predicted_labels, average='weighted')

    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score (weighted): {f1:.4f}")

# 저장
test_df.to_csv("/content/test_predictions_improved.csv", index=False)
print("\n[INFO] 예측 결과 저장 완료: /content/test_predictions_improved.csv")

AttributeError: partially initialized module 'torch' has no attribute 'fx' (most likely due to a circular import)

# 2. 이진 분류 : KcBERT-base / 다중 분류 : KoELECTRA + LSTM

In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModel, AutoTokenizer, BertModel, BertTokenizer
from torch.optim import AdamW
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from torch.utils.data import WeightedRandomSampler

# KoELECTRA tokenizer 로드 - 다중 분류기용
electra_tokenizer = AutoTokenizer.from_pretrained("monologg/koelectra-base-discriminator")
# KcBERT tokenizer 로드 - 이진 분류기용
bert_tokenizer = BertTokenizer.from_pretrained("beomi/kcbert-base")

# 클래스명-정수 매핑
label_map_str2int = {
    '협박 대화': 0,
    '갈취 대화': 1,
    '직장 내 괴롭힘 대화': 2,
    '기타 괴롭힘 대화': 3,
    '일반대화': 4
}
label_map_int2str = {v: k for k, v in label_map_str2int.items()}

# 이진 분류용 SharedBERT 모델
class SharedBERT(nn.Module):
    def __init__(self, model_name="beomi/kcbert-base"):
        super().__init__()
        self.bert = BertModel.from_pretrained(model_name)
        for p in self.bert.parameters():
            p.requires_grad = False

    def forward(self, input_ids, attention_mask):
        return self.bert(input_ids=input_ids, attention_mask=attention_mask).pooler_output

# 이진 분류기 정의
class BinaryClassifier(nn.Module):
    def __init__(self, hidden_size=768):
        super().__init__()
        self.classifier = nn.Linear(hidden_size, 2)

    def forward(self, cls_output):
        return self.classifier(cls_output)

# 이진 분류 데이터셋
class BinaryClassificationDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length=128):
        self.texts = dataframe['conversation'].tolist()
        self.labels = [label_map_str2int[label] for label in dataframe['class'].tolist()]
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.binary_labels = [0 if label == 4 else 1 for label in self.labels]

        # 디버깅을 위한 레이블 분포 출력
        unique, counts = np.unique(self.binary_labels, return_counts=True)
        print(f"Binary label distribution: {dict(zip(unique, counts))}")

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoding = self.tokenizer(
            self.texts[idx],
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )
        item = {key: val.squeeze(0) for key, val in encoding.items()}
        item['label'] = torch.tensor(self.binary_labels[idx], dtype=torch.long)
        item['original_label'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

# 다중 분류용 데이터셋 클래스 정의
class MultiClassificationDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.label_map = label_map_str2int

        # 폭력 대화만 사용 (일반대화 제외)
        self.data = self.data[self.data['class'] != '일반대화'].copy()

        # 다중 레이블 설정
        self.multi_labels = [self.label_map[row['class']] for _, row in self.data.iterrows()]
        unique, counts = np.unique(self.multi_labels, return_counts=True)
        print(f"Multi-class label distribution: {dict(zip(unique, counts))}")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        text = self.data.iloc[index]['conversation']
        label_str = self.data.iloc[index]['class']
        label = self.label_map[label_str]  # 0-3 레이블

        inputs = self.tokenizer(text,
                               truncation=True,
                               padding='max_length',
                               max_length=self.max_len,
                               return_tensors="pt")

        input_ids = inputs['input_ids'].squeeze()
        attention_mask = inputs['attention_mask'].squeeze()

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'label': torch.tensor(label, dtype=torch.long)
        }

    # 샘플링 가중치 계산용 함수
    def get_labels(self):
        return self.multi_labels

# KoELECTRA + LSTM 다중 분류 모델 (4개 클래스)
class KoELECTRA_LSTM_MultiClassifier(nn.Module):
    def __init__(self, bert_model_name='monologg/koelectra-base-discriminator', hidden_size=128, lstm_layers=1, dropout=0.3, freeze_bert=False):
        super().__init__()
        self.bert = AutoModel.from_pretrained(bert_model_name)
        self.freeze_bert = freeze_bert
        self.lstm = nn.LSTM(input_size=self.bert.config.hidden_size, hidden_size=hidden_size,
                            num_layers=lstm_layers, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(hidden_size * 2, 4)  # 4개 클래스 (0-3)

    def forward(self, input_ids, attention_mask):
        if self.freeze_bert:
            with torch.no_grad():
                bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        else:
            bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)

        sequence_output = bert_output.last_hidden_state
        lstm_output, _ = self.lstm(sequence_output)
        pooled_output = lstm_output[:, -1, :]
        out = self.dropout(pooled_output)
        logits = self.classifier(out)
        return logits

# 이진 분류기 학습 함수
def train_binary_model(shared_bert, binary_head, train_loader, optimizer, device, epochs=10):
    for epoch in range(epochs):
        shared_bert.eval()  # 사전 학습된 BERT는 고정
        binary_head.train()
        total_loss = 0
        train_preds = []
        train_true = []

        for batch in train_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()
            with torch.no_grad():
                cls_output = shared_bert(input_ids, attention_mask)
            logits = binary_head(cls_output)
            loss = F.cross_entropy(logits, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            # 학습 중 정확도 계산용
            preds = torch.argmax(logits, dim=1).cpu().numpy()
            train_preds.extend(preds)
            train_true.extend(labels.cpu().numpy())

        avg_train_loss = total_loss / len(train_loader)
        train_accuracy = accuracy_score(train_true, train_preds)
        train_f1 = f1_score(train_true, train_preds, average='weighted')

        print(f"[Binary Epoch {epoch+1}/{epochs}] Loss: {avg_train_loss:.4f}, Accuracy: {train_accuracy:.4f}, F1: {train_f1:.4f}")

    return binary_head

# 다중 분류기 학습 함수
def train_with_scheduler(model, train_loader, val_loader, optimizer, scheduler, criterion, device, epochs=5, patience=2):
    best_val_f1 = 0.0
    patience_counter = 0

    for epoch in range(epochs):
        # 학습
        model.train()
        total_loss = 0
        train_preds = []
        train_true = []

        for batch in train_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            loss = criterion(outputs, labels)
            loss.backward()

            # 그래디언트 클리핑 추가
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            optimizer.step()
            scheduler.step()  # 스케줄러 업데이트

            total_loss += loss.item()

            # 학습 중 정확도 계산용
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            train_preds.extend(preds)
            train_true.extend(labels.cpu().numpy())

        avg_train_loss = total_loss / len(train_loader)
        train_accuracy = accuracy_score(train_true, train_preds)
        train_f1 = f1_score(train_true, train_preds, average='weighted')

        # 검증
        model.eval()
        val_loss = 0
        val_preds = []
        val_true = []

        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)

                outputs = model(input_ids=input_ids, attention_mask=attention_mask)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                preds = torch.argmax(outputs, dim=1).cpu().numpy()
                val_preds.extend(preds)
                val_true.extend(labels.cpu().numpy())

        # 클래스별 성능 평가
        val_report = classification_report(val_true, val_preds, output_dict=True)
        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = accuracy_score(val_true, val_preds)
        val_f1 = f1_score(val_true, val_preds, average='weighted')

        print(f"\nEpoch {epoch+1}/{epochs}")
        print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Train F1: {train_f1:.4f}")
        print(f"Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Val F1: {val_f1:.4f}")

        # 각 클래스별 성능 출력
        for cls in sorted(set(val_true)):
            cls_str = str(cls)
            if cls_str in val_report:
                print(f"Class {cls}: F1={val_report[cls_str]['f1-score']:.4f}, Recall={val_report[cls_str]['recall']:.4f}")

        # 조기 종료 체크 (F1 스코어 기반)
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            patience_counter = 0
            # 최적 모델 저장
            torch.save(model.state_dict(), 'best_multi_model.pt')
            print("새로운 최적 모델 저장!")
        else:
            patience_counter += 1
            print(f"성능 개선 없음: {patience_counter}/{patience}")
            if patience_counter >= patience:
                print("조기 종료!")
                break

    # 최적 모델 로드
    model.load_state_dict(torch.load('best_multi_model.pt'))
    return model

# 학습 준비
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 데이터 로드
data = pd.read_csv("/content/sample_data/train_toxic.csv")
print(f"Total data size: {len(data)}")

# 클래스 분포 확인
class_counts = data['class'].value_counts()
print("Class distribution:")
print(class_counts)

# 이진 분류 및 다중 분류를 위한 데이터 분할
train_data, val_data = train_test_split(data, test_size=0.2, stratify=data['class'], random_state=42)
print(f"Training set: {len(train_data)}, validation set: {len(val_data)}")

# 이진 분류 모델 준비
shared_bert = SharedBERT(model_name="beomi/kcbert-base").to(device)
binary_head = BinaryClassifier().to(device)

# 이진 분류 데이터셋 및 데이터로더 생성
train_dataset_binary = BinaryClassificationDataset(train_data, bert_tokenizer, max_length=128)
val_dataset_binary = BinaryClassificationDataset(val_data, bert_tokenizer, max_length=128)

# 이진 분류 데이터로더
train_loader_binary = DataLoader(train_dataset_binary, batch_size=32, shuffle=True)
val_loader_binary = DataLoader(val_dataset_binary, batch_size=32, shuffle=False)

# 이진 분류 모델 학습
optimizer_binary = torch.optim.Adam(binary_head.parameters(), lr=2e-5)

print("\n[INFO] Training Binary Classifier...")
binary_head = train_binary_model(shared_bert, binary_head, train_loader_binary, optimizer_binary, device, epochs=10)

# 이진 분류 모델 저장
torch.save(binary_head.state_dict(), "binary_classifier.pt")
print("Binary classifier model saved as 'binary_classifier.pt'")

# 다중 분류용 데이터셋 준비 - 일반대화 제외
train_dataset_multi = MultiClassificationDataset(train_data, electra_tokenizer, max_len=128)
val_dataset_multi = MultiClassificationDataset(val_data, electra_tokenizer, max_len=128)

# 불균형 데이터 처리를 위한 가중치 샘플링 설정 (다중 분류용)
multi_labels = train_dataset_multi.get_labels()
unique_multi_labels = np.unique(multi_labels)
multi_class_weights = compute_class_weight(class_weight='balanced',
                                         classes=unique_multi_labels,
                                         y=multi_labels)
# 클래스 인덱스를 dictionary로 변환
multi_weight_dict = {cls: weight for cls, weight in zip(unique_multi_labels, multi_class_weights)}
multi_samples_weight = [multi_weight_dict[lbl] for lbl in multi_labels]
multi_sampler = WeightedRandomSampler(weights=multi_samples_weight,
                                    num_samples=len(multi_samples_weight),
                                    replacement=True)

# 다중 분류용 데이터로더 생성
train_loader_multi = DataLoader(train_dataset_multi, batch_size=16, sampler=multi_sampler)
val_loader_multi = DataLoader(val_dataset_multi, batch_size=16, shuffle=False)

# 다중 분류 모델 생성
model_multi = KoELECTRA_LSTM_MultiClassifier(freeze_bert=False).to(device)

# 손실 함수 정의 (클래스 가중치 적용) - 다중 분류용
multi_loss_weights = torch.tensor(multi_class_weights, dtype=torch.float).to(device)
criterion_multi = nn.CrossEntropyLoss(weight=multi_loss_weights)

# 옵티마이저 설정 - 다중 분류용
optimizer_multi = AdamW(model_multi.parameters(), lr=2e-5)

# 학습률 스케줄러 추가
from transformers import get_linear_schedule_with_warmup

# 총 훈련 스텝 계산 - 다중 분류용
num_epochs = 5
total_steps_multi = len(train_loader_multi) * num_epochs
warmup_steps_multi = int(total_steps_multi * 0.1)  # 10% 웜업
scheduler_multi = get_linear_schedule_with_warmup(
    optimizer_multi, num_warmup_steps=warmup_steps_multi, num_training_steps=total_steps_multi
)

# 다중 분류 모델 학습
print("\n[INFO] Training Multi Classifier...")
model_multi = train_with_scheduler(model_multi, train_loader_multi, val_loader_multi,
                  optimizer_multi, scheduler_multi, criterion_multi, device, epochs=num_epochs, patience=2)

# 추론 함수
def predict(text, binary_threshold=0.5):
    # 이진 분류 (일반/폭력)
    inputs_bert = bert_tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    input_ids_bert = inputs_bert["input_ids"].to(device)
    attention_mask_bert = inputs_bert["attention_mask"].to(device)

    with torch.no_grad():
        # SharedBERT로 특성 추출
        cls_output = shared_bert(input_ids_bert, attention_mask_bert)
        # 이진 분류
        binary_logits = binary_head(cls_output)
        binary_probs = F.softmax(binary_logits, dim=1)

        # 확률값 추출
        normal_prob = binary_probs[0][0].item()  # 일반대화 확률
        violent_prob = binary_probs[0][1].item()  # 폭력대화 확률

        print(f"일반대화 확률: {normal_prob:.4f}, 폭력대화 확률: {violent_prob:.4f}")

        # 임계값 기반 분류
        binary_pred = 1 if violent_prob > binary_threshold else 0

        print(f"Binary prediction: {binary_pred} ({'폭력대화' if binary_pred == 1 else '일반대화'})")

        # 일반 대화로 분류된 경우
        if binary_pred == 0:
            return 4  # 일반대화

        # 폭력 대화로 분류된 경우, 세부 유형 분류
        inputs_electra = electra_tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
        input_ids_electra = inputs_electra["input_ids"].to(device)
        attention_mask_electra = inputs_electra["attention_mask"].to(device)

        multi_logits = model_multi(input_ids=input_ids_electra, attention_mask=attention_mask_electra)
        multi_probs = F.softmax(multi_logits, dim=1)
        multi_pred = torch.argmax(multi_logits, dim=1).item()

        # 확률값 출력
        for i, prob in enumerate(multi_probs[0].cpu().numpy()):
            print(f"Class {i} ({label_map_int2str[i]}) 확률: {prob:.4f}")

        print(f"Multi prediction: {multi_pred} ({label_map_int2str[multi_pred]})")
        return multi_pred

# 테스트셋 평가
print("\n[INFO] Loading test set...")
test_df = pd.read_csv("/content/sample_data/test.csv")
print(f"Test set size: {len(test_df)}")

predicted_labels = []

# 원래 레이블 매핑과 동일한 매핑 사용
label_map = label_map_int2str

# text 컬럼을 기준으로 예측 수행
test_texts = test_df["text"].tolist()
for i, text in enumerate(test_texts):
    print(f"\nPredicting sample {i+1}/{len(test_texts)}")
    pred = predict(text, binary_threshold=0.6)  # 임계값 적용
    predicted_labels.append(pred)
    print(f"Final prediction: {pred} ({label_map[pred]})")

test_df["prediction"] = predicted_labels
test_df["prediction_label"] = test_df["prediction"].map(label_map)

# 클래스별 예측 분포 확인
pred_counts = test_df["prediction_label"].value_counts()
print("\nPrediction distribution:")
print(pred_counts)

# 실제 레이블이 있는 경우 평가 지표 계산
if "class" in test_df.columns:
    test_df["true_label_id"] = test_df["class"].map(label_map_str2int)
    true_labels = test_df["true_label_id"].tolist()

    print("\n[INFO] Classification Report:")
    print(classification_report(true_labels, predicted_labels, target_names=list(label_map_str2int.keys())))

    accuracy = accuracy_score(true_labels, predicted_labels)
    f1 = f1_score(true_labels, predicted_labels, average='weighted')

    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score (weighted): {f1:.4f}")

# 결과 저장
test_df.to_csv("/content/test_predictions_combined.csv", index=False)
print("\n[INFO] 예측 결과 저장 완료: /content/test_predictions_combined.csv")

Using device: cuda
Total data size: 7784
Class distribution:
class
일반대화           3834
기타 괴롭힘 대화      1094
갈취 대화           981
직장 내 괴롭힘 대화     979
협박 대화           896
Name: count, dtype: int64
Training set: 6227, validation set: 1557
Binary label distribution: {np.int64(0): np.int64(3067), np.int64(1): np.int64(3160)}
Binary label distribution: {np.int64(0): np.int64(767), np.int64(1): np.int64(790)}

[INFO] Training Binary Classifier...
[Binary Epoch 1/10] Loss: 0.6650, Accuracy: 0.6764, F1: 0.6745
[Binary Epoch 2/10] Loss: 0.6082, Accuracy: 0.8473, F1: 0.8469
[Binary Epoch 3/10] Loss: 0.5676, Accuracy: 0.8429, F1: 0.8424
[Binary Epoch 4/10] Loss: 0.5334, Accuracy: 0.8709, F1: 0.8708
[Binary Epoch 5/10] Loss: 0.5041, Accuracy: 0.8683, F1: 0.8681
[Binary Epoch 6/10] Loss: 0.4772, Accuracy: 0.8918, F1: 0.8917
[Binary Epoch 7/10] Loss: 0.4528, Accuracy: 0.9189, F1: 0.9189
[Binary Epoch 8/10] Loss: 0.4304, Accuracy: 0.9277, F1: 0.9277
[Binary Epoch 9/10] Loss: 0.4096, Accuracy: 0.9345, F1

- 이진 분류 결과 출력

In [None]:

def predict_binary_only(text, binary_threshold=0.5):
    """이진 분류(일반/폭력)만 수행하는 함수"""
    # 이진 분류용 입력 처리
    inputs_bert = bert_tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    input_ids_bert = inputs_bert["input_ids"].to(device)
    attention_mask_bert = inputs_bert["attention_mask"].to(device)

    with torch.no_grad():
        # SharedBERT로 특성 추출
        cls_output = shared_bert(input_ids_bert, attention_mask_bert)
        # 이진 분류
        binary_logits = binary_head(cls_output)
        binary_probs = F.softmax(binary_logits, dim=1)

        # 확률값 추출
        normal_prob = binary_probs[0][0].item()  # 일반대화 확률
        violent_prob = binary_probs[0][1].item()  # 폭력대화 확률

        # 결과 출력
        print(f"일반대화 확률: {normal_prob:.4f}, 폭력대화 확률: {violent_prob:.4f}")

        # 임계값 기반 분류
        binary_pred = 1 if violent_prob > binary_threshold else 0
        binary_label = '폭력대화' if binary_pred == 1 else '일반대화'

        print(f"Binary prediction: {binary_pred} ({binary_label})")

        return binary_pred, binary_label, normal_prob, violent_prob

# 테스트셋에 대해 이진 분류만 수행
print("\n[INFO] 테스트셋에 대한 이진 분류만 수행...")
test_df = pd.read_csv("/content/sample_data/test.csv")
print(f"테스트셋 크기: {len(test_df)}")

# 결과를 저장할 새 컬럼 추가
test_df["binary_pred"] = None
test_df["binary_label"] = None
test_df["normal_prob"] = None
test_df["violent_prob"] = None

# 각 샘플에 대해 이진 분류 수행
for i, text in enumerate(test_df["text"].tolist()):
    print(f"\n샘플 {i+1}/{len(test_df)} 예측 중...")
    binary_pred, binary_label, normal_prob, violent_prob = predict_binary_only(text, binary_threshold=0.6)

    # 결과 저장
    test_df.at[i, "binary_pred"] = binary_pred
    test_df.at[i, "binary_label"] = binary_label
    test_df.at[i, "normal_prob"] = normal_prob
    test_df.at[i, "violent_prob"] = violent_prob

# 이진 분류 예측 분포 확인
binary_counts = test_df["binary_label"].value_counts()
print("\n이진 분류 예측 분포:")
print(binary_counts)

# 실제 레이블이 있는 경우, 이진 분류 평가지표 계산
if "class" in test_df.columns:
    # 원래 레이블을 이진 레이블로 변환 (일반대화=0, 나머지=1)
    test_df["true_binary"] = test_df["class"].apply(lambda x: 0 if x == '일반대화' else 1)

    true_binary = test_df["true_binary"].tolist()
    pred_binary = test_df["binary_pred"].tolist()

    # 이진 분류 성능 평가
    binary_accuracy = accuracy_score(true_binary, pred_binary)
    binary_f1 = f1_score(true_binary, pred_binary, average='weighted')

    print("\n[INFO] 이진 분류 평가 결과:")
    print(f"정확도: {binary_accuracy:.4f}")
    print(f"F1 점수: {binary_f1:.4f}")
    print("\n혼동 행렬:")
    from sklearn.metrics import confusion_matrix
    cm = confusion_matrix(true_binary, pred_binary)
    print(cm)

    # 상세한 분류 보고서
    print("\n분류 보고서:")
    print(classification_report(true_binary, pred_binary, target_names=['일반대화', '폭력대화']))

# 결과 저장
test_df.to_csv("/content/binary_classification_results.csv", index=False)
print("\n[INFO] 이진 분류 결과 저장 완료: /content/binary_classification_results.csv")

AttributeError: partially initialized module 'torch' has no attribute 'fx' (most likely due to a circular import)

- 파일 병합

In [8]:
import pandas as pd

# 파일 경로 정의
predictions_path = "/content/test_predictions_combined.csv"
submission_path = "/content/submission.csv"

try:
    # 1. 파일 로드
    predictions_df = pd.read_csv(predictions_path)
    submission_df = pd.read_csv(submission_path)

    print(f"예측 결과 파일 로드 완료: {len(predictions_df)}개 행")
    print(f"제출 파일 로드 완료: {len(submission_df)}개 행")

    # 2. prediction 컬럼 확인
    if 'prediction' not in predictions_df.columns:
        print(f"'prediction' 컬럼이 없습니다. 사용 가능한 컬럼: {predictions_df.columns.tolist()}")
        raise ValueError("prediction 컬럼이 없습니다.")

    # 3. class 컬럼 업데이트
    submission_df['class'] = predictions_df['prediction'].values[:len(submission_df)]

    # 4. 업데이트된 파일 저장
    submission_df.to_csv(submission_path, index=False)
    print(f"\n{submission_path} 파일이 업데이트되었습니다.")

    # 5. 업데이트된 파일 확인
    print("\n업데이트된 submission.csv 파일의 처음 5개 행:")
    print(submission_df.head(5))

    # 6. 클래스 분포 확인
    if pd.api.types.is_numeric_dtype(submission_df['class']):
        class_counts = submission_df['class'].value_counts().sort_index()
        print("\n클래스 분포:")

        # 클래스 이름 매핑
        label_map = {
            0: '협박 대화',
            1: '갈취 대화',
            2: '직장 내 괴롭힘 대화',
            3: '기타 괴롭힘 대화',
            4: '일반대화'
        }

        for class_num, count in class_counts.items():
            class_name = label_map.get(class_num, f"알 수 없음({class_num})")
            print(f"클래스 {class_num} ({class_name}): {count}개")

except FileNotFoundError as e:
    print(f"파일을 찾을 수 없습니다: {e}")
    print("파일 경로가 올바른지 확인하세요.")
except Exception as e:
    print(f"오류가 발생했습니다: {e}")

예측 결과 파일 로드 완료: 500개 행
제출 파일 로드 완료: 500개 행

/content/submission.csv 파일이 업데이트되었습니다.

업데이트된 submission.csv 파일의 처음 5개 행:
  file_name  class
0     t_000      2
1     t_001      2
2     t_002      2
3     t_003      3
4     t_004      3

클래스 분포:
클래스 0 (협박 대화): 72개
클래스 1 (갈취 대화): 92개
클래스 2 (직장 내 괴롭힘 대화): 109개
클래스 3 (기타 괴롭힘 대화): 122개
클래스 4 (일반대화): 105개


# 3. 룰 기반 필터 추가

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModel, AutoTokenizer, BertModel, BertTokenizer
from torch.optim import AdamW
import pandas as pd
import numpy as np
import re  # Added for regex pattern matching
from sklearn.metrics import accuracy_score, f1_score, classification_report
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from torch.utils.data import WeightedRandomSampler

# Rule-based filter function
def rule_based_filter(text):
    # 이름+씨 패턴이 있고, 공격적인 말이 같이 있으면 괴롭힘
    if re.search(r"[가-힣]{2,4}씨", text):
        return 2  # '직장 내 괴롭힘 대화' 클래스에 해당하는 인덱스
    return None  # 없으면 None 반환해서 모델 기반 분류 진행

# KoELECTRA tokenizer 로드 - 다중 분류기용
electra_tokenizer = AutoTokenizer.from_pretrained("monologg/koelectra-base-discriminator")
# KcBERT tokenizer 로드 - 이진 분류기용
bert_tokenizer = BertTokenizer.from_pretrained("beomi/kcbert-base")

# 클래스명-정수 매핑
label_map_str2int = {
    '협박 대화': 0,
    '갈취 대화': 1,
    '직장 내 괴롭힘 대화': 2,
    '기타 괴롭힘 대화': 3,
    '일반대화': 4
}
label_map_int2str = {v: k for k, v in label_map_str2int.items()}

# 이진 분류용 SharedBERT 모델
class SharedBERT(nn.Module):
    def __init__(self, model_name="beomi/kcbert-base"):
        super().__init__()
        self.bert = BertModel.from_pretrained(model_name)
        for p in self.bert.parameters():
            p.requires_grad = False

    def forward(self, input_ids, attention_mask):
        return self.bert(input_ids=input_ids, attention_mask=attention_mask).pooler_output

# 이진 분류기 정의
class BinaryClassifier(nn.Module):
    def __init__(self, hidden_size=768):
        super().__init__()
        self.classifier = nn.Linear(hidden_size, 2)

    def forward(self, cls_output):
        return self.classifier(cls_output)

# 이진 분류 데이터셋
class BinaryClassificationDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length=128):
        self.texts = dataframe['conversation'].tolist()
        self.labels = [label_map_str2int[label] for label in dataframe['class'].tolist()]
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.binary_labels = [0 if label == 4 else 1 for label in self.labels]

        # 디버깅을 위한 레이블 분포 출력
        unique, counts = np.unique(self.binary_labels, return_counts=True)
        print(f"Binary label distribution: {dict(zip(unique, counts))}")

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoding = self.tokenizer(
            self.texts[idx],
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )
        item = {key: val.squeeze(0) for key, val in encoding.items()}
        item['label'] = torch.tensor(self.binary_labels[idx], dtype=torch.long)
        item['original_label'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

# 다중 분류용 데이터셋 클래스 정의
class MultiClassificationDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.label_map = label_map_str2int

        # 폭력 대화만 사용 (일반대화 제외)
        self.data = self.data[self.data['class'] != '일반대화'].copy()

        # 다중 레이블 설정
        self.multi_labels = [self.label_map[row['class']] for _, row in self.data.iterrows()]
        unique, counts = np.unique(self.multi_labels, return_counts=True)
        print(f"Multi-class label distribution: {dict(zip(unique, counts))}")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        text = self.data.iloc[index]['conversation']
        label_str = self.data.iloc[index]['class']
        label = self.label_map[label_str]  # 0-3 레이블

        inputs = self.tokenizer(text,
                               truncation=True,
                               padding='max_length',
                               max_length=self.max_len,
                               return_tensors="pt")

        input_ids = inputs['input_ids'].squeeze()
        attention_mask = inputs['attention_mask'].squeeze()

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'label': torch.tensor(label, dtype=torch.long)
        }

    # 샘플링 가중치 계산용 함수
    def get_labels(self):
        return self.multi_labels

# KoELECTRA + LSTM 다중 분류 모델 (4개 클래스)
class KoELECTRA_LSTM_MultiClassifier(nn.Module):
    def __init__(self, bert_model_name='monologg/koelectra-base-discriminator', hidden_size=128, lstm_layers=1, dropout=0.3, freeze_bert=False):
        super().__init__()
        self.bert = AutoModel.from_pretrained(bert_model_name)
        self.freeze_bert = freeze_bert
        self.lstm = nn.LSTM(input_size=self.bert.config.hidden_size, hidden_size=hidden_size,
                            num_layers=lstm_layers, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(dropout)
        self.classifier = nn.Linear(hidden_size * 2, 4)  # 4개 클래스 (0-3)

    def forward(self, input_ids, attention_mask):
        if self.freeze_bert:
            with torch.no_grad():
                bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        else:
            bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)

        sequence_output = bert_output.last_hidden_state
        lstm_output, _ = self.lstm(sequence_output)
        pooled_output = lstm_output[:, -1, :]
        out = self.dropout(pooled_output)
        logits = self.classifier(out)
        return logits

# 이진 분류기 학습 함수
def train_binary_model(shared_bert, binary_head, train_loader, optimizer, device, epochs=10):
    for epoch in range(epochs):
        shared_bert.eval()  # 사전 학습된 BERT는 고정
        binary_head.train()
        total_loss = 0
        train_preds = []
        train_true = []

        for batch in train_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()
            with torch.no_grad():
                cls_output = shared_bert(input_ids, attention_mask)
            logits = binary_head(cls_output)
            loss = F.cross_entropy(logits, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            # 학습 중 정확도 계산용
            preds = torch.argmax(logits, dim=1).cpu().numpy()
            train_preds.extend(preds)
            train_true.extend(labels.cpu().numpy())

        avg_train_loss = total_loss / len(train_loader)
        train_accuracy = accuracy_score(train_true, train_preds)
        train_f1 = f1_score(train_true, train_preds, average='weighted')

        print(f"[Binary Epoch {epoch+1}/{epochs}] Loss: {avg_train_loss:.4f}, Accuracy: {train_accuracy:.4f}, F1: {train_f1:.4f}")

    return binary_head

# 다중 분류기 학습 함수
def train_with_scheduler(model, train_loader, val_loader, optimizer, scheduler, criterion, device, epochs=5, patience=2):
    best_val_f1 = 0.0
    patience_counter = 0

    for epoch in range(epochs):
        # 학습
        model.train()
        total_loss = 0
        train_preds = []
        train_true = []

        for batch in train_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            optimizer.zero_grad()
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            loss = criterion(outputs, labels)
            loss.backward()

            # 그래디언트 클리핑 추가
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            optimizer.step()
            scheduler.step()  # 스케줄러 업데이트

            total_loss += loss.item()

            # 학습 중 정확도 계산용
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            train_preds.extend(preds)
            train_true.extend(labels.cpu().numpy())

        avg_train_loss = total_loss / len(train_loader)
        train_accuracy = accuracy_score(train_true, train_preds)
        train_f1 = f1_score(train_true, train_preds, average='weighted')

        # 검증
        model.eval()
        val_loss = 0
        val_preds = []
        val_true = []

        with torch.no_grad():
            for batch in val_loader:
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)

                outputs = model(input_ids=input_ids, attention_mask=attention_mask)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                preds = torch.argmax(outputs, dim=1).cpu().numpy()
                val_preds.extend(preds)
                val_true.extend(labels.cpu().numpy())

        # 클래스별 성능 평가
        val_report = classification_report(val_true, val_preds, output_dict=True)
        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = accuracy_score(val_true, val_preds)
        val_f1 = f1_score(val_true, val_preds, average='weighted')

        print(f"\nEpoch {epoch+1}/{epochs}")
        print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Train F1: {train_f1:.4f}")
        print(f"Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Val F1: {val_f1:.4f}")

        # 각 클래스별 성능 출력
        for cls in sorted(set(val_true)):
            cls_str = str(cls)
            if cls_str in val_report:
                print(f"Class {cls}: F1={val_report[cls_str]['f1-score']:.4f}, Recall={val_report[cls_str]['recall']:.4f}")

        # 조기 종료 체크 (F1 스코어 기반)
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            patience_counter = 0
            # 최적 모델 저장
            torch.save(model.state_dict(), 'best_multi_model.pt')
            print("새로운 최적 모델 저장!")
        else:
            patience_counter += 1
            print(f"성능 개선 없음: {patience_counter}/{patience}")
            if patience_counter >= patience:
                print("조기 종료!")
                break

    # 최적 모델 로드
    model.load_state_dict(torch.load('best_multi_model.pt'))
    return model

# 학습 준비
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 데이터 로드
data = pd.read_csv("/content/sample_data/merged_dataset.csv")
print(f"Total data size: {len(data)}")

# 클래스 분포 확인
class_counts = data['class'].value_counts()
print("Class distribution:")
print(class_counts)

# 이진 분류 및 다중 분류를 위한 데이터 분할
train_data, val_data = train_test_split(data, test_size=0.2, stratify=data['class'], random_state=42)
print(f"Training set: {len(train_data)}, validation set: {len(val_data)}")

# 이진 분류 모델 준비
shared_bert = SharedBERT(model_name="beomi/kcbert-base").to(device)
binary_head = BinaryClassifier().to(device)

# 이진 분류 데이터셋 및 데이터로더 생성
train_dataset_binary = BinaryClassificationDataset(train_data, bert_tokenizer, max_length=128)
val_dataset_binary = BinaryClassificationDataset(val_data, bert_tokenizer, max_length=128)

# 이진 분류 데이터로더
train_loader_binary = DataLoader(train_dataset_binary, batch_size=32, shuffle=True)
val_loader_binary = DataLoader(val_dataset_binary, batch_size=32, shuffle=False)

# 이진 분류 모델 학습
optimizer_binary = torch.optim.Adam(binary_head.parameters(), lr=2e-5)

print("\n[INFO] Training Binary Classifier...")
binary_head = train_binary_model(shared_bert, binary_head, train_loader_binary, optimizer_binary, device, epochs=10)

# 이진 분류 모델 저장
torch.save(binary_head.state_dict(), "binary_classifier.pt")
print("Binary classifier model saved as 'binary_classifier.pt'")

# 다중 분류용 데이터셋 준비 - 일반대화 제외
train_dataset_multi = MultiClassificationDataset(train_data, electra_tokenizer, max_len=128)
val_dataset_multi = MultiClassificationDataset(val_data, electra_tokenizer, max_len=128)

# 불균형 데이터 처리를 위한 가중치 샘플링 설정 (다중 분류용)
multi_labels = train_dataset_multi.get_labels()
unique_multi_labels = np.unique(multi_labels)
multi_class_weights = compute_class_weight(class_weight='balanced',
                                         classes=unique_multi_labels,
                                         y=multi_labels)
# 클래스 인덱스를 dictionary로 변환
multi_weight_dict = {cls: weight for cls, weight in zip(unique_multi_labels, multi_class_weights)}
multi_samples_weight = [multi_weight_dict[lbl] for lbl in multi_labels]
multi_sampler = WeightedRandomSampler(weights=multi_samples_weight,
                                    num_samples=len(multi_samples_weight),
                                    replacement=True)

# 다중 분류용 데이터로더 생성
train_loader_multi = DataLoader(train_dataset_multi, batch_size=16, sampler=multi_sampler)
val_loader_multi = DataLoader(val_dataset_multi, batch_size=16, shuffle=False)

# 다중 분류 모델 생성
model_multi = KoELECTRA_LSTM_MultiClassifier(freeze_bert=False).to(device)

# 손실 함수 정의 (클래스 가중치 적용) - 다중 분류용
multi_loss_weights = torch.tensor(multi_class_weights, dtype=torch.float).to(device)
criterion_multi = nn.CrossEntropyLoss(weight=multi_loss_weights)

# 옵티마이저 설정 - 다중 분류용
optimizer_multi = AdamW(model_multi.parameters(), lr=2e-5)

# 학습률 스케줄러 추가
from transformers import get_linear_schedule_with_warmup

# 총 훈련 스텝 계산 - 다중 분류용
num_epochs = 5
total_steps_multi = len(train_loader_multi) * num_epochs
warmup_steps_multi = int(total_steps_multi * 0.1)  # 10% 웜업
scheduler_multi = get_linear_schedule_with_warmup(
    optimizer_multi, num_warmup_steps=warmup_steps_multi, num_training_steps=total_steps_multi
)

# 다중 분류 모델 학습
print("\n[INFO] Training Multi Classifier...")
model_multi = train_with_scheduler(model_multi, train_loader_multi, val_loader_multi,
                  optimizer_multi, scheduler_multi, criterion_multi, device, epochs=num_epochs, patience=2)

# 추론 함수 - 룰 기반 필터링 추가
def predict(text, binary_threshold=0.5):
    # 룰 기반 필터 먼저 적용
    rule_result = rule_based_filter(text)
    if rule_result is not None:
        print(f"Rule-based filter triggered: {rule_result} ({label_map_int2str[rule_result]})")
        return rule_result

    # 이진 분류 (일반/폭력)
    inputs_bert = bert_tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    input_ids_bert = inputs_bert["input_ids"].to(device)
    attention_mask_bert = inputs_bert["attention_mask"].to(device)

    with torch.no_grad():
        # SharedBERT로 특성 추출
        cls_output = shared_bert(input_ids_bert, attention_mask_bert)
        # 이진 분류
        binary_logits = binary_head(cls_output)
        binary_probs = F.softmax(binary_logits, dim=1)

        # 확률값 추출
        normal_prob = binary_probs[0][0].item()  # 일반대화 확률
        violent_prob = binary_probs[0][1].item()  # 폭력대화 확률

        print(f"일반대화 확률: {normal_prob:.4f}, 폭력대화 확률: {violent_prob:.4f}")

        # 임계값 기반 분류
        binary_pred = 1 if violent_prob > binary_threshold else 0

        print(f"Binary prediction: {binary_pred} ({'폭력대화' if binary_pred == 1 else '일반대화'})")

        # 일반 대화로 분류된 경우
        if binary_pred == 0:
            return 4  # 일반대화

        # 폭력 대화로 분류된 경우, 세부 유형 분류
        inputs_electra = electra_tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
        input_ids_electra = inputs_electra["input_ids"].to(device)
        attention_mask_electra = inputs_electra["attention_mask"].to(device)

        multi_logits = model_multi(input_ids=input_ids_electra, attention_mask=attention_mask_electra)
        multi_probs = F.softmax(multi_logits, dim=1)
        multi_pred = torch.argmax(multi_logits, dim=1).item()

        # 확률값 출력
        for i, prob in enumerate(multi_probs[0].cpu().numpy()):
            print(f"Class {i} ({label_map_int2str[i]}) 확률: {prob:.4f}")

        print(f"Multi prediction: {multi_pred} ({label_map_int2str[multi_pred]})")
        return multi_pred

# 테스트셋 평가
print("\n[INFO] Loading test set...")
test_df = pd.read_csv("/content/sample_data/test.csv")
print(f"Test set size: {len(test_df)}")

predicted_labels = []

# 원래 레이블 매핑과 동일한 매핑 사용
label_map = label_map_int2str

# text 컬럼을 기준으로 예측 수행
test_texts = test_df["text"].tolist()
for i, text in enumerate(test_texts):
    print(f"\nPredicting sample {i+1}/{len(test_texts)}")
    pred = predict(text, binary_threshold=0.6)  # 임계값 적용
    predicted_labels.append(pred)
    print(f"Final prediction: {pred} ({label_map[pred]})")

test_df["prediction"] = predicted_labels
test_df["prediction_label"] = test_df["prediction"].map(label_map)

# 클래스별 예측 분포 확인
pred_counts = test_df["prediction_label"].value_counts()
print("\nPrediction distribution:")
print(pred_counts)

# 실제 레이블이 있는 경우 평가 지표 계산
if "class" in test_df.columns:
    test_df["true_label_id"] = test_df["class"].map(label_map_str2int)
    true_labels = test_df["true_label_id"].tolist()

    print("\n[INFO] Classification Report:")
    print(classification_report(true_labels, predicted_labels, target_names=list(label_map_str2int.keys())))

    accuracy = accuracy_score(true_labels, predicted_labels)
    f1 = f1_score(true_labels, predicted_labels, average='weighted')

    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Score (weighted): {f1:.4f}")

# 결과 저장
test_df.to_csv("/content/test_predictions_combined.csv", index=False)
print("\n[INFO] 예측 결과 저장 완료: /content/test_predictions_combined.csv")

Using device: cuda
Total data size: 7637
Class distribution:
class
일반대화           3687
기타 괴롭힘 대화      1094
갈취 대화           981
직장 내 괴롭힘 대화     979
협박 대화           896
Name: count, dtype: int64
Training set: 6109, validation set: 1528
Binary label distribution: {np.int64(0): np.int64(2949), np.int64(1): np.int64(3160)}
Binary label distribution: {np.int64(0): np.int64(738), np.int64(1): np.int64(790)}

[INFO] Training Binary Classifier...
[Binary Epoch 1/10] Loss: 0.7073, Accuracy: 0.5562, F1: 0.5056
[Binary Epoch 2/10] Loss: 0.6389, Accuracy: 0.7464, F1: 0.7464
[Binary Epoch 3/10] Loss: 0.5930, Accuracy: 0.7959, F1: 0.7958
[Binary Epoch 4/10] Loss: 0.5550, Accuracy: 0.8253, F1: 0.8253
[Binary Epoch 5/10] Loss: 0.5214, Accuracy: 0.8520, F1: 0.8520
[Binary Epoch 6/10] Loss: 0.4914, Accuracy: 0.8798, F1: 0.8799
[Binary Epoch 7/10] Loss: 0.4639, Accuracy: 0.9057, F1: 0.9057
[Binary Epoch 8/10] Loss: 0.4386, Accuracy: 0.9204, F1: 0.9205
[Binary Epoch 9/10] Loss: 0.4155, Accuracy: 0.9311, F1