In [2]:
!pip -q install python-dotenv langchain-google-genai pydantic tqdm

In [None]:
import os  # OS 환경변수
import re  # 정규식
import json  # JSON 처리
import pandas as pd  # 판다스
from collections import defaultdict  # 기본값 딕셔너리
from typing import Optional, List, Dict, Any  # 타입 힌트

from dotenv import load_dotenv  # .env 로더
from pydantic import BaseModel, ValidationError  # Pydantic
from tqdm import tqdm  # 진행바
from langchain_google_genai import ChatGoogleGenerativeAI  # Gemini
from langchain_core.prompts import ChatPromptTemplate  # 프롬프트 템플릿

# =========================
# 0) ENV 로드
# =========================
load_dotenv("/content/drive/MyDrive/project_team1/.env")  # ✅ .env 로드
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")  # ✅ 키 로드
if not GEMINI_API_KEY:  # 키 없으면
    raise ValueError("❌ GEMINI_API_KEY가 없습니다. project_team1/.env를 확인하세요.")  # 중단

# =========================
# 1) 트랜스크립트 파싱
# =========================
def parse_transcript(transcript_path: str) -> List[Dict[str, str]]:  # 트랜스크립트 파싱 함수
    dialogues = []  # 결과 리스트
    current_speaker = "UNKNOWN"  # 현재 화자
    current_text = []  # 누적 텍스트

    speaker_time_pattern = re.compile(r'^(SPEAKER_\d+)\s+(\d{2}:\d{2}:\d{2})$')  # SPEAKER+시간
    time_only_pattern = re.compile(r'^(\d{2}:\d{2}:\d{2})$')  # 시간만

    with open(transcript_path, "r", encoding="utf-8") as f:  # 파일 열기
        lines = f.readlines()  # 라인 읽기

    for line in lines:  # 반복
        line = line.strip()  # 공백 제거
        if not line:  # 빈 줄이면
            continue  # 스킵

        speaker_match = speaker_time_pattern.match(line)  # 매칭
        time_match = time_only_pattern.match(line)  # 매칭

        if speaker_match:  # 새 화자 시작
            if current_text:  # 누적이 있으면
                dialogues.append({"speaker": current_speaker, "text": " ".join(current_text)})  # 저장
                current_text = []  # 초기화
            current_speaker = speaker_match.group(1)  # 화자 갱신

        elif time_match:  # 시간만(세그먼트 구분)
            if current_text:  # 누적이 있으면
                dialogues.append({"speaker": current_speaker, "text": " ".join(current_text)})  # 저장
                current_text = []  # 초기화

        else:  # 일반 텍스트
            current_text.append(line)  # 누적

    if current_text:  # 마지막 누적 처리
        dialogues.append({"speaker": current_speaker, "text": " ".join(current_text)})  # 저장

    return dialogues  # 반환

# =========================
# 2) Config 로드 + Regex 분석(JSON)
# =========================
def load_config(config_path: str) -> Dict:  # config 로드
    with open(config_path, "r", encoding="utf-8") as f:  # 파일 열기
        return json.load(f)  # 로드

def analyze_text_regex(dialogues: List[Dict[str, str]], config: Dict) -> Dict:  # regex 기반 카운트 분석
    keyword_map = {}  # 키워드 맵

    if "emotion_polarity" in config:  # 감정 설정
        if "positive" in config["emotion_polarity"]:
            keyword_map["positive"] = config["emotion_polarity"]["positive"]["keywords"]
        if "negative" in config["emotion_polarity"]:
            keyword_map["negative"] = config["emotion_polarity"]["negative"]["keywords"]

    if "attention" in config:  # attention 설정
        if "mention_x" in config["attention"]:
            keyword_map["mention_x"] = config["attention"]["mention_x"]["keywords"]
        if "mention_other" in config["attention"]:
            keyword_map["mention_other"] = ["해은", "규민", "나연", "희두", "원빈", "지수", "태희", "지연", "나언", "현규", "지현"]

    if "initiative" in config:  # initiative 설정
        keyword_map["initiative"] = config["initiative"]["keywords"]

    total_counts = defaultdict(int)  # 전체 카운트
    speaker_stats = defaultdict(lambda: defaultdict(int))  # 화자별 카운트

    for entry in dialogues:  # 반복
        speaker = entry["speaker"]  # 화자
        text = entry["text"]  # 텍스트

        for category, keywords in keyword_map.items():  # 카테고리 반복
            for kw in keywords:  # 키워드 반복
                if kw in text:  # 포함되면
                    c = text.count(kw)  # 횟수
                    total_counts[category] += c  # 누적
                    speaker_stats[speaker][category] += c  # 누적

    return {  # 결과 반환
        "summary": dict(total_counts),  # dict 변환
        "by_speaker": {k: dict(v) for k, v in speaker_stats.items()},  # dict 변환
    }

def save_json(obj: Any, path: str) -> None:  # JSON 저장
    with open(path, "w", encoding="utf-8") as f:  # 파일 열기
        json.dump(obj, f, ensure_ascii=False, indent=2)  # 저장

# =========================
# 3) Gemini: 스키마
# =========================
class SpeakerMapping(BaseModel):  # 화자 매핑 스키마
    speaker_id: str  # SPEAKER_XX
    real_name: str  # ✅ 반드시 후보 중 하나(Unknown 금지)
    reason: str  # 근거
    confidence: float  # 확신도(0~1)

class SpeakerMappingList(BaseModel):  # 리스트 스키마
    mappings: List[SpeakerMapping]  # 매핑 리스트

class DialogueAnalysis(BaseModel):  # 대화 분석 스키마
    target_person: Optional[str] = "None"  # 대상 인물
    sentiment: str  # positive/negative/neutral
    category: str  # emotion/attention/initiative
    summary: str  # 요약

# =========================
# 4) 캐시: 화자 매핑 저장/불러오기(JSON)
# =========================
def load_mapping_cache(cache_path: str) -> Optional[Dict[str, Any]]:  # 캐시 로드
    if not os.path.exists(cache_path):  # 없으면
        return None  # None
    with open(cache_path, "r", encoding="utf-8") as f:  # 열기
        return json.load(f)  # 로드

def save_mapping_cache(cache_path: str, mapping_obj: Dict[str, Any]) -> None:  # 캐시 저장
    with open(cache_path, "w", encoding="utf-8") as f:  # 열기
        json.dump(mapping_obj, f, ensure_ascii=False, indent=2)  # 저장

# =========================
# 5) 샘플 선별: 정보량 높은 대사만 사용(정확도 업)
# =========================
def build_speaker_samples(dialogues: List[Dict[str, str]], max_samples: int = 20, min_len: int = 20) -> Dict[str, List[str]]:  # 샘플 생성
    speaker_samples = {}  # 결과 dict

    for d in dialogues:  # 반복
        sid = d["speaker"]  # 화자
        text = d["text"].strip()  # 텍스트

        if len(text) < min_len:  # 너무 짧으면
            continue  # 스킵

        score = 0  # 점수
        if re.search(r"[0-9]", text):  # 숫자 단서
            score += 1
        if re.search(r"(오빠|언니|누나|형)", text):  # 호칭 단서
            score += 2
        if re.search(r"(집|회사|학교|공항|한국|연대|북문)", text):  # 장소 단서
            score += 1
        if re.search(r"(헤어|만나|연락|데이트|사귀|좋아|싫어)", text):  # 관계 단서
            score += 1

        if score < 1:  # 단서 부족이면
            continue  # 스킵

        speaker_samples.setdefault(sid, [])  # 초기화
        if len(speaker_samples[sid]) >= max_samples:  # 상한이면
            continue  # 스킵

        speaker_samples[sid].append(text)  # 추가

    return speaker_samples  # 반환

# =========================
# 6) 화자 식별(Unknown 금지): 반드시 후보 중 하나로 찍기
#    - 캐시에 {speaker_id: {name, confidence, reason}} 저장
# =========================
def identify_speakers_force_choice(
    dialogues: List[Dict[str, str]],
    profile_text: str,
    performer_names: List[str],
    llm: ChatGoogleGenerativeAI,
    cache_path: str,
) -> Dict[str, Dict[str, Any]]:
    cached = load_mapping_cache(cache_path)  # 캐시 로드
    if cached:  # 캐시가 있으면
        return cached  # 그대로 반환

    speaker_samples = build_speaker_samples(dialogues, max_samples=20, min_len=20)  # 샘플 생성

    samples_str = ""  # 문자열
    for sid, texts in speaker_samples.items():  # 반복
        samples_str += f"[{sid} 샘플]\n" + "\n".join(texts) + "\n\n"  # 합치기

    prompt = ChatPromptTemplate.from_messages([
        ("system",
         "너는 연애 리얼리티 프로그램 분석가야.\n"
         "각 SPEAKER_XX가 아래 후보 중 누구인지 반드시 1명으로 결정해.\n"
         "중요 규칙:\n"
         f"- real_name은 반드시 {performer_names} 중 하나만 반환해. (Unknown 금지)\n"
         "- 확신이 낮아도 '가장 그럴듯한 쪽'을 선택해.\n"
         "- 대신 confidence(0~1)로 불확실함을 반드시 표시해.\n"
         "- reason에는 근거 단서가 있으면 쓰고, 단서가 부족하면 '단서 부족/중립 발화'라고 적어.\n\n"
         f"[출연진 정보]\n{profile_text}"
        ),
        ("user", "{samples_str}")
    ])

    structured = llm.with_structured_output(schema=SpeakerMappingList)  # 구조화
    result = structured.invoke(prompt.invoke({"samples_str": samples_str}))  # 실행

    mapping_obj = {}  # 캐시에 저장할 구조
    for m in result.mappings:  # 반복
        mapping_obj[m.speaker_id] = {  # speaker별 저장
            "name": m.real_name,  # 이름
            "confidence": m.confidence,  # 확신도
            "reason": m.reason,  # 근거
        }

    save_mapping_cache(cache_path, mapping_obj)  # 캐시 저장
    return mapping_obj  # 반환

# =========================
# 7) 대화 분석(Gemini): LLM 1회 생성 후 재사용
# =========================
def analyze_dialogue_with_ai(
    dialogue_text: str,
    current_speaker: str,
    config_text: str,
    performer_names: List[str],
    llm: ChatGoogleGenerativeAI,
) -> Optional[DialogueAnalysis]:
    prompt = ChatPromptTemplate.from_messages([
        ("system",
         "너는 대화 분석기야.\n"
         "화자({current_speaker})의 말을 분석해서 (1) 대상 인물, (2) 감정, (3) 발화 유형을 분류해.\n"
         "중요: target_person은 후보 목록에 있는 이름만 선택해. 애매하면 'None'.\n"
         "분석 기준: {analysis_config_text}\n"
         "대상 인물 후보: {performer_names}"
        ),
        ("user", "{dialogue_text}")
    ])

    structured = llm.with_structured_output(schema=DialogueAnalysis)  # 구조화

    try:
        return structured.invoke(prompt.invoke({
            "current_speaker": current_speaker,
            "analysis_config_text": config_text,
            "performer_names": performer_names,
            "dialogue_text": dialogue_text,
        }))
    except ValidationError as e:
        print(f"Validation Error: speaker={current_speaker}, text={dialogue_text[:50]}...")
        print(f"Error details: {e}")
        return None
    except Exception as e:
        msg = str(e)
        print(f"Unexpected Error: speaker={current_speaker}, text={dialogue_text[:50]}...")
        print(f"Error details: {msg}")
        if "PERMISSION_DENIED" in msg or "leaked" in msg:
            raise RuntimeError("❌ API 키가 차단되었습니다. 새 키 발급 후 .env 교체하세요.")
        return None

# =========================
# 8) 한 케이스 실행
#    - mode="pair": 후보 2명, 결과도 2명만 저장
#    - mode="all": 후보 전체 출연진, 결과도 전체 저장
# =========================
def run_case(
    case_name: str,
    transcript_path: str,
    performer_names: List[str],
    config_path: str,
    profile_path: str,
    out_dir: str,
    mode: str = "pair",
) -> None:
    print(f"\n===== RUN CASE: {case_name} ({mode}) =====")  # 로그

    dialogues = parse_transcript(transcript_path)  # 파싱
    print(f"Parsed segments: {len(dialogues)}")  # 로그

    config = load_config(config_path)  # config 로드

    # ✅ 1) Regex 결과 저장
    regex_result = analyze_text_regex(dialogues, config)  # regex 분석
    regex_out_path = os.path.join(out_dir, f"{case_name}_{mode}_regex.json")  # 경로
    save_json(regex_result, regex_out_path)  # 저장
    print(f"✅ Regex JSON saved: {regex_out_path}")  # 로그

    # ✅ 2) Gemini 준비
    profile_df = pd.read_csv(profile_path)  # 프로필 로드
    profile_text = profile_df.to_string(index=False)  # 텍스트화
    analysis_config_text = json.dumps(config, ensure_ascii=False)  # 문자열화
    llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0, google_api_key=GEMINI_API_KEY)  # LLM 1회 생성

    # ✅ 3) 화자 매핑(Unknown 금지)
    mapping_cache_path = os.path.join(out_dir, f"{case_name}_{mode}_speaker_mapping.json")  # 캐시 경로
    mapping_obj = identify_speakers_force_choice(dialogues, profile_text, performer_names, llm, mapping_cache_path)  # 매핑
    print(f"✅ Speaker mapping cached: {mapping_cache_path}")  # 로그

    # ✅ 4) 대사 분석 결과 저장(CSV)
    csv_out_path = os.path.join(out_dir, f"{case_name}_{mode}_ai.csv")  # CSV 경로
    results = []  # 결과
    processed_count = 0  # 재개

    if os.path.exists(csv_out_path):
        try:
            existing_df = pd.read_csv(csv_out_path)
            processed_count = len(existing_df)
            results = existing_df.to_dict("records")
            print(f"Resume CSV: {processed_count} rows")  # 로그
        except Exception as e:
            print(f"Existing CSV read error, start fresh: {e}")  # 로그

    # ✅ mode별 저장 기준
    target_set = set(performer_names)  # 후보 집합
    save_only_targets = (mode == "pair")  # pair면 후보 2명만 저장

    for i, d in enumerate(tqdm(dialogues)):
        if i < processed_count:
            continue

        sid = d["speaker"]  # SPEAKER ID
        speaker_name = mapping_obj.get(sid, {}).get("name")  # 매핑 이름
        speaker_conf = mapping_obj.get(sid, {}).get("confidence")  # 확신도

        if not speaker_name:  # 혹시 누락되면
            continue  # 스킵

        if save_only_targets and speaker_name not in target_set:  # pair모드에서는 후보 2명만
            continue

        analysis = analyze_dialogue_with_ai(d["text"], speaker_name, analysis_config_text, performer_names, llm)
        if not analysis:
            continue

        results.append({
            "speaker": speaker_name,
            "speaker_confidence": speaker_conf,
            "text": d["text"],
            "target": analysis.target_person,
            "sentiment": analysis.sentiment,
            "category": analysis.category,
            "summary": analysis.summary,
        })

        if (i + 1) % 10 == 0:
            pd.DataFrame(results).to_csv(csv_out_path, index=False, encoding="utf-8-sig")

    pd.DataFrame(results).to_csv(csv_out_path, index=False, encoding="utf-8-sig")
    print(f"✅ Gemini CSV saved: {csv_out_path}")

# =========================
# 9) main: 2커플 + 전체출연진 모드까지 실행
# =========================
def main():
    config_path = "/content/drive/MyDrive/project_team1/table/all_text_anlst.json"
    profile_path = "/content/drive/MyDrive/project_team1/table/character_profile.csv"
    out_dir = "/content/drive/MyDrive/project_team1/result"

    profile_df = pd.read_csv(profile_path)  # 전체 출연진 로드
    all_performers = profile_df["name"].dropna().unique().tolist()  # 전체 후보 리스트

    # ✅ 1) 커플 2명 모드: 희두/나연
    run_case(
        case_name="환연2_희두나연",
        transcript_path="/content/drive/MyDrive/project_team1/transcript/환연2_희두나연.txt",
        performer_names=["남희두", "이나연"],
        config_path=config_path,
        profile_path=profile_path,
        out_dir=out_dir,
        mode="pair",
    )

    # ✅ 2) 커플 2명 모드: 해은/규민
    run_case(
        case_name="환연2_해은규민",
        transcript_path="/content/drive/MyDrive/project_team1/transcript/환연2_해은규민.txt",
        performer_names=["성해은", "정규민"],
        config_path=config_path,
        profile_path=profile_path,
        out_dir=out_dir,
        mode="pair",
    )

    # ✅ 3) 전체 출연진 모드(10명+)
    run_case(
        case_name="환연2_전체",
        transcript_path="/content/drive/MyDrive/project_team1/transcript/환연2_1화_20화_요약.txt",
        performer_names=all_performers,
        config_path=config_path,
        profile_path=profile_path,
        out_dir=out_dir,
        mode="all",
    )
   
if __name__ == "__main__":
    main()
