In [None]:
import os
import sys
import time
import uuid
import io
import base64
import tempfile
import shutil
import subprocess
import warnings
import IPython.display as ipd
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple, Union

# 警告を非表示にする
warnings.filterwarnings('ignore')

# 必要なライブラリをインストール
try:
    import ipywidgets as widgets
    import torch
    import torchaudio
    import numpy as np
    from tqdm.auto import tqdm
    import soundfile as sf
    from IPython.display import display, Audio, HTML, Javascript
except ImportError:
    print("必要なライブラリをインストールしています...")
    !pip install -q ipywidgets torch torchaudio numpy tqdm soundfile huggingface_hub

    # Jupyter環境を初期化し直す必要がある場合の処理
    display(HTML("""
    <div style="background-color: #ffffcc; padding: 10px; border: 1px solid #ffcc00; border-radius: 5px;">
        <p><strong>注意:</strong> ライブラリがインストールされました。正常に動作させるには、このセルを実行した後、
        <b>Runtime > Restart runtime</b> を選択してノートブックを再起動してください。</p>
    </div>
    """))
    import ipywidgets as widgets
    import torch
    import torchaudio
    import numpy as np
    from tqdm.auto import tqdm
    import soundfile as sf

# Fish Speech の初期化
print("Fish Speechモデルを初期化しています...")

# 必要なディレクトリ構造を作成
os.makedirs("checkpoints/fish-speech-1.5", exist_ok=True)
os.makedirs("references", exist_ok=True)
os.makedirs("outputs", exist_ok=True)

# モデルのダウンロード（存在しない場合のみ）
model_files = [
    "model.pth",
    "tokenizer.tiktoken", 
    "config.json",
    "firefly-gan-vq-fsq-8x1024-21hz-generator.pth"
]

def download_models():
    """モデルファイルをダウンロードする関数"""
    from huggingface_hub import hf_hub_download
    
    print("Fish Speech モデルをダウンロードしています...")
    for file in tqdm(model_files, desc="モデルダウンロード"):
        if not os.path.exists(f"checkpoints/fish-speech-1.5/{file}"):
            try:
                hf_hub_download(
                    repo_id="fishaudio/fish-speech-1.5",
                    filename=file,
                    local_dir="checkpoints/fish-speech-1.5",
                    local_dir_use_symlinks=False
                )
            except Exception as e:
                print(f"モデルのダウンロード中にエラーが発生しました: {e}")
                raise e

try:
    download_models()
except Exception as e:
    print(f"モデルのダウンロードに失敗しました: {e}")
    print("手動でモデルをダウンロードしてください:")
    print("https://huggingface.co/fishaudio/fish-speech-1.5")

# Fish Speech のモジュールをインポート
sys.path.append(os.getcwd())
try:
    from fish_speech.models.vqgan.inference import load_model as load_decoder_model
    from fish_speech.models.text2semantic.inference import launch_thread_safe_queue
    from fish_speech.inference_engine import TTSInferenceEngine
    from fish_speech.utils.schema import ServeTTSRequest, ServeReferenceAudio
except ImportError:
    print("Fish Speech モジュールをロードできませんでした。")
    print("このノートブックが Fish Speech リポジトリのルートディレクトリで実行されていることを確認してください。")
    raise

def initialize_models():
    """TTS モデルを初期化する関数"""
    device = "cuda" if torch.cuda.is_available() else "cpu"
    precision = torch.bfloat16
    compile_flag = False
    
    print(f"デバイス: {device}")
    
    # LLama モデルの初期化
    llama_queue = launch_thread_safe_queue(
        checkpoint_path="checkpoints/fish-speech-1.5",
        device=device,
        precision=precision,
        compile=compile_flag,
    )
    
    # Decoder モデルの初期化
    decoder_model = load_decoder_model(
        config_name="firefly_gan_vq",
        checkpoint_path="checkpoints/fish-speech-1.5/firefly-gan-vq-fsq-8x1024-21hz-generator.pth",
        device=device,
    )
    
    # 推論エンジンの初期化
    inference_engine = TTSInferenceEngine(
        llama_queue=llama_queue,
        decoder_model=decoder_model,
        precision=precision,
        compile=compile_flag,
    )
    
    # ウォームアップ実行
    print("モデルをウォームアップしています...")
    list(inference_engine.inference(
        ServeTTSRequest(
            text="こんにちは",
            references=[],
            reference_id=None,
            max_new_tokens=1024,
            chunk_length=200,
            top_p=0.7,
            repetition_penalty=1.5,
            temperature=0.7,
            format="wav",
        )
    ))
    
    print("モデルの初期化が完了しました！")
    return inference_engine

# モデルの初期化
inference_engine = initialize_models()

# 音声クローン関数
def create_voice_clone(audio_file, text, clone_id=None):
    """音声ファイルとテキストから音声クローンを作成する関数"""
    if clone_id is None:
        clone_id = str(uuid.uuid4().hex)
    
    # リファレンスディレクトリの作成
    ref_dir = os.path.join("references", clone_id)
    os.makedirs(ref_dir, exist_ok=True)
    
    # 音声ファイルをコピー
    audio_ext = os.path.splitext(audio_file)[1]
    ref_audio_path = os.path.join(ref_dir, f"reference{audio_ext}")
    shutil.copy(audio_file, ref_audio_path)
    
    # テキストファイルを作成
    ref_text_path = os.path.join(ref_dir, "reference.lab")
    with open(ref_text_path, "w", encoding="utf-8") as f:
        f.write(text)
    
    return clone_id

# 利用可能な話者リストを取得する関数
def get_available_speakers():
    """references フォルダにある話者IDのリストを取得する"""
    speakers = []
    ref_dir = Path("references")
    if ref_dir.exists():
        speakers = [d.name for d in ref_dir.iterdir() if d.is_dir()]
    return speakers

# 音声生成関数
def generate_speech(text, reference_id=None, reference_audio=None, reference_text=None, 
                   max_new_tokens=1024, chunk_length=200, top_p=0.7, 
                   repetition_penalty=1.2, temperature=0.7, seed=None, 
                   use_memory_cache="on"):
    """テキストから音声を生成する関数"""
    references = []
    if reference_audio and reference_text:
        with open(reference_audio, "rb") as audio_file:
            audio_bytes = audio_file.read()
        references = [ServeReferenceAudio(audio=audio_bytes, text=reference_text)]
    
    req = ServeTTSRequest(
        text=text,
        reference_id=reference_id if reference_id else None,
        references=references,
        max_new_tokens=max_new_tokens,
        chunk_length=chunk_length,
        top_p=top_p,
        repetition_penalty=repetition_penalty,
        temperature=temperature,
        seed=seed if seed and seed > 0 else None,
        use_memory_cache=use_memory_cache,
        format="wav",
    )
    
    # 音声生成
    audio_data = None
    error_msg = None
    
    for result in inference_engine.inference(req):
        if result.code == "final":
            audio_data = result.audio
            break
        elif result.code == "error":
            error_msg = str(result.error)
            break
    
    if error_msg:
        return None, error_msg
    
    if audio_data is None:
        return None, "音声が生成されませんでした"
    
    # 音声データを返す
    sample_rate, audio = audio_data
    return (sample_rate, audio), None

# 音声ファイルを保存する関数
def save_audio(audio_data, filename="generated_audio.wav"):
    """音声データをファイルに保存する関数"""
    sample_rate, audio = audio_data
    output_path = os.path.join("outputs", filename)
    sf.write(output_path, audio, sample_rate)
    return output_path

# ダウンロードボタン機能のために、一時ファイルを作成して HTML でダウンロードリンクを生成
def get_download_link(audio_data, filename="generated_audio.wav"):
    """音声データからダウンロードリンクを生成する関数"""
    sample_rate, audio = audio_data
    
    # WAVデータを一時ファイルとして保存
    temp_dir = tempfile.mkdtemp()
    temp_file = os.path.join(temp_dir, filename)
    sf.write(temp_file, audio, sample_rate)
    
    # ファイルを読み込んでBase64エンコード
    with open(temp_file, "rb") as f:
        audio_data_b64 = base64.b64encode(f.read()).decode()
    
    # 一時ディレクトリを削除
    shutil.rmtree(temp_dir)
    
    # Base64エンコードされたデータを使ってダウンロードリンクを生成
    download_link = f"""
    <a href="data:audio/wav;base64,{audio_data_b64}" 
       download="{filename}" 
       class="download-button"
       style="display: inline-block; 
              background-color: #4CAF50; 
              color: white; 
              padding: 8px 16px; 
              text-align: center; 
              text-decoration: none; 
              font-size: 14px; 
              margin: 4px 2px; 
              cursor: pointer; 
              border-radius: 4px;">
        ダウンロード: {filename}
    </a>
    """
    return download_link

# UI 構築
def build_ui():
    """ipywidgetsを使用してUIを構築する関数"""
    # スタイルの設定
    style = {'description_width': '150px'}
    layout = widgets.Layout(width='100%')
    
    # ヘッダー
    header = widgets.HTML(
        value="""
        <h1 style="color: #2c3e50; font-family: Arial, sans-serif;">Fish Speech TTS アプリケーション</h1>
        <p style="color: #7f8c8d; font-family: Arial, sans-serif; margin-bottom: 20px;">
          FishAudioが開発したテキスト音声変換アプリケーション
        </p>
        <hr style="margin-bottom: 20px;">
        """
    )
    
    # 進捗インジケーター
    progress_bar = widgets.IntProgress(
        value=0,
        min=0,
        max=100,
        description='進捗:',
        style={'bar_color': '#5cb85c'},
        layout=widgets.Layout(width='70%', visibility='hidden')
    )
    status_label = widgets.HTML(
        value='',
        layout=widgets.Layout(margin='5px 0px')
    )
    
    # 1. テキスト入力タブ
    text_input = widgets.Textarea(
        value='こんにちは、私はAIアシスタントです。お手伝いできることがあれば教えてください。',
        placeholder='ここにテキストを入力してください',
        description='テキスト:',
        style=style,
        layout=layout,
        rows=5
    )
    
    text_file_upload = widgets.FileUpload(
        accept='.txt,.md,.csv,.json',
        description='または、テキストファイルをアップロード:',
        style=style,
        layout=layout,
        multiple=False
    )
    
    speaker_dropdown = widgets.Dropdown(
        options=[('なし', None)] + [(s, s) for s in get_available_speakers()],
        value=None,
        description='話者ID:',
        style=style,
        layout=widgets.Layout(width='50%')
    )
    
    generate_button = widgets.Button(
        description='音声生成',
        button_style='primary',
        layout=widgets.Layout(width='200px')
    )
    
    output_audio = widgets.Output()
    download_area = widgets.HTML()
    error_output = widgets.HTML()
    
    # テンプレート音声クローンタブは削除
    
    # 2. 音声アップロードクローンタブ
    upload_audio = widgets.FileUpload(
        accept='.wav,.mp3,.ogg,.flac',
        description='音声ファイル:',
        style=style,
        layout=layout
    )
    
    reference_text = widgets.Textarea(
        value='',
        placeholder='アップロードした音声に対応するテキストを入力してください',
        description='参照テキスト:',
        style=style,
        layout=layout,
        rows=3
    )
    
    upload_clone_id = widgets.Text(
        value='',
        placeholder='任意のID (空白の場合は自動生成)',
        description='クローンID:',
        style=style,
        layout=widgets.Layout(width='50%')
    )
    
    upload_clone_button = widgets.Button(
        description='音声クローン作成',
        button_style='primary',
        layout=widgets.Layout(width='200px', margin='10px 0px')
    )
    
    clone_progress = widgets.IntProgress(
        value=0,
        min=0,
        max=100,
        description='進捗:',
        style={'bar_color': '#5cb85c'},
        layout=widgets.Layout(width='70%', visibility='hidden')
    )
    
    upload_clone_output = widgets.HTML()
    
    # 3. 詳細設定タブ
    advanced_settings = widgets.VBox([
        widgets.IntSlider(
            value=200,
            min=0,
            max=300,
            step=10,
            description='チャンク長:',
            style=style,
            layout=layout
        ),
        widgets.IntSlider(
            value=1024,
            min=256,
            max=2048,
            step=128,
            description='最大トークン数:',
            style=style,
            layout=layout
        ),
        widgets.FloatSlider(
            value=0.7,
            min=0.1,
            max=1.0,
            step=0.05,
            description='Top-p:',
            style=style,
            layout=layout
        ),
        widgets.FloatSlider(
            value=1.2,
            min=1.0,
            max=2.0,
            step=0.1,
            description='繰り返しペナルティ:',
            style=style,
            layout=layout
        ),
        widgets.FloatSlider(
            value=0.7,
            min=0.1,
            max=1.0,
            step=0.05,
            description='温度:',
            style=style,
            layout=layout
        ),
        widgets.IntText(
            value=0,
            description='シード:',
            style=style,
            layout=widgets.Layout(width='50%')
        ),
        widgets.Dropdown(
            options=[('オン', 'on'), ('オフ', 'off')],
            value='on',
            description='メモリキャッシュ:',
            style=style,
            layout=widgets.Layout(width='50%')
        ),
        widgets.HTML(
            value="""
            <div style="background-color: #f8f9fa; padding: 10px; border-radius: 5px; margin-top: 15px;">
                <p style="margin: 0; font-size: 0.9em; color: #6c757d;">
                    <b>パラメーター説明:</b><br>
                    - <b>チャンク長</b>: 長いテキストを分割する長さ (0は分割なし)<br>
                    - <b>最大トークン数</b>: 生成する最大トークン数<br>
                    - <b>Top-p</b>: サンプリング多様性 (高いほど多様)<br>
                    - <b>繰り返しペナルティ</b>: 繰り返しを抑制する強さ<br>
                    - <b>温度</b>: 生成の多様性 (高いほど多様)<br>
                    - <b>シード</b>: 再現性のための乱数シード (0はランダム)<br>
                </p>
            </div>
            """
        )
    ])
    
    # タブの作成
    tabs = widgets.Tab()
    tabs.children = [
        widgets.VBox([
            text_input,
            text_file_upload,
            widgets.HBox([speaker_dropdown, generate_button]),
            progress_bar,
            status_label,
            output_audio,
            download_area,
            error_output
        ]),
        widgets.VBox([
            upload_audio,
            reference_text,
            widgets.HBox([upload_clone_id, upload_clone_button]),
            clone_progress,
            upload_clone_output
        ]),
        advanced_settings
    ]
    
    tabs.set_title(0, 'テキスト音声変換')
    tabs.set_title(1, '音声クローン作成')
    tabs.set_title(2, '詳細設定')
    
    # イベントハンドラの設定
    def on_generate_click(b):
        error_output.value = ""
        download_area.value = ""
        with output_audio:
            output_audio.clear_output()
            print("音声を生成しています...")
            
            # 詳細設定から値を取得
            chunk_length = advanced_settings.children[0].value
            max_new_tokens = advanced_settings.children[1].value
            top_p = advanced_settings.children[2].value
            repetition_penalty = advanced_settings.children[3].value
            temperature = advanced_settings.children[4].value
            seed = advanced_settings.children[5].value
            use_memory_cache = advanced_settings.children[6].value
            
            # 音声生成
            audio_data, error = generate_speech(
                text_input.value,
                reference_id=speaker_dropdown.value,
                max_new_tokens=max_new_tokens,
                chunk_length=chunk_length,
                top_p=top_p,
                repetition_penalty=repetition_penalty,
                temperature=temperature,
                seed=seed,
                use_memory_cache=use_memory_cache
            )
            
            output_audio.clear_output()
            
            if error:
                error_output.value = f'<div style="color: red; margin-top: 10px;">エラー: {error}</div>'
                return
            
            # 音声を再生
            display(Audio(data=audio_data[1], rate=audio_data[0]))
            
            # ダウンロードリンクを表示
            timestamp = time.strftime("%Y%m%d_%H%M%S")
            filename = f"generated_{timestamp}.wav"
            save_audio(audio_data, filename)
            download_area.value = get_download_link(audio_data, filename)
    
    def on_create_clone_click(b):
        clone_output.value = ""
        
        if not template_recorder.value:
            clone_output.value = '<div style="color: red; margin-top: 10px;">エラー: 録音データがありません。テンプレートを録音してください。</div>'
            return
        
        # 録音データを一時ファイルに保存
        temp_dir = tempfile.mkdtemp()
        temp_audio_file = os.path.join(temp_dir, "template.wav")
        
        with open(temp_audio_file, "wb") as f:
            f.write(template_recorder.value)
        
        # テンプレートテキスト
        template_value = "魚は水の中で泳ぎ、鳥は空を飛びます。私たちは言葉を通じてコミュニケーションをとります。この音声は、私の声をクローンするためのサンプルです。"
        
        # クローンID
        clone_id = clone_id_input.value if clone_id_input.value else None
        
        # 音声クローン作成
        try:
            clone_id = create_voice_clone(temp_audio_file, template_value, clone_id)
            clone_output.value = f'''
            <div style="background-color: #d4edda; color: #155724; padding: 10px; border-radius: 4px; margin-top: 10px;">
                音声クローンが作成されました！<br>
                クローンID: <b>{clone_id}</b><br>
                このIDを「テキスト音声変換」タブの話者IDドロップダウンで選択することで、あなたの声でテキストを読み上げることができます。
            </div>
            '''
            
            # 話者リストを更新
            speaker_dropdown.options = [('なし', None)] + [(s, s) for s in get_available_speakers()]
        except Exception as e:
            clone_output.value = f'<div style="color: red; margin-top: 10px;">エラー: {str(e)}</div>'
        finally:
            # 一時ファイルを削除
            shutil.rmtree(temp_dir)
    
    def on_upload_clone_click(b):
        upload_clone_output.value = ""
        
        if not upload_audio.value:
            upload_clone_output.value = '<div style="color: red; margin-top: 10px;">エラー: 音声ファイルをアップロードしてください。</div>'
            return
        
        if not reference_text.value:
            upload_clone_output.value = '<div style="color: red; margin-top: 10px;">エラー: 参照テキストを入力してください。</div>'
            return
        
        # ファイル名を取得
        file_info = next(iter(upload_audio.value.values()))
        file_name = file_info['metadata']['name']
        
        # 一時ファイルに保存
        temp_dir = tempfile.mkdtemp()
        temp_audio_file = os.path.join(temp_dir, file_name)
        
        with open(temp_audio_file, "wb") as f:
            f.write(file_info['content'])
        
        # クローンID
        clone_id = upload_clone_id.value if upload_clone_id.value else None
        
        # 音声クローン作成
        try:
            clone_id = create_voice_clone(temp_audio_file, reference_text.value, clone_id)
            upload_clone_output.value = f'''
            <div style="background-color: #d4edda; color: #155724; padding: 10px; border-radius: 4px; margin-top: 10px;">
                音声クローンが作成されました！<br>
                クローンID: <b>{clone_id}</b><br>
                このIDを「テキスト音声変換」タブの話者IDドロップダウンで選択することで、あなたの声でテキストを読み上げることができます。
            </div>
            '''
            
            # 話者リストを更新
            speaker_dropdown.options = [('なし', None)] + [(s, s) for s in get_available_speakers()]
        except Exception as e:
            upload_clone_output.value = f'<div style="color: red; margin-top: 10px;">エラー: {str(e)}</div>'
        finally:
            # 一時ファイルを削除
            shutil.rmtree(temp_dir)
    
    # イベントハンドラの登録
    generate_button.on_click(on_generate_click)
    create_clone_button.on_click(on_create_clone_click)
    upload_clone_button.on_click(on_upload_clone_click)
    
    # UIを表示
    return widgets.VBox([header, tabs])

# メインアプリケーションの構築と表示
app = build_ui()
display(app)