# NVIDIA NeMo Curator 데이터 큐레이션 종합 실습

이 노트북은 NeMo Curator를 사용한 데이터 큐레이션과 합성 데이터 생성의 전체 워크플로우를 실습합니다.

## 목차
1. [환경 설정](#1.-환경-설정)
2. [기본 데이터 큐레이션](#2.-기본-데이터-큐레이션)
3. [주제 및 부주제 생성](#3.-주제-및-부주제-생성)
4. [Q&A 데이터셋 생성](#4.-Q&A-데이터셋-생성)
5. [수학 문제 생성](#5.-수학-문제-생성)
6. [종합 실습](#6.-종합-실습)

## 1. 환경 설정

### 1.1 필요한 라이브러리 Import

In [None]:
import os
import warnings
import re
from typing import List, Dict, Tuple

# NeMo Curator
from nemo_curator import OpenAIClient, Sequential, ScoreFilter
from nemo_curator.datasets import DocumentDataset
from nemo_curator.modifiers import DocumentModifier, UnicodeReformatter
from nemo_curator.modifiers.pii_modifier import PiiModifier
from nemo_curator.modules.modify import Modify
from nemo_curator.filters import WordCountFilter, DocumentFilter
from nemo_curator.synthetic import NemotronGenerator
from nemo_curator.synthetic.error import YamlConversionError
from nemo_curator.utils.distributed_utils import get_client

# OpenAI
from openai import OpenAI

# Others
import pandas as pd
import dask
import dask.dataframe

warnings.filterwarnings('ignore')
print('라이브러리 import 완료')

### 1.2 Dask 클러스터 초기화

In [None]:
# CPU 기반 Dask 클러스터 시작
client = get_client(cluster_type='cpu')
print(f'Dask 클러스터 시작됨: {client}')

### 1.3 NVIDIA API 클라이언트 설정

> **참고**: API 키는 환경변수로 설정되어 있어야 합니다.

In [None]:
# OpenAI 클라이언트 초기화
openai_client = OpenAI(
    base_url=os.environ.get('NVIDIA_BASE_URL', 'https://integrate.api.nvidia.com/v1'),
    api_key=os.environ.get('NVIDIA_API_KEY', 'your-api-key-here'),
)

# NeMo Curator 클라이언트 초기화
curator_client = OpenAIClient(openai_client)
generator = NemotronGenerator(curator_client)

# 모델 설정
model = 'mistralai/mistral-7b-instruct-v0.3'
model_kwargs = {
    'temperature': 0.1,
    'top_p': 0.9,
    'max_tokens': 1024,
}

print('API 클라이언트 설정 완료')

## 2. 기본 데이터 큐레이션

### 2.1 커스텀 텍스트 정제기 구현

In [None]:
class QuotationTagUnifier(DocumentModifier):
    """
    인용 부호와 HTML 태그를 정규화하고 불필요한 요소를 제거하는 클래스
    """
    
    def modify_document(self, text: str) -> str:
        # 모든 종류의 따옴표를 표준 ASCII 따옴표로 변환
        text = re.sub(r'[''\u2018\u2019]', "'", text)
        text = re.sub(r'[""\u201c\u201d]', '"', text)
        
        # 탭을 공백으로 변환
        text = re.sub(r'\t', ' ', text)
        
        # HTML 태그 제거
        text = re.sub(r'<[^>]+>', '', text)
        
        # URL 제거
        text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)
        
        # 연속된 공백을 하나로 압축
        text = re.sub(r' +', ' ', text)
        
        # 연속된 줄바꿈을 최대 2개로 제한
        text = re.sub(r'\n{3,}', '\n\n', text)
        
        return text.strip()

print('QuotationTagUnifier 클래스 정의 완료')

### 2.2 불완전 문서 필터 구현

In [None]:
class IncompleteDocumentFilter(DocumentFilter):
    """
    불완전한 문서를 필터링하는 클래스
    - 문장이 완결되지 않은 문서
    - 너무 짧은 문서
    - 반복이 많은 문서
    """
    
    def __init__(self, min_words=50, min_sentences=3):
        self.min_words = min_words
        self.min_sentences = min_sentences
    
    def score_document(self, text: str) -> bool:
        # 단어 수 체크
        words = text.split()
        if len(words) < self.min_words:
            return False
        
        # 문장 수 체크
        sentences = re.split(r'[.!?]+', text)
        sentences = [s.strip() for s in sentences if s.strip()]
        if len(sentences) < self.min_sentences:
            return False
        
        # 마지막 문장이 완결되었는지 체크
        if text and text[-1] not in '.!?':
            return False
        
        # 반복 체크 (같은 단어가 전체의 30% 이상)
        word_freq = {}
        for word in words:
            word_freq[word] = word_freq.get(word, 0) + 1
        
        max_freq = max(word_freq.values()) if word_freq else 0
        if max_freq / len(words) > 0.3:
            return False
        
        return True

print('IncompleteDocumentFilter 클래스 정의 완료')

### 2.3 샘플 데이터셋 준비 및 큐레이션 실습

In [None]:
# 샘플 데이터 생성
sample_data = pd.DataFrame({
    'text': [
        '인공지능은 컴퓨터 과학의 한 분야입니다. 기계 학습과 딥러닝은 인공지능의 주요 기술입니다. 이들은 데이터로부터 패턴을 학습합니다.',
        '짧은 텍스트',  # 필터링될 예정
        '<html><body>HTML 태그가 포함된 "특수 문자" 텍스트입니다. 이것은 정제되어야 합니다. https://example.com 링크도 제거됩니다.</body></html>',
        '반복 반복 반복 반복 반복 반복 반복 반복 반복',  # 필터링될 예정
        '자연어 처리는 컴퓨터가 인간의 언어를 이해하고 처리하는 기술입니다. 최근 트랜스포머 모델의 등장으로 큰 발전을 이루었습니다. GPT와 BERT가 대표적인 예입니다.'
    ],
    'id': range(5)
})

# Dask DataFrame으로 변환
dask_df = dask.dataframe.from_pandas(sample_data, npartitions=2)

# DocumentDataset 생성
dataset = DocumentDataset(dask_df)

print(f'원본 데이터셋 크기: {len(dataset.df)}')
print('\n원본 데이터 샘플:')
print(sample_data['text'].head())

In [None]:
# 큐레이션 파이프라인 구성
curation_pipeline = Sequential([
    # 1. Unicode 정규화
    Modify(UnicodeReformatter()),
    
    # 2. 커스텀 텍스트 정제
    Modify(QuotationTagUnifier()),
    
    # 3. 불완전 문서 필터링
    ScoreFilter(IncompleteDocumentFilter(min_words=10, min_sentences=2)),
    
    # 4. 단어 수 기반 필터링
    ScoreFilter(WordCountFilter(min_words=5)),
])

# 파이프라인 실행
curated_dataset = curation_pipeline(dataset)

print('\n큐레이션 완료!')
print(f'큐레이션 후 데이터셋 크기: {len(curated_dataset.df)}')
print('\n큐레이션된 데이터 샘플:')
print(curated_dataset.df.compute()['text'].head())

### 2.4 PII (개인정보) 제거

In [None]:
# PII가 포함된 샘플 데이터
pii_sample = pd.DataFrame({
    'text': [
        '제 이메일은 hong.gildong@example.com이고 전화번호는 010-1234-5678입니다.',
        '주민등록번호는 123456-1234567이며 계좌번호는 110-123-456789입니다.',
        '안전한 텍스트로 개인정보가 없습니다.'
    ],
    'id': range(3)
})

pii_dask = dask.dataframe.from_pandas(pii_sample, npartitions=1)
pii_dataset = DocumentDataset(pii_dask)

print('원본 데이터:')
for text in pii_sample['text']:
    print(f'- {text}')

In [None]:
# PII 제거 파이프라인
pii_remover = PiiModifier(
    supported_entities=[
        'EMAIL_ADDRESS',
        'PHONE_NUMBER',
        'CREDIT_CARD',
        'US_SSN',  # 주민등록번호 유사
    ],
    anonymize_action='replace',  # 'replace', 'redact', 'hash' 중 선택
    device='cpu'
)

pii_cleaned = Modify(pii_remover)(pii_dataset)

print('\nPII 제거 후:')
for text in pii_cleaned.df.compute()['text']:
    print(f'- {text}')

## 3. 주제 및 부주제 생성

### 3.1 매크로 주제 생성

In [None]:
def generate_macro_topics(
    domain: str,
    num_topics: int = 5,
    model: str = 'mistralai/mistral-7b-instruct-v0.3'
) -> List[str]:
    """
    특정 도메인에 대한 매크로 주제들을 생성합니다.
    
    Args:
        domain: 주제를 생성할 도메인 (예: '인공지능', '의료', '금융')
        num_topics: 생성할 주제 수
        model: 사용할 LLM 모델
    
    Returns:
        생성된 주제 리스트
    """
    prompt = f"""
당신은 {domain} 분야의 전문가입니다.
{domain} 분야에서 중요한 {num_topics}개의 주요 주제(macro topics)를 생성해주세요.

각 주제는:
1. 구체적이고 명확해야 합니다
2. 서로 중복되지 않아야 합니다
3. 해당 분야에서 실제로 중요한 주제여야 합니다

형식: 각 주제를 한 줄에 하나씩 나열해주세요.
"""
    
    try:
        response = openai_client.chat.completions.create(
            model=model,
            messages=[{'role': 'user', 'content': prompt}],
            temperature=0.7,
            max_tokens=500
        )
        
        topics_text = response.choices[0].message.content
        topics = [line.strip() for line in topics_text.split('\n') if line.strip() and not line.strip().startswith('#')]
        
        # 번호나 특수문자 제거
        topics = [re.sub(r'^[\d\.\-\*]+\s*', '', topic) for topic in topics]
        
        return topics[:num_topics]
    
    except Exception as e:
        print(f'오류 발생: {e}')
        return []

print('generate_macro_topics 함수 정의 완료')

In [None]:
# 매크로 주제 생성 실습
domain = '인공지능'
macro_topics = generate_macro_topics(domain, num_topics=5)

print(f'{domain} 분야 매크로 주제:')
for i, topic in enumerate(macro_topics, 1):
    print(f'{i}. {topic}')

### 3.2 부주제 생성

In [None]:
def generate_subtopics(
    macro_topic: str,
    num_subtopics: int = 3,
    model: str = 'mistralai/mistral-7b-instruct-v0.3'
) -> List[str]:
    """
    매크로 주제에 대한 부주제들을 생성합니다.
    
    Args:
        macro_topic: 상위 주제
        num_subtopics: 생성할 부주제 수
        model: 사용할 LLM 모델
    
    Returns:
        생성된 부주제 리스트
    """
    prompt = f"""
주제: {macro_topic}

위 주제에 대한 {num_subtopics}개의 구체적인 부주제(subtopics)를 생성해주세요.

각 부주제는:
1. 상위 주제와 명확한 관련성이 있어야 합니다
2. 충분히 구체적이어야 합니다
3. 서로 다른 측면을 다루어야 합니다

형식: 각 부주제를 한 줄에 하나씩 나열해주세요.
"""
    
    try:
        response = openai_client.chat.completions.create(
            model=model,
            messages=[{'role': 'user', 'content': prompt}],
            temperature=0.7,
            max_tokens=400
        )
        
        subtopics_text = response.choices[0].message.content
        subtopics = [line.strip() for line in subtopics_text.split('\n') if line.strip() and not line.strip().startswith('#')]
        
        # 번호나 특수문자 제거
        subtopics = [re.sub(r'^[\d\.\-\*]+\s*', '', topic) for topic in subtopics]
        
        return subtopics[:num_subtopics]
    
    except Exception as e:
        print(f'오류 발생: {e}')
        return []

print('generate_subtopics 함수 정의 완료')

In [None]:
# 부주제 생성 실습
if macro_topics:
    selected_topic = macro_topics[0]
    subtopics = generate_subtopics(selected_topic, num_subtopics=3)
    
    print(f'매크로 주제: {selected_topic}\n')
    print('부주제:')
    for i, subtopic in enumerate(subtopics, 1):
        print(f'  {i}. {subtopic}')

## 4. Q&A 데이터셋 생성

### 4.1 NeMo Curator를 사용한 Q&A 생성

In [None]:
def generate_qa_pairs(
    topic: str,
    num_pairs: int = 5,
    difficulty: str = 'medium'
) -> List[Dict[str, str]]:
    """
    특정 주제에 대한 Q&A 쌍을 생성합니다.
    
    Args:
        topic: Q&A를 생성할 주제
        num_pairs: 생성할 Q&A 쌍의 수
        difficulty: 난이도 ('easy', 'medium', 'hard')
    
    Returns:
        Q&A 쌍 리스트 [{'question': ..., 'answer': ...}, ...]
    """
    difficulty_desc = {
        'easy': '기초적이고 이해하기 쉬운',
        'medium': '중급 수준의',
        'hard': '전문적이고 심화된'
    }
    
    prompt = f"""
주제: {topic}

{difficulty_desc.get(difficulty, '중급 수준의')} 질문과 답변 쌍을 {num_pairs}개 생성해주세요.

요구사항:
1. 질문은 명확하고 구체적이어야 합니다
2. 답변은 정확하고 유익한 정보를 포함해야 합니다
3. 답변은 2-4문장 정도의 적절한 길이여야 합니다

형식:
Q1: [질문]
A1: [답변]

Q2: [질문]
A2: [답변]
"""
    
    try:
        response = openai_client.chat.completions.create(
            model=model,
            messages=[{'role': 'user', 'content': prompt}],
            **model_kwargs
        )
        
        content = response.choices[0].message.content
        
        # Q&A 파싱
        qa_pairs = []
        lines = content.split('\n')
        
        current_q = None
        for line in lines:
            line = line.strip()
            if re.match(r'^Q\d+:', line):
                current_q = re.sub(r'^Q\d+:\s*', '', line)
            elif re.match(r'^A\d+:', line) and current_q:
                answer = re.sub(r'^A\d+:\s*', '', line)
                qa_pairs.append({'question': current_q, 'answer': answer})
                current_q = None
        
        return qa_pairs[:num_pairs]
    
    except Exception as e:
        print(f'오류 발생: {e}')
        return []

print('generate_qa_pairs 함수 정의 완료')

In [None]:
# Q&A 생성 실습
if subtopics:
    test_topic = subtopics[0]
    qa_pairs = generate_qa_pairs(test_topic, num_pairs=3, difficulty='medium')
    
    print(f'주제: {test_topic}\n')
    for i, qa in enumerate(qa_pairs, 1):
        print(f'Q{i}: {qa["question"]}')
        print(f'A{i}: {qa["answer"]}\n')

### 4.2 Reward Model을 사용한 Q&A 평가

In [None]:
def evaluate_qa_quality(
    question: str,
    answer: str,
    model: str = 'mistralai/mistral-7b-instruct-v0.3'
) -> Dict[str, any]:
    """
    Q&A 쌍의 품질을 평가합니다.
    
    Args:
        question: 질문
        answer: 답변
        model: 평가에 사용할 모델
    
    Returns:
        평가 결과 딕셔너리 (점수, 피드백 등)
    """
    prompt = f"""
다음 Q&A 쌍의 품질을 평가해주세요.

질문: {question}
답변: {answer}

다음 기준으로 1-10점 사이의 점수를 매겨주세요:
1. 질문의 명확성 (1-10)
2. 답변의 정확성 (1-10)
3. 답변의 완성도 (1-10)
4. 전체적인 유용성 (1-10)

형식:
명확성: [점수]
정확성: [점수]
완성도: [점수]
유용성: [점수]
총점: [평균 점수]
피드백: [간단한 피드백]
"""
    
    try:
        response = openai_client.chat.completions.create(
            model=model,
            messages=[{'role': 'user', 'content': prompt}],
            temperature=0.3,
            max_tokens=300
        )
        
        content = response.choices[0].message.content
        
        # 점수 파싱
        scores = {}
        for line in content.split('\n'):
            if ':' in line:
                key, value = line.split(':', 1)
                key = key.strip()
                value = value.strip()
                
                # 숫자 추출
                numbers = re.findall(r'\d+\.?\d*', value)
                if numbers and key in ['명확성', '정확성', '완성도', '유용성', '총점']:
                    scores[key] = float(numbers[0])
                elif key == '피드백':
                    scores[key] = value
        
        return scores
    
    except Exception as e:
        print(f'평가 오류: {e}')
        return {}

print('evaluate_qa_quality 함수 정의 완료')

In [None]:
# Q&A 평가 실습
if qa_pairs:
    sample_qa = qa_pairs[0]
    evaluation = evaluate_qa_quality(sample_qa['question'], sample_qa['answer'])
    
    print('평가 대상:')
    print(f"Q: {sample_qa['question']}")
    print(f"A: {sample_qa['answer']}\n")
    
    print('평가 결과:')
    for key, value in evaluation.items():
        print(f'{key}: {value}')

## 5. 수학 문제 생성

### 5.1 난이도별 수학 문제 생성

In [None]:
def generate_math_problems(
    topic: str,
    difficulty: str = 'medium',
    num_problems: int = 3
) -> List[Dict[str, str]]:
    """
    특정 주제의 수학 문제를 생성합니다.
    
    Args:
        topic: 수학 주제 (예: '미적분', '확률', '선형대수')
        difficulty: 난이도 ('easy', 'medium', 'hard')
        num_problems: 생성할 문제 수
    
    Returns:
        문제 리스트 [{'problem': ..., 'solution': ..., 'answer': ...}, ...]
    """
    difficulty_levels = {
        'easy': '초등 ~ 중학교 수준의 기초',
        'medium': '고등학교 수준의',
        'hard': '대학교 수준의 심화'
    }
    
    prompt = f"""
주제: {topic}
난이도: {difficulty_levels.get(difficulty, '고등학교 수준의')}

{num_problems}개의 수학 문제를 생성해주세요.

각 문제는 다음을 포함해야 합니다:
1. 명확한 문제 설명
2. 단계별 풀이 과정
3. 최종 답

형식:
문제 1:
[문제 내용]

풀이 1:
[단계별 풀이]

답 1:
[최종 답]
"""
    
    try:
        response = openai_client.chat.completions.create(
            model=model,
            messages=[{'role': 'user', 'content': prompt}],
            temperature=0.7,
            max_tokens=1500
        )
        
        content = response.choices[0].message.content
        
        # 문제 파싱
        problems = []
        current_problem = {}
        
        lines = content.split('\n')
        current_section = None
        current_text = []
        
        for line in lines:
            if re.match(r'^문제\s*\d+:', line):
                if current_problem:
                    if current_section and current_text:
                        current_problem[current_section] = '\n'.join(current_text).strip()
                    problems.append(current_problem)
                current_problem = {}
                current_section = 'problem'
                current_text = []
            elif re.match(r'^풀이\s*\d+:', line):
                if current_section and current_text:
                    current_problem[current_section] = '\n'.join(current_text).strip()
                current_section = 'solution'
                current_text = []
            elif re.match(r'^답\s*\d+:', line):
                if current_section and current_text:
                    current_problem[current_section] = '\n'.join(current_text).strip()
                current_section = 'answer'
                current_text = []
            else:
                if line.strip():
                    current_text.append(line)
        
        # 마지막 문제 추가
        if current_problem and current_section and current_text:
            current_problem[current_section] = '\n'.join(current_text).strip()
            problems.append(current_problem)
        
        return problems[:num_problems]
    
    except Exception as e:
        print(f'오류 발생: {e}')
        return []

print('generate_math_problems 함수 정의 완료')

In [None]:
# 수학 문제 생성 실습
math_topic = '확률과 통계'
math_problems = generate_math_problems(math_topic, difficulty='medium', num_problems=2)

print(f'주제: {math_topic}\n')
for i, prob in enumerate(math_problems, 1):
    print(f'=== 문제 {i} ===')
    print(f"문제: {prob.get('problem', 'N/A')}")
    print(f"\n풀이: {prob.get('solution', 'N/A')}")
    print(f"\n답: {prob.get('answer', 'N/A')}")
    print('\n')

## 6. 종합 실습

### 6.1 전체 파이프라인 통합

In [None]:
def create_comprehensive_dataset(
    domain: str,
    num_macro_topics: int = 3,
    num_subtopics: int = 2,
    num_qa_per_subtopic: int = 3,
    include_math: bool = False
) -> pd.DataFrame:
    """
    도메인에 대한 종합적인 데이터셋을 생성합니다.
    
    Args:
        domain: 대상 도메인
        num_macro_topics: 매크로 주제 수
        num_subtopics: 매크로 주제당 부주제 수
        num_qa_per_subtopic: 부주제당 Q&A 쌍 수
        include_math: 수학 문제 포함 여부
    
    Returns:
        생성된 데이터셋 (DataFrame)
    """
    all_data = []
    
    print(f'도메인 "{domain}"에 대한 데이터셋 생성 시작...\n')
    
    # 1. 매크로 주제 생성
    print('1단계: 매크로 주제 생성')
    macro_topics = generate_macro_topics(domain, num_macro_topics)
    print(f'생성된 매크로 주제: {len(macro_topics)}개\n')
    
    # 2. 각 매크로 주제에 대해 부주제 및 Q&A 생성
    for i, macro_topic in enumerate(macro_topics, 1):
        print(f'2단계: 매크로 주제 {i}/{len(macro_topics)} 처리 중 - "{macro_topic}"')
        
        # 부주제 생성
        subtopics = generate_subtopics(macro_topic, num_subtopics)
        print(f'  부주제 {len(subtopics)}개 생성됨')
        
        # 각 부주제에 대해 Q&A 생성
        for j, subtopic in enumerate(subtopics, 1):
            print(f'  부주제 {j}/{len(subtopics)} 처리 중')
            
            qa_pairs = generate_qa_pairs(subtopic, num_qa_per_subtopic)
            
            for qa in qa_pairs:
                all_data.append({
                    'domain': domain,
                    'macro_topic': macro_topic,
                    'subtopic': subtopic,
                    'type': 'qa',
                    'question': qa['question'],
                    'answer': qa['answer']
                })
        
        print()
    
    # 3. 수학 문제 생성 (옵션)
    if include_math:
        print('3단계: 수학 문제 생성')
        math_problems = generate_math_problems(domain, num_problems=3)
        
        for prob in math_problems:
            all_data.append({
                'domain': domain,
                'macro_topic': '수학',
                'subtopic': domain,
                'type': 'math',
                'question': prob.get('problem', ''),
                'answer': f"{prob.get('solution', '')}\n\n최종답: {prob.get('answer', '')}"
            })
        print(f'수학 문제 {len(math_problems)}개 생성됨\n')
    
    # DataFrame으로 변환
    df = pd.DataFrame(all_data)
    
    print(f'완료! 총 {len(df)}개의 데이터 생성됨')
    print(f'- Q&A: {len(df[df["type"] == "qa"])}개')
    if include_math:
        print(f'- 수학 문제: {len(df[df["type"] == "math"])}개')
    
    return df

print('create_comprehensive_dataset 함수 정의 완료')

### 6.2 종합 파이프라인 실행

In [None]:
# 종합 데이터셋 생성 실습
final_dataset = create_comprehensive_dataset(
    domain='기계학습',
    num_macro_topics=2,
    num_subtopics=2,
    num_qa_per_subtopic=2,
    include_math=False
)

print('\n데이터셋 미리보기:')
print(final_dataset.head(10))

In [None]:
# 데이터셋 저장
output_path = 'curated_dataset.jsonl'
final_dataset.to_json(output_path, orient='records', lines=True, force_ascii=False)
print(f'데이터셋이 {output_path}에 저장되었습니다.')

# 통계 출력
print('\n=== 데이터셋 통계 ===')
print(f'총 데이터 수: {len(final_dataset)}')
print(f'매크로 주제 수: {final_dataset["macro_topic"].nunique()}')
print(f'부주제 수: {final_dataset["subtopic"].nunique()}')
print('\n매크로 주제별 분포:')
print(final_dataset['macro_topic'].value_counts())

## 정리

이 노트북에서는 다음 내용을 실습했습니다:

1. **환경 설정**: NeMo Curator, Dask, NVIDIA API 클라이언트 초기화
2. **기본 데이터 큐레이션**:
   - 커스텀 텍스트 정제기 구현
   - 문서 필터링
   - PII 제거
3. **주제 생성**:
   - 매크로 주제 생성
   - 부주제 생성
4. **Q&A 데이터셋**:
   - Q&A 쌍 생성
   - Reward Model을 통한 평가
5. **수학 문제 생성**:
   - 난이도별 문제 생성
   - 풀이 및 답안 포함
6. **종합 파이프라인**:
   - 전체 워크플로우 통합
   - 데이터셋 저장 및 분석
