In [3]:
import redis
import json

class RedisChannel:
    do_tts_service = "do-tts-service"
    tts_done_service = "tts-done-service"
    do_asr_service = "do-asr-service"
    asr_done_service = "asr-done-service"
    
def publisher(channel, data):
    message = json.dumps(data)
    redis_client_pub.publish(channel, message)
    print(f"Published: {data}")
redis_client_pub = redis.Redis(host='localhost', port=51201)
# redis_client_pub = redis.Redis(host='localhost', port=6379)

# # publisher(RedisChannel.do_tts_service, 3333)

message = {
    "text":"""
            畫面中，前方黑衣騎士才過一個大轉彎，突然右方一輛路邊停車的轎車開車門，
            騎士沒閃過，直接被吃車門擊落，連人帶車噴飛在車道上，落地前安全帽掉落，
            騎士還在地面上滑行滾了一圈才停下，地面上留下長長一道刮痕。
           """}
publisher(RedisChannel.do_tts_service, message)

Published: {'text': '\n            畫面中，前方黑衣騎士才過一個大轉彎，突然右方一輛路邊停車的轎車開車門，\n            騎士沒閃過，直接被吃車門擊落，連人帶車噴飛在車道上，落地前安全帽掉落，\n            騎士還在地面上滑行滾了一圈才停下，地面上留下長長一道刮痕。\n           '}


: 

In [None]:
# # -*- coding: utf-8 -*-

# import os
# import sys

# project_path = os.getcwd()
# print(project_path)
# # 获取当前脚本的绝对路径
# # current_script_path = os.path.abspath(__file__)
# current_script_path = os.path.abspath(f"{project_path}/services/tts.service.py")
# # 获取当前脚本的目录路径
# current_directory = os.path.dirname(current_script_path)
# # 回退到 asr 目录（当前目录的父目录的父目录）
# asr_directory = os.path.dirname(current_directory)
# # 构建目标目录路径
# packages_path = os.path.join(asr_directory, 'packages')
# # 添加到 sys.path
# sys.path.append(packages_path)
# # 打印添加的路径以确认
# print("Added to sys.path:", packages_path)

import os
import sys

project_path = os.getcwd()
print(project_path)
sys.path.append(f'{project_path}/packages')
sys.path.append(f'{project_path}/packages/vits/text')

In [None]:

from vits.text import text_to_sequence

In [None]:
# -*- coding: utf-8 -*-


from vits.text import text_to_sequence
import vits.commons as commons
import vits.utils as utils
from vits.models import SynthesizerTrn
from torch import no_grad, LongTensor
import torch

import wave
import numpy as np
from datetime import datetime

class VitsService:
    def __init__(self,hparams_file_path = "./models/community/model_config.json",checkpoint_path="./models/community/G_953000.pth"):
        self._hparams_file_path = hparams_file_path 
        self._checkpoint_path = checkpoint_path
        self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
        self.language_marks = {
            "Japanese": "",
            "日本語": "[JA]",
            "中文": "[ZH]",
            "English": "[EN]",
            "Mix": "",
            }
        self.lang = ['日本語', '中文', 'English', 'Mix']
        self.hps = utils.get_hparams_from_file(self._hparams_file_path)
        self.set_model()

    def set_model(self):
        self.net_g = SynthesizerTrn(
            len(self.hps.symbols),
            self.hps.data.filter_length // 2 + 1,
            self.hps.train.segment_size // self.hps.data.hop_length,
            n_speakers=self.hps.data.n_speakers,
            **self.hps.model).to(self.device)
        _ = self.net_g.eval()

        _ = utils.load_checkpoint(self._checkpoint_path, self.net_g, None)

        self.speaker_ids = self.hps.speakers

    def get_text(self,text, hps, is_symbol):
        text_norm = text_to_sequence(text, hps.symbols, [] if is_symbol else hps.data.text_cleaners)
        if hps.data.add_blank:
            text_norm = commons.intersperse(text_norm, 0)
        text_norm = LongTensor(text_norm)
        return text_norm

    def create_tts_fn(self):
        def tts_fn(text, speaker, language, speed):
            if language is not None:
                text = self.language_marks[language] + text + self.language_marks[language]
            speaker_id = self.speaker_ids[speaker]
            stn_tst = self.get_text(text, self.hps, False)
            with no_grad():
                x_tst = stn_tst.unsqueeze(0).to(self.device)
                x_tst_lengths = LongTensor([stn_tst.size(0)]).to(self.device)
                sid = LongTensor([speaker_id]).to(self.device)
                audio = self.net_g.infer(x_tst, x_tst_lengths, sid=sid, noise_scale=.667, noise_scale_w=0.8,
                                    length_scale=1.0 / speed)[0][0, 0].data.cpu().float().numpy()
            del stn_tst, x_tst, x_tst_lengths, sid
            return "Success", (self.hps.data.sampling_rate, audio)

        return tts_fn


In [None]:
import datetime
import wave
import numpy as np

In [None]:
tts = VitsService(
    hparams_file_path="./models/YunzeNeural/config.json",
    checkpoint_path="./models/YunzeNeural/G_latest.pth"
)
tts_fn = tts.create_tts_fn()
speaker = 'YunzeNeural'
language='中文'
speed = 1.0

In [None]:
def create_voice_array(tts_fn, speaker ,text: str) -> list:
    text = text.replace('\\n','').strip().replace('\n','')
    symbol_remov = ["，", "、", "。","：", "）", "（", "？", ":", ")", "(","「", "」", "！"]
    non_symbol_text = text
    for i in symbol_remov:
        non_symbol_text = non_symbol_text.replace(i, "@")
    current_index = 0
    max_len = 30
    numpy_voice_array = []
    if max_len >= len(text):

            _, output = tts_fn(text=text,speaker=speaker,language=language,speed=float(speed))
            sr = output[0]
            numpy_voice_array = output[1].tolist()
    else:
        while True:
            temp_text = non_symbol_text[current_index:current_index + max_len]
            split_index = [pos for pos, char in enumerate(temp_text) if char == '@']
            if len(split_index) == 0:
                target_index = current_index + max_len + 1
            else:
                target_index = split_index[-1] + current_index + 1
            used_text = text[current_index:target_index]
            # print("Used Text: ", used_text)
            current_index = target_index
            _, output = tts_fn(text=used_text,speaker=speaker,language=language,speed=float(speed))
            sr = output[0]
            voice_list = output[1].tolist()
            # print(len(voice_list))
            if len(voice_list) < 20000:
                numpy_voice_array += voice_list
            else:
                numpy_voice_array += output[1].tolist()[300:-8500]
            if current_index >= len(text)-1:
                break
    return numpy_voice_array

In [None]:
def create_mp3(tts_fn, speaker, text, filename):
    text = text.replace('\\n','').strip().replace('\n','')
    symbol_remov = ["，", "、", "。","：", "）", "（", "？", ":", ")", "(","「", "」", "！"]
    non_symbol_text = text
    for i in symbol_remov:
        non_symbol_text = non_symbol_text.replace(i, "@")
    current_index = 0
    max_len = 30
    numpy_voice_array = []
    if max_len >= len(text):

            _, output = tts_fn(text=text,speaker=speaker,language=language,speed=float(speed))
            sr = output[0]
            numpy_voice_array = output[1].tolist()
    else:
        while True:
            temp_text = non_symbol_text[current_index:current_index + max_len]
            split_index = [pos for pos, char in enumerate(temp_text) if char == '@']
            if len(split_index) == 0:
                target_index = current_index + max_len + 1
            else:
                target_index = split_index[-1] + current_index + 1
            used_text = text[current_index:target_index]
            # print("Used Text: ", used_text)
            current_index = target_index
            _, output = tts_fn(text=used_text,speaker=speaker,language=language,speed=float(speed))
            sr = output[0]
            voice_list = output[1].tolist()
            # print(len(voice_list))
            if len(voice_list) < 20000:
                numpy_voice_array += voice_list
            else:
                numpy_voice_array += output[1].tolist()[300:-8500]
            if current_index >= len(text)-1:
                break
                
    numpy_voice_array2 = np.int16(np.array(numpy_voice_array)* 32767)
    if not filename:
        file_str = datetime.strftime(datetime.now(), '%Y-%m-%d-%H-%M-%S') + ".wav"
    else:
        file_str = filename + ".wav"

    with wave.open(file_str, "wb") as wf:
        wf.setnchannels(1)  # 设置声道数
        wf.setsampwidth(2)  # 设置样本宽度（字节数）
        wf.setframerate(sr)  # 设置采样率
        wf.writeframes(numpy_voice_array2.tobytes())  # 写入数据

In [None]:
text = '''
1. 你的睡眠如何? 是否一覺到天亮? 是否每天定時會醒? 如果會醒, 是幾點會醒? 是否多夢? 

2。 你感覺餓嗎？有欲望想吃什麼特別的食物或是喜愛什麼味道的食物？或是不餓，完全沒有胃口。

3。 你便秘嗎？每天有大便嗎？大便顏色是什麼？是下利嗎？很臭還是無味？

4。 你的小便是什麼顏色？頻尿嗎？還是小不出來？還是沒有尿意？平均一天幾次？

5。 你很渴嗎？如渴，最想喝什麼溫度的水？如不渴，時常會忘記喝水嗎？還是再怎麼喝也不能止渴呢？

6。 你平時覺得身體很熱還是很冷？手腳冰冷嗎？

7。 你容易出汗嗎？會半夜盜汗嗎？會時常流汗不止嗎？還是不出汗的身體呢？

8。 精神好嗎？還是一直疲憊中？早上起床時，是精神奕奕呢？還是無法起床呢？精神能夠集中嗎？

9。 你性功能好嗎？

10。無論妳有無月經，都要詳細說明妳的月經情形，是延後還是每次都提前呢？痛不痛呢？生過小孩嗎？
'''

In [None]:
create_mp3(tts_fn, speaker, text, "testing")

In [None]:
# -*- coding: utf-8 -*-

import os
import sys

project_path = os.getcwd()
print(project_path)
sys.path.append(f'{project_path}/packages')
# 获取当前脚本的绝对路径
# current_script_path = os.path.abspath(__file__)
# # 获取当前脚本的目录路径
# current_directory = os.path.dirname(current_script_path)
# # 回退到 asr 目录（当前目录的父目录的父目录）
# asr_directory = os.path.dirname(current_directory)
# # 构建目标目录路径
# packages_path = os.path.join(asr_directory, 'packages')
# # 添加到 sys.path
# sys.path.append(packages_path)

from vits.text import text_to_sequence
import vits.commons as commons
import vits.utils as utils
from vits.models import SynthesizerTrn
from torch import no_grad, LongTensor
import torch

import wave
import numpy as np
from datetime import datetime

class VitsService:
    def __init__(self,hparams_file_path = "./models/community/config.json",checkpoint_path="./models/community/G_latest.pth"):
        self._hparams_file_path = hparams_file_path 
        self._checkpoint_path = checkpoint_path
        
        print(hparams_file_path)
        self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
        self.language_marks = {
            "Japanese": "",
            "日本語": "[JA]",
            "中文": "[ZH]",
            "English": "[EN]",
            "Mix": "",
            }
        self.lang = ['日本語', '中文', 'English', 'Mix']
                
        self.tts_fn = self.create_tts_fn()
        self.hps = utils.get_hparams_from_file(self._hparams_file_path)
        self.set_model()

    def set_model(self):
        self.net_g = SynthesizerTrn(
            len(self.hps.symbols),
            self.hps.data.filter_length // 2 + 1,
            self.hps.train.segment_size // self.hps.data.hop_length,
            n_speakers=self.hps.data.n_speakers,
            **self.hps.model).to(self.device)
        _ = self.net_g.eval()

        _ = utils.load_checkpoint(self._checkpoint_path, self.net_g, None)

        self.speaker_ids = self.hps.speakers

    def get_text(self,text, hps, is_symbol):
        text_norm = text_to_sequence(text, hps.symbols, [] if is_symbol else hps.data.text_cleaners)
        if hps.data.add_blank:
            text_norm = commons.intersperse(text_norm, 0)
        text_norm = LongTensor(text_norm)
        return text_norm

    def create_tts_fn(self):
        def tts_fn(text, speaker, language, speed):
            if language is not None:
                text = self.language_marks[language] + text + self.language_marks[language]
            speaker_id = self.speaker_ids[speaker]
            stn_tst = self.get_text(text, self.hps, False)
            with no_grad():
                x_tst = stn_tst.unsqueeze(0).to(self.device)
                x_tst_lengths = LongTensor([stn_tst.size(0)]).to(self.device)
                sid = LongTensor([speaker_id]).to(self.device)
                audio = self.net_g.infer(x_tst, x_tst_lengths, sid=sid, noise_scale=.667, noise_scale_w=0.8,
                                    length_scale=1.0 / speed)[0][0, 0].data.cpu().float().numpy()
            del stn_tst, x_tst, x_tst_lengths, sid
            return "Success", (self.hps.data.sampling_rate, audio)

        return tts_fn

    def process_text_and_generate_voice_array(slef,tts_fn, speaker, text, language, speed):
        """處理文本並生成聲音數據列表"""
        text = text.replace('\\n', '').strip().replace('\n', '')
        symbol_remov = ["，", "、", "。", "：", "）", "（", "？", ":", ")", "(", "「", "」", "！"]
        non_symbol_text = text
        for symbol in symbol_remov:
            non_symbol_text = non_symbol_text.replace(symbol, "@")

        current_index = 0
        max_len = 30
        numpy_voice_array = []
        if max_len >= len(text):
            _, output = tts_fn(text=text, speaker=speaker, language=language, speed=float(speed))
            numpy_voice_array = output[1].tolist()
        else:
            while True:
                temp_text = non_symbol_text[current_index:current_index + max_len]
                split_index = [pos for pos, char in enumerate(temp_text) if char == '@']
                target_index = current_index + max_len + 1 if len(split_index) == 0 else split_index[-1] + current_index + 1
                used_text = text[current_index:target_index]
                _, output = tts_fn(text=used_text, speaker=speaker, language=language, speed=float(speed))
                voice_list = output[1].tolist()
                if len(voice_list) < 20000:
                    numpy_voice_array += voice_list
                else:
                    numpy_voice_array += output[1].tolist()[300:-8500]
                current_index = target_index
                if current_index >= len(text) - 1:
                    break

        return numpy_voice_array, output[0]

    def create_wav(self, text, filename=None, speaker = "community", language = "中文", speed=1.0):
        """根據文本創建wav檔案"""
        numpy_voice_array, sr = self.process_text_and_generate_voice_array(self.tts_fn, speaker, text, language, speed)
        numpy_voice_array2 = np.int16(np.array(numpy_voice_array) * 32767)
       
        # 構建完整的文件路徑
        file_str = filename if filename else datetime.strftime(datetime.now(), '%Y-%m-%d-%H-%M-%S')
        full_path = os.path.join('audio', f"{file_str}.wav")

        # 確保目錄存在
        os.makedirs(os.path.dirname(full_path), exist_ok=True)

        # 寫入 WAV 文件
        with wave.open(full_path, "wb") as wf:
            wf.setnchannels(1)
            wf.setsampwidth(2)
            wf.setframerate(sr)
            wf.writeframes(numpy_voice_array2.tobytes())
        # 返回完整的文件路徑
        return full_path

    def create_voice_array(self, text, speaker = "community", language = "中文", speed=1.0):
        """根據文本生成聲音數據列表"""
        numpy_voice_array, _ = self.process_text_and_generate_voice_array(self.tts_fn, speaker, text, language, speed)
        return numpy_voice_array

In [None]:
tts = VitsService(
    hparams_file_path="./models/YunzeNeural/config.json",
    checkpoint_path="./models/YunzeNeural/G_latest.pth"
    )

In [None]:

speaker = 'YunzeNeural'
tts.create_wav(text='水藥，須冷藏保存，若無法冷藏，請放置陰涼處',speaker=speaker)