# Whisper + Pyannote 음성 전사 및 화자 분리

이 노트북은 OpenAI Whisper와 Pyannote.audio를 결합하여 음성 파일을 텍스트로 변환하고, 각 발화에 화자를 자동으로 할당하는 파이프라인입니다.

## 주요 기능
- **음성 인식(STT)**: Whisper를 사용한 고품질 음성-텍스트 변환
- **화자 분리(Diarization)**: Pyannote를 사용한 자동 화자 구분
- **발화 병합**: 같은 화자의 연속된 발언을 자동으로 병합
- **다양한 출력**: TXT 및 SRT 자막 형식으로 결과 저장

## 사전 요구사항
- Google Colab 환경 (GPU 권장)
- Hugging Face 계정 및 액세스 토큰 (Pyannote 모델 사용을 위해 필요)

## 실행 환경
- Python 3.x
- CUDA 지원 (선택사항, GPU 사용 시 처리 속도 향상)


## 1단계: 환경 설정 및 패키지 설치

이 셀에서는 필요한 모든 패키지를 설치합니다.

### 설치 내용
- **ffmpeg**: 오디오 파일 변환 도구
- **numpy<2.0**: Pyannote와의 호환성을 위해 2.0 미만 버전 사용
- **openai-whisper**: OpenAI의 음성 인식 모델
- **pyannote.audio**: 화자 분리 및 음성 분석 라이브러리

### 주의사항
numpy, whisper, pyannote를 한 번에 설치하여 의존성 충돌을 방지합니다.


In [None]:
# ffmpeg 설치 (오디오 파일 변환 도구)
!apt -y update && apt -y install ffmpeg

# 핵심 패키지 설치
# numpy 버전을 2.0 미만으로 고정하여 pyannote와의 호환성 확보
# whisper와 pyannote를 동시에 설치하면 pip가 호환되는 버전을 자동으로 선택
!pip install "numpy<2.0" openai-whisper pyannote.audio

## 2단계: 음성 파일 업로드

Colab 환경에서 처리할 음성 파일을 업로드합니다.

### 지원 형식
- M4A, MP3, WAV 등 다양한 오디오 형식 지원
- 업로드된 파일은 자동으로 Colab 런타임 환경에 저장됩니다

### 출력
업로드된 파일 이름이 `filename` 변수에 저장됩니다.


In [None]:
from google.colab import files

# 파일 업로드 창 실행
uploaded = files.upload()

# 업로드된 첫 번째 파일의 이름 저장
filename = list(uploaded.keys())[0]
print("Uploaded:", filename)

## 3단계: 오디오 파일 형식 변환 (WAV)

Pyannote 모델의 최적 성능을 위해 오디오 파일을 WAV 형식으로 변환합니다.

### 변환 사양
- **샘플링 레이트**: 16kHz (Pyannote 권장)
- **채널**: 모노 (1채널)
- **코덱**: PCM 16-bit

### 이유
Pyannote는 표준화된 WAV 형식에서 가장 안정적으로 작동하며, 16kHz 모노 오디오는 음성 분석에 충분한 품질을 제공합니다.

In [None]:
import os

# 원본 파일명 저장
original_filename = filename 

# WAV 파일명 생성 (확장자만 변경)
base_name = os.path.splitext(original_filename)[0]
wav_filename = f"{base_name}.wav" 

print(f"Original file: {original_filename}")
print(f"Converting to: {wav_filename} (16kHz, mono, 16-bit PCM)")

# ffmpeg를 사용한 오디오 변환
# -i: 입력 파일 지정
# -ar 16000: 샘플링 레이트를 16kHz로 설정 (화자 분리 최적화)
# -ac 1: 모노 채널로 변환 (화자 분리에 스테레오 불필요)
# -c:a pcm_s16le: 16-bit PCM 코덱 사용 (표준 WAV 형식)
# -y: 기존 파일이 있으면 덮어쓰기
!ffmpeg -i "{original_filename}" -ar 16000 -ac 1 -c:a pcm_s16le -y "{wav_filename}"

print("Conversion complete!")

## 4단계: Whisper 음성 인식 (STT)

OpenAI Whisper 모델을 사용하여 음성을 텍스트로 변환합니다.

### 모델 정보
- **사용 모델**: turbo (빠른 처리 속도와 높은 정확도의 균형)
- **다른 옵션**: tiny, base, small, medium, large (정확도↑, 속도↓)

### 출력
- `segments`: 타임스탬프가 포함된 텍스트 세그먼트 리스트
- 각 세그먼트에는 시작/종료 시간과 텍스트가 포함됩니다


In [None]:
import whisper

# Whisper 모델 로드 (turbo: 빠르고 정확한 다국어 모델)
wmodel = whisper.load_model("turbo")
print(f"Transcribing {wav_filename} with Whisper...")

# 음성 파일 전사 (transcribe)
# 변환된 WAV 파일 사용하여 최적의 인식 결과 확보
whisper_result = wmodel.transcribe(wav_filename) 
segments = whisper_result["segments"]
print("Num segments:", len(segments))

## 5단계: Pyannote 화자 분리 (Speaker Diarization)

Pyannote를 사용하여 오디오에서 화자를 자동으로 구분합니다.

### 사전 준비
1. [Hugging Face](https://huggingface.co)에서 계정 생성
2. [Pyannote 모델 페이지](https://huggingface.co/pyannote/speaker-diarization-3.1)에서 약관 동의
3. [토큰 페이지](https://huggingface.co/settings/tokens)에서 액세스 토큰 발급
4. 아래 코드의 `YOUR_HF_TOKEN_HERE`를 발급받은 토큰으로 교체

### 동작 방식
- 오디오를 분석하여 여러 화자의 발화 구간을 자동으로 감지
- 각 구간에 화자 레이블(SPEAKER_00, SPEAKER_01 등) 자동 할당
- GPU 사용 가능 시 자동으로 CUDA 활성화


In [None]:
import torch
from pyannote.audio import Pipeline
from pyannote.audio.pipelines.utils.hook import ProgressHook 

# Hugging Face 액세스 토큰 입력 (필수)
HF_TOKEN = "YOUR_HF_TOKEN_HERE"

# Pyannote 화자 분리 파이프라인 로드
pipeline = Pipeline.from_pretrained(
    "pyannote/speaker-diarization-3.1", 
    use_auth_token=HF_TOKEN  
)

# GPU 사용 가능 여부에 따라 자동으로 디바이스 설정
pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

# 화자 분리 실행 (진행률 표시)
print(f"Starting diarization for: {wav_filename}")
with ProgressHook() as hook:
    diar = pipeline(wav_filename, hook=hook) 

# Pyannote 결과를 리스트 형식으로 변환
# 각 항목: {start: 시작시간, end: 종료시간, spk: 화자ID}
spk_turns = []
for turn, _, spk in diar.itertracks(yield_label=True):
    spk_turns.append({"start": float(turn.start), "end": float(turn.end), "spk": spk})

print("Num diarization turns:", len(spk_turns))
print("First 5 turns:", spk_turns[:5])

## 6단계: 화자 할당 및 발화 병합

Whisper의 텍스트 세그먼트와 Pyannote의 화자 정보를 결합합니다.

### 처리 과정

#### 6-1. 화자 할당
- Whisper의 각 텍스트 세그먼트에 대해 시간대가 가장 많이 겹치는 화자를 찾아 할당
- 겹침이 없는 경우 "UNKNOWN"으로 표시

#### 6-2. 발화 병합
- 같은 화자의 연속된 짧은 발언들을 하나로 병합
- **병합 기준**: 0.6초 이내의 간격
- **효과**: 자연스러운 대화 흐름 재현, 불필요한 세그먼트 분할 제거

#### 6-3. 결과 저장
최종 결과를 두 가지 형식으로 저장:
1. **TXT 파일**: 간단한 텍스트 형식 `[화자] 내용`
2. **SRT 파일**: 타임스탬프 포함 자막 파일 (영상 편집용)


In [None]:
import numpy as np

# ========================================
# 6-1. Whisper 세그먼트에 화자 할당
# ========================================

def overlap(a_start, a_end, b_start, b_end):
    """
    두 시간 구간의 겹치는 길이를 계산합니다.
    
    Args:
        a_start, a_end: 첫 번째 구간의 시작과 끝 (초)
        b_start, b_end: 두 번째 구간의 시작과 끝 (초)
    
    Returns:
        겹치는 시간 (초), 겹치지 않으면 0.0
    """
    return max(0.0, min(a_end, b_end) - max(a_start, b_start))

def assign_speaker(seg, turns):
    """
    Whisper 텍스트 세그먼트에 Pyannote 화자를 할당합니다.
    시간대가 가장 많이 겹치는 화자를 선택합니다.
    
    Args:
        seg: Whisper 세그먼트 (start, end, text 포함)
        turns: Pyannote 화자 구간 리스트
    
    Returns:
        할당된 화자 ID (예: "SPEAKER_00") 또는 "UNKNOWN"
    """
    best_spk, best_ol = "UNKNOWN", 0.0
    for t in turns:
        ol = overlap(seg["start"], seg["end"], t["start"], t["end"])
        if ol > best_ol:
            best_ol = ol
            best_spk = t["spk"]
    return best_spk

# 모든 Whisper 세그먼트에 화자 할당
labeled = []
for s in segments:
    spk = assign_speaker(s, spk_turns) 
    labeled.append({
        "start": s["start"],
        "end": s["end"],
        "speaker": spk,
        "text": s["text"].strip()
    })

print("--- 6-1. 화자 할당 완료 (첫 15개) ---")
for x in labeled[:15]:
    mm = int(x['start'] // 60); ss = int(x['start'] % 60)
    print(f"[{mm:02d}:{ss:02d}] {x['speaker']}: {x['text']}")


# ========================================
# 6-2. 같은 화자의 연속된 발언 병합
# ========================================

# 병합 기준: 같은 화자의 발언이 이 시간(초) 이내에 있으면 하나로 합침
MERGE_THRESHOLD_SEC = 0.6 

merged = []
for x in labeled:
    # 직전 발언이 같은 화자이고, 간격이 임계값 이하면 병합
    if (merged and 
        merged[-1]["speaker"] == x["speaker"] and 
        (x["start"] - merged[-1]["end"]) <= MERGE_THRESHOLD_SEC):
        
        # 기존 발언에 이어붙이기
        merged[-1]["end"] = x["end"]
        merged[-1]["text"] += (" " if merged[-1]["text"] else "") + x["text"]
    else:
        # 새로운 발언으로 추가
        merged.append(x.copy())

print(f"\n--- 6-2. 병합 완료 (총 {len(merged)}개) ---")
for x in merged[:10]:
    mm = int(x['start'] // 60); ss = int(x['start'] % 60)
    print(f"[{mm:02d}:{ss:02d}] {x['speaker']}: {x['text']}")


# ========================================
# 6-3. 최종 결과 파일 저장 (TXT, SRT)
# ========================================

def to_srt_time(t):
    """
    초 단위 시간을 SRT 자막 형식의 타임스탬프로 변환합니다.
    형식: HH:MM:SS,mmm (예: 01:23:45,678)
    """
    h = int(t // 3600)
    m = int((t % 3600) // 60)
    s = int(t % 60)
    ms = int((t - int(t)) * 1000)
    return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"

# 출력 파일명 생성 (원본 파일명 기반)
base_name = os.path.splitext(wav_filename)[0]

# (1) TXT 파일 저장: 간단한 [화자] 텍스트 형식
txt_filename = f"{base_name}_transcript.txt"
with open(txt_filename, "w", encoding="utf-8") as f:
    for x in merged:
        f.write(f"[{x['speaker']}] {x['text']}\n")

print(f"\n--- 6-3. 저장 완료 ---")
print(f"Saved: {txt_filename}")

# (2) SRT 파일 저장: 타임스탬프 포함 자막 파일
srt_filename = f"{base_name}_transcript.srt"
with open(srt_filename, "w", encoding="utf-8") as f:
    for i, x in enumerate(merged, 1):
        f.write(str(i) + "\n")
        f.write(f"{to_srt_time(x['start'])} --> {to_srt_time(x['end'])}\n")
        f.write(f"[{x['speaker']}] {x['text']}\n\n")
print(f"Saved: {srt_filename}")