In [1]:
import re
import os
import json
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import math
import time

# LLM
import torch
from transformers import pipeline

# line
from waitress import serve
from flask import Flask, request
from linebot import LineBotApi, WebhookHandler
from linebot.models import TextSendMessage
import threading

In [2]:
# 語言模型 class(模型載入、回答輸出、數據資料庫串接)
class Llama3():
    def __init__(self) -> None:
        self.pipe = None
        self.terminators = None
    
    # 載入模型
    def load_model(self):
        # 本地端部屬 llama3 並加入量化
        self.pipe = pipeline("text-generation", model="fine_tuned_llama",
            model_kwargs={
                "torch_dtype": torch.float16,
                "quantization_config": {"load_in_4bit": True},
                "low_cpu_mem_usage": True,
            },
        )
        self.terminators = [
            self.pipe.tokenizer.eos_token_id,
            self.pipe.tokenizer.convert_tokens_to_ids("<|eot_id|>")
        ]
    
    # message json 的打包
    def message(self, txt, Q, prompt):
        messages = [
            {"role": "system", "content": f"{prompt}"},
            {"role": "user", "content": f"{txt} \n \n {Q}"},
        ]
        
        return messages
    
    # 回答
    def answer(self, messages:json):
        outputs = self.pipe(
            messages,
            max_new_tokens=1024,
            eos_token_id=self.terminators,
            do_sample=True,
            temperature=0.2,
            top_p=0.6,
        )
        response = outputs[0]["generated_text"][-1]["content"]
        
        return response

    # 抓數據庫資料
    def call_data_deport(self, keywords_pairs):
        txt = ''
        for i in keywords_pairs:
            txt += f'{i[0]}{i[1]}XXX\n\n'
            try:
                with open(f'stock_report//week//{i[0]}.json', 'r', encoding='utf-8') as file:
                        data = json.load(file)
                week_report = data.get(" ")
                txt += week_report
                txt += '\n\n'
            except:
                pass
            
            try:
                with open(f'stock_report//finace//{i[0]}.json', 'r', encoding='utf-8') as file:
                        data = json.load(file)
                finace_report = data.get(" ")
                txt += finace_report
                txt += '\n\n'
            except:
                 pass
        
        return txt

In [3]:
# 資料預處理 class(第一層預處理(大小寫、停用字)、關鍵字掃描、同義詞轉換)
class DataProcess():
    def __init__(self) -> None:
        self.XX_df = pd.read_csv('key_syn\stocks.csv')

    # 大小寫轉換、停用字
    def basic(self, input_text:str):
        # # 去除特殊字元及標點
        # text = re.sub(r'[^\w\s]', '', input_text)

        # 大小寫轉換(全轉成小寫)
        text = re.sub(r'[A-Za-z]+', lambda x: x.group().lower(), input_text)

        return text

    # 抓文本代號、名子
    def scan_stock(self, input_text:str):
        # 將 'XX_id' int 轉 str
        self.XX_df['XX_id'] = self.XX_df['XX_id'].astype(str)
        # 將 self.XX_df 轉為字典 用 'XX_id' 作 key 'name' 作 value
        XX_info = dict(zip(self.XX_df['XX_id'], self.XX_df['name']))

        # 掃描代號、名字
        keywords_pairs = []
        for code, name in XX_info.items():
            if code in input_text:
                keywords_pairs.append((code, name))
            if name in input_text:
                keywords_pairs.append((code, name))

        # 使用 set 去掉重複項再轉回 list
        keywords_pairs = list(set(map(tuple, keywords_pairs)))
        # 將每對 (code, name) 轉換為列表形式
        keywords_pairs = [list(pair) for pair in keywords_pairs]

        return keywords_pairs
    
    # 判斷類型並回傳
    def classify_type(self, input_text: str):
        # 將 'XX_id' int 轉 str
        self.XX_df['XX_id'] = self.XX_df['XX_id'].astype(str)
        
        # 獲取代號和名稱的關鍵字
        XX_id_list = self.XX_df['XX_id'].tolist()
        XX_name_list = self.XX_df['name'].tolist()
        keywords = XX_id_list + XX_name_list
        
        # 關鍵字文字形式
        patterns = {
            1: rf"^({'|'.join(keywords)})_XX報告$",
            2: rf"^({'|'.join(keywords)})_X報告$",
            3: rf"^({'|'.join(keywords)})_X營收$"
        }

        # 先將文本轉為小寫
        input_text = self.basic(input_text)

        # 檢查類型
        for type_number, pattern in patterns.items():
            if re.match(pattern, input_text):
                # 找到代號或名稱
                for keyword in keywords:
                    if f"{keyword}_" in input_text:
                        # 如果是名稱，轉換為代號
                        if keyword in XX_name_list:
                            XX_id = self.XX_df.loc[self.XX_df['name'] == keyword, 'XX_id'].values[0]
                            return type_number, XX_id  # 返回股票代號
                        
                        # 如果直接匹配返回股票代號
                        return type_number, keyword

        # 如果不匹配，返回類型 4
        return 4, None

In [5]:
cus_prompt = ''
compare_prompt = ''
single_prompt = ''

In [None]:
# Line運作@app
# 帳務
access_token = ''
secret = ''

# token secret
line_bot_api = LineBotApi(access_token)
handler = WebhookHandler(secret)

# 物件
DP = DataProcess()
LL = Llama3()
LL.load_model()

app = Flask(__name__)

@app.route("/", methods=['POST'])
def linebot():
    # 取得訊息內容
    body = request.get_data(as_text=True)
    # 訊息轉 json
    json_data = json.loads(body)
    # header 處理
    signature = request.headers['X-Line-Signature']
    handler.handle(body, signature)
    
    # 取得回傳訊息的 Token
    tk = json_data['events'][0]['replyToken']
    # 取得 LINE 收到的訊息類型
    type = json_data['events'][0]['message']['type']

    if type=='text':
        # 取得 LINE 收到的文字訊息
        msg = json_data['events'][0]['message']['text']

        # 刪除空格及換行
        msg_cleaned = re.sub(r'\s+', '', msg)

        
        # 2.文字處理
        input_text = DP.basic(msg_cleaned)
        print(input_text)

        if input_text == 'help':
            answer = """可以這樣問:

1. 
2. 
3. 
4. 
5. 
            """
            line_bot_api.reply_message(tk, TextSendMessage(answer))

        else:
            scan_list = DP.scan_stock(msg_cleaned)
            # 3.格式判斷(1.語言模型回答 2.直接給報告 3.回傳資料)
            input_type, key_code = DP.classify_type(input_text)
            
            # 財務報告
            if input_type == 1:
                with open(f'XX//XX//{key_code}.json', 'r', encoding='utf-8') as file:
                    data = json.load(file)
                answer = data.get("")
                line_bot_api.reply_message(tk, TextSendMessage(answer))
            
            # 週報告
            elif input_type == 2:
                with open(f'XX//XX//{key_code}.json', 'r', encoding='utf-8') as file:
                    data = json.load(file)
                answer = data.get(" ")
                line_bot_api.reply_message(tk, TextSendMessage(answer))
            
            # 月營收
            elif input_type == 3:
                df = pd.read_csv('XX//XX.csv')
                filtered_data = df[df[' '] == int(key_code)]
                filtered_data = filtered_data.drop(columns='Unnamed: 0')
                # 合併成一條字串
                answer = "\n".join([f"{col}: {filtered_data.iloc[0, filtered_data.columns.get_loc(col)]}" for col in filtered_data.columns])
                line_bot_api.reply_message(tk, TextSendMessage(answer))

            # 語言模型回答
            else:
                # 4.文本、數據資料庫
                ### 不同的 scan_list 長度，給不同形式的 data_txt ###
                if len(scan_list) > 0:
                    data_txt = LL.call_data_deport(scan_list)
                else:
                    data_txt = ''
                
                # 5.語言模型
                if len(scan_list) == 0:
                    message = LL.message(data_txt, input_text, cus_prompt)
                elif len(scan_list) == 1:
                    message = LL.message(data_txt, input_text, single_prompt)
                else:
                    message = LL.message(data_txt, input_text, compare_prompt)

                answer = LL.answer(message)

                # 回傳訊息
                line_bot_api.reply_message(tk, TextSendMessage(answer))

    else:
        pass

    # 驗證 Webhook 使用，不能省略
    return 'OK'

if __name__ == "__main__":
    # 啟動 Flask 應用
    # app.run()
    serve(app, host='0.0.0.0', port=5000)