In [6]:
import asyncio
import json
import logging
import os
import tempfile
import time

import websockets
from pydub import AudioSegment
from requests import Session

API_BASE = "https://openapi.vito.ai"
CLIENT_ID = os.environ.get("RTZR_CLIENT_ID")
CLIENT_SECRET = os.environ.get("RTZR_CLIENT_SECRET")

if not CLIENT_ID or not CLIENT_SECRET:
    raise ValueError("RTZR_CLIENT_ID and RTZR_CLIENT_SECRET must be set")

# ---- 설정 ----
SAMPLE_RATE = 8000          # RTZR 예제에 맞춘 샘플레이트
BYTES_PER_SAMPLE = 2        # 16bit PCM = 2 bytes
CHUNK_SIZE = 2048           # 한 번에 보낼 오디오 바이트 크기 (작을수록 더 자주 전송)
# ----------------


# 파일을 "실시간처럼" 스트리밍하는 클래스
class FileStreamer:
    def __init__(self, file: str):
        file_name = os.path.basename(file)
        i = file_name.rindex(".")
        audio_file_8k_path = os.path.join(
            tempfile.gettempdir(),
            file_name[:i]
        ) + "_" + str(SAMPLE_RATE) + ".wav"

        self.filepath = audio_file_8k_path

        # 원본 파일을 8kHz, mono wav로 변환
        audio = AudioSegment.from_file(file=file, format=file[i + 1 :])
        audio = audio.set_frame_rate(SAMPLE_RATE)
        audio = audio.set_channels(1)
        audio.export(audio_file_8k_path, format="wav")

        self.file = open(audio_file_8k_path, "rb")

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.file.close()
        os.remove(self.filepath)

    async def read(self, size: int) -> bytes:
        """
        예제 코드에는 '실시간처럼' 보이게 하려고 sleep 이 있었는데,
        그게 속도를 엄청 느리게 만들어서 제거함.
        여기서는 그냥 파일에서 size 만큼 바로 읽어서 반환.
        """
        if size > CHUNK_SIZE:
            size = CHUNK_SIZE
        content = self.file.read(size)
        return content


class RTZROpenAPIClient:
    def __init__(self, client_id: str, client_secret: str):
        super().__init__()
        self._logger = logging.getLogger(__name__)
        self.client_id = client_id
        self.client_secret = client_secret
        self._sess = Session()
        self._token = None

    @property
    def token(self) -> str:
        """
        액세스 토큰을 캐싱해서 쓰는 부분.
        expire_at 지나면 다시 발급.
        """
        if self._token is None or self._token["expire_at"] < time.time():
            resp = self._sess.post(
                API_BASE + "/v1/authenticate",
                data={"client_id": self.client_id, "client_secret": self.client_secret},
            )
            resp.raise_for_status()
            self._token = resp.json()
        return self._token["access_token"]

    async def streaming_transcribe(self, filename: str, config: dict | None = None):
        if config is None:
            config = dict(
                sample_rate=str(SAMPLE_RATE),
                encoding="LINEAR16",
                use_itn="true",
                use_disfluency_filter="false",
                use_profanity_filter="false",
            )

        # RTZR 스트리밍 엔드포인트
        STREAMING_ENDPOINT = "wss://{}/v1/transcribe:streaming?{}".format(
            API_BASE.split("://")[1],
            "&".join(map("=".join, config.items())),
        )

        # websockets 버전 호환되는 extra_headers 형태 (리스트[튜플])
        conn_kwargs = dict(
            extra_headers=[("Authorization", "bearer " + self.token)]
        )

        async def streamer(websocket):
            # 파일을 조금씩 읽어서 websocket으로 전송
            with FileStreamer(filename) as f:
                while True:
                    buff = await f.read(CHUNK_SIZE)
                    if not buff:
                        break
                    await websocket.send(buff)
                # 전송 종료 표시
                await websocket.send("EOS")

        async def transcriber(websocket):
            async for raw in websocket:
                msg = json.loads(raw)

                # 중간 결과는 무시
                if not msg.get("final"):
                    continue

                text = msg["alternatives"][0]["text"]
                print("final:", text)
                break  # 최종 결과 받은 뒤 루프 종료

        # 웹소켓 연결 후, 송신(streamer) + 수신(transcriber) 동시에 수행
        async with websockets.connect(STREAMING_ENDPOINT, **conn_kwargs) as websocket:
            await asyncio.gather(
                streamer(websocket),
                transcriber(websocket),
            )


In [7]:
# parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
# parser.add_argument("stream", help="File to stream to the API")
# args = parser.parse_args()
# import nest_asyncio
# nest_asyncio.apply()
client = RTZROpenAPIClient(CLIENT_ID, CLIENT_SECRET)
filename = "./record.wav"
await client.streaming_transcribe(filename)
# asyncio.run(client.streaming_transcribe(filename))
# asyncio.run(client.streaming_transcribe(args.stream))

final: 아이템 사용해 줘
