In [None]:
import pickle
from src.utils.text_chunking import *
from sentence_transformers import util
from src.utils.embeddings import *

In [2]:
with open(r'C:\path\to\word_definition_embeddings','rb') as f:
    word_definition_embeddings = pickle.load(f)

with open(r"C:\path\to\faq_embeddings", 'rb') as f:
    faq_embeddings =pickle.load(f)

with open(r"C:\path\to\insurance_chunk_embeddings", 'rb') as f:
    insurance_chunk_embeddings =pickle.load(f)

with open(r"C:\path\to\record_chunk_embeddings", 'rb') as f:
    record_chunk_embeddings = pickle.load(f)

In [3]:
import os
import chardet

In [4]:
def detect_encoding(file_path):
    with open(file_path, "rb") as f:
        result = chardet.detect(f.read())
        return result['encoding']
    
def read_txt(file_path):
    encoding = detect_encoding(file_path)
    with open(file_path, 'r', encoding=encoding) as f:
        content = f.read()
        return content

def read_file(folder_path):
    corpus = {}
    files = [f for f in os.listdir(folder_path) if f.endswith(".txt")]
    for file in files:
        file_path = os.path.join(folder_path, file)
        content = read_txt(file_path)
        corpus[file] = content

    return corpus

In [5]:
corpus = read_file(folder_path ="processed_data")

In [6]:
data = corpus["data.txt"]

In [7]:
record_text = corpus['T00005025.script.txt']

In [8]:
from src.utils.retrievals import *

In [9]:
r = Retrieval()

In [10]:
word_definition = corpus["data.txt"]

In [11]:
chunking = Chunking(text=word_definition)
word_definition = chunking.text_chunking(word_definition, separators=['名詞解釋', '。'], chunk_size=50, chunk_overlap=10)

In [12]:
insurance = ["invest1.txt","invest2.txt","invest3.txt","normal1.txt","normal3.txt","normal3.txt"]
insurance_text = {key: corpus[key] for key in insurance if key in corpus}
insurance_text = " ".join(insurance_text.values())
chunking = Chunking(text=insurance_text)

insurance_chunk = chunking.text_chunking(insurance_text,
                                         separators=["KEYWORD", "KEYWORD", "KEYWORD\KEYWORD", "KEYWORD", "KEYWORD", "KEYWORD", "KEYWORD", "KEYWORD", "？"],
                                         chunk_size=50,
                                         chunk_overlap=10)

In [47]:
with open("path\to\caption.txt", encoding='utf-8') as f:
    lines = f.readlines()

texts = [line.split(" ", 1)[1].strip() for line in lines]
texts = "".join(texts)

In [48]:
index = r.build_index(embeddings=insurance_chunk_embeddings)
q = r.retrieval(query=texts, text=insurance_chunk, index=index, k=30, threshold=0.88)

In [49]:
Rule = {}

for i in ["KEYWORD", "KEYWORD", "KEYWORD\KEYWORD", "KEYWORD", "KEYWORD", "KEYWORD", "KEYWORD", "KEYWORD"]:
    Rule[f"{i}"] = []
    for text in insurance_chunk:
        if i in text:
            Rule[f"{i}"].append(text)

In [50]:
import re

In [51]:
remove_words = ['請問正確嗎', '請問您是否同意', '請問您清楚嗎', '請問是否正確', '請問您是否同意']

In [52]:
def word_filter(data, remove_words):
    remove_words = ['請問正確嗎', '請問您是否同意', '請問您清楚嗎', '請問是否正確', '請問您是否同意']
    pattern = re.compile('|'.join(map(re.escape, remove_words)))
    for key, sentences in data.items():
        data[key] = [pattern.sub('', sentence) for sentence in sentences]

    return data

In [None]:
filtered_Rule = word_filter(Rule, remove_words=remove_words)
filtered_Rule

錄音檔vs規則

In [135]:
with open("path\to\caption.txt", encoding='utf-8') as f:
    lines = f.readlines()

record_texts = [line.split(" ", 1)[1].strip() for line in lines]
record_texts = "".join(record_texts)


In [136]:
from sklearn.metrics.pairwise import cosine_similarity

def load_rule_embeddings(folder):
    rule_embeddings = {}
    for rule_file in os.listdir(folder):
        if rule_file.endswith(".pkl"):
            rule_name = rule_file.replace(".pkl", "")
            with open(os.path.join(folder, rule_file), "rb") as f:
                rule_embeddings[rule_name] = pickle.load(f)
    return rule_embeddings

def calculate_sim(vector, matrix):
    return cosine_similarity(vector.reshape(1, -1), matrix).flatten()

In [137]:
rule_folder = r"C:\Users\YT0283\Desktop\stt_anaysis\Rule_embedding"
rule_embeddings = load_rule_embeddings(rule_folder)

In [138]:
we = WordEmbedding()

In [139]:
def regex_test(max_scores,  texts_chunks):
    key_word = {'KEYWORD'}
    key_extract = {k: embeddings for k, embeddings in max_scores.items() if k in key_word} 
    pattern = r"十日|10日|十天|10天"
    return bool(re.search(pattern, texts_chunks[key_extract['契撤期']['chunk']-1]))
    

In [140]:
def run(folder, record_texts):
    chunking = Chunking(text=record_texts)
    we = WordEmbedding()
    texts_chunks = chunking.text_chunking(record_texts,
                                         separators=["嗎", "是否確認清楚", "瞭解瞭解", "同意同意", "了解了解", "請問"],
                                         chunk_size=50,
                                         chunk_overlap=10)
    
    rule_embeddings = load_rule_embeddings(folder)
    max_scores = {rule: {"score": 0, "chunk": None} for rule in rule_embeddings.keys()}
    for i, chunk in enumerate(texts_chunks):
        chunk_embedding = we.embedding(chunk)
        print("-"*50)
        print(f"chunk{i}:{chunk}")
        for rule, embeddings in rule_embeddings.items():
            rule_embeddings_matrix = np.array(embeddings)
            rule_embeddings_matrix = rule_embeddings_matrix.mean(axis=0)
            sim = calculate_sim(chunk_embedding, rule_embeddings_matrix.reshape(1,-1))
            print(f"錄音檔與{rule}之相似度:{sim[0]:.4f}")
            if sim[0] > max_scores[rule]["score"]:
                max_scores[rule]["score"] = sim[0]
                max_scores[rule]["chunk"] = i+1

            
    print("="*50)
    print(f"規則分數:")
    for rule, data in max_scores.items():
        print(f"{rule}:{data['score']:.4f} in chunk{data['chunk']}")
    
    if regex_test(max_scores,  texts_chunks) == True:
        print("="*50)
        print("內容正確")
    if regex_test(max_scores,  texts_chunks) == False:
        print("="*50)
        print("內容可能有誤")
        print(f"請檢查chunk{max_scores['KEYWORD']['chunk']}")

In [None]:
run(folder=rule_folder, record_texts=record_texts)

In [142]:
def topic_classification(folder, record_texts):
    chunking = Chunking(text=record_texts)
    we = WordEmbedding()
    texts_chunks = chunking.text_chunking(record_texts,
                                         separators=["嗎", "瞭解瞭解", "同意同意", "了解了解"],
                                         chunk_size=50,
                                         chunk_overlap=10)
    
    rule_embeddings = load_rule_embeddings(folder)
    max_scores = {chunk : {"score": 0, "rule": None} for chunk in texts_chunks}
    for i, chunk in enumerate(texts_chunks):
        chunk_embedding = we.embedding(chunk)
        #print("-"*50)
        #print(f"chunk{i+1}:{chunk}")
        for rule, embeddings in rule_embeddings.items():
            rule_embeddings_matrix = np.array(embeddings)
            rule_embeddings_matrix = rule_embeddings_matrix.mean(axis=0)
            sim = calculate_sim(chunk_embedding, rule_embeddings_matrix.reshape(1,-1))
            #print(f"錄音檔與{rule}之相似度:{sim[0]:.4f}")
            if sim[0] > max_scores[chunk]["score"]:
                max_scores[chunk]["score"] = sim[0]
                max_scores[chunk]["rule"] = rule
    return max_scores

In [143]:
topic_dict = topic_classification(folder=rule_folder, record_texts=record_texts)

In [None]:
for text_dict in topic_dict:
    for key, value in topic_dict.items():
        topic = value['rule']
        if topic in filtered_Rule:
            value["topic"] = {topic: filtered_Rule[topic]}


topic_dict

LLM

In [None]:
for chunk, rule in topic_dict.items():
    print(rule)

In [None]:
import requests
import json

url = "http://ditgpu01.aegon.com.tw/ollama-test/api/chat"

for chunk, rule in topic_dict.items():
  prompt = f"""
  你現在是一個對話檢查員，檢查客服人員是否在與顧客的對話中有提到銷售商品時的應注意事項。

  首先你會收到以下規則以及這些規則的範文:
  1.'KEYWORD'
  2.'KEYWORD'
  3.'KEYWORD'
  4.'KEYWORD'
  5.'KEYWORD'
  6.'KEYWORD'
  7.'聲明'
  8.'標的說明'

  再來會收到對話紀錄的一個段落，請比對並確認段落是否清楚地表達了規則。

  注意到：範文可能同時涉及多個規則，因此段落有至少提及一個規則即為正確。

  並遵照以下格式以繁體中文輸出:
  1.**是否正確?(是/否)**
  若1.的答案為否
  請回應:
  **建議修正:**

  段落:{chunk}
  規則:{rule}

  """
  payload = {
    "model": "llama3.1",
    "messages": [
      {
        "role": "user",
        "content": prompt
      }
    ],
    "stream": False
  }

  headers = {"Content-Type": "application/json"}

  response = requests.post(url, json=payload, headers=headers)

  formatted = json.dumps(response.json(), indent=4, ensure_ascii=False)

  print(chunk)
  print(formatted)