# Make Dataset for KoELECTRA, KoBERT NER Fine-tuning

# 1. Introduction

1. Few-shot을 위한 few examples 작성
2. LLM을 이용하여 리스트 확장
3. LLM을 이용하여 문장 생성
4. LLM을 이용하여 BIO 태깅
5. Train/Test set split & processing

# 2. Few-shot을 위한 few examples 
    -> 01_fewshot.json

# 3. LLM을 이용한 리스트 확장

## 3.1 openAI API 설정
TODO: Local LLM을 쓸때는 어캐함?

In [1]:
import openai
import os
import ollama

openai.api_key = os.getenv("OPENAI_API_KEY")
client = ollama.Client(
    host='http://localhost:11434'
)

## 3.2 프롬포트 작성

In [3]:
def generate_expend_prompt(prompts, model="gpt-4o", max_tokens=1024):
    try:
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "developer", "content": "You are a helpful assistant."},
                {"role": "user", "content": prompts}
            ],
            temperature=0.7,
            max_completion_tokens=max_tokens,
        )
        texts = [choice.message.content.strip() for choice in response.choices]
        return texts
    except Exception as e:
        print(f"OpenAI API 호출 중 오류 발생: {e}")
        return []

def expend_prompt(class_name, entity_names, k=100):
    prompt = f"Below is a list of <{class_name}> entity names in Korean. Please list exactly {k} new <{class_name}> entity names in Korean that are similar.\n\n"
    prompt += "Existing entity names:\n"
    for e in entity_names:
        prompt += f"- {e}\n"
    prompt += "\nNew entity names:\n"
    return prompt

## 3.3 post processing

`clean_text()` : 문자열에서 유니코드 특수문자 및 불필요 공백 제거
`postprocess_entities()`: API응답에서 개체명만 추출

In [4]:
import re
import unicodedata

def clean_text(text):
    return unicodedata.normalize("NFKC", text).encode("utf-8", "ignore").decode("utf-8").strip()

def postprocess_entities(synthetic_entities):
    processed = []
    for ents in synthetic_entities:
        # 응답 전체에서 줄 단위로 분할
        lines = ents.split("\n")
        new_entities = []
        for line in lines:
            line = line.strip()
            # 숫자로 시작하는 항목만 처리 (예: "1. 서대문" 또는 "2) 경복궁")
            if re.match(r'^\d+[\.\)]', line):
                # 숫자와 구분 기호 제거
                line = re.sub(r'^\d+[\.\)]\s*', '', line)
                line = line.replace("-", "").strip()
                line = unicodedata.normalize("NFKC", line).encode("utf-8", "ignore").decode("utf-8").strip()
                if line:
                    new_entities.append(line)
        processed += new_entities
    # 중복 제거 후 반환
    return list(set(processed))


## 3.4 Load JSON

In [5]:
import json

with open("01_fewshot.json", "r", encoding="utf-8") as f:
    few_entities = json.load(f)

## 3.5 개체명 확장

In [6]:
from tqdm import tqdm

expended_entities = []

for real_ent in tqdm(few_entities):
    class_name, entity_names = real_ent['class_name'], real_ent['entity_name']
    prompt = expend_prompt(class_name, entity_names)
    syn_entities = generate_expend_prompt(prompt)
    syn_entities = postprocess_entities(syn_entities)
    expended_entities.append({'class_name': class_name, 'entity_name': syn_entities})


100%|██████████| 4/4 [00:51<00:00, 12.96s/it]


## 3.6 data 저장
Data 확인용

In [7]:
with open("02_expended_data.json", "w", encoding="utf-8") as f:
    json.dump(expended_entities, f, ensure_ascii=False, indent=4)

# 4. LLM을 확용하여 문장 생성

## 4.1. sampling
전체 개체명 데이터셋에서 랜덤하게 min_k이상 max_k 이하의 개체명을 샘플링
(현재는 class가 2개뿐이지만, 혹시모르니 일단 진행)

In [8]:
import numpy as np

def sample_entities(all_entities, min_k=1, max_k=4):
    k = np.random.randint(min_k, max_k+1)
    idxs = np.random.choice(range(len(all_entities)), size=k, replace=False)
    entities = []
    for i in idxs:
        ents = all_entities[i]
        name = np.random.choice(ents["entity_name"])
        entities.append({"class_name": ents["class_name"], "entity_name": name})
    return entities

## 4.2 프롬포트 구성

In [9]:
def generate_dataset_prompt(prompts, model="gpt-4o", max_tokens=512):
    responses = []
    for prompt in prompts:
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "developer", "content": "You are a helpful assistant."},
                {"role": "user", "content": prompt}
            ],
            max_completion_tokens=max_tokens,
            temperature=0.7,
        )
        generated_text = response.choices[0].message.content.strip()
        responses.append(generated_text)
    return responses

def generate_sentence_prompt(entities, style="dialog"):
    prompt = f"Generate a {style} sentence that includes the following entities in Korean.\n\n"
    entities_string = ", ".join([f"{e['entity_name']}({e['class_name']})" for e in entities])
    prompt += f"Entities: {entities_string}\n"
    prompt += "Sentence:"
    return prompt

## 4.3 post processing

- 문장에서 "삭제

In [10]:
def clean_sentence(sentence):
    sentence = sentence.strip('"')
    return sentence

### 4.3.1 Data Load

3번단계 API호출 안하고 넘어가기

In [11]:
with open("02_expended_data.json", "r", encoding="utf-8") as f:
    expended_entities = json.load(f)

## 4.4 진행

`num_iteration`에 반복할 횟수 설정
`batch_size`에 API 한번에 생성할 문장수 결정

In [12]:
import time

num_iterations = 100
batch_size = 5

generated_sentences = []

for _ in tqdm(range(num_iterations)):
    batch_entities = [sample_entities(expended_entities) for _ in range(batch_size)]
    batch_prompts = [generate_sentence_prompt(ents) for ents in batch_entities]
    batch_generated = generate_dataset_prompt(batch_prompts, model="gpt-4o", max_tokens=256)
    for generated, entities in zip(batch_generated, batch_entities):
        cleaned_sentence = clean_sentence(generated)
        generated_sentences.append({"entities": entities, "sentence": cleaned_sentence})
    time.sleep(1)  # rate limit 방지를 위한 딜레이

100%|██████████| 100/100 [09:59<00:00,  6.00s/it]


## 4.5 저장
Data 확인용

In [13]:
with open("03_generated_sentences.json", "w", encoding="utf-8") as f:
    json.dump(generated_sentences, f, ensure_ascii=False, indent=4)
#print(json.dumps(data, indent=4, ensure_ascii=False))

# 5. LLM을 활용하여 BIO 태깅

## 5.1. 프롬포트 작성

In [16]:
def generate_bio_prompt(prompts, model="gpt-4o", max_tokens=512):
    dev_msg = (
        "You are a helpful assistant.\n"
        "DO NOT INCLUDE OTHER COMMNENTS IN THE OUTPUT."
        )
    responses = []
    for prompt in prompts:
        response = client.chat.completions.create(
            model=model,
            messages=[
                {"role": "developer", "content": dev_msg},
                {"role": "user", "content": prompt}
            ],
            max_completion_tokens=max_tokens,
            temperature=0.7,
        )
        generated_text = response.choices[0].message.content.strip()
        responses.append(generated_text)
    return responses

def construct_bio_prompt(text, entities, tokens):
    prompt = f"Sentence has been pre-tokenized into words. Do NOT tokenize it again. Use the given tokens exactly as they are and perform BIO tagging.\n"
    prompt += f"Sentence: {text}\n"
    prompt += "Tokens: " + " ".join(tokens) + "\n\n"
    prompt += "BIO Tagging Rules:\n"
    prompt += "- B-ASP: Beginning of an Aspect term\n"
    prompt += "- I-ASP: Inside of an Aspect term\n"
    prompt += "- B-OPI: Beginning of an Opinion term\n"
    prompt += "- I-OPI: Inside of an Opinion term\n"
    prompt += "- B-LOC: Beginning of an Location term\n"
    prompt += "- I-LOC: Inside of an Location term\n"
    prompt += "- B-PLC: Beginning of an Place term\n"
    prompt += "- I-PLC: Inside of an Place term\n"
    prompt += "- O: Not related to Aspect or Opinion\n\n"
    prompt += "Aspect refers to the attribute or feature of an entity, and Opinion indicates a subjective evaluation of that aspect.\n"
    prompt += "The following entities are present in this sentence:\n"
    for ent in entities:
        prompt += f"- {ent['entity_name']} ({ent['class_name']})\n"
    prompt += "Even if the sentence does not contain these entities, other words can be tagged as Aspect or Opinion terms.\n"
    prompt += "Do NOT split or merge the tokens. Use them exactly as given and ensure the number of BIO tags matches the number of tokens.\n"
    prompt += "If the number of tokens and BIO tags do not match, return 'ERROR'.\n\n"
    prompt += "Here is an example:\n"
    prompt += "Example sentence: '이 카페는 공간이 작지만, 가족친화적인 분위기가 정말 좋아요.'\n"
    prompt += "Tokens: ['이', '카페는', '공간이', '작지만', ',', '가족친화적인', '분위기가', '정말', '좋아요', '.']\n"
    prompt += "BIO Tagging:\n"
    prompt += "- Output each token with its corresponding BIO tag, separated by a slash (/).\n"
    prompt += "- Example output: '이/O 카페는/O 공간이/B-ASP 작지만/B-OPI ,/O 가족친화적인/B-ASP 분위기가/I-ASP 정말/O 좋아요/B-OPI ./O'\n"
    prompt += "- Each token must have exactly one tag.\n\n"
    prompt += "IMPORTANT: Write the result in Korean only. No explanations or comments.\n"
    return prompt

## 5.2 단어별 토크나이징

In [17]:
import re

def tokenize_sentence(sentence):
    tokens = re.findall(r'\w+|[^\w\s]', sentence)
    cleaned_tokens = []
    for token in tokens:
        token = re.sub(r'\s+', '', token)  # 공백 제거
        token = re.sub(r'[^\w가-힣]', '', token)  # 특수문자 제거
        if token:
            cleaned_tokens.append(token)
    return cleaned_tokens

## 5.3 진행

### 5.3.1. Data Load

4단계 새로 생성 안하고 불러와서 진행할 때

In [18]:
import json

with open("03_generated_sentences.json", "r", encoding="utf-8") as f:
    generated_sentences = json.load(f)

### 5.3.2. 진행

In [19]:
from tqdm import tqdm
import time

bio_data = []

batch_size = 5

for item in tqdm(range(0, len(generated_sentences), batch_size)):
    batch = generated_sentences[item:item+batch_size]
    batch_prompts = []
    batch_tokens = []

    for i in batch:
        tokens = tokenize_sentence(i["sentence"])
        batch_tokens.append(tokens)

        prompt = construct_bio_prompt(i["sentence"], i["entities"], tokens)
        batch_prompts.append(prompt)

    batch_results = generate_bio_prompt(batch_prompts, model="gpt-4o", max_tokens=1024)

    for i, tokens, bio_result in zip(batch, batch_tokens, batch_results):
        bio_data.append({
            "sentence": i["sentence"],
            "tokens": tokens,
            "bio_tagging": bio_result
        })
    time.sleep(1)

100%|██████████| 100/100 [13:33<00:00,  8.14s/it]


## 5.4 저장

In [20]:
import json

with open("04_bio_tagged_data.json", "w", encoding="utf-8") as f:
    json.dump(bio_data, f, indent=4, ensure_ascii=False)

# 6. Train and Test set split

## 6.1. Data Load

로드하고 작업할 때

In [21]:
import json

with open("04_bio_tagged_data.json", "r", encoding="utf-8") as f:
    bio_data = json.load(f)

print(len(bio_data))

500


## 6.2. Preprocessing

In [22]:
check_list = []
processed_list = []

def extract_tags(tagged_output, tokens):
    tagged_tokens = tagged_output.split()
    fixed_tagging = []
    
    i, j = 0, 0
    while i < len(tagged_tokens) and j < len(tokens):
        current_token, current_tag = tagged_tokens[i].rsplit("/", 1)
        actual_token = tokens[j]
        
        if current_token ==  actual_token:
            fixed_tagging.append(current_tag)
            i += 1
            j += 1
        else:
            if i + 1 < len(tagged_tokens):
                next_token, _ = tagged_tokens[i+1].rsplit("/", 1)
                combined_token = current_token + next_token

                if combined_token == actual_token:
                    fixed_tagging.append(current_tag)
                    i += 2
                    j += 1
                    continue
            print(f"ERROR: Token mismatch - Expected: {actual_token}, Actual: {current_token}")
            return "ERROR"
    
    if len(fixed_tagging) != len(tokens):
        print(f"Length Mismatch: Tokens = {len(tokens)}, Tags = {len(fixed_tagging)}")
        return "ERROR"
    return fixed_tagging

for idx, item in enumerate(bio_data):
    tokens = item["tokens"]
    bio_tagging = item["bio_tagging"]
    bio_tag_list = extract_tags(bio_tagging, tokens)

    item["bio_tagging"] = bio_tag_list

    if bio_tag_list == "ERROR":
        check_list.append({
            "sentence": item["sentence"],
            "tokens": tokens,
            "bio_tagging": bio_tagging,
            "token_length": len(tokens),
            "bio_tagging_length": len(bio_tag_list)
        })
    else:
        processed_list.append(item)

for item in processed_list:
    bio_tagged = item["bio_tagging"]
    tokens = item["tokens"]

    if len(bio_tagged) != len(tokens):
        print(f"ERROR: Length mismatch - Tokens: {len(tokens)}, Tags: {len(bio_tagged)}")
        print(f"Sentence: {item['sentence']}")

print(f"Total: {len(bio_data)}, Processed: {len(processed_list)}, Errors: {len(check_list)}")


Total: 500, Processed: 500, Errors: 0


In [23]:
with open("05_processed_data.json", "w", encoding="utf-8") as f:
    json.dump(processed_list, f, indent=4, ensure_ascii=False)

with open("06_check_list.json", "w", encoding="utf-8") as f:
    json.dump(check_list, f, indent=4, ensure_ascii=False)

## 6.1.2. Split

In [24]:
from sklearn.model_selection import train_test_split

train_data, test_data = train_test_split(processed_list, test_size=0.2, random_state=42)

print(f"Train: {len(train_data)}, Test: {len(test_data)}")

with open("../train_data.json", "w", encoding="utf-8") as f:
    json.dump(train_data, f, indent=4, ensure_ascii=False)

with open("../test_data.json", "w", encoding="utf-8") as f:
    json.dump(test_data, f, indent=4, ensure_ascii=False)

Train: 400, Test: 100
