In [8]:
# ============================
# 0. 구글 드라이브 마운트
# ============================
from google.colab import drive
drive.mount('/content/drive')

# (선택) 드라이브 구조 확인
!ls -lah /content/drive
!ls -lah "/content/drive/MyDrive"


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
total 16K
dr-x------ 4 root root 4.0K Nov 24 02:14 .Encrypted
drwx------ 6 root root 4.0K Nov 24 02:14 MyDrive
dr-x------ 2 root root 4.0K Nov 24 02:14 .shortcut-targets-by-id
drwx------ 5 root root 4.0K Nov 24 02:14 .Trash-0
total 265K
-rw------- 1 root root  176 Nov 21 02:43 '11 21 헤세드 인적사항 사본.gsheet'
-rw------- 1 root root  176 Mar 29  2025 '5 11 업데이트 버스킹 Song List 추천.gsheet'
drwx------ 2 root root 4.0K May  7  2025 'Colab Notebooks'
drwx------ 2 root root 4.0K Nov 24 02:51  data_MP3
drwx------ 2 root root 4.0K Nov 13 06:40  data_MP4
drwx------ 2 root root 4.0K Nov 13 07:30  output
lrw------- 1 root root    0 Nov 19 02:15  멀티캠퍼스_LLMs_Llama_index -> /content/drive/.shortcut-targets-by-id/1L_SL2y6c8AjDF5Plju0IaQnV4pi-pBmJ/멀티캠퍼스_LLMs_Llama_index
-rw------- 1 root root 247K Nov 14 08:04  퍼실AI_baseline.ipynb
-rw------- 1 root 

In [9]:
# ============================
# 1. 패키지 설치
# ============================
# Whisper는 안 쓰니까 제거, 대신 requests만 추가
!pip -q install moviepy requests
!pip -q install transformers accelerate librosa soundfile torchaudio

In [10]:
# ============================
# 2. import
# ============================
import os
from pathlib import Path
import mimetypes
import json
import time
import getpass

import numpy as np
import torch
import librosa
import soundfile as sf
import requests

from transformers import (
    pipeline,
    AutoConfig,
    AutoTokenizer,
    AutoModelForSequenceClassification,
)

from collections import Counter
import csv
import math

In [11]:
# ============================
# 2-1. 리턴제로 STT 클라이언트 정의
# ============================
class RTZROpenAPIClient:
    """
    리턴제로 파일 STT용 최소 클라이언트
    - /v1/authenticate : JWT 발급
    - /v1/transcribe   : 파일 STT 요청
    - /v1/transcribe/{id} : 결과 폴링
    """
    def __init__(self, client_id: str, client_secret: str,
                 base_url: str = "https://openapi.vito.ai"):
        self.base_url = base_url.rstrip("/")
        self.client_id = client_id
        self.client_secret = client_secret

        if not self.client_id or not self.client_secret:
            raise ValueError("RTZR CLIENT_ID / CLIENT_SECRET가 비었습니다.")

        self._sess = requests.Session()
        self._token = None

    @property
    def token(self) -> str:
        """현재 JWT 토큰을 반환, 없거나 오래되면 /v1/authenticate로 갱신"""
        # expire_at 기준으로 30분 전이면 다시 받기 (공식 샘플과 유사 로직)
        if self._token is None or self._token.get("expire_at", 0) < time.time() - 1800:
            resp = self._sess.post(
                f"{self.base_url}/v1/authenticate",
                data={
                    "client_id": self.client_id,
                    "client_secret": self.client_secret,
                },
            )
            resp.raise_for_status()
            self._token = resp.json()

        access = self._token.get("access_token")
        if not access:
            raise RuntimeError("authenticate 응답에 access_token이 없습니다.")
        return access

    def _auth_headers(self):
        return {"Authorization": f"Bearer {self.token}"}

    def transcribe_file(self, file_path: str, config: dict) -> dict:
        """ /v1/transcribe 로 파일 STT 요청 """
        url = f"{self.base_url}/v1/transcribe"
        with open(file_path, "rb") as f:
            files = {"file": (os.path.basename(file_path), f)}
            data = {"config": json.dumps(config)}
            resp = self._sess.post(url, headers=self._auth_headers(),
                                   files=files, data=data)
            resp.raise_for_status()
            return resp.json()  # {"id": "..."} 형태

    def get_transcription(self, transcribe_id: str) -> dict:
        url = f"{self.base_url}/v1/transcribe/{transcribe_id}"
        resp = self._sess.get(url, headers=self._auth_headers())
        resp.raise_for_status()
        return resp.json()

    def wait_for_result(self, transcribe_id: str,
                        poll_interval_sec: int = 5,
                        timeout_sec: int = 3600) -> dict:
        """폴링하면서 completed/failed 될 때까지 기다렸다가 결과 리턴"""
        deadline = time.time() + timeout_sec
        while True:
            if time.time() > deadline:
                raise TimeoutError("STT 결과 대기 중 timeout")

            result = self.get_transcription(transcribe_id)
            status = result.get("status")
            if status in ("completed", "failed"):
                return result

            time.sleep(poll_interval_sec)

In [12]:
# ============================
# 3. 입력 미디어 설정 (비디오 or 오디오)
# ============================
media_path = "/content/drive/MyDrive/data_MP3/sample3.mp3"
desired_audio_path = "/content/drive/MyDrive/data_MP3/extracted_audio.mp3"  # 비디오일 때만 사용

def is_audio(path: str) -> bool:
    mt, _ = mimetypes.guess_type(path)
    return (mt or "").startswith("audio")

if is_audio(media_path):
    # 이미 오디오 파일이면 추출 생략
    audio_path = media_path
    print(f"[SKIP extract] Detected audio file. Use as-is: {audio_path}")
else:
    # 비디오면 moviepy로 오디오 추출
    from moviepy.editor import VideoFileClip
    Path(Path(desired_audio_path).parent).mkdir(parents=True, exist_ok=True)
    clip = VideoFileClip(media_path)
    if clip.audio is None:
        raise RuntimeError("이 비디오에는 오디오 트랙이 없습니다.")
    clip.audio.write_audiofile(desired_audio_path)  # 확장자에 따라 자동 인코딩
    clip.close()
    audio_path = desired_audio_path
    print(f"[EXTRACT] Audio saved to: {audio_path}")

[SKIP extract] Detected audio file. Use as-is: /content/drive/MyDrive/data_MP3/sample3.mp3


In [13]:
# ============================
# 4. 리턴제로 기반 텍스트 인식
# ============================
clean_audio_path = "/content/drive/MyDrive/data_MP3/cleaned_sample.wav"

# 4-1) 오디오 전처리: 16kHz 모노 + 앞/뒤 무음 제거 + 노멀라이즈
y, sr = librosa.load(audio_path, sr=16000, mono=True)
raw_duration = len(y) / sr
print(f"[DEBUG] 원본 오디오 duration = {raw_duration:.2f} s")

# 앞/뒤 무음만 제거 (중간은 건드리지 않음)
y, _ = librosa.effects.trim(y, top_db=30)

peak = np.max(np.abs(y))
if peak > 0:
    y = y / peak

Path(clean_audio_path).parent.mkdir(parents=True, exist_ok=True)
sf.write(clean_audio_path, y, 16000)
audio_for_asr = clean_audio_path

clean_duration = librosa.get_duration(path=audio_for_asr)
print(f"[DEBUG] 정제된 오디오 duration = {clean_duration:.2f} s")

# 4-2) RTZR API 키 입력 (환경변수 우선, 없으면 직접 입력)
if "RTZR_CLIENT_ID" in os.environ and "RTZR_CLIENT_SECRET" in os.environ:
    RTZR_CLIENT_ID = os.environ["RTZR_CLIENT_ID"]
    RTZR_CLIENT_SECRET = os.environ["RTZR_CLIENT_SECRET"]
    print("[INFO] 환경변수에서 RTZR CLIENT_ID/SECRET을 읽었습니다.")
else:
    RTZR_CLIENT_ID = input("RTZR CLIENT_ID를 입력하세요: ").strip()
    RTZR_CLIENT_SECRET = getpass.getpass(
        "RTZR CLIENT_SECRET를 입력하세요 (입력 시 화면에 표시되지 않습니다): "
    ).strip()

client = RTZROpenAPIClient(RTZR_CLIENT_ID, RTZR_CLIENT_SECRET)

# 4-3) STT 설정 (화자 분리 ON)
#   - Somm ers 모델 + GENERAL 도메인 기준
SPEAKER_COUNT = 0  # 0이면 화자 수 자동 추정

config = {
    "model_name": "sommers",          # 또는 "whisper"
    "domain": "GENERAL",              # 회의/콜이면 "CALL" 고려
    "use_diarization": True,          # 화자 분리
    "diarization": {"spk_count": SPEAKER_COUNT},
    "use_paragraph_splitter": True,
    "paragraph_splitter": {"max": 80},
    "use_itn": False,
    "use_disfluency_filter": False,
    "use_profanity_filter": False,
    # 필요하면 옵션 추가 가능 (keywords, use_word_timestamp 등)
}

print("[DEBUG] RTZR STT config:")
print(json.dumps(config, ensure_ascii=False, indent=2))

# 4-4) STT 요청 + 결과 대기
submit_resp = client.transcribe_file(audio_for_asr, config)
transcribe_id = submit_resp.get("id")
print(f"[INFO] STT 작업 ID: {transcribe_id}")

if not transcribe_id:
    print("[ERROR] transcribe_id가 없습니다. 응답 전체:")
    print(json.dumps(submit_resp, ensure_ascii=False, indent=2))
    raise SystemExit

print("[INFO] 결과 대기 중 (5초 간격 폴링)...")
rtzr_result = client.wait_for_result(transcribe_id, poll_interval_sec=5, timeout_sec=3600)

print(f"[DEBUG] RTZR status = {rtzr_result.get('status')}")

# 4-5) RTZR 결과 → Whisper 스타일 segments_to_use로 변환
utterances = rtzr_result.get("results", {}).get("utterances", [])
print(f"[DEBUG] 총 utterance 개수 = {len(utterances)}")

segments_to_use = []
for utt in utterances:
    start_ms = int(utt.get("start_at", 0))
    dur_ms = int(utt.get("duration", 0))
    start_s = start_ms / 1000.0
    end_s = (start_ms + dur_ms) / 1000.0
    text = utt.get("msg", "").strip()
    spk = utt.get("spk", 0)

    segments_to_use.append({
        "start": start_s,
        "end": end_s,
        "text": text,
        "speaker": spk,  # 나중에 화자별 분석용으로 사용 가능
    })

if segments_to_use:
    last_end = segments_to_use[-1]["end"]
    print(f"[DEBUG] 마지막 세그먼트 end = {last_end:.2f} s")

    print("\n[DEBUG] 처음 5개 세그먼트:")
    for seg in segments_to_use[:5]:
        print(
            f"  [{seg['start']:.2f}-{seg['end']:.2f}] "
            f"Speaker={seg['speaker']} :: {seg['text']}"
        )

    print("\n[DEBUG] 마지막 5개 세그먼트:")
    for seg in segments_to_use[-5:]:
        print(
            f"  [{seg['start']:.2f}-{seg['end']:.2f}] "
            f"Speaker={seg['speaker']} :: {seg['text']}"
        )

# 콘솔에 구간별 자막 형태로 출력
for seg in segments_to_use:
    print(f"[{seg['start']:.2f}–{seg['end']:.2f}] (spk={seg['speaker']}) {seg['text']}")

# 전체 텍스트 합치기
full_text = " ".join(seg["text"] for seg in segments_to_use)
print("Full text:\n", full_text)

# 텍스트 파일로 저장
output_txt_path = '/content/drive/MyDrive/output/sample.txt'
Path(output_txt_path).parent.mkdir(parents=True, exist_ok=True)
with open(output_txt_path, 'w', encoding="utf-8") as f:
    for seg in segments_to_use:
        f.write(f"[{seg['start']:.2f}–{seg['end']:.2f}] (spk={seg['speaker']}) {seg['text']}\n")

print(f"\nTranscription saved to {output_txt_path}")

[DEBUG] 원본 오디오 duration = 3951.06 s
[DEBUG] 정제된 오디오 duration = 3951.06 s
RTZR CLIENT_ID를 입력하세요: wtVlucULYnE0c0ATqMlq
RTZR CLIENT_SECRET를 입력하세요 (입력 시 화면에 표시되지 않습니다): ··········
[DEBUG] RTZR STT config:
{
  "model_name": "sommers",
  "domain": "GENERAL",
  "use_diarization": true,
  "diarization": {
    "spk_count": 0
  },
  "use_paragraph_splitter": true,
  "paragraph_splitter": {
    "max": 80
  },
  "use_itn": false,
  "use_disfluency_filter": false,
  "use_profanity_filter": false
}
[INFO] STT 작업 ID: 2V1MdYJ1R1-zRV6h_HkaEg
[INFO] 결과 대기 중 (5초 간격 폴링)...
[DEBUG] RTZR status = completed
[DEBUG] 총 utterance 개수 = 441
[DEBUG] 마지막 세그먼트 end = 3950.15 s

[DEBUG] 처음 5개 세그먼트:
  [0.53-11.29] Speaker=0 :: 국내 의료계가 또다시 슬렁이고 있습니다. 더블유 티 오 를 중심으로 한 커센 개방의 파고가 그동안 공공제로 분류되어 왔던 의료계에도 숨가쁘게 밀려오고 있기 때문인데요.
  [12.83-21.44] Speaker=0 :: 국민의 생명과 직접 연계된 이 문제. 정부는 물론 당사자인 의료계도 너무 안이하게 대처하고 있는 게 아닌가 하는 비판이 일고 있습니다.
  [22.21-32.85] Speaker=0 :: 그래서 오늘 난상 토론에서는 의료서비스의 기본을 다시 한번 고민해 보고 개방협상에 임하는 우리의 대응방안을 심도 있게 고찰해

In [14]:
# ============================
# 5. 감정(SER) 분석: HuBERT 기반
# ============================
# RTZR에서 사용한 정제 오디오(audio_for_asr)를 그대로 사용
y_ser, sr_ser = librosa.load(audio_for_asr, sr=16000, mono=True)
duration_ser = len(y_ser) / 16000.0
print(f"Loaded for SER: {audio_for_asr}, sr=16000, duration={duration_ser:.2f}s")

model_id = "superb/hubert-large-superb-er"
config_ser = AutoConfig.from_pretrained(model_id)
print("SER labels:", config_ser.id2label)

ser = pipeline(task="audio-classification", model=model_id, top_k=None)

def slice_audio(y, sr, start_s, end_s):
    i0 = int(max(0, start_s) * sr)
    i1 = int(min(len(y) / sr, end_s) * sr)
    return y[i0:i1]

def predict_emotion(wave_1d, sr=16000):
    if len(wave_1d) < int(sr * 0.3):  # 0.3초 미만은 스킵
        return {"label": "unknown", "score": 0.0, "probs": {}}
    out = ser({"array": wave_1d, "sampling_rate": sr})
    top = max(out, key=lambda d: d["score"])
    probs = {d["label"]: float(d["score"]) for d in out}
    return {"label": top["label"], "score": float(top["score"]), "probs": probs}

segment_emotions = []
for seg in segments_to_use:
    start, end, text = seg["start"], seg["end"], seg["text"]
    w = slice_audio(y_ser, 16000, start, end)
    pred = predict_emotion(w, 16000)
    segment_emotions.append({
        "start": start,
        "end": end,
        "text": text,
        "speaker": seg.get("speaker", None),
        "emotion": pred["label"],
        "confidence": pred["score"],
        "probs": pred["probs"],
    })

print("\n[DEBUG] SER 예시 5개:")
for s in segment_emotions[:5]:
    print(f"[{s['start']:.2f}-{s['end']:.2f}] spk={s['speaker']} "
          f"{s['emotion']} ({s['confidence']:.2f}) :: {s['text']}")
print(f"... 총 {len(segment_emotions)}개 세그먼트")

Loaded for SER: /content/drive/MyDrive/data_MP3/cleaned_sample.wav, sr=16000, duration=3951.06s


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json: 0.00B [00:00, ?B/s]

SER labels: {0: 'neu', 1: 'hap', 2: 'ang', 3: 'sad'}


pytorch_model.bin:   0%|          | 0.00/1.26G [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.26G [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/212 [00:00<?, ?B/s]

Device set to use cuda:0
You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset



[DEBUG] SER 예시 5개:
[0.53-11.29] spk=0 hap (0.74) :: 국내 의료계가 또다시 슬렁이고 있습니다. 더블유 티 오 를 중심으로 한 커센 개방의 파고가 그동안 공공제로 분류되어 왔던 의료계에도 숨가쁘게 밀려오고 있기 때문인데요.
[12.83-21.44] spk=0 hap (0.48) :: 국민의 생명과 직접 연계된 이 문제. 정부는 물론 당사자인 의료계도 너무 안이하게 대처하고 있는 게 아닌가 하는 비판이 일고 있습니다.
[22.21-32.85] spk=0 hap (0.56) :: 그래서 오늘 난상 토론에서는 의료서비스의 기본을 다시 한번 고민해 보고 개방협상에 임하는 우리의 대응방안을 심도 있게 고찰해 보도록 하겠습니다. 난상토론 잠시 후에 뵙겠습니다.
[33.43-44.49] spk=0 neu (0.67) :: 먼저 이렇게 자리해 주신 토론자 여러분들께 감사를 드리면서 어 치과의사협회와 한의사협회에서는 협회 내부의 입장이 정리된 관계로 오늘 토론회 이렇게 참석을 해 주셨습니다만 아.
[44.49-52.11] spk=0 hap (0.65) :: 또 그 이외에 국내 의료서비스 분야에는 대안의사협회라든가 대한병원협회, 그리고 대한 간호사협회 등 관련 협회들이 많지 않습니까?
... 총 441개 세그먼트


In [15]:
# ============================
# 6. 텍스트 감성 분석 (XLM-R sentiment)
# ============================
txt_model_id = "cardiffnlp/twitter-xlm-roberta-base-sentiment"
tok = AutoTokenizer.from_pretrained(txt_model_id)
txt_model = AutoModelForSequenceClassification.from_pretrained(txt_model_id)
txt_model.eval()

id2label_txt = {0: "negative", 1: "neutral", 2: "positive"}

def text_sentiment(s):
    if not s.strip():
        return {"label": "neutral", "score": 0.0}
    inputs = tok(s, return_tensors="pt", truncation=True, max_length=256)
    if torch.cuda.is_available():
        txt_model.to("cuda")
        inputs = {k: v.to("cuda") for k, v in inputs.items()}
    with torch.no_grad():
        logits = txt_model(**inputs).logits
    probs = torch.softmax(logits, dim=-1).squeeze().tolist()
    idx = int(np.argmax(probs))
    return {"label": id2label_txt[idx], "score": float(probs[idx])}

for it in segment_emotions:
    ts = text_sentiment(it["text"])
    it["text_sentiment"] = ts["label"]
    it["text_sentiment_conf"] = ts["score"]

config.json:   0%|          | 0.00/841 [00:00<?, ?B/s]

sentencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/1.11G [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.11G [00:00<?, ?B/s]

In [16]:
# ============================
# 7. 감정 라벨 리매핑 (통일 태그)
# ============================
label_map = {
    "happiness": "happy",
    "sadness": "sad",
    "anger": "angry",
    "disgust": "disgust",
    "fear": "fear",
    "surprise": "surprise",
    "neutral": "neutral",
    # 필요시 추가
}

def remap(label):
    if label is None:
        return "unknown"
    l = label.lower()
    return label_map.get(l, l)

for it in segment_emotions:
    it["emotion"] = remap(it["emotion"])

In [17]:
# ============================
# 8. 전체 요약 + CSV 저장
# ============================
def summarize(items, key="emotion"):
    seq = [it[key] for it in items if it.get(key) and it[key] != "unknown"]
    if not seq:
        return "No clear emotion detected."
    counts = Counter(seq)
    top, cnt = counts.most_common(1)[0]
    preview = " → ".join(seq[:5] + (["..."] if len(seq) > 5 else []))
    return f"Dominant({key}): {top} (count={cnt}). Early trend: {preview}"

print("\n", summarize(segment_emotions, key="emotion"))
print(summarize(segment_emotions, key="text_sentiment"))

csv_path = "/content/drive/MyDrive/output/ser_segments.csv"
Path(csv_path).parent.mkdir(parents=True, exist_ok=True)
with open(csv_path, "w", newline="", encoding="utf-8") as f:
    w = csv.writer(f)
    w.writerow(["start","end","speaker","emotion","confidence",
                "text_sentiment","text_sent_conf","text"])
    for it in segment_emotions:
        w.writerow([
            f"{it['start']:.2f}",
            f"{it['end']:.2f}",
            it.get("speaker", ""),
            it["emotion"],
            f"{it['confidence']:.4f}",
            it.get("text_sentiment",""),
            f"{it.get('text_sentiment_conf',0.0):.4f}",
            it["text"]
        ])
print("Saved:", csv_path)


 Dominant(emotion): neu (count=343). Early trend: hap → hap → hap → neu → hap → ...
Dominant(text_sentiment): neutral (count=251). Early trend: negative → negative → neutral → positive → neutral → ...
Saved: /content/drive/MyDrive/output/ser_segments.csv
