<a href="https://colab.research.google.com/github/NathnSong/MysteryNovel-TextMining/blob/main/%EB%8C%80%EC%82%AC%EC%8A%A4%ED%81%AC%EB%A6%BD%ED%8A%B8%EC%B6%94%EC%B6%9C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 대사스크립트추출
### 화자, 청자, 대사 추출
- MLP-KTLim/llama-3-Korean-Bllossom-8B
- Llma 모델을 한국어로 튜닝한 모델을 사용
- 기존 꺽쇠(「, 」) 있는지를 조건으로 확인하여 화자, 청자, 대사를 추출하는 것에 한계가 있음을 확인, LLM을 활용

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# !pip uninstall torch transformers -y
# !pip install torch==2.1.2 transformers==4.37.2

In [None]:
cd '/content/drive/MyDrive/MysteryNovel-TextMining'

## 대사스크립트 추출

In [None]:
# 모델 불러오기
import os
import torch
import re
import pandas as pd
from transformers import AutoTokenizer, AutoModelForCausalLM

model_id = "MLP-KTLim/llama-3-Korean-Bllossom-8B"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype=torch.bfloat16,
    device_map="auto",
)
model.eval()

In [None]:
import json
act_file = "Act2.txt"
print(f"\n******** {act_file} 처리 시작 ********")
# 대사스크립트 번호 적용
save_path = f"대사스크립트_{act_file.replace('.txt', '').replace('Act', '')}.csv"

def sliding_windows(text, window_size=600, stride=300):
  return [text[i:i+window_size] for i in range(0, len(text), stride)]

# 텍스트 불러오기
with open(act_file, encoding="utf-8") as f:
    novel_text = f.read()

chunks = sliding_windows(novel_text)


# 정규표현식으로 꺽쇠로 감싸진 대사 추출
def extract_dialogues(text_chunk):
  dialogues = re.findall(r'「(.*?)」', text_chunk, re.DOTALL)
  return dialogues

# 프롬프트
def extract_speaker_info_bllossom(text_chunk, dialogue):
  PREFIX = """
  당신은 문학 작품 속 '대사'를 분석하는 AI입니다.

  [규칙]
  - 아래 문맥에서 꺽쇠(「, 」)로 감싸진 '대사'만 추출하세요.
  - '대사' 내용은 절대 바꾸지 마세요.
  - 추론이 어려운 경우에는 '불명'으로 답하세요.

  - 화자 : 대사 직전/전후에 '는', '말했다', '물었다' 등과 함께 언급되거나, 문맥상 발화자인 인물.
  - 청자 : 발화 상대방으로 문맥상 자연스럽게 연결되는 인물.

  [출력형식]
  {"화자": "화자 이름", "청자": "청자 이름 또는 불명", "내용": "대사 내용"}
  """

  FEWSHOT = """
  [예시1]
  문맥:
  "베라는 말했다. '그걸 믿으라고?' 필립은 웃었다."
  대사: 「그걸 믿으라고?」
  결과: {"화자": "베라", "청자": "필립", "내용": "그걸 믿으라고?"}

  [예시2]
  문맥:
  블로어는 좀 난처한 표정을  지었다. 붉은 벽돌 같은 얼굴빛이 더욱 짙어졌다. 그리고 말하기 거북한 듯 빠른 말투로 지껄였다.
  「암스트롱, 당신은 그녀에게 약을 주었겠지요?」
  「약을?」암스트롱 의사는 블로어의 얼굴을 쳐다보았다.
  「어젯밤에 말이오. 수면제를 주었지요?」
  대사 :「주었소. 보통 수면제였소.」
  결과 : {"화자": "암스트롱", "청자": "블로어", "내용": "주었소. 보통 수면제였소."}

  [예시3]
  문맥:
  롬버드는 엷은 미소를 떠올리며 말했다.
  「블로어, 당신도 꽤  영리하군. 나는 처음부터 사건이 일어날  것을 예상하고 있었소.」
  「어젯밤에는 그런 말을 하지 않았잖소?」
  롬버드는 머리를 저었다.
  「우리들에게 숨기는 게 있지요?」
  「그렇소.」
    결과 : {"화자": "블로어", "청자": "롬버드", "내용": "우리들에게 숨기는 게 있지요?"}
    결과 : {"화자": "롬버드", "청자": "블로어", "내용": "그렇소"}
  """

  def make_prompt(text_chunk, dialogue):
      return f"{PREFIX}\n\n{FEWSHOT}\n\n[대상 문맥]\n{text_chunk}\n\n[대사]\n「{dialogue}」"

  prompt_text = make_prompt(text_chunk, dialogue)
  messages = [
      {"role": "system", "content": prompt_text}
  ]

  input_ids = tokenizer.apply_chat_template(
      messages,
      add_generation_prompt=True,
      return_tensors="pt"
  ).to(model.device)

  attention_mask = (input_ids != tokenizer.pad_token_id).long()

  outputs = model.generate(
      input_ids,
      attention_mask=attention_mask,
      max_new_tokens=512,
      do_sample=False,
      temperature=0.6,
      top_p=0.9,
      eos_token_id=[tokenizer.eos_token_id]
  )

  return tokenizer.decode(outputs[0][input_ids.shape[-1]:], skip_special_tokens=True)

def parse_result(raw_output):
  rows = []
  rows = []
  # 여러 줄로 json 객체가 나올 수 있으니 줄마다 처리
  for line in raw_output.strip().splitlines():
      line = line.strip()
      if not line:
          continue
      try:
          row = json.loads(line)
          rows.append({
              "화자": str(row.get("화자", "")).strip(),
              "청자": str(row.get("청자", "")).strip(),
              "내용": str(row.get("내용", "")).strip()
          })
      except Exception as e:
        print(f" JSON parsing 실패: {line}\n오류: {e}")
        continue
  return rows
def clean_content(x):
    # dict나 이상한 타입이 있으면 문자열로 바꾸기
    if isinstance(x, dict) or isinstance(x, list):
        return str(x)
    elif pd.isnull(x):
        return ""
    return x

chunks = sliding_windows(novel_text)

# 이전에 저장된 데이터 불러오기 (있으면 이어서)
if os.path.exists(save_path):
    existing_df = pd.read_csv(save_path)

    # 저장되어 있는 chunk_idx 기준으로
    processed_count = existing_df["chunk_idx"].max()+1
    all_data = existing_df.to_dict("records")
    print(f" 기존 데이터 {processed_count}개 불러옴, 이어서 처리 시작")
else:
    all_data = []
    processed_count = 0

# 처리 반복
for idx, chunk in enumerate(chunks):
    if idx < processed_count:
        continue  # 이미 처리된 chunk는 스킵

    dialogues = extract_dialogues(chunk)

    if not dialogues:
      print(f" 대사 없음! 빈 문자열 추가 : {idx+1}")
      # 저장한 데이터 횟수 체크를 위해 idx 같이 저장
      all_data.append({"화자" : "", "청자" : "", "내용": "", "chunk_idx" : idx})
      continue
    else:
      print(f"처리 중 ...: {idx+1}/{len(chunks)}")
      chunk_rows = []
      for dialogue in dialogues:
        raw = extract_speaker_info_bllossom(chunk, dialogue)
        print("모델 응답:\n", raw[:300].strip())
        rows = parse_result(raw)

        # 내용이 비어있으면 직접 dialogue 채워넣기
        for row in rows:
          if not row.get("내용"):
            row["내용"] = dialogue
          row["chunk_idx"] = idx
        chunk_rows.extend(rows)

      # 여기서 chunk 데이터 추가
      all_data.extend(chunk_rows)

    # chunk 자동 저장
    if (idx+1) % 5 == 0:
      temp_df = pd.DataFrame(all_data)
      temp_df["내용"] = temp_df["내용"].apply(clean_content)
      temp_df.to_csv(save_path, index=False, encoding='utf-8-sig')
      print(f"임시 저장 완료: {idx+1}개 완료")

# 최종 저장

print(type(all_data))
print(type(all_data[0]))
print(all_data[0])
final_df = pd.DataFrame(all_data)

# value 값 안에 dict 가 들어가는 문제 해결
final_df["화자"] = final_df["화자"].apply(clean_content)
final_df["청자"] = final_df["청자"].apply(clean_content)
final_df["내용"] = final_df["내용"].apply(clean_content)

final_df.to_csv(save_path, index=False, encoding='utf-8-sig')
print("대사 추출 최종 완료:", save_path)

In [None]:
import json
act_file = "Act4.txt"
print(f"\n******** {act_file} 처리 시작 ********")
# 대사스크립트 번호 적용
save_path = f"대사스크립트_{act_file.replace('.txt', '').replace('Act', '')}.csv"

def sliding_windows(text, window_size=600, stride=300):
  return [text[i:i+window_size] for i in range(0, len(text), stride)]

# 텍스트 불러오기
with open(act_file, encoding="utf-8") as f:
    novel_text = f.read()

chunks = sliding_windows(novel_text)


# 정규표현식으로 꺽쇠로 감싸진 대사 추출
def extract_dialogues(text_chunk):
  dialogues = re.findall(r'「(.*?)」', text_chunk, re.DOTALL)
  return dialogues

# 프롬프트
def extract_speaker_info_bllossom(text_chunk, dialogue):
  PREFIX = """
  당신은 문학 작품 속 '대사'를 분석하는 AI입니다.

  [규칙]
  - 아래 문맥에서 꺽쇠(「, 」)로 감싸진 '대사'만 추출하세요.
  - '대사' 내용은 절대 바꾸지 마세요.
  - 추론이 어려운 경우에는 '불명'으로 답하세요.

  - 화자 : 대사 직전/전후에 '는', '말했다', '물었다' 등과 함께 언급되거나, 문맥상 발화자인 인물.
  - 청자 : 발화 상대방으로 문맥상 자연스럽게 연결되는 인물.

  [출력형식]
  {"화자": "화자 이름", "청자": "청자 이름 또는 불명", "내용": "대사 내용"}
  """

  FEWSHOT = """
  [예시1]
  문맥:
  "베라는 말했다. '그걸 믿으라고?' 필립은 웃었다."
  대사: 「그걸 믿으라고?」
  결과: {"화자": "베라", "청자": "필립", "내용": "그걸 믿으라고?"}

  [예시2]
  문맥:
  블로어는 좀 난처한 표정을  지었다. 붉은 벽돌 같은 얼굴빛이 더욱 짙어졌다. 그리고 말하기 거북한 듯 빠른 말투로 지껄였다.
  「암스트롱, 당신은 그녀에게 약을 주었겠지요?」
  「약을?」암스트롱 의사는 블로어의 얼굴을 쳐다보았다.
  「어젯밤에 말이오. 수면제를 주었지요?」
  대사 :「주었소. 보통 수면제였소.」
  결과 : {"화자": "암스트롱", "청자": "블로어", "내용": "주었소. 보통 수면제였소."}

  [예시3]
  문맥:
  롬버드는 엷은 미소를 떠올리며 말했다.
  「블로어, 당신도 꽤  영리하군. 나는 처음부터 사건이 일어날  것을 예상하고 있었소.」
  「어젯밤에는 그런 말을 하지 않았잖소?」
  롬버드는 머리를 저었다.
  「우리들에게 숨기는 게 있지요?」
  「그렇소.」
    결과 : {"화자": "블로어", "청자": "롬버드", "내용": "우리들에게 숨기는 게 있지요?"}
    결과 : {"화자": "롬버드", "청자": "블로어", "내용": "그렇소"}
  """

  def make_prompt(text_chunk, dialogue):
      return f"{PREFIX}\n\n{FEWSHOT}\n\n[대상 문맥]\n{text_chunk}\n\n[대사]\n「{dialogue}」"

  prompt_text = make_prompt(text_chunk, dialogue)
  messages = [
      {"role": "system", "content": prompt_text}
  ]

  input_ids = tokenizer.apply_chat_template(
      messages,
      add_generation_prompt=True,
      return_tensors="pt"
  ).to(model.device)

  attention_mask = (input_ids != tokenizer.pad_token_id).long()

  outputs = model.generate(
      input_ids,
      attention_mask=attention_mask,
      max_new_tokens=512,
      do_sample=False,
      temperature=0.6,
      top_p=0.9,
      eos_token_id=[tokenizer.eos_token_id]
  )

  return tokenizer.decode(outputs[0][input_ids.shape[-1]:], skip_special_tokens=True)

def parse_result(raw_output):
  rows = []
  rows = []
  # 여러 줄로 json 객체가 나올 수 있으니 줄마다 처리
  for line in raw_output.strip().splitlines():
      line = line.strip()
      if not line:
          continue
      try:
          row = json.loads(line)
          rows.append({
              "화자": str(row.get("화자", "")).strip(),
              "청자": str(row.get("청자", "")).strip(),
              "내용": str(row.get("내용", "")).strip()
          })
      except Exception as e:
        print(f" JSON parsing 실패: {line}\n오류: {e}")
        continue
  return rows
def clean_content(x):
    # dict나 이상한 타입이 있으면 문자열로 바꾸기
    if isinstance(x, dict) or isinstance(x, list):
        return str(x)
    elif pd.isnull(x):
        return ""
    return x

chunks = sliding_windows(novel_text)

# 이전에 저장된 데이터 불러오기 (있으면 이어서)
if os.path.exists(save_path):
    existing_df = pd.read_csv(save_path)

    # 저장되어 있는 chunk_idx 기준으로
    processed_count = existing_df["chunk_idx"].max()+1
    all_data = existing_df.to_dict("records")
    print(f" 기존 데이터 {processed_count}개 불러옴, 이어서 처리 시작")
else:
    all_data = []
    processed_count = 0

# 처리 반복
for idx, chunk in enumerate(chunks):
    if idx < processed_count:
        continue  # 이미 처리된 chunk는 스킵

    dialogues = extract_dialogues(chunk)

    if not dialogues:
      print(f" 대사 없음! 빈 문자열 추가 : {idx+1}")
      # 저장한 데이터 횟수 체크를 위해 idx 같이 저장
      all_data.append({"화자" : "", "청자" : "", "내용": "", "chunk_idx" : idx})
      continue
    else:
      print(f"처리 중 ...: {idx+1}/{len(chunks)}")
      chunk_rows = []
      for dialogue in dialogues:
        raw = extract_speaker_info_bllossom(chunk, dialogue)
        print("모델 응답:\n", raw[:300].strip())
        rows = parse_result(raw)

        # 내용이 비어있으면 직접 dialogue 채워넣기
        for row in rows:
          if not row.get("내용"):
            row["내용"] = dialogue
          row["chunk_idx"] = idx
        chunk_rows.extend(rows)

      # 여기서 chunk 데이터 추가
      all_data.extend(chunk_rows)

    # chunk 자동 저장
    if (idx+1) % 5 == 0:
      temp_df = pd.DataFrame(all_data)
      temp_df["내용"] = temp_df["내용"].apply(clean_content)
      temp_df.to_csv(save_path, index=False, encoding='utf-8-sig')
      print(f"임시 저장 완료: {idx+1}개 완료")

# 최종 저장

print(type(all_data))
print(type(all_data[0]))
print(all_data[0])
final_df = pd.DataFrame(all_data)

# value 값 안에 dict 가 들어가는 문제 해결
final_df["화자"] = final_df["화자"].apply(clean_content)
final_df["청자"] = final_df["청자"].apply(clean_content)
final_df["내용"] = final_df["내용"].apply(clean_content)

final_df.to_csv(save_path, index=False, encoding='utf-8-sig')
print("대사 추출 최종 완료:", save_path)

## Act1

In [None]:
import pandas as pd

# CSV 파일 로드
df = pd.read_csv("대사스크립트_1.csv")
df = df.fillna("불명")

# 1단계 : 내용이 같고, 하나는 화자 or 청자가 불명이고, 하나는 명확할 때 -> 불명 쪽 삭제
mask_same_content = df.duplicated(subset = ['내용'], keep=False)
sub_df = df[mask_same_content].copy()

to_remove = []
for content, group in sub_df.groupby("내용"):
  if len(group) <= 1:
    continue

  # 명확한 화자 or 청자 존재
  has_identified = group[(group['화자'] != '불명') & (group['청자'] != '불명')]

  # 불명인 쪽만 골라낸다
  has_fully_unknown = group[(group['화자'] == '불명') | (group['청자'] == '불명')]

  # 명확한 사람도 있고, 완전 불명도 있으면 -> 불명 제거
  if not has_identified.empty and not has_fully_unknown.empty:
    to_remove.extend(has_fully_unknown.index.tolist())

df = df.drop(index = to_remove)

df = df.drop_duplicates(subset=['화자', '청자', '내용'], keep = 'first')

#저장
df.to_csv("대사_중복제거1.csv", index=False)
print("중복 제거 완료!")

In [None]:
import pandas as pd

# 1. 불러오기 및 chunk_index 제거
df = pd.read_csv("대사_중복제거1.csv")
df = df.fillna("불명")

# 2. 중복 내용 그룹 필터링
duplicates = df[df.duplicated(subset=['내용'], keep=False)]

# 3. 수동 선택 저장용 리스트
rows_to_keep = []

# 4. 그룹별 수동 선택
for content, group in duplicates.groupby('내용'):
    print(f"\n내용: {content}\n")
    group = group.reset_index()  # 원래 인덱스를 새로운 컬럼으로 유지
    for i, row in group.iterrows():
        print(f"{i + 1}. 화자: {row['화자']}, 청자: {row['청자']}")
    print("0. 삭제")

    while True:
        try:
            choice = int(input("몇 번을 남기시겠어요? (0 = 전체 삭제): "))
            if choice == 0:
                break
            elif 1 <= choice <= len(group):
                selected_index = group.loc[choice - 1, 'index']
                rows_to_keep.append(selected_index)
                break
            else:
                print("잘못된 번호입니다.")
        except ValueError:
            print("숫자로 입력해주세요.")

# 5. 선택한 행 + 중복 아니었던 행
df_keep = df.loc[rows_to_keep]
df_non_dup = df.drop(index=duplicates.index)
final_df = pd.concat([df_keep, df_non_dup])

# chunk_idx 기준 정렬
if 'chunk_idx' in final_df.columns:
    final_df = final_df.sort_values(by='chunk_idx').reset_index(drop=True)
    final_df = final_df.drop(columns=['chunk_idx'])

# 7. 저장
final_df.to_csv("대사_중복제거1.csv", index=False)
print("\n수동 중복 제거 완료 + 순서 정렬 + chunk_index 완전 제거!")


## Act3

In [None]:
import pandas as pd

# CSV 파일 로드
df = pd.read_csv("대사스크립트_3.csv")
df = df.fillna("불명")

# 1단계 : 내용이 같고, 하나는 화자 or 청자가 불명이고, 하나는 명확할 때 -> 불명 쪽 삭제
mask_same_content = df.duplicated(subset = ['내용'], keep=False)
sub_df = df[mask_same_content].copy()

to_remove = []
for content, group in sub_df.groupby("내용"):
  if len(group) <= 1:
    continue

  # 명확한 화자 or 청자 존재
  has_identified = group[(group['화자'] != '불명') & (group['청자'] != '불명')]

  # 불명인 쪽만 골라낸다
  has_fully_unknown = group[(group['화자'] == '불명') | (group['청자'] == '불명')]

  # 명확한 사람도 있고, 완전 불명도 있으면 -> 불명 제거
  if not has_identified.empty and not has_fully_unknown.empty:
    to_remove.extend(has_fully_unknown.index.tolist())

df = df.drop(index = to_remove)

df = df.drop_duplicates(subset=['화자', '청자', '내용'], keep = 'first')

#저장
df.to_csv("대사_중복제거3.csv", index=False)
print("중복 제거 완료!")

In [None]:
import pandas as pd

# 1. 불러오기 및 chunk_index 제거
df = pd.read_csv("대사_중복제거3.csv")
df = df.fillna("불명")

# 2. 중복 내용 그룹 필터링
duplicates = df[df.duplicated(subset=['내용'], keep=False)]

# 3. 수동 선택 저장용 리스트
rows_to_keep = []

# 4. 그룹별 수동 선택
for content, group in duplicates.groupby('내용'):
    print(f"\n내용: {content}\n")
    group = group.reset_index()  # 원래 인덱스를 새로운 컬럼으로 유지
    for i, row in group.iterrows():
        print(f"{i + 1}. 화자: {row['화자']}, 청자: {row['청자']}")
    print("0. 삭제")

    while True:
        try:
            choice = int(input("몇 번을 남기시겠어요? (0 = 전체 삭제): "))
            if choice == 0:
                break
            elif 1 <= choice <= len(group):
                selected_index = group.loc[choice - 1, 'index']
                rows_to_keep.append(selected_index)
                break
            else:
                print("잘못된 번호입니다.")
        except ValueError:
            print("숫자로 입력해주세요.")

# 5. 선택한 행 + 중복 아니었던 행
df_keep = df.loc[rows_to_keep]
df_non_dup = df.drop(index=duplicates.index)
final_df = pd.concat([df_keep, df_non_dup])

# chunk_idx 기준 정렬
if 'chunk_idx' in final_df.columns:
    final_df = final_df.sort_values(by='chunk_idx').reset_index(drop=True)
    final_df = final_df.drop(columns=['chunk_idx'])

# 7. 저장
final_df.to_csv("대사_중복제거3.csv", index=False)
print("\n수동 중복 제거 완료 + 순서 정렬 + chunk_index 완전 제거!")

## Act2

In [None]:
import pandas as pd

# CSV 파일 로드
df = pd.read_csv("대사스크립트_2.csv")
df = df.fillna("불명")

# 1단계 : 내용이 같고, 하나는 화자 or 청자가 불명이고, 하나는 명확할 때 -> 불명 쪽 삭제
mask_same_content = df.duplicated(subset = ['내용'], keep=False)
sub_df = df[mask_same_content].copy()

to_remove = []
for content, group in sub_df.groupby("내용"):
  if len(group) <= 1:
    continue

  # 명확한 화자 or 청자 존재
  has_identified = group[(group['화자'] != '불명') & (group['청자'] != '불명')]

  # 불명인 쪽만 골라낸다
  has_fully_unknown = group[(group['화자'] == '불명') | (group['청자'] == '불명')]

  # 명확한 사람도 있고, 완전 불명도 있으면 -> 불명 제거
  if not has_identified.empty and not has_fully_unknown.empty:
    to_remove.extend(has_fully_unknown.index.tolist())

df = df.drop(index = to_remove)

df = df.drop_duplicates(subset=['화자', '청자', '내용'], keep = 'first')

#저장
df.to_csv("대사_중복제거2.csv", index=False)
print("중복 제거 완료!")

In [None]:
1import pandas as pd

# 1. 불러오기 및 chunk_index 제거
df = pd.read_csv("대사_중복제거2.csv")
df = df.fillna("불명")

# 2. 중복 내용 그룹 필터링
duplicates = df[df.duplicated(subset=['내용'], keep=False)]

# 3. 수동 선택 저장용 리스트
rows_to_keep = []

# 4. 그룹별 수동 선택
for content, group in duplicates.groupby('내용'):
    print(f"\n내용: {content}\n")
    group = group.reset_index()  # 원래 인덱스를 새로운 컬럼으로 유지
    for i, row in group.iterrows():
        print(f"{i + 1}. 화자: {row['화자']}, 청자: {row['청자']}")
    print("0. 삭제")

    while True:
        try:
            choice = int(input("몇 번을 남기시겠어요? (0 = 전체 삭제): "))
            if choice == 0:
                break
            elif 1 <= choice <= len(group):
                selected_index = group.loc[choice - 1, 'index']
                rows_to_keep.append(selected_index)
                break
            else:
                print("잘못된 번호입니다.")
        except ValueError:
            print("숫자로 입력해주세요.")

# 5. 선택한 행 + 중복 아니었던 행
df_keep = df.loc[rows_to_keep]
df_non_dup = df.drop(index=duplicates.index)
final_df = pd.concat([df_keep, df_non_dup])

# chunk_idx 기준 정렬
if 'chunk_idx' in final_df.columns:
    final_df = final_df.sort_values(by='chunk_idx').reset_index(drop=True)
    final_df = final_df.drop(columns=['chunk_idx'])

# 7. 저장
final_df.to_csv("대사_중복제거2.csv", index=False)
print("\n수동 중복 제거 완료 + 순서 정렬 + chunk_index 완전 제거!")

## Act4

In [None]:
import pandas as pd

# CSV 파일 로드
df = pd.read_csv("대사스크립트_4.csv")
df = df.fillna("불명")

# 1단계 : 내용이 같고, 하나는 화자 or 청자가 불명이고, 하나는 명확할 때 -> 불명 쪽 삭제
mask_same_content = df.duplicated(subset = ['내용'], keep=False)
sub_df = df[mask_same_content].copy()

to_remove = []
for content, group in sub_df.groupby("내용"):
  if len(group) <= 1:
    continue

  # 명확한 화자 or 청자 존재
  has_identified = group[(group['화자'] != '불명') & (group['청자'] != '불명')]

  # 불명인 쪽만 골라낸다
  has_fully_unknown = group[(group['화자'] == '불명') | (group['청자'] == '불명')]

  # 명확한 사람도 있고, 완전 불명도 있으면 -> 불명 제거
  if not has_identified.empty and not has_fully_unknown.empty:
    to_remove.extend(has_fully_unknown.index.tolist())

df = df.drop(index = to_remove)

df = df.drop_duplicates(subset=['화자', '청자', '내용'], keep = 'first')

#저장
df.to_csv("대사_중복제거4.csv", index=False)
print("중복 제거 완료!")

In [None]:
1import pandas as pd

# 1. 불러오기 및 chunk_index 제거
df = pd.read_csv("대사_중복제거4.csv")
df = df.fillna("불명")

# 2. 중복 내용 그룹 필터링
duplicates = df[df.duplicated(subset=['내용'], keep=False)]

# 3. 수동 선택 저장용 리스트
rows_to_keep = []

# 4. 그룹별 수동 선택
for content, group in duplicates.groupby('내용'):
    print(f"\n내용: {content}\n")
    group = group.reset_index()  # 원래 인덱스를 새로운 컬럼으로 유지
    for i, row in group.iterrows():
        print(f"{i + 1}. 화자: {row['화자']}, 청자: {row['청자']}")
    print("0. 삭제")

    while True:
        try:
            choice = int(input("몇 번을 남기시겠어요? (0 = 전체 삭제): "))
            if choice == 0:
                break
            elif 1 <= choice <= len(group):
                selected_index = group.loc[choice - 1, 'index']
                rows_to_keep.append(selected_index)
                break
            else:
                print("잘못된 번호입니다.")
        except ValueError:
            print("숫자로 입력해주세요.")

# 5. 선택한 행 + 중복 아니었던 행
df_keep = df.loc[rows_to_keep]
df_non_dup = df.drop(index=duplicates.index)
final_df = pd.concat([df_keep, df_non_dup])

# chunk_idx 기준 정렬
if 'chunk_idx' in final_df.columns:
    final_df = final_df.sort_values(by='chunk_idx').reset_index(drop=True)
    final_df = final_df.drop(columns=['chunk_idx'])

# 7. 저장
final_df.to_csv("대사_중복제거4.csv", index=False)
print("\n수동 중복 제거 완료 + 순서 정렬 + chunk_index 완전 제거!")

## 인물 빈도수 추출

In [None]:
import pandas as pd
from collections import Counter

file_list = [
    "대사_중복제거1.csv",
    "대사_중복제거2.csv",
    "대사_중복제거3.csv",
    "대사_중복제거4.csv"
]

# 2. CSV 합치기
dfs = [pd.read_csv(file).fillna("불명") for file in file_list]
merged_df = pd.concat(dfs, ignore_index=True)

# 3. 화자 + 청자 합쳐서 빈도 계산
characters = merged_df['화자'].tolist() + merged_df['청자'].tolist()
character_counts = Counter([c for c in characters if c != "불명"])


# 정렬된 결과를 DataFrame으로 보기 좋게 정리
freq_df = pd.DataFrame(character_counts.items(), columns=["등장인물", "등장횟수"])
freq_df = freq_df.sort_values(by="등장횟수", ascending=False).reset_index(drop=True)

# # 저장 (선택 사항)
# freq_df.to_csv("등장인물_빈도수.csv", index=False)

# 출력
freq_df

In [None]:
# 이름 정규화
alias_dict = {
    # 주요인물 10인
    "베러": "베러 크레이슨",
    "크레이슨": "베러 크레이슨",
    "크레이슨 선생님": "베러 크레이슨",
    "배러": "베러 크레이슨",
    "베라": "베러 크레이슨",

    "워그레이브": "로렌스 워그레이브",
    "워그레이브 판사": "로렌스 워그레이브",
    "판사": "로렌스 워그레이브",

    "브랜트": "에밀리 브랜트",
    "에밀리": "에밀리 브랜트",
    "에밀리 브랜트": "에밀리 브랜트",
    "미스 브랜트": "에밀리 브랜트",
    "미스 브란트": "에밀리 브랜트",
    "밀리 브랜트": "에밀리 브랜트",

    "롬버드": "필립 롬버드",
    "롬보드": "필립 롬버드",
    "롬버드 대위": "필립 롬버드",
    "필립": "필립 롬버드",
    "필립 롬버드": "필립 롬버드",

    "매커서": "존 고든 매커서",
    "매커서 장군": "존 고든 매커서",
    "매커스 장군": "존 고든 매커서",
    "매커슨": "존 고든 매커서",
    "매커센": "존 고든 매커서",
    "장군": "존 고든 매커서",

    "암스트롱": "에드워드 암스트롱",
    "암스트롱 의사": "에드워드 암스트롱",
    "의사": "에드워드 암스트롱",

    "머스턴": "앤서니 머스턴",
    "앤터니 머스턴": "앤서니 머스턴",
    "앤터니": "앤서니 머스턴",

    "블로어": "윌리엄 헨리 블로어",

    "로저스": "토머스 로저스",
    "로저스 부인": "에설 로저스",
    "하인 로저스": "토머스 로저스",
    "로저스 부부": "토머스 로저스",
    "에설": "에설 로저스",

    # "렉 경": "토머스 렉 경",
    # "토머스 렉 경": "토머스 렉 경",
    # "부경찰국장": "토머스 렉 경",
    # "메인 경감": "메인 경감",
    "전직 경감": "윌리엄 헨리 블로어",
    "군인": "존 고든 매커서",
    # "하녀": "비트리스 테일러",

    "로버드 대위": "필립 롬버드",
    "대위": "필립 롬버드"
}

def normalize_name(name, alias_dict):
    for alias, canonical in alias_dict.items():
        if alias in name:
            return canonical
    return name

def apply_alias_dict(rows, alias_dict):
    for row in rows:
        row["화자"] = normalize_name(row["화자"], alias_dict)
        row["청자"] = normalize_name(row["청자"], alias_dict)
    return rows


file_list = [
    "대사_중복제거1.csv",
    "대사_중복제거2.csv",
    "대사_중복제거3.csv",
    "대사_중복제거4.csv"
]

# 각 파일 정규화 + 덮어쓰기
for file in file_list:
    df = pd.read_csv(file).fillna("불명")
    rows = df.to_dict("records")
    normalized_rows = apply_alias_dict(rows, alias_dict)
    normalized_df = pd.DataFrame(normalized_rows)

    # 덮어쓰기
    normalized_df.to_csv(file, index=False)
    print(f"이름 정규화 완료 {file}")