In [None]:
import pandas as pd
import ollama
import os
import re

# 경로 설정
input_dir = "./Data/None"
output_dir = "./Data/ZeroShot"
os.makedirs(output_dir, exist_ok=True)  # 출력 디렉토리 생성

# 클래스 이름 한국어로 매핑
hate_type_map = {
    0: '행복',
    1: '놀람',
    2: '분노',
    3: '공포',
    4: '혐오',
    5: '슬픔',
    6: '중립'
}

# 데이터 생성 함수
def generate_data(hate_type, additional_count, existing_data, output_path):
    generated_set = set(existing_data['document'].dropna())  # 중복 방지용 집합
    hate_type_kor = hate_type_map.get(hate_type, hate_type)  # 한국어로 변환

    while additional_count > 0:
        try:
            # Train 데이터에서 해당 클래스의 랜덤 예시 3개 추출
            examples = existing_data[existing_data['label'] == hate_type]['document'].dropna().sample(1).tolist()
            prompt = f"""
            아래의 형식에 맞추어 {hate_type_kor} 유형의 새로운 문장을 하나 작성해줘.
            **출력 형식: {{새로운 문장}}**
            다른 형식의 출력은 절대 사용하지마.

            이제 새로운 문장을 작성해줘.
            """

            # Ollama 호출
            response = ollama.chat(model="exaone3.5:2.4b", messages=[
                {
                    "role": "user",
                    "content": prompt.strip(),
                }
            ])
            generated_text = response['message']['content'].strip()

            # 정규 표현식으로 { } 안의 내용 추출
            match = re.search(r"\{(.+?)\}", generated_text)
            if match:
                new_comment = match.group(1).strip()

                # 중복 확인
                if new_comment in generated_set:
                    continue

                # 새로운 데이터 추가
                new_row = {"document": new_comment, "label": hate_type}
                existing_data = pd.concat([existing_data, pd.DataFrame([new_row])], ignore_index=True)
                generated_set.add(new_comment)
                additional_count -= 1

                # 중간 저장
                existing_data.to_csv(output_path, index=False, encoding="utf-8-sig")

        except Exception as e:
            print(f"오류 발생: {e}")
            continue

    return existing_data

# 각 파일 처리
for file_name in sorted(os.listdir(input_dir)):
    if file_name.endswith(".csv"):
        input_path = os.path.join(input_dir, file_name)
        output_file = f"ZeroShot_{file_name.split('_')[-1]}"
        output_path = os.path.join(output_dir, output_file)

        # 파일 로드
        gr_data = pd.read_csv(input_path)

        # 클래스별 100% 추가 생성
        current_counts = gr_data['label'].value_counts()
        for hate_type, count in current_counts.items():
            additional_count = count  # 100% 증가
            gr_data = generate_data(hate_type, additional_count, gr_data, output_path)

        # 클래스 추가 후 확인
        final_counts = gr_data['label'].value_counts()
        print(f"{output_path} - 모든 클래스의 데이터가 100% 증가했습니다. 최종 분포: {final_counts}")

# 최종 완료 메시지
print("모든 파일에 대한 데이터 생성 및 저장이 완료되었습니다.")


./Data/ZeroShot/ZeroShot_1.csv - 모든 클래스의 데이터가 100% 증가했습니다. 최종 분포: label
5    36
1    34
2    30
4    28
0    26
3    26
6    20
Name: count, dtype: int64


In [None]:
import pandas as pd
import ollama
import os
import re

# 경로 설정
input_dir = "./Data/None"
output_dir = "./Data/OneShot"
os.makedirs(output_dir, exist_ok=True)  # 출력 디렉토리 생성

# 클래스 이름 한국어로 매핑
hate_type_map = {
    0: '행복',
    1: '놀람',
    2: '분노',
    3: '공포',
    4: '혐오',
    5: '슬픔',
    6: '중립'
}

# 데이터 생성 함수
def generate_data(hate_type, additional_count, existing_data, output_path):
    generated_set = set(existing_data['document'].dropna())  # 중복 방지용 집합
    hate_type_kor = hate_type_map.get(hate_type, hate_type)  # 한국어로 변환

    # 해당 hate 유형의 예시 목록 추출
    examples = existing_data[existing_data['label'] == hate_type]['document'].dropna().tolist()
    example_index = 0  # 순서대로 예시를 사용하기 위한 인덱스

    while additional_count > 0:
        try:
            # 현재 예시 선택
            example = examples[example_index]
            example_index = (example_index + 1) % len(examples)  # 순환적으로 접근

            # 프롬프트 구성
            prompt = f"""
            아래는 {hate_type_kor} 유형의 문장 예시야:
            예시: "{example}"

            위 예시를 참고하여 {hate_type_kor} 유형의 새로운 문장을 하나 작성해줘.
            **출력 형식: {{새로운 문장}}**
            다른 형식의 출력은 절대 사용하지마.

            이제 새로운 문장을 작성해줘.
            """

            # Ollama 호출
            response = ollama.chat(model="exaone3.5:2.4b", messages=[
                {
                    "role": "user",
                    "content": prompt.strip(),
                }
            ])
            generated_text = response['message']['content'].strip()

            # 정규 표현식으로 { } 안의 내용 추출
            match = re.search(r"\{(.+?)\}", generated_text)
            if match:
                new_comment = match.group(1).strip()

                # 중복 확인
                if new_comment in generated_set:
                    continue

                # 새로운 데이터 추가
                new_row = {"document": new_comment, "label": hate_type}
                existing_data = pd.concat([existing_data, pd.DataFrame([new_row])], ignore_index=True)
                generated_set.add(new_comment)
                additional_count -= 1

                # 중간 저장
                existing_data.to_csv(output_path, index=False, encoding="utf-8-sig")

        except Exception as e:
            print(f"오류 발생: {e}")
            continue

    return existing_data

# 각 파일 처리
for file_name in sorted(os.listdir(input_dir)):
    if file_name.endswith(".csv"):
        input_path = os.path.join(input_dir, file_name)
        output_file = f"OneShot_{file_name.split('_')[-1]}"
        output_path = os.path.join(output_dir, output_file)

        # 파일 로드
        gr_data = pd.read_csv(input_path)

        # 클래스별 100% 추가 생성
        current_counts = gr_data['label'].value_counts()
        for hate_type, count in current_counts.items():
            additional_count = count  # 100% 증가
            gr_data = generate_data(hate_type, additional_count, gr_data, output_path)

        # 클래스 추가 후 확인
        final_counts = gr_data['label'].value_counts()
        print(f"{output_path} - 모든 클래스의 데이터가 100% 증가했습니다. 최종 분포: {final_counts}")

# 최종 완료 메시지
print("모든 파일에 대한 데이터 생성 및 저장이 완료되었습니다.")


In [None]:
import pandas as pd
import ollama
import os
import re

# 경로 설정
input_dir = "./Data/None"
output_dir = "./Data/ThreeShot"
os.makedirs(output_dir, exist_ok=True)  # 출력 디렉토리 생성

# 클래스 이름 한국어로 매핑
hate_type_map = {
    0: '행복',
    1: '놀람',
    2: '분노',
    3: '공포',
    4: '혐오',
    5: '슬픔',
    6: '중립'
}

# 데이터 생성 함수
def generate_data(hate_type, additional_count, existing_data, output_path):
    generated_set = set(existing_data['document'].dropna())  # 중복 방지용 집합
    hate_type_kor = hate_type_map.get(hate_type, hate_type)  # 한국어로 변환

    # 해당 hate 유형의 예시 순서대로 추출
    examples = existing_data[existing_data['label'] == hate_type]['document'].dropna().tolist()
    example_count = len(examples)

    while additional_count > 0:
        try:
            # 순서대로 3개의 예시를 구성
            start_index = (example_count - additional_count) % example_count
            current_examples = examples[start_index:start_index + 3]

            # 순환 처리로 3개가 안되면 앞에서 추가
            while len(current_examples) < 3:
                current_examples.append(examples[len(current_examples) % example_count])

            # 프롬프트 구성
            prompt = f"""
            아래는 {hate_type_kor} 유형의 문장 예시야:
            예시 1: "{current_examples[0]}"
            예시 2: "{current_examples[1]}"
            예시 3: "{current_examples[2]}"

            위 예시를 참고하여 {hate_type_kor} 유형의 새로운 문장을 하나 작성해줘.
            **출력 형식: {{새로운 문장}}**
            다른 형식의 출력은 포함하지 말아줘.

            이제 새로운 문장을 작성해줘.
            """

            # Ollama 호출
            response = ollama.chat(model="exaone3.5:2.4b", messages=[
                {
                    "role": "user",
                    "content": prompt.strip(),
                }
            ])
            generated_text = response['message']['content'].strip()

            # 정규 표현식으로 { } 안의 내용 추출
            match = re.search(r"\{(.+?)\}", generated_text)
            if match:
                new_comment = match.group(1).strip()

                # 중복 확인
                if new_comment in generated_set:
                    continue

                # 새로운 데이터 추가
                new_row = {"document": new_comment, "label": hate_type}
                existing_data = pd.concat([existing_data, pd.DataFrame([new_row])], ignore_index=True)
                generated_set.add(new_comment)
                additional_count -= 1

                # 중간 저장
                existing_data.to_csv(output_path, index=False, encoding="utf-8-sig")

        except Exception as e:
            print(f"오류 발생: {e}")
            continue

    return existing_data

# 각 파일 처리
for file_name in sorted(os.listdir(input_dir)):
    if file_name.endswith(".csv"):
        input_path = os.path.join(input_dir, file_name)
        output_file = f"ThreeShot_{file_name.split('_')[-1]}"
        output_path = os.path.join(output_dir, output_file)

        # 파일 로드
        gr_data = pd.read_csv(input_path)

        # 클래스별 100% 추가 생성
        current_counts = gr_data['label'].value_counts()
        for hate_type, count in current_counts.items():
            additional_count = count  # 100% 증가
            gr_data = generate_data(hate_type, additional_count, gr_data, output_path)

        # 클래스 추가 후 확인
        final_counts = gr_data['label'].value_counts()
        print(f"{output_path} - 모든 클래스의 데이터가 100% 증가했습니다. 최종 분포: {final_counts}")

# 최종 완료 메시지
print("모든 파일에 대한 데이터 생성 및 저장이 완료되었습니다.")


In [None]:
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AdamW
from sklearn.metrics import f1_score
import os

# 사용자 정의 데이터셋 클래스
class SimpleDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_len):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        inputs = self.tokenizer(
            self.texts[idx],
            add_special_tokens=True,
            max_length=self.max_len,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )
        return {
            "input_ids": inputs["input_ids"].squeeze(0),
            "attention_mask": inputs["attention_mask"].squeeze(0),
            "label": torch.tensor(self.labels[idx], dtype=torch.long),
        }

# 경로 설정
train_dirs = ["./Data/ZeroShot", "./Data/OneShot", "./Data/ThreeShot"]
test_file = "./Data/Test.csv"
summary_file = "./Data/Summary.csv"

# 모델 및 토크나이저 초기화
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = AutoModelForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=7)  # 이진 분류
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 테스트 데이터 준비
test_data = pd.read_csv(test_file)

# `document` 열 문자열로 변환 및 결측치 처리
test_data["document"] = test_data["document"].astype(str).fillna("")

test_texts = test_data["document"].tolist()
test_labels = test_data["label"].tolist()  # 'label' 열 사용
test_dataset = SimpleDataset(test_texts, test_labels, tokenizer, max_len=128)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 결과 저장용 DataFrame 초기화
if os.path.exists(summary_file):
    results_df = pd.read_csv(summary_file)
else:
    results_df = pd.DataFrame(columns=["Train Dir", "File", "F1 Micro", "F1 Macro"])

# 학습 루프
for train_dir in train_dirs:
    for train_file in sorted(os.listdir(train_dir)):
        if train_file.endswith(".csv"):
            input_path = os.path.join(train_dir, train_file)
            train_data = pd.read_csv(input_path)
            
            # `document` 열 문자열로 변환 및 결측치 처리
            train_data["document"] = train_data["document"].astype(str).fillna("")

            train_texts = train_data["document"].tolist()
            train_labels = train_data["label"].tolist()  # 'label' 열 사용

            train_dataset = SimpleDataset(train_texts, train_labels, tokenizer, max_len=128)
            train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

            # 옵티마이저 설정
            optimizer = AdamW(model.parameters(), lr=5e-5)

            # 학습
            model.train()
            for epoch in range(3):  # 3 에포크
                total_loss = 0
                for batch in train_loader:
                    input_ids = batch["input_ids"].to(device)
                    attention_mask = batch["attention_mask"].to(device)
                    labels = batch["label"].to(device)

                    optimizer.zero_grad()
                    outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
                    loss = outputs.loss
                    loss.backward()
                    optimizer.step()

                    total_loss += loss.item()

                print(f"{train_file} - Epoch {epoch + 1} - Loss: {total_loss:.4f}")

            # 평가
            model.eval()
            all_labels = []
            all_preds = []
            with torch.no_grad():
                for batch in test_loader:
                    input_ids = batch["input_ids"].to(device)
                    attention_mask = batch["attention_mask"].to(device)
                    labels = batch["label"].to(device)

                    outputs = model(input_ids, attention_mask=attention_mask)
                    preds = torch.argmax(outputs.logits, axis=1).cpu().numpy()
                    all_preds.extend(preds)
                    all_labels.extend(labels.cpu().numpy())

            f1_micro = f1_score(all_labels, all_preds, average="micro")
            f1_macro = f1_score(all_labels, all_preds, average="macro")
            print(f"Test Results for {train_file}: F1 Micro = {f1_micro:.4f}, F1 Macro = {f1_macro:.4f}")

            # 결과 추가
            results_df = pd.concat([results_df, pd.DataFrame([{
                "File": train_file,
                "F1 Micro": f1_micro,
                "F1 Macro": f1_macro,
            }])], ignore_index=True)

# 결과 저장
results_df.to_csv(summary_file, index=False)
print(f"Summary saved to {summary_file}")
