In [1]:
import os
import time
from flask import Flask, request, abort, render_template, redirect, url_for, flash, jsonify, send_file
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError
from linebot.models import (
    MessageEvent, AudioMessage, TextMessage, TextSendMessage, AudioSendMessage
)
import requests
import logging

#---------------------------語音辨識模型載入
import tempfile
import whisper
# 載入 Whisper 模型
model = whisper.load_model("tiny")

#---------------------------語言模型載入
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import opencc
# 載入模型和分詞器
model_name = "Qwen/Qwen2-0.5B"
tokenizer = AutoTokenizer.from_pretrained(model_name)
# 檢查是否有可用的 GPU，如果有則將模型載入到 GPU 上，否則使用 CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForCausalLM.from_pretrained(model_name).to(device)
#model = AutoModelForCausalLM.from_pretrained(model_name, ignore_mismatched_sizes=True).to(device)

# 建立一個 OpenCC 轉換器，用於將簡體中文轉換為繁體中文
converter = opencc.OpenCC('s2t')  # 's2t' 表示從簡體轉為繁體
def answer_question(question):
    # 將輸入的問題進行編碼，並生成 attention_mask
    inputs = tokenizer(question, return_tensors="pt", padding=True)   
    # 將編碼後的輸入向量傳到到與模型相同的 device 上（GPU 或 CPU）
    inputs = {key: value.to(device) for key, value in inputs.items()}   
    # 設定 pad_token_id 為 eos_token_id，以避免在處理填充時出現混淆
    pad_token_id = tokenizer.eos_token_id   
    # 使用 LLM 模型生成答案，這裡設置 max_new_tokens 來限制生成的最大長度
    outputs = model.generate(
        inputs['input_ids'], 
        attention_mask=inputs['attention_mask'], 
        max_new_tokens=256, 
        pad_token_id=pad_token_id
    )   
    # 解碼生成的答案，將模型輸出的 token 轉換為人類可讀的內容
    answer = tokenizer.decode(outputs[0], skip_special_tokens=True)   
    # 將簡體中文答案轉換為繁體中文
    answer_traditional = converter.convert(answer)   
    return answer_traditional



import subprocess
from datetime import datetime
from pathlib import Path

app = Flask(__name__)
app.secret_key = 'os.urandom(24)'  # 用於未確定 key 前，先預設一組 key

# 設置Log記錄
logging.basicConfig(level=logging.INFO)

# 設定 LINE CHANNEL ACCESS TOKEN 和 CHANNEL SECRET
LINE_CHANNEL_ACCESS_TOKEN = 'w4627SjiixmfjJ7LNg6U8q9L8Nh+NXgaN4ELtQ9FkxjO8oO0aVdT8L9J9eGT/qNM9IrLMzjcngjmCtPy+Qa70dxtU0e4e8F6NA6hwbIM3lppgmzwNMiC257n6Eq8eLt+buQ8lSfFFNQF1AJvRZGRIgdB04t89/1O/w1cDnyilFU='
LINE_CHANNEL_SECRET = '1d982942ffefc23710b07c6abc050cb1'
SERVER_URL = 'your_server_url'  # 設定你的 SERVER_URL

STT_API_URL = 'http://180.218.16.187:30303/recognition_long_audio'
TTS_API_URL = 'http://180.218.16.187:30303/getTTSfromText'
LLM_API_URL = 'http://61.66.218.215:30315/llm_chat'
SERVER_PORT = 10000  # 免費空間 Render.com 預設 PORT

line_bot_api = LineBotApi(LINE_CHANNEL_ACCESS_TOKEN)
line_handler = WebhookHandler(LINE_CHANNEL_SECRET)

# 語音轉文字 (需先架設好 Whisper Server)
def get_text_from_audio(audio_path):
    payload = {'doStyle': '0'}
    files = [
        ('audio', (os.path.basename(audio_path), open(audio_path, 'rb'), 'audio/mpeg'))
    ]
    headers = {}
    response = requests.request('POST', STT_API_URL, headers=headers, data=payload, files=files)

    if response.status_code == 200:
        try:
            data = response.json()
            logging.info(f"STT=> {data}")
            return data.get('result', '無法辨識音訊')
        except requests.exceptions.JSONDecodeError:
            logging.error("Failed to decode JSON response")
            return '無法辨識音訊'
    else:
        logging.error(f"STT API request failed with status code {response.status_code}")
        return '錄音語音品質不佳，請再試試。'

# LLM語言模型 (需先架設好 LLM Server)
def get_response_from_llm(query):
    payload = {'token': 'TEST',
        'query': query,
        'prompt_name': '艾妮機器人',
        'max_tokens': '1024'}
    files = []
    headers = {}
    response = requests.post(LLM_API_URL, headers=headers, data=payload, files=files)
    data = response.json()
    logging.info(f"LLM=> {data}")
    return data.get('result', '無法獲取回應')

# 文字轉語音  (需先架設好 TTS Server)
def get_audio_from_text(text):
    payload = {
        'tone': '0',  # 語音音高
        'speed': '0',  # 語音速度
        'content': text,  # 語音內容
        'gender': '1'  # 語音性別
    }
    headers = {}
    response = requests.post(TTS_API_URL, headers=headers, data=payload)
    audio_path = f'static/{int(time.time())}.mp3'
    with open(audio_path, 'wb') as f:
        f.write(response.content)
    return audio_path

def runcmd(command):
    try:
        # 使用 subprocess 執行命令行指令
        ret = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", timeout=3600)
        if ret.returncode == 0:  # 檢查是否成功執行
            print("成功:", ret.stdout)
        else:
            print("錯誤:", ret.stderr)
            return False, ret.stderr  # 回傳錯誤訊息
        return True, ret.stdout  # 回傳成功訊息
    except subprocess.TimeoutExpired as e:
        print(f"命令超時: {e}")
        return False, f"命令超時: {e}"  # 回傳超時錯誤
    except Exception as e:
        print(f"發生錯誤: {e}")
        return False, f"發生錯誤: {e}"  # 回傳一般錯誤

@app.route("/", methods=["GET", "POST"])
def home():
    return "Line Bot 已啟動並運作"

def add_line_handlers(handler):
    @handler.add(MessageEvent, message=AudioMessage)
    def handle_audio_message(event):
        message_content = line_bot_api.get_message_content(event.message.id)
        audio_path = f'static/{int(time.time())}.mp3'

        with open(audio_path, 'wb') as fd:
            for chunk in message_content.iter_content():
                fd.write(chunk)

        text = get_text_from_audio(audio_path)
        logging.info(f"STT: {text}")

        llm_response = get_response_from_llm(text)[0]['content']
        logging.info(f"LLM Reply: {llm_response}")

        reply_audio_path = get_audio_from_text(llm_response)
        logging.info(f"TTS: {reply_audio_path}")

        if os.path.exists(reply_audio_path):
            line_bot_api.reply_message(
                event.reply_token,
                [
                    TextSendMessage(text=llm_response),
                    AudioSendMessage(
                        original_content_url=f'{SERVER_URL}/{reply_audio_path}',
                        duration=330
                    )
                ]
            )
        else:
            line_bot_api.reply_message(
                event.reply_token,
                TextSendMessage(text="合成語音時錯誤，請檢查 TTS Server")
            )

    @handler.add(MessageEvent, message=TextMessage)
    def handle_text_message(event):
        text = event.message.text
        llm_response = get_response_from_llm(text)[0]['content']
        logging.info(f"LLM Reply: {llm_response}")

        reply_audio_path = get_audio_from_text(llm_response)
        logging.info(f"TTS: {reply_audio_path}")

        if os.path.exists(reply_audio_path):
            line_bot_api.reply_message(
                event.reply_token,
                [
                    TextSendMessage(text=llm_response),
                    AudioSendMessage(
                        original_content_url=f'{SERVER_URL}/{reply_audio_path}',
                        duration=330
                    )
                ]
            )
        else:
            line_bot_api.reply_message(
                event.reply_token,
                TextSendMessage(text="合成語音時錯誤，請檢查 TTS Server")
            )

@app.route("/webhook", methods=["POST"])
def callback():
    if not line_handler:
        abort(500, "LINE bot has not been configured.")

    signature = request.headers["X-Line-Signature"]
    body = request.get_data(as_text=True)

    try:
        line_handler.handle(body, signature)
    except InvalidSignatureError:
        abort(400)
    return "OK"

@app.route("/transcribe", methods=["POST"])
def transcribe():
    # 檢查請求中是否包含檔案
    if 'file' not in request.files:
        return jsonify({"error": "未上傳音訊檔案"}), 400   
    audio_file = request.files['file']
    # 將上傳的音訊檔案保存為臨時檔案
    with tempfile.NamedTemporaryFile(delete=False) as temp_audio_file:
        audio_file.save(temp_audio_file.name)   
    # 載入音訊檔案並進行轉逐字稿
    audio = whisper.load_audio(temp_audio_file.name)
    result = model.transcribe(audio, language='zh')
    # 刪除臨時檔案
    os.remove(temp_audio_file.name)
    # 回傳語音辨識結果
    return jsonify({"transcription": result['text']})

@app.route('/synthesize', methods=['POST'])
def synthesize():
    # 建立存放上傳文件的文件路徑
    upload_folder = "static/" + datetime.now().strftime('%Y-%m-%d')
    upload_folder = Path(upload_folder)
    upload_folder.mkdir(exist_ok=True, parents=True)  # 如果文件路徑不存在，則新增
    if request.method == 'POST':
        # 取得 POST 請求中的參數
        content = request.form.get('content', '上傳資料內容有誤')  # 要合成的文字內容
        gender = request.form.get('gender', '1')  # 選擇的語音性別
        tone = request.form.get('tone', '0')  # 調整的音調
        speed = request.form.get('speed', '0')  # 調整的語速       
        # 根據性別選擇語音
        voices = {
            '1': "zh-TW-YunJheNeural",  # 男聲參數
            '0': "zh-TW-HsiaoYuNeural"  # 女聲參數
        }
        voice = voices.get(gender, "zh-TW-HsiaoChenNeural")  # 如果性別無法匹配或是沒有設定，預設使用 "zh-TW-HsiaoChenNeural"
        # 設定音調和語速
        pitch = f"{'+' if int(tone) >= 0 else ''}{tone}Hz"  # 音調設定，例如 +5Hz 或 -5Hz
        rate = f"{'+' if int(speed) >= 0 else ''}{speed}%"  # 語速設定，例如 +10% 或 -10%
        # 定義生成的音頻文件名稱
        fileName = f"{int(time.time() * 1000)}.mp3"
        outputFileName = str(upload_folder / fileName)  # 完整的文件路徑
        # 構建 CLI 命令以進行語音合成
        CLI = f'edge-tts --text "{content}" --write-media "{outputFileName}" --voice "{voice}" --pitch="{pitch}" --rate="{rate}"'
        print(CLI)       
        # 執行命令並處理結果
        success, message = runcmd(CLI)
        if success:
            return send_file(outputFileName, mimetype='audio/mpeg')  # 成功則回傳語音島按
        else:
            return jsonify({"error": message}), 500  # 如果失敗，回傳錯誤訊息
    return '請使用POST方法上傳數據'  # 如果不是 POST 請求，提示用戶

# 定義一個路由 '/qa'，使用 POST 方法來處理問答請求
@app.route('/qa', methods=['POST'])
def qa():
    # 檢查請求中是否包含 'text' 字段，如果沒有則回傳錯誤訊息
    if 'text' not in request.json:
        return jsonify({"error": "未提供文本"}), 400  # 400 表示客戶端錯誤
    # 從請求中提取問題文字
    question = request.json['text']   
    # 呼叫 answer_question 函數來生成答案
    answer = answer_question(question)   
    # 將答案以 JSON 格式回傳
    return jsonify({"answer": answer})

if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=SERVER_PORT, use_reloader=False)



  checkpoint = torch.load(fp, map_location=device)


tokenizer_config.json:   0%|          | 0.00/1.29k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.78M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/1.67M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/7.03M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/661 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/988M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/138 [00:00<?, ?B/s]

 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:10000
 * Running on http://192.168.0.199:10000
INFO:werkzeug:[33mPress CTRL+C to quit[0m
INFO:werkzeug:127.0.0.1 - - [07/Sep/2024 21:40:01] "POST /qa HTTP/1.1" 200 -
