In [None]:
import os
import sys

# 设置路径
current_dir = os.getcwd()
matcha_path = os.path.join(current_dir, 'third_party/Matcha-TTS')
if matcha_path not in sys.path:
    sys.path.append(matcha_path)
    print(f"Added Matcha-TTS path: {matcha_path}")

import argparse
import gradio as gr
import numpy as np
import torch
import torchaudio
import random
import librosa
from cosyvoice.cli.cosyvoice import CosyVoice
from cosyvoice.utils.file_utils import load_wav, logging
from cosyvoice.utils.common import set_all_random_seed

In [None]:
# 全局配置
inference_mode_list = ['预训练音色', '3s极速复刻', '跨语种复刻', '自然语言控制']
instruct_dict = {
    '预训练音色': '1. 选择预训练音色\n2. 点击生成音频按钮',
    '3s极速复刻': '1. 选择prompt音频文件，或录入prompt音频，注意不超过30s，若同时提供，优先选择prompt音频文件\n2. 输入prompt文本\n3. 点击生成音频按钮',
    '跨语种复刻': '1. 选择prompt音频文件，或录入prompt音频，注意不超过30s，若同时提供，优先选择prompt音频文件\n2. 点击生成音频按钮',
    '自然语言控制': '1. 选择预训练音色\n2. 输入instruct文本\n3. 点击生成音频按钮'
}
stream_mode_list = [('否', False), ('是', True)]
max_val = 0.8
prompt_sr, target_sr = 16000, 22050
default_data = np.zeros(target_sr)

# 初始化模型
model_dir = 'pretrained_models/CosyVoice-300M'  # 可以根据需要修改
cosyvoice = CosyVoice(model_dir)
sft_spk = cosyvoice.list_avaliable_spks()

In [4]:
def generate_seed():
    seed = random.randint(1, 100000000)
    return {
        "__type__": "update",
        "value": seed
    }

def postprocess(speech, top_db=60, hop_length=220, win_length=440):
    speech, _ = librosa.effects.trim(
        speech, top_db=top_db,
        frame_length=win_length,
        hop_length=hop_length
    )
    if speech.abs().max() > max_val:
        speech = speech / speech.abs().max() * max_val
    speech = torch.concat([speech, torch.zeros(1, int(target_sr * 0.2))], dim=1)
    return speech

def change_instruction(mode_checkbox_group):
    return instruct_dict[mode_checkbox_group]

In [5]:
def generate_audio(tts_text, mode_checkbox_group, sft_dropdown, prompt_text, prompt_wav_upload, prompt_wav_record, instruct_text,
                   seed, stream, speed):
    if prompt_wav_upload is not None:
        prompt_wav = prompt_wav_upload
    elif prompt_wav_record is not None:
        prompt_wav = prompt_wav_record
    else:
        prompt_wav = None
    # if instruct mode, please make sure that model is iic/CosyVoice-300M-Instruct and not cross_lingual mode
    if mode_checkbox_group in ['自然语言控制']:
        if cosyvoice.frontend.instruct is False:
            gr.Warning('您正在使用自然语言控制模式, {}模型不支持此模式, 请使用iic/CosyVoice-300M-Instruct模型'.format(args.model_dir))
            yield (target_sr, default_data)
        if instruct_text == '':
            gr.Warning('您正在使用自然语言控制模式, 请输入instruct文本')
            yield (target_sr, default_data)
        if prompt_wav is not None or prompt_text != '':
            gr.Info('您正在使用自然语言控制模式, prompt音频/prompt文本会被忽略')
    # if cross_lingual mode, please make sure that model is iic/CosyVoice-300M and tts_text prompt_text are different language
    if mode_checkbox_group in ['跨语种复刻']:
        if cosyvoice.frontend.instruct is True:
            gr.Warning('您正在使用跨语种复刻模式, {}模型不支持此模式, 请使用iic/CosyVoice-300M模型'.format(args.model_dir))
            yield (target_sr, default_data)
        if instruct_text != '':
            gr.Info('您正在使用跨语种复刻模式, instruct文本会被忽略')
        if prompt_wav is None:
            gr.Warning('您正在使用跨语种复刻模式, 请提供prompt音频')
            yield (target_sr, default_data)
        gr.Info('您正在使用跨语种复刻模式, 请确保合成文本和prompt文本为不同语言')
    # if in zero_shot cross_lingual, please make sure that prompt_text and prompt_wav meets requirements
    if mode_checkbox_group in ['3s极速复刻', '跨语种复刻']:
        if prompt_wav is None:
            gr.Warning('prompt音频为空，您是否忘记输入prompt音频？')
            yield (target_sr, default_data)
        if torchaudio.info(prompt_wav).sample_rate < prompt_sr:
            gr.Warning('prompt音频采样率{}低于{}'.format(torchaudio.info(prompt_wav).sample_rate, prompt_sr))
            yield (target_sr, default_data)
    # sft mode only use sft_dropdown
    if mode_checkbox_group in ['预训练音色']:
        if instruct_text != '' or prompt_wav is not None or prompt_text != '':
            gr.Info('您正在使用预训练音色模式，prompt文本/prompt音频/instruct文本会被忽略！')
    # zero_shot mode only use prompt_wav prompt text
    if mode_checkbox_group in ['3s极速复刻']:
        if prompt_text == '':
            gr.Warning('prompt文本为空，您是否忘记输入prompt文本？')
            yield (target_sr, default_data)
        if instruct_text != '':
            gr.Info('您正在使用3s极速复刻模式，预训练音色/instruct文本会被忽略！')

    if mode_checkbox_group == '预训练音色':
        logging.info('get sft inference request')
        set_all_random_seed(seed)
        for i in cosyvoice.inference_sft(tts_text, sft_dropdown, stream=stream, speed=speed):
            yield (target_sr, i['tts_speech'].numpy().flatten())
    elif mode_checkbox_group == '3s极速复刻':
        logging.info('get zero_shot inference request')
        prompt_speech_16k = postprocess(load_wav(prompt_wav, prompt_sr))
        set_all_random_seed(seed)
        for i in cosyvoice.inference_zero_shot(tts_text, prompt_text, prompt_speech_16k, stream=stream, speed=speed):
            yield (target_sr, i['tts_speech'].numpy().flatten())
    elif mode_checkbox_group == '跨语种复刻':
        logging.info('get cross_lingual inference request')
        prompt_speech_16k = postprocess(load_wav(prompt_wav, prompt_sr))
        set_all_random_seed(seed)
        for i in cosyvoice.inference_cross_lingual(tts_text, prompt_speech_16k, stream=stream, speed=speed):
            yield (target_sr, i['tts_speech'].numpy().flatten())
    else:
        logging.info('get instruct inference request')
        set_all_random_seed(seed)
        for i in cosyvoice.inference_instruct(tts_text, sft_dropdown, instruct_text, stream=stream, speed=speed):
            yield (target_sr, i['tts_speech'].numpy().flatten())

In [6]:
def batch_generate_audio(text_list, mode_checkbox_group, sft_dropdown, prompt_text, 
                        prompt_wav_upload, prompt_wav_record, instruct_text, seed, stream, speed):
    """批量生成音频"""
    if not text_list:
        gr.Warning("请先上传文本文件！")
        # 返回5个音频和5个文件名的空值
        return [None] * 5 + [""] * 5
    
    # 初始化结果列表
    audio_outputs = [None] * 5  # 5个音频位置
    filename_outputs = [""] * 5  # 5个文件名位置
    
    try:
        # 处理每个文本文件
        for idx, text_item in enumerate(text_list):
            if idx >= 5:  # 最多处理5个文件
                gr.Warning(f"已达到最大处理数量(5个文件)，剩余文件将被忽略")
                break
                
            content = text_item["content"]
            filename = text_item["filename"]
            
            # 使用现有的 generate_audio 函数生成音频
            audio_generator = generate_audio(content, mode_checkbox_group, sft_dropdown, prompt_text, 
                                          prompt_wav_upload, prompt_wav_record, instruct_text, 
                                          seed, stream, speed)
            
            # 获取生成的音频
            for audio in audio_generator:
                sample_rate, audio_data = audio
                audio_outputs[idx] = (sample_rate, audio_data)
                filename_outputs[idx] = f"文件: {filename}"
                break
        
        # 返回所有结果（5个音频 + 5个文件名）
        return audio_outputs + filename_outputs
            
    except Exception as e:
        gr.Warning(f"生成音频时出错: {str(e)}")
        return [None] * 5 + [""] * 5

In [13]:
# 更新标签页标题的函数
def update_tab_titles(*results):
    audio_results = results[:5]
    filename_results = results[5:]
    
    updated_tabs = []
    for i, (audio, filename) in enumerate(zip(audio_results, filename_results)):
        if audio is not None and filename:
            # 如果有音频和文件名，使用文件名作为标签
            updated_tabs.append(gr.update(label=filename))
        else:
            # 否则使用默认标签
            updated_tabs.append(gr.update(label=f"音频 {i+1}"))
    
    return updated_tabs

In [26]:
def read_txt_file(file_obj):
    """读取上传的txt文件内容"""
    if file_obj is None:
        return ""
    try:
        with open(file_obj.name, 'r', encoding='utf-8') as f:
            text = f.read().strip()
        return text
    except Exception as e:
        gr.Warning(f"读取文件失败: {str(e)}")
        return ""

def read_multiple_txt_files(files):
    """读取多个上传的txt文件内容"""
    if not files:
        return [], gr.update(visible=False)
    try:
        texts = []
        for file in files:
            with open(file.name, 'r', encoding='utf-8') as f:
                text = f.read().strip()
                texts.append({"filename": os.path.basename(file.name), "content": text})
        return texts, gr.update(visible=True)
    except Exception as e:
        gr.Warning(f"读取文件失败: {str(e)}")
        return [], gr.update(visible=False)

# 启动服务器
demo = gr.Blocks()
with demo:
    gr.Markdown("### 代码库 [CosyVoice](https://github.com/FunAudioLLM/CosyVoice) \
                预训练模型 [CosyVoice-300M](https://www.modelscope.cn/models/iic/CosyVoice-300M) \
                [CosyVoice-300M-Instruct](https://www.modelscope.cn/models/iic/CosyVoice-300M-Instruct) \
                [CosyVoice-300M-SFT](https://www.modelscope.cn/models/iic/CosyVoice-300M-SFT)")
    gr.Markdown("#### 请输入需要合成的文本，选择推理模式，并按照提示步骤进行操作")

    with gr.Tabs():
        # 第一个标签页：单文件处理
        with gr.Tab("单文本处理"):
            with gr.Row():
                tts_text = gr.Textbox(label="输入合成文本", 
                                    lines=3,
                                    value="我是通义实验室语音团队全新推出的生成式语音大模型，提供舒适自然的语音合成能力。")
                txt_file = gr.File(label="上传TXT文本文件", 
                                file_types=[".txt"],
                                type="filepath")
            
            # 添加生成按钮和音频输出到标签页内
            generate_button = gr.Button("生成音频")
            with gr.Column(visible=True):
                audio_output = gr.Audio(
                    label="合成音频",
                    type="numpy",
                    interactive=False,
                    autoplay=True,
                    show_download_button=True
                )
        
        # 第二个标签页：批量处理
        with gr.Tab("批量处理"):
            txt_files = gr.Files(label="上传多个TXT文本文件", 
                                file_types=[".txt"],
                                type="filepath")
            file_contents = gr.JSON(label="已上传文件", visible=False)
            batch_generate_button = gr.Button("批量生成音频", visible=False)
            
            # 使用 Tabs 组织音频输出
            with gr.Column(visible=True) as output_column:
                with gr.Tabs() as audio_tabs:  # 创建 Tabs 容器
                    audio_outputs = []
                    filename_outputs = []
                    tabs = []  # 存储标签页引用
                    
                    # 创建5个标签页
                    for i in range(5):
                        with gr.Tab(f"音频 {i+1}") as tab:  
                            audio = gr.Audio(
                                label="生成的音频",
                                type="numpy",
                                interactive=False,  # 禁用交互（上传）功能
                                autoplay=False,      # 自动播放
                                show_download_button=True  # 显示下载按钮
                            )
                            filename = gr.Text(label="文件名", visible=False)
                            audio_outputs.append(audio)
                            filename_outputs.append(filename)
                            tabs.append(tab)  # 保存标签页引用
    
    # 共用的参数设置
    with gr.Row():
        mode_checkbox_group = gr.Radio(choices=inference_mode_list, 
                                     label='选择推理模式', 
                                     value=inference_mode_list[0])
        instruction_text = gr.Text(label="操作步骤", 
                                 value=instruct_dict[inference_mode_list[0]], 
                                 scale=0.5)
        sft_dropdown = gr.Dropdown(choices=sft_spk, 
                                 label='选择预训练音色', 
                                 value=sft_spk[0], 
                                 scale=0.25)
        stream = gr.Radio(choices=stream_mode_list, 
                        label='是否流式推理', 
                        value=stream_mode_list[0][1])
        speed = gr.Number(value=1, 
                        label="速度调节(仅支持非流式推理)", 
                        minimum=0.5, 
                        maximum=2.0, 
                        step=0.1)
        with gr.Column(scale=0.25):
            seed_button = gr.Button(value="\U0001F3B2")
            seed = gr.Number(value=0, label="随机推理种子")

    with gr.Row():
        prompt_wav_upload = gr.Audio(sources='upload', 
                                   type='filepath', 
                                   label='选择prompt音频文件，注意采样率不低于16khz')
        prompt_wav_record = gr.Audio(sources='microphone', 
                                   type='filepath', 
                                   label='录制prompt音频文件')
    
    prompt_text = gr.Textbox(label="输入prompt文本", 
                            lines=1, 
                            placeholder="请输入prompt文本，需与prompt音频内容一致，暂时不支持自动识别...", 
                            value='')
    
    instruct_text = gr.Textbox(label="输入instruct文本", 
                              lines=1, 
                              placeholder="请输入instruct文本.", 
                              value='')

    # 设置事件处理
    txt_file.change(fn=read_txt_file,
                   inputs=[txt_file],
                   outputs=[tts_text])
    
    txt_files.change(
        fn=read_multiple_txt_files,
        inputs=[txt_files],
        outputs=[file_contents, batch_generate_button]
    )
    
    batch_generate_button.click(
        fn=batch_generate_audio,
        inputs=[file_contents, mode_checkbox_group, sft_dropdown, prompt_text, 
                prompt_wav_upload, prompt_wav_record, instruct_text, seed, stream, speed],
        outputs=audio_outputs + filename_outputs
    ).then(  # 添加后续处理来更新标签页标题
        fn=update_tab_titles,
        inputs=audio_outputs + filename_outputs,
        outputs=tabs  # 使用保存的标签页引用
    )
                   
    seed_button.click(generate_seed, 
                     inputs=[], 
                     outputs=seed)
    
    generate_button.click(generate_audio,
                        inputs=[tts_text, mode_checkbox_group, sft_dropdown, 
                               prompt_text, prompt_wav_upload, prompt_wav_record, 
                               instruct_text, seed, stream, speed],
                        outputs=[audio_output])
    
    mode_checkbox_group.change(fn=change_instruction, 
                             inputs=[mode_checkbox_group], 
                             outputs=[instruction_text])
                             
demo.queue(max_size=4, default_concurrency_limit=2)
demo.launch(server_name='127.0.0.1', server_port=8000, prevent_thread_lock=True)

2024-11-20 11:04:28,004 DEBUG load_ssl_context verify=True cert=None trust_env=True http2=False
2024-11-20 11:04:28,006 DEBUG Starting new HTTPS connection (1): huggingface.co:443
2024-11-20 11:04:28,007 DEBUG load_verify_locations cafile='/home/ad/miniconda3/envs/cosyvoice_env/lib/python3.8/site-packages/certifi/cacert.pem'
2024-11-20 11:04:28,077 DEBUG connect_tcp.started host='api.gradio.app' port=443 local_address=None timeout=3 socket_options=None
2024-11-20 11:04:28,116 DEBUG load_ssl_context verify=True cert=None trust_env=True http2=False
2024-11-20 11:04:28,116 DEBUG load_verify_locations cafile='/home/ad/miniconda3/envs/cosyvoice_env/lib/python3.8/site-packages/certifi/cacert.pem'
2024-11-20 11:04:28,120 DEBUG connect_tcp.started host='127.0.0.1' port=8000 local_address=None timeout=None socket_options=None
2024-11-20 11:04:28,120 DEBUG connect_tcp.complete return_value=<httpcore._backends.sync.SyncStream object at 0x7f7a04942ac0>
2024-11-20 11:04:28,120 DEBUG send_request_he

Running on local URL:  http://127.0.0.1:8000

To create a public link, set `share=True` in `launch()`.


2024-11-20 11:04:28,135 DEBUG Starting new HTTPS connection (3): huggingface.co:443




2024-11-20 11:04:28,281 DEBUG connect_tcp.complete return_value=<httpcore._backends.sync.SyncStream object at 0x7f7a04a23100>
2024-11-20 11:04:28,281 DEBUG start_tls.started ssl_context=<ssl.SSLContext object at 0x7f78fd72a0c0> server_hostname='api.gradio.app' timeout=3
2024-11-20 11:04:28,646 DEBUG start_tls.complete return_value=<httpcore._backends.sync.SyncStream object at 0x7f7a04e930d0>
2024-11-20 11:04:28,646 DEBUG send_request_headers.started request=<Request [b'GET']>
2024-11-20 11:04:28,647 DEBUG send_request_headers.complete
2024-11-20 11:04:28,647 DEBUG send_request_body.started request=<Request [b'GET']>
2024-11-20 11:04:28,648 DEBUG send_request_body.complete
2024-11-20 11:04:28,648 DEBUG receive_response_headers.started request=<Request [b'GET']>
2024-11-20 11:04:28,830 DEBUG receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', [(b'Date', b'Wed, 20 Nov 2024 03:04:28 GMT'), (b'Content-Type', b'application/json'), (b'Content-Length', b'21'), (b'Connectio

tn 我是通义实验室语音团队全新推出的生成式语音大模型，提供舒适自然的语音合成能力。 to 我是通义实验室语音团队全新推出的生成式语音大模型，提供舒适自然的语音合成能力。


  0%|          | 0/1 [00:00<?, ?it/s]2024-11-20 11:04:40,033 INFO synthesis text 我是通义实验室语音团队全新推出的生成式语音大模型，提供舒适自然的语音合成能力。
2024-11-20 11:04:42,168 INFO yield speech len 7.453605442176871, rtf 0.2863396855645648
100%|██████████| 1/1 [00:02<00:00,  2.15s/it]
2024-11-20 11:06:36,266 INFO get sft inference request


tn 都说了多少遍星期一需要go work，不然没有salary用来live a better life，等到了sunday你自然会得到一天的假期 to 都说了多少遍星期一需要go work，不然没有salary用来live a better life，等到了sunday你自然会得到一天的假期


  0%|          | 0/1 [00:00<?, ?it/s]2024-11-20 11:06:36,320 INFO synthesis text 都说了多少遍星期一需要go work，不然没有salary用来live a better life，等到了sunday你自然会得到一天的假期。
2024-11-20 11:06:38,742 INFO yield speech len 8.335963718820862, rtf 0.29059364278813754
100%|██████████| 1/1 [00:02<00:00,  2.43s/it]
2024-11-20 11:06:51,148 INFO get sft inference request


tn 都说了多少遍星期一需要go work，不然没有salary用来live a better life，等到了sunday你自然会得到一天的假期 to 都说了多少遍星期一需要go work，不然没有salary用来live a better life，等到了sunday你自然会得到一天的假期


  0%|          | 0/1 [00:00<?, ?it/s]2024-11-20 11:06:51,179 INFO synthesis text 都说了多少遍星期一需要go work，不然没有salary用来live a better life，等到了sunday你自然会得到一天的假期。
2024-11-20 11:06:54,002 INFO yield speech len 10.135510204081633, rtf 0.2785665712789776
100%|██████████| 1/1 [00:02<00:00,  2.83s/it]
2024-11-20 11:07:06,128 INFO get sft inference request


tn 都说了多少遍星期一需要go work，不然没有salary用来live a better life，等到了sunday你自然会得到一天的假期 to 都说了多少遍星期一需要go work，不然没有salary用来live a better life，等到了sunday你自然会得到一天的假期


  0%|          | 0/1 [00:00<?, ?it/s]2024-11-20 11:07:06,158 INFO synthesis text 都说了多少遍星期一需要go work，不然没有salary用来live a better life，等到了sunday你自然会得到一天的假期。
2024-11-20 11:07:09,678 INFO yield speech len 11.215238095238096, rtf 0.3138340554078636
100%|██████████| 1/1 [00:03<00:00,  3.52s/it]
2024-11-20 11:07:24,104 INFO get sft inference request


tn 都说了多少遍星期一需要go work，不然没有salary用来live a better life，等到了sunday你自然会得到一天的假期 to 都说了多少遍星期一需要go work，不然没有salary用来live a better life，等到了sunday你自然会得到一天的假期


  0%|          | 0/1 [00:00<?, ?it/s]2024-11-20 11:07:24,134 INFO synthesis text 都说了多少遍星期一需要go work，不然没有salary用来live a better life，等到了sunday你自然会得到一天的假期。
2024-11-20 11:07:27,027 INFO yield speech len 10.437369614512471, rtf 0.27716458657004345
100%|██████████| 1/1 [00:02<00:00,  2.90s/it]


In [None]:
# 停止服务器
demo.close()

In [None]:
import os
def clear_port(port=8000):
    try:
        os.system(f"kill -9 $(lsof -t -i:{port})")
        print(f"端口 {port} 已清理")
    except:
        print("清理端口失败，请手动重启内核")

clear_port(8000)