# Fish-Speech-1.5 Hugging Face Spaces APIを使用した音声変換アプリ

## 準備
以下のセルを実行して、必要なライブラリをインストールします。初回のみ実行してください。

In [None]:
# 必要なライブラリのインストール（最初に一度だけ実行）
!pip install torch torchaudio numpy scipy ipywidgets
!pip install requests
!pip install soundfile

## アプリの実装（Hugging Face Spaces API版）
Fish-Speechの公式Spaces APIを使用した音声合成

In [None]:
import requests
import numpy as np
import torch
import torchaudio
import os
import io
import tempfile
import time
import json
from IPython.display import Audio, display, HTML
import ipywidgets as widgets
import scipy.io.wavfile as wavfile
import soundfile as sf

# Fish Speech APIを呼び出す関数
def text_to_speech_api(text, language="ja", speaker=None):
    """
    Fish Speech Spaces APIを使用して、テキストを音声に変換する
    
    Parameters:
    -----------
    text : str
        変換するテキスト
    language : str
        言語コード（'ja', 'en', 'zh'など）
    speaker : str or None
        話者ID（Noneの場合はデフォルト）
        
    Returns:
    --------
    str
        生成された音声ファイルのパス
    """
    # APIエンドポイント
    API_URL = "https://fishaudio-fish-speech-1.hf.space/api/predict"
    
    # 一時ファイルを作成
    temp_dir = tempfile.gettempdir()
    output_path = os.path.join(temp_dir, 'output_api.wav')
    
    try:
        # APIリクエストのデータを準備
        if not speaker:
            # 言語に基づいてスピーカーを選択
            if language == "ja":
                speaker = "ja_speaker_1"
            elif language == "en":
                speaker = "en_speaker_1"
            elif language == "zh":
                speaker = "zh_speaker_1"
            else:
                speaker = "default"
        
        payload = {
            "data": [
                text,  # テキスト
                "auto",  # 言語自動検出（または直接言語コードを指定）
                speaker,  # 話者ID
                0.6,  # 温度
                1.2,  # トップK
                0.7,  # トップP
                False  # 声色推論（不要ならFalse）
            ]
        }
        
        print(f"APIリクエスト送信中...")
        
        # POSTリクエストを送信
        response = requests.post(API_URL, json=payload)
        
        # レスポンスを確認
        if response.status_code == 200:
            response_data = response.json()
            
            # 音声データのURLを取得
            audio_url = response_data.get('data', [])[1]
            
            if audio_url and audio_url.startswith('http'):
                # 音声データをダウンロード
                audio_response = requests.get(audio_url)
                
                if audio_response.status_code == 200:
                    # 音声データを一時ファイルに保存
                    with open(output_path, 'wb') as f:
                        f.write(audio_response.content)
                    
                    print(f"音声生成完了: {output_path}")
                    return output_path
                else:
                    raise Exception(f"音声データのダウンロードに失敗しました: {audio_response.status_code}")
            else:
                raise Exception(f"音声URLが見つかりません: {response_data}")
        else:
            raise Exception(f"APIリクエストに失敗しました: {response.status_code} - {response.text}")
    
    except Exception as e:
        print(f"APIエラー: {e}")
        
        # エラーが発生した場合はデモ音声を生成
        print("デモ音声を生成します...")
        
        # デモ音声（正弦波）
        sample_rate = 24000
        duration = min(len(text) * 0.1, 10)  # テキストの長さに比例（最大10秒）
        t = np.linspace(0, duration, int(duration * sample_rate), endpoint=False)
        
        if language == "ja":
            freq = 440  # 日本語の場合
        else:
            freq = 392  # その他の言語
            
        waveform = 0.5 * np.sin(2 * np.pi * freq * t) * np.exp(-t/duration)
        wavfile.write(output_path, sample_rate, waveform.astype(np.float32))
        
        print(f"デモ音声を生成しました: {output_path}")
        return output_path

# フォールバック用の速度調整関数
def adjust_audio_speed(input_file, output_file, speed=1.0):
    """
    音声ファイルの再生速度を調整する
    
    Parameters:
    -----------
    input_file : str
        入力音声ファイルのパス
    output_file : str
        出力音声ファイルのパス
    speed : float
        速度調整率（1.0が標準）
        
    Returns:
    --------
    str
        出力音声ファイルのパス
    """
    try:
        # 音声ファイルの読み込み
        data, samplerate = sf.read(input_file)
        
        if speed != 1.0:
            # 速度調整のためのリサンプリング
            # リサンプリングの比率を計算
            resample_ratio = 1.0 / speed
            
            # PyTorchのリサンプラーを使用
            audio_tensor = torch.tensor(data).unsqueeze(0)
            
            if len(audio_tensor.shape) == 3:  # (1, channels, samples)
                audio_tensor = audio_tensor.squeeze(0)
            
            # モノラルかステレオかを確認
            if len(audio_tensor.shape) == 1:  # モノラル
                audio_tensor = audio_tensor.unsqueeze(0)
            
            # リサンプラーの作成
            resampler = torchaudio.transforms.Resample(
                orig_freq=samplerate,
                new_freq=int(samplerate * resample_ratio)
            )
            
            # リサンプリング
            resampled_audio = resampler(audio_tensor)
            
            # NumPy配列に変換
            if resampled_audio.shape[0] == 1:  # モノラル
                output_data = resampled_audio.squeeze(0).numpy()
            else:  # ステレオ
                output_data = resampled_audio.numpy().T
        else:
            # 速度調整なし
            output_data = data
        
        # 音声ファイルの保存
        sf.write(output_file, output_data, samplerate)
        
        return output_file
    except Exception as e:
        print(f"音声速度調整エラー: {e}")
        return input_file

# ファイルからテキストを読み込む関数
def read_text_file(uploaded_file):
    """
    アップロードされたファイルからテキストを読み込む
    
    Parameters:
    -----------
    uploaded_file : UploadedFile
        アップロードされたテキストファイル
        
    Returns:
    --------
    str
        ファイルの内容
    """
    content = uploaded_file.read()
    # エンコーディングを自動検出（日本語ファイルの場合、utf-8やshift-jisなど様々な可能性がある）
    encodings = ['utf-8', 'shift-jis', 'euc-jp', 'iso-2022-jp']
    
    for encoding in encodings:
        try:
            text = content.decode(encoding)
            return text
        except UnicodeDecodeError:
            continue
    
    # どのエンコーディングでも失敗した場合
    raise ValueError("ファイルのエンコーディングを検出できませんでした。")

# ダウンロードリンクを作成する関数
def create_download_link(audio_data, filename, text):
    """
    音声データをダウンロードするためのHTMLリンクを作成
    
    Parameters:
    -----------
    audio_data : bytes
        ダウンロードするオーディオデータ
    filename : str
        ダウンロード時のファイル名
    text : str
        リンクに表示するテキスト
        
    Returns:
    --------
    str
        HTMLリンク
    """
    b64 = io.BytesIO(audio_data)
    payload = b64.getvalue()
    import base64
    b64_str = base64.b64encode(payload).decode()
    return f'<a href="data:audio/wav;base64,{b64_str}" download="{filename}">{text}</a>'

# 話者IDのオプション
speaker_options = [
    ("日本語標準", "ja"),
    ("英語標準", "en"),
    ("中国語標準", "zh")
]

# ファイルアップロード時の処理
def on_upload_change(change):
    """
    ファイルがアップロードされたときの処理
    """
    if not change.new:
        return
    
    # 処理状況を表示
    status_output.value = "ファイルを処理中..."
    
    try:
        # アップロードされたファイルからテキストを読み込む
        uploaded_file = list(change.new.values())[0]
        text = read_text_file(uploaded_file)
        
        # テキストプレビューを表示
        text_preview.value = text if len(text) <= 1000 else text[:1000] + "..."
        
        # テキストを音声に変換
        speed = float(speed_slider.value)
        language = speaker_dropdown.value
        
        # APIを使用して音声生成
        audio_path = text_to_speech_api(text, language)
        
        # 速度調整が必要な場合
        if speed != 1.0:
            temp_dir = tempfile.gettempdir()
            speed_adjusted_path = os.path.join(temp_dir, f'output_speed_{speed}.wav')
            audio_path = adjust_audio_speed(audio_path, speed_adjusted_path, speed)
        
        # 音声を表示
        audio_output.clear_output()
        with audio_output:
            display(Audio(audio_path))
        
        # ダウンロードリンクを作成
        with open(audio_path, 'rb') as f:
            audio_data = f.read()
        
        download_link.value = create_download_link(audio_data, 'output.wav', '音声ファイルをダウンロード')
        
        status_output.value = "処理完了！"
    
    except Exception as e:
        status_output.value = f"エラーが発生しました: {str(e)}"

# 速度変更時の処理
def on_speed_change(change):
    """
    速度スライダーが変更されたときの処理
    """
    speed_value.value = f"発話速度: {change.new}x"

# UIの作成
upload_button = widgets.FileUpload(
    accept='.txt',
    multiple=False,
    description='テキストファイルを選択'
)

speaker_dropdown = widgets.Dropdown(
    options=speaker_options,
    value="ja",
    description='言語:',
)

speed_slider = widgets.FloatSlider(
    value=1.0,
    min=0.5,
    max=2.0,
    step=0.1,
    description='速度:',
    continuous_update=False
)

speed_value = widgets.HTML(value="発話速度: 1.0x")
status_output = widgets.HTML(value="ファイルをアップロードしてください。")
text_preview = widgets.Textarea(
    description='テキスト:',
    placeholder='アップロードされたテキストがここに表示されます',
    disabled=True,
    layout=widgets.Layout(width='100%', height='200px')
)

audio_output = widgets.Output()
download_link = widgets.HTML()

# イベントハンドラの登録
upload_button.observe(on_upload_change, names='value')
speed_slider.observe(on_speed_change, names='value')

# ボタンクリック時の処理
def on_convert_button_click(b):
    text = text_input.value
    if not text:
        status_output.value = "テキストが入力されていません。"
        return
    
    status_output.value = "テキストを処理中..."
    
    try:
        # テキストプレビューを表示
        text_preview.value = text if len(text) <= 1000 else text[:1000] + "..."
        
        # テキストを音声に変換
        speed = float(speed_slider.value)
        language = speaker_dropdown.value
        
        # APIを使用して音声生成
        audio_path = text_to_speech_api(text, language)
        
        # 速度調整が必要な場合
        if speed != 1.0:
            temp_dir = tempfile.gettempdir()
            speed_adjusted_path = os.path.join(temp_dir, f'output_speed_{speed}.wav')
            audio_path = adjust_audio_speed(audio_path, speed_adjusted_path, speed)
        
        # 音声を表示
        audio_output.clear_output()
        with audio_output:
            display(Audio(audio_path))
        
        # ダウンロードリンクを作成
        with open(audio_path, 'rb') as f:
            audio_data = f.read()
        
        download_link.value = create_download_link(audio_data, 'output.wav', '音声ファイルをダウンロード')
        
        status_output.value = "処理完了！"
    
    except Exception as e:
        status_output.value = f"エラーが発生しました: {str(e)}"

# 直接テキスト入力機能も追加
text_input = widgets.Textarea(
    description='直接入力:',
    placeholder='ここに直接テキストを入力して変換することもできます',
    layout=widgets.Layout(width='100%', height='150px')
)

# テキスト入力による変換ボタン
convert_button = widgets.Button(
    description='テキストを変換',
    button_style='primary',
    tooltip='入力されたテキストを音声に変換します'
)

# イベントハンドラの登録
convert_button.on_click(on_convert_button_click)

# UIの表示
display(widgets.HTML("<h1>Fish-Speech-1.5 日本語テキスト→音声変換アプリ（API版）</h1>"))
display(widgets.HTML("<p>日本語のテキストファイルをアップロードして、Fish Speech APIを使用して音声に変換します。</p>"))
display(widgets.HTML("<p style='color: blue;'><strong>Note:</strong> このバージョンは公式のFish Speech API (Hugging Face Spaces)を使用しています。</p>"))
display(widgets.HBox([upload_button, speaker_dropdown, widgets.VBox([speed_slider, speed_value])]))
display(status_output)
display(text_preview)
display(widgets.HTML("<h3>生成された音声:</h3>"))
display(audio_output)
display(download_link)

# 直接入力UIの表示
display(widgets.HTML("<h3>または直接テキストを入力:</h3>"))
display(text_input)
display(convert_button)