# 參數設定

In [1]:
import os
from dotenv import load_dotenv
load_dotenv()
load_dotenv('/eason/.server.env')

import pymongo
MongoClient = pymongo.MongoClient(f"mongodb://{os.getenv('mongo_user')}:{os.getenv('mongo_pw')}@localhost:27081")

DB_NAME = "chatbot_experiment_2022_09"
GPT3_chat_history_col = MongoClient[DB_NAME]["GPT3_Chat"]
GPT3_chat_user_col = MongoClient[DB_NAME]["Users"]
GPT3_chat_bots_col = MongoClient[DB_NAME]["Bots"]


from translate import translate

from linebot import (
    LineBotApi, WebhookHandler
)
from linebot.exceptions import (
    InvalidSignatureError
)
from linebot.models import (
    MessageEvent,
    TextMessage,
    TextSendMessage,
    FlexSendMessage,
    TemplateSendMessage,
    MessageTemplateAction,
    ButtonsTemplate,
    PostbackEvent,
    PostbackTemplateAction,
    AudioMessage,
    AudioSendMessage,
    Sender
)

from flask import Flask, request, abort, render_template, send_from_directory
import subprocess



app = Flask(__name__, template_folder = 'dist')

line_bot_api = LineBotApi(os.getenv('CHANNEL_ACCESS_TOKEN'))
handler = WebhookHandler(os.getenv('CHANNEL_SECRET'))


# 處理 GPT 3

In [5]:
import openai

openai.api_key = os.getenv("OPENAI_API_KEY")


from google.cloud import speech
import sys, pathlib
from pydub import AudioSegment
import requests
import json
import os
import uuid
import datetime
import numpy as np

import io
def voice_reco(client, filename):
    with io.open(filename, "rb") as audio_file:
        content = audio_file.read()
    audio = speech.RecognitionAudio(content = content)


    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=16000,
        language_code="zh-TW"
    )
    response = client.recognize(config=config, audio=audio)
    try:
        text = response.results[0].alternatives[0].transcript
    except:
        text = ""
    return text

def process_voice_file(file_path):
    text = ""
    try:
        # Instantiates a client
        client = speech.SpeechClient.from_service_account_json(os.getenv('GOOGLE_SERVICE_JSON_PATH'))
        # Detects speech in the audio file
        wav = AudioSegment.from_wav(f'{file_path}.wav')
        if wav.duration_seconds < 60:
            text = voice_reco(client, f'{file_path}.wav')

            all_texts = [text]
        else:
            text = "too long... can't recognize..."
            wav_files = []
            while wav.duration_seconds > 60:
                temp_filename = str(uuid.uuid4())
                temp_filename = f'{file_path}.wav'
                slice_wav = wav[0:59]
                slice_wav.export(temp_filename, format="wav")
                wav_files.append(temp_filename)
                wav = wav[57:]
            temp_filename = str(uuid.uuid4())
            temp_filename = f'{file_path}.wav'
            wav.export(temp_filename, format="wav")
            wav_files.append(temp_filename)
            all_texts = []
            for wav_file in wav_files:
                text = voice_reco(client, wav_file)
                all_texts.append(text)
                os.remove(wav_file)

            #response = client.long_running_recognize(config=config, audio=audio)
            #response = operation.result(timeout=90)
            #text = response.results[0].alternatives[0].transcript
        #AAA.append(response)
        text = "".join(all_texts)
        

        os.remove(f'{file_path}.wav')
        os.remove(f'{file_path}')
    except ValueError as e:
        import traceback
        import sys
        exc_type, exc_value, exc_tb = sys.exc_info()
        text = "[ERROR]\n" + "".join(traceback.format_exception(exc_type, exc_value, exc_tb))
        '''if update.message.chat.id in User_List:
            result = "[ERROR]\n" + "".join(traceback.format_exception(exc_type, exc_value, exc_tb))
            bot.edit_message_text(text = result, message_id=reply_msg.message_id, chat_id= reply_msg.chat_id)
        else:
            result = "Error, Might be no detectable text in the voice message. If you think this is an error otherwise, please contect the author @EasonC13"
            bot.edit_message_text(text = result, message_id=reply_msg.message_id, chat_id= reply_msg.chat_id)'''
        
    return text



def process_voice_message(event):
    #AAA.append(event)
    file_path = f'/tmp/{uuid.uuid4()}'
    
    message_content = line_bot_api.get_message_content(event.message.id)

    with open(file_path, 'wb') as fd:
        for chunk in message_content.iter_content():
            fd.write(chunk)

    subprocess.call(
            f'ffmpeg -i {file_path} {file_path}.wav',
            shell=True, stdout=subprocess.DEVNULL,
            stderr=subprocess.STDOUT)

    text = process_voice_file(file_path)
    send_GPT3_response(text = text, event = event)

from bson.objectid import ObjectId
def create_prompt(prompt, prev_msgs, text, num = -3):
    prompt = (prompt + "\n") if prompt[-1] != "\n" else prompt
    for msg in prev_msgs[num:]:
        prompt += f"You: {msg['input_text_en']}\nFriend: {msg['response_text_en']}\n"
    prompt += f"You: {text}\nFriend: "
    return prompt

#以棄用
def update_pre_prompt():
    """def update_pre_prompt(user_id, text):
        GPT3_chat_user_col.update_one({
                "user_id": user_id
            },{
                "$set": {"pre_prompt": text}
            })

    def update_chat_with(user_id, chat_with, chat_from = "You"):
        GPT3_chat_user_col.update_one({
                "user_id": user_id
            },{"$set":{
                "chat_with": chat_with,
                "chat_from": chat_from,
            }})"""
pass

def generate_GPT3_response(event, text, bot):
    
    user = get_user(event.source.user_id)
    
    prev_msgs = GPT3_chat_history_col.find({
        'user.user_id': event.source.user_id,
        'user.sn': user['sn'], #serial number
        'bot_id': bot['id'],
    })
    prev_msgs = list(prev_msgs)
    prev_msgs.sort(key=lambda x: x['time'])
    
    prompt = create_prompt(bot['prefix'], prev_msgs, text, -3)
    
    have_second_chance = True
    max_try = 4

    response = openai.Completion.create(
      engine="text-davinci-001",
      prompt=prompt,
      temperature=0.6,
      max_tokens=120,
      top_p=1.0,
      frequency_penalty=0.5,
      presence_penalty=0.5,
      stop=["You:"]
    )

    response_text = response.choices[0].text.replace("\n", "")

    prev_responses = list(map(lambda x: x['response_text_en'], prev_msgs[-3:]))
    count = np.unique(prev_responses, return_counts=True)[1]
    
    print(prompt + response_text, end = '\n')
    
    return response_text

def norm_text(text):
    text = text.replace("&#39;", "'")
    return text

def get_bots(bot_id):
    return GPT3_chat_bots_col.find_one({'id': bot_id})

def send_GPT3_response(text, event):
    
    user = get_user(event.source.user_id)
    user_profile = line_bot_api.get_profile(event.source.user_id)
    
    translated_result = translate(text, target="en")
    
    text_en = translated_result['translatedText']
    text_en = norm_text(text_en)
    text_source = translated_result['detectedSourceLanguage']
    if text_source == 'und':
        text_source = "zh-tw"
    
    bots_id = user['bots']
    if len(bots_id) == 0:
        bots_id = ['棒男孩']
    bots = list(map(get_bots, bots_id))
    
    message = []
    tasks = [] #TODO: 如果需要照順序回要改成寫 mapping
    for bot in bots:
        t = doThreading(thread_GPT3, args = (message, event, text, text_en, bot, user_profile, user, text_source))
        tasks.append(t)
        #thread_GPT3(message, event, text, text_en, bot, user_profile, user, text_source)
    for t in tasks:
        t.join()

    print(message)
    line_bot_api.reply_message(event.reply_token, message)


def thread_GPT3(message, event, text, text_en, bot, user_profile, user, text_source):
    response_text = ""
    while response_text == "":
        response_text_en = generate_GPT3_response(event, text_en, bot)
        response_text_en = norm_text(response_text_en)
        print(f"response_text_en = {response_text_en}, text_source = {text_source}")

        if text_source[:2] == "zh":
            response_text = translate(response_text_en, target="zh-TW")['translatedText']
        elif text_source != 'en':
            response_text = translate(response_text_en, target=text_source)['translatedText']
        else:
            response_text = response_text_en

    print("----")
    print(f"From: {bot['id']}")
    print('--Origin--')
    print(f'{text_en} => {response_text_en}')
    print('--Translated--')
    print(f'{text} => {response_text}')
    print('========')


    GPT3_chat_history_col.insert_one({
        'input_text': text,
        'input_text_en': text_en,
        'response_text_en': response_text_en,
        'response_text': response_text,
        'event_message_id': event.message.id,
        'user': {
            "display_name": user_profile.display_name,
            "user_id": event.source.user_id,
            "sn": user['sn'],
        },
        'bot_id': bot['id'],
        "time": datetime.datetime.now(),
        "input_type": 'text',
    })

    message.append(
        TextSendMessage(
            text=response_text, 
            sender = Sender(name = bot['name'].replace("_", " "),
                            icon_url = bot['img_url']))
            )

# 回應邏輯

In [16]:

ALL_STATUS = [
            "New Starter",
            "Ready to go",
            "Condition_A_Pretest",
            "Condition_A_Chatting",
            "Condition_A_Posttest",
            "Condition_A_Finish",
            "Condition_B_Pretest",
            "Condition_B_Chatting",
            "Condition_B_Posttest",
            "Condition_B_Finish",
            "Condition_C_Pretest",
            "Condition_C_Chatting",
            "Condition_C_Posttest",
            "Condition_C_Finish",            
            "Final_Test"
        ]
def get_user(user_id):
    user_profile = line_bot_api.get_profile(user_id)

    user = GPT3_chat_user_col.find_one({
        'user_id': user_id
    })
    if user == None:
        user = {
            'user_id': user_id,
            'display_name': user_profile.display_name,
            'sn': 0,
            'status': ALL_STATUS[0],
            'status_history': []
        }
        GPT3_chat_user_col.insert_one(user)

    return user

def process_text_message(event):
    text = event.message.text
    
    if process_command(event, text):
        return True
    
    send_GPT3_response(text, event)

def update_user_status(user_id, newStatus):
    user = GPT3_chat_user_col.find_one({
            'user_id': user_id
        })
    GPT3_chat_user_col.update_one({
        'user_id': user_id
    }, {
        '$set': {'status': newStatus},
        '$push': {'status_history': user['status']}
    })

user = get_user("Ub830fb81ec2de64d825b4ab2f6b7472e")

user

import random

def randomly_assign_next_task(user):
    tasks = ["Condition_A_Pretest", "Condition_B_Pretest", "Condition_C_Pretest"]
    random.shuffle(tasks)
    tasks.append(False)
    for t in tasks:
        if t not in user['status_history']:
            break
    if t:
        update_user_status(user['user_id'], t)
        return t
    else:
        return False

def send_pre_test_info(event, user, tast_name):
    line_bot_api.reply_message(event.reply_token, FlexSendMessage(
        alt_text = f"歡迎來到此聊天機器人實驗，請先點選下方連結閱讀實驗指引並確認開始實驗\n\nhttps://exp1.eason.best/starter?id={user['user_id']}\n\n完成後請點選「完成」", 
        contents = {
        "type": "bubble",
        "hero": {
        "type": "image",
        "url": "https://i.imgur.com/ufIifp6.png",
        "size": "full",
        "aspectRatio": "20:13",
        "aspectMode": "cover",
        },
        "body": {
        "type": "box",
        "layout": "vertical",
        "contents": [
          {
            "type": "text",
            "text": "前測",
            "weight": "bold",
            "size": "xl"
          },
          {
            "type": "box",
            "layout": "vertical",
            "margin": "lg",
            "spacing": "sm",
            "contents": [
              {
                "type": "box",
                "layout": "baseline",
                "spacing": "sm",
                "contents": [
                  {
                    "type": "text",
                    "text": "點選下方「進行前測」做好感度評估，之後點選「下一步」繼續實驗",
                    "wrap": True,
                    "color": "#666666",
                    "size": "sm",
                    "flex": 5
                  }
                ]
              }
            ]
          }
        ]
        },
        "footer": {
        "type": "box",
        "layout": "vertical",
        "spacing": "sm",
        "contents": [
          {
            "type": "button",
            "style": "link",
            "height": "sm",
            "action": {
              "type": "uri",
              "label": "進行前測",
              "uri": f"https://exp1.eason.best/pretest?id=userid&test={tast_name}"
            }
          },
          {
            "type": "button",
            "style": "link",
            "height": "sm",
            "action": {
              "type": "message",
              "label": "下一步",
              "text": "我已完成前測"
            }
          },
          {
            "type": "box",
            "layout": "vertical",
            "contents": [],
            "margin": "sm"
          }
        ],
        "flex": 0
        }
        }))
    
change_topic_command_zh = [
    '換個話題',
]
change_topic_command_en = [
    'change topic'
]

def increase_chat_sn(user_id):
    GPT3_chat_user_col.update_one({
            "user_id": user_id
        },{
            "$inc": {'sn': 1}
        })

def process_command(event, text):
    user_id = event.source.user_id
    user = get_user(user_id)
    
    if user['status'] == 'New Starter':
        if text == "我已完成閱讀":
            line_bot_api.reply_message(event.reply_token, 
               TextSendMessage(text='''偵測到您尚未完成閱讀，請閱讀完所有內容後於最下方點選「開始實驗」，收到頁面告知可以回來才算完成喔！'''))
        elif text == "Ready to go":
            line_bot_api.reply_message(event.reply_token, 
               TextSendMessage(text='''已更新狀態為 "Ready to go"'''))
            update_user_status(user['user_id'], "Ready to go")
        else:
            line_bot_api.reply_message(event.reply_token, FlexSendMessage(
                alt_text = f"歡迎來到此聊天機器人實驗，請先點選下方連結閱讀實驗指引並確認開始實驗\n\nhttps://exp1.eason.best/starter?id={user_id}\n\n完成後請點選「完成」", 
                contents = {
                "type": "bubble",
                "hero": {
                "type": "image",
                "url": "https://i.imgur.com/ufIifp6.png",
                "size": "full",
                "aspectRatio": "20:13",
                "aspectMode": "cover",
                },
                "body": {
                "type": "box",
                "layout": "vertical",
                "contents": [
                  {
                    "type": "text",
                    "text": "歡迎來到聊天機器人實驗",
                    "weight": "bold",
                    "size": "xl"
                  },
                  {
                    "type": "box",
                    "layout": "vertical",
                    "margin": "lg",
                    "spacing": "sm",
                    "contents": [
                      {
                        "type": "box",
                        "layout": "baseline",
                        "spacing": "sm",
                        "contents": [
                          {
                            "type": "text",
                            "text": "點選下方「閱讀指引」觀看實驗說明並勾選同意，之後點選「下一步」開始實驗",
                            "wrap": True,
                            "color": "#666666",
                            "size": "sm",
                            "flex": 5
                          }
                        ]
                      }
                    ]
                  }
                ]
                },
                "footer": {
                "type": "box",
                "layout": "vertical",
                "spacing": "sm",
                "contents": [
                  {
                    "type": "button",
                    "style": "link",
                    "height": "sm",
                    "action": {
                      "type": "uri",
                      "label": "閱讀實驗指引",
                      "uri": "https://exp1.eason.best/starter?id=userid"
                    }
                  },
                  {
                    "type": "button",
                    "style": "link",
                    "height": "sm",
                    "action": {
                      "type": "message",
                      "label": "下一步",
                      "text": "我已完成閱讀"
                    }
                  },
                  {
                    "type": "box",
                    "layout": "vertical",
                    "contents": [],
                    "margin": "sm"
                  }
                ],
                "flex": 0
                }
                }))
        return True
    elif user['status'] == "Ready to go":
        # Arrange a group
        task_type = randomly_assign_next_task(user)
        send_pre_test_info(event, user, task_type)
        
        
        return True
    elif text in change_topic_command_zh:
        increase_chat_sn(user_id)
        line_bot_api.reply_message(event.reply_token, 
           TextSendMessage(text='''已更換話題，讓我們繼續聊天吧～\n如果我又持續說重複的話，請輸入「換個話題」以繼續對話'''))
        return True
    elif text.lower() in change_topic_command_en:
        increase_chat_sn(user_id)
        line_bot_api.reply_message(event.reply_token, 
           TextSendMessage(text='''Topic changed, let's continue chatting.\nIf I keep saying weird things again, please enter 'change topic'.'''))
        return True
    
    return False




@app.route("/api/v1/line_bot", methods=['POST'])
def callback():
    # get X-Line-Signature header value
    signature = request.headers['X-Line-Signature']

    # get request body as text
    body = request.get_data(as_text=True)
    app.logger.info("Request body: " + body)

    # handle webhook body
    try:
        handler.handle(body, signature)
    except InvalidSignatureError:
        print("Invalid signature. Please check your channel access token/channel secret.")
        abort(400)

    return 'OK'


@handler.add(MessageEvent, message=TextMessage)
def handleTextMessage(event):
    doThreading(process_text_message, args = (event))

@handler.add(MessageEvent, message=AudioMessage)
def handleAudioMessage(event):
    doThreading(process_voice_message, args = (event))
    



@app.route("/api/v1/status", methods=['GET'])
def get_status():
    user_count = GPT3_chat_user_col.estimated_document_count()
    chat_count = GPT3_chat_history_col.estimated_document_count()
    return {"user_count": user_count, "chat_count": chat_count}
    








AAA = []
if __name__ == "__main__":
    from flask_cors import CORS
    CORS(app)
    app.run(port=os.getenv('API_PORT'))

In [None]:
#os.system('cd ../frontend && rm -rf dist && unzip -qq dist && echo "Update Frontend Success"')