#Speech Brain

In [None]:
# ============================================================
# 0) 환경 준비: 버전 충돌 최소화를 위한 가벼운 설치
#  - torch/torchaudio는 Colab 기본 버전 사용 (CUDA에 맞춤)
#  - speechbrain만 설치, 나머지는 범위 고정
# ============================================================
!pip -q install "speechbrain>=0.5.16,<0.6.0" \
                "librosa>=0.10,<0.11" \
                "soundfile>=0.12,<0.14" \
                "webrtcvad>=2.0.10,<3.0" \
                "scikit-learn>=1.3,<1.6" \
                "numpy>=1.23,<2.0"

In [None]:
# ============================================================
# 1) 의존성 임포트
# ============================================================
import os
import json
import csv
import numpy as np
import soundfile as sf
import librosa
import webrtcvad

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
from sklearn.mixture import BayesianGaussianMixture
from sklearn.cluster import SpectralClustering, AgglomerativeClustering
from sklearn.metrics.pairwise import cosine_similarity

import torch
from speechbrain.pretrained import EncoderClassifier

In [None]:
# ============================================================
# 2) Google Drive 마운트 & 입출력 경로
# ============================================================
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

INPUT_WAV = '/content/drive/MyDrive/AI_NLP_FINAL/1105_오전회의.wav'
WORKDIR   = '/content'
BASENAME  = 'result'

OUT_RTTM  = os.path.join(WORKDIR, f'{BASENAME}.rttm')
OUT_CSV   = os.path.join(WORKDIR, f'{BASENAME}.csv')
OUT_VTT   = os.path.join(WORKDIR, f'{BASENAME}.vtt')
OUT_JSON  = os.path.join(WORKDIR, f'{BASENAME}.jsonl')

In [None]:
# ============================================================
# 3) 디바이스 설정
# ============================================================
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Device:', device)
if device == 'cuda':
    print(torch.cuda.get_device_name(0))

In [None]:
# ============================================================
# 4) 오디오 로드 + 리샘플(16 kHz, mono)
# ============================================================
target_sr = 16000
wav, sr = librosa.load(INPUT_WAV, sr=target_sr, mono=True)
wav = np.ascontiguousarray(wav, dtype=np.float32)

In [None]:
# ============================================================
# 5) VAD(webrtcvad)로 음성 구간 검출
#    - 30ms 프레임, aggressiveness=2 (중간)
#    - 최소 구간 0.3s, 인접 병합 허용 0.15s
# ============================================================
vad = webrtcvad.Vad(2)
frame_dur_ms = 30
frame_len = int(target_sr * frame_dur_ms / 1000)

def float_to_pcm16(x):
    x = np.clip(x, -1.0, 1.0)
    return (x * 32767.0).astype(np.int16)

pcm16 = float_to_pcm16(wav).tobytes()

frames = []
for i in range(0, len(wav) - frame_len + 1, frame_len):
    start = i
    end   = i + frame_len
    chunk = pcm16[start*2:end*2]  # int16 => 2 bytes/sample
    is_speech = vad.is_speech(chunk, sample_rate=target_sr)
    frames.append((start, end, is_speech))

speech_regions = []
min_region_ms       = 300
merge_tolerance_ms  = 150
min_region = int(target_sr * (min_region_ms / 1000))
merge_tol  = int(target_sr * (merge_tolerance_ms / 1000))

active = None
for (start, end, is_speech) in frames:
    if is_speech and active is None:
        active = [start, end]
    elif is_speech and active is not None:
        active[1] = end
    elif (not is_speech) and active is not None:
        if active[1] - active[0] >= min_region:
            speech_regions.append(tuple(active))
        active = None
if active is not None and active[1] - active[0] >= min_region:
    speech_regions.append(tuple(active))

# 인접 병합
merged = []
for seg in speech_regions:
    if not merged:
        merged.append(list(seg))
    else:
        if seg[0] - merged[-1][1] <= merge_tol:
            merged[-1][1] = seg[1]
        else:
            merged.append(list(seg))
speech_regions = [tuple(x) for x in merged]

print(f'VAD speech regions: {len(speech_regions)} segments')

# ============================================================
# 6) 슬라이딩 윈도우 구성 후 임베딩 추출 (SpeechBrain ECAPA)
#    - 윈도우 1.5s, 홉 0.50s (세분화)
# ============================================================
win_sec = 1.5
hop_sec = 0.50   # 0.75 -> 0.50 로 조정
win = int(target_sr * win_sec)
hop = int(target_sr * hop_sec)

windows = []
for (s, e) in speech_regions:
    cur = s
    while cur + win <= e:
        windows.append((cur, cur + win))
        cur += hop
    # 꼬리 구간도 일정 길이 이상이면 추가 (>= 0.9s)
    if e - cur > win * 0.5:
        end = min(e, cur + win)
        if end - cur >= int(0.6 * win):
            windows.append((cur, end))

print(f'Windows to embed: {len(windows)}')

classifier = EncoderClassifier.from_hparams(
    source="speechbrain/spkrec-ecapa-voxceleb",
    run_opts={"device": device}
)

emb_list = []
if len(windows) > 0:
    for (s, e) in windows:
        seg = torch.tensor(wav[s:e]).unsqueeze(0)  # [1, time]
        seg = seg.to(device)
        with torch.no_grad():
            emb = classifier.encode_batch(seg)     # [1, 1, 192] 또는 [1, 192]
        emb = emb.squeeze().cpu().numpy()          # -> (192,)
        emb_list.append(emb)

embeddings = np.stack(emb_list, axis=0) if len(emb_list) > 0 else np.empty((0, 192))
print('Embeddings shape:', embeddings.shape)

# ============================================================
# (신규) 임베딩 전처리: 표준화 + PCA(whiten) + L2
# ============================================================
def preprocess_embeddings(E, pca_dim=64):
    if E.shape[0] == 0:
        return E, None
    pipe = Pipeline([
        ('scaler', StandardScaler(with_mean=True, with_std=True)),
        ('pca', PCA(n_components=min(pca_dim, E.shape[1]), whiten=True))
    ])
    Z = pipe.fit_transform(E)
    Z = Z / (np.linalg.norm(Z, axis=1, keepdims=True) + 1e-8)
    return Z, pipe

# ============================================================
# (신규) eigengap 기반 K 추정 (스펙트럴용)
# ============================================================
def estimate_k_eigengap(Z, max_k=12):
    if len(Z) < 2:
        return 1
    Z = np.nan_to_num(Z, nan=0.0, posinf=0.0, neginf=0.0)
    A = cosine_similarity(Z)
    A = np.nan_to_num(A, nan=0.0, posinf=0.0, neginf=0.0)
    np.fill_diagonal(A, 1.0)
    D = np.diag(A.sum(axis=1))
    with np.errstate(divide='ignore', invalid='ignore'):
        D_inv_sqrt = np.diag(1.0 / (np.sqrt(np.diag(D)) + 1e-8))
    L = np.eye(A.shape[0]) - D_inv_sqrt @ A @ D_inv_sqrt
    L = np.nan_to_num(L, nan=0.0, posinf=0.0, neginf=0.0)
    try:
        evals = np.sort(np.real(np.linalg.eigvals(L)))
    except np.linalg.LinAlgError:
        return 1
    ks = range(2, min(max_k, len(evals)-1) + 1)
    if not ks:
        return 1
    gaps = [(k, float(evals[k] - evals[k-1])) for k in ks]
    k_star = max(gaps, key=lambda x: x[1])[0]
    return int(k_star)


# ============================================================
# 7) 화자 수 추정(K) + 클러스터링 (강화)
#    옵션:
#      - USE_SPECTRAL: 스펙트럴 클러스터링 사용
#      - USE_DP_GMM : Dirichlet Process GMM으로 보조 K 추정
#      - FIXED_K    : K를 고정(정수)하면 그 값으로 바로 군집
# ============================================================
MAX_K        = 12
USE_SPECTRAL = True
USE_DP_GMM   = True
FIXED_K      = None   # 예) 4로 고정하려면 4

if len(embeddings) > 0:
    Z, preproc_pipe = preprocess_embeddings(embeddings, pca_dim=64)

    # --- 7-1) K 결정 ---
    if FIXED_K is not None:
        best_k = int(FIXED_K)
    else:
        k_eigen = estimate_k_eigengap(Z, max_k=MAX_K) if len(Z) >= 6 else 1
        best_k = max(1, k_eigen)

        if USE_DP_GMM and best_k < MAX_K and len(Z) >= 4:
            dpgmm = BayesianGaussianMixture(
                n_components=min(MAX_K, max(3, best_k + 2)),
                covariance_type='full',
                weight_concentration_prior_type='dirichlet_process',
                max_iter=1000, random_state=0
            ).fit(Z)
            weights = dpgmm.weights_
            active = (weights > (1.0 / (5 * len(weights))))  # 느슨한 임계
            k_dp = int(max(1, active.sum()))
            best_k = int(np.clip(round(0.5 * (best_k + k_dp)), 1, MAX_K))

    # --- 7-2) 클러스터링 ---
    if best_k == 1 or len(Z) < 2:
        labels = np.zeros(len(Z), dtype=int)
    else:
        if USE_SPECTRAL:
            A = cosine_similarity(Z)
            A = (A - A.min()) / (A.max() - A.min() + 1e-8)
            np.fill_diagonal(A, 1.0)
            spec = SpectralClustering(
                n_clusters=best_k,
                affinity='precomputed',
                assign_labels='kmeans',
                random_state=0
            )
            labels = spec.fit_predict(A)
        else:
            # 대안: 유클리드 공간 AHC (코사인 기반은 scikit 설정 제약 있음)
            ahc = AgglomerativeClustering(
                n_clusters=best_k, linkage='average'
            )
            labels = ahc.fit_predict(Z)

    # --- 7-3) 극소 군집 정리 (윈도우 수 1% 미만 또는 2개 미만) ---
    if len(Z) > 0:
        uniq, counts = np.unique(labels, return_counts=True)
        tiny = set(uniq[counts < max(2, int(0.01 * len(labels)))])
        if tiny:
            centroids = {c: Z[labels == c].mean(axis=0) for c in uniq}
            keep = [c for c in uniq if c not in tiny]
            for c in tiny:
                if not keep:
                    continue
                # 코사인 유사도 최대 centroid에 흡수
                target = max(
                    keep,
                    key=lambda kk: np.dot(centroids[c], centroids[kk]) /
                                   (np.linalg.norm(centroids[c]) * np.linalg.norm(centroids[kk]) + 1e-8)
                )
                labels[labels == c] = target

        # 라벨 재인덱스(0..K-1)
        _, inv = np.unique(labels, return_inverse=True)
        labels = inv
        best_k = len(np.unique(labels))
else:
    labels = np.array([])
    best_k = 0

print(f'Estimated #speakers (K): {best_k}')

# ============================================================
# 8) 윈도우 라벨을 타임라인으로 투영 + 후처리 병합
#    - gap 병합 임계 0.15s (더 세분)
#    - 최소 세그 길이 0.25s (짧은 턴 보존)
# ============================================================
segments = []
for (idx, (s, e)) in enumerate(windows):
    spk = f'spk{labels[idx]}' if len(labels) > 0 else 'spk0'
    segments.append([s, e, spk])

def merge_labeled_segments(segs, gap_samples=int(0.15 * target_sr)):
    if not segs:
        return []
    merged = [segs[0][:]]
    for s, e, spk in segs[1:]:
        ps, pe, pspk = merged[-1]
        if spk == pspk and s - pe <= gap_samples:
            merged[-1][1] = max(pe, e)
        else:
            merged.append([s, e, spk])
    return merged

segments = merge_labeled_segments(segments)

def clamp_to_regions(segs, regions):
    if not segs:
        return []
    out = []
    r_idx = 0
    for s, e, spk in segs:
        while r_idx < len(regions) and regions[r_idx][1] <= s:
            r_idx += 1
        rj = r_idx
        while rj < len(regions) and regions[rj][0] < e:
            cs = max(s, regions[rj][0])
            ce = min(e, regions[rj][1])
            if ce > cs:
                out.append([cs, ce, spk])
            rj += 1
    return merge_labeled_segments(sorted(out, key=lambda x: x[0]))

segments = clamp_to_regions(segments, speech_regions)

min_keep = int(0.25 * target_sr)   # 0.35 -> 0.25 로 완화
segments = [seg for seg in segments if seg[1] - seg[0] >= min_keep]

print(f'Final diarized segments: {len(segments)}')

# ============================================================
# 9) 결과 파일 저장 (RTTM / CSV / VTT / JSONL)
# ============================================================
def samples_to_time(s):
    return s / target_sr

file_id = os.path.splitext(os.path.basename(INPUT_WAV))[0]

# RTTM
with open(OUT_RTTM, 'w', encoding='utf-8') as f:
    for s, e, spk in segments:
        start = samples_to_time(s)
        dur   = samples_to_time(e - s)
        line = f"SPEAKER {file_id} 1 {start:.3f} {dur:.3f} <NA> <NA> {spk} <NA> <NA>\n"
        f.write(line)

# CSV
with open(OUT_CSV, 'w', newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
    writer.writerow(['start', 'end', 'duration', 'speaker'])
    for s, e, spk in segments:
        start = samples_to_time(s)
        end   = samples_to_time(e)
        writer.writerow([f'{start:.3f}', f'{end:.3f}', f'{end - start:.3f}', spk])

# VTT
def to_vtt_timestamp(t):
    h = int(t // 3600); t -= h*3600
    m = int(t // 60);   t -= m*60
    s = int(t);         ms = int(round((t - s) * 1000))
    return f"{h:02}:{m:02}:{s:02}.{ms:03}"

with open(OUT_VTT, 'w', encoding='utf-8') as f:
    f.write("WEBVTT\n\n")
    for idx, (s, e, spk) in enumerate(segments, 1):
        f.write(f"{idx}\n")
        f.write(f"{to_vtt_timestamp(samples_to_time(s))} --> {to_vtt_timestamp(samples_to_time(e))}\n")
        f.write(f"{spk}\n\n")

# JSONL
with open(OUT_JSON, 'w', encoding='utf-8') as f:
    for s, e, spk in segments:
        rec = {
            'start': round(samples_to_time(s), 3),
            'end'  : round(samples_to_time(e), 3),
            'speaker': spk
        }
        f.write(json.dumps(rec, ensure_ascii=False) + "\n")

# 요약 출력
dur_total = len(wav) / target_sr
speakers = sorted({spk for _,_,spk in segments})
print("\n================ SUMMARY ================")
print(f"Processed file : {INPUT_WAV}")
print(f"Audio duration : {dur_total:.1f} sec")
print(f"Speakers (est) : {len(speakers)} -> {speakers}")
print("Saved files:")
print(" -", OUT_RTTM)
print(" -", OUT_CSV)
print(" -", OUT_VTT)
print(" -", OUT_JSON)
print("=========================================")

#Whisper

In [None]:
# ============================================================
# 설정
# ============================================================
DIAR_JSONL = "/content/drive/MyDrive/AI_NLP_FINAL/SpeechBrain_result2.jsonl"  # ← SpeechBrain 결과(JSONL, 초단위)
AUDIO_PATH = "/content/drive/MyDrive/AI_NLP_FINAL/1105_오전회의.wav"
WORKDIR    = "/content"  # 출력 파일 저장 폴더

# 출력 파일 (CSV / JSONL만)
import os
BASENAME = "whisper_speaker"
OUT_CSV   = os.path.join(WORKDIR, f"{BASENAME}.csv")
OUT_JSONL = os.path.join(WORKDIR, f"{BASENAME}.jsonl")

In [None]:
# ============================================================
# 1) 다이어리제이션 JSONL 로드 (초 단위)
# ============================================================
import json

speaker_regions = []  # [(start_sec, end_sec, speaker)]
with open(DIAR_JSONL, "r", encoding="utf-8") as f:
    for line in f:
        rec = json.loads(line)
        s = float(rec["start"])
        e = float(rec["end"])
        spk = str(rec["speaker"])
        if e > s:
            speaker_regions.append((s, e, spk))

speaker_regions.sort(key=lambda x: x[0])
print(f"[INFO] Loaded diarization regions: {len(speaker_regions)}")

In [None]:
# ============================================================
# 2) Whisper 로드 + 타임스탬프 포함 ASR
# ============================================================
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline

device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
model_id = "openai/whisper-large-v3"

print(f"[INFO] Loading Whisper: {model_id} on {device} ({torch_dtype})")
model = AutoModelForSpeechSeq2Seq.from_pretrained(
    model_id,
    torch_dtype=torch_dtype,
    low_cpu_mem_usage=True,
    use_safetensors=True
).to(device)
processor = AutoProcessor.from_pretrained(model_id)

asr = pipeline(
    "automatic-speech-recognition",
    model=model,
    tokenizer=processor.tokenizer,
    feature_extractor=processor.feature_extractor,
    torch_dtype=torch_dtype,
    device=device,
    return_timestamps=True,
)

print(f"[INFO] Transcribing '{AUDIO_PATH}' ...")
whisper_out = asr(
    AUDIO_PATH,
    chunk_length_s=30,
    stride_length_s=(4, 2),
    generate_kwargs={"language": "ko", "task": "transcribe"},
)

chunks = whisper_out.get("chunks", []) or whisper_out.get("segments", [])
print(f"[INFO] Whisper chunks: {len(chunks)}")

In [None]:

# ============================================================
# 3) Whisper 세그먼트를 diarization 구간에 매핑
# ============================================================
def overlap_len(a0, a1, b0, b1):
    return max(0.0, min(a1, b1) - max(a0, b0))

assigned = []  # [(start, end, speaker, text)]
for ch in chunks:
    ts = ch.get("timestamp", None) or ch.get("timestamps", None)
    if isinstance(ts, dict):
        st, ed = ts.get("start"), ts.get("end")
    elif isinstance(ts, (list, tuple)) and len(ts) == 2:
        st, ed = ts
    else:
        st = ed = None

    if st is None or ed is None:
        continue
    s_sec, e_sec = float(st), float(ed)
    text = (ch.get("text") or "").strip()
    if not text or e_sec <= s_sec:
        continue

    best_spk, best_ov = None, 0.0
    for (ds, de, spk) in speaker_regions:
        if de < s_sec:
            continue
        if ds > e_sec:
            break
        ov = overlap_len(s_sec, e_sec, ds, de)
        if ov > best_ov:
            best_ov = ov
            best_spk = spk

    if best_spk is None or best_ov < 0.10:
        continue
    assigned.append([s_sec, e_sec, best_spk, text])

print(f"[INFO] Speaker-assigned chunks: {len(assigned)}")

# ============================================================
# 4) 같은 화자이면서 인접(≤0.5s) 세그먼트는 병합
# ============================================================
def merge_speaker_chunks(rows, gap_thresh=0.5):
    if not rows:
        return []
    rows = sorted(rows, key=lambda x: x[0])
    merged = [rows[0][:]]
    for s, e, spk, txt in rows[1:]:
        ps, pe, pspk, ptxt = merged[-1]
        if spk == pspk and (s - pe) <= gap_thresh:
            merged[-1][1] = max(pe, e)
            if ptxt and not ptxt.endswith(('。','.', '!', '?', '…')):
                ptxt += ' '
            merged[-1][3] = (ptxt + txt).strip()
        else:
            merged.append([s, e, spk, txt])
    return merged

turns = merge_speaker_chunks(assigned, gap_thresh=0.5)
print(f"[INFO] Merged speaker turns: {len(turns)}")

# ============================================================
# 5) 저장: CSV / JSONL
# ============================================================
import csv

# CSV
with open(OUT_CSV, "w", newline="", encoding="utf-8") as f:
    w = csv.writer(f)
    w.writerow(["start", "end", "duration", "speaker", "text"])
    for s, e, spk, txt in turns:
        w.writerow([f"{s:.3f}", f"{e:.3f}", f"{(e-s):.3f}", spk, txt])

# JSONL
with open(OUT_JSONL, "w", encoding="utf-8") as f:
    for s, e, spk, txt in turns:
        rec = {"start": round(s, 3), "end": round(e, 3), "speaker": spk, "text": txt}
        f.write(json.dumps(rec, ensure_ascii=False) + "\n")

print("\n================ SUMMARY ================")
print(f"Input audio          : {AUDIO_PATH}")
print(f"Diarization jsonl    : {DIAR_JSONL}")
print(f"Assigned speaker turns: {len(turns)}")
print("Saved files:")
print(" -", OUT_CSV)
print(" -", OUT_JSONL)
print("=========================================")