In [1]:
import numpy as np
import spacy
from pprint import pprint
import yaml
import os
import sys
sys.path.insert(0, '/Users/leon/Income/python files/Telesales-QA-Framework')
from sentence_transformers import SentenceTransformer
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from helper.lexicons import *
from helper.config_helper import *
from helper.aspect_matching import *
from helper.sentiment_helper import *
from model_code.distilbert import *

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# model path
sentiment_path = "../../model_collection/sentiment_model/fine_tune_disbert.pt"

In [3]:
# Load model
sentiment_tokenizer = AutoTokenizer.from_pretrained("../../model_collection/sentiment_tokenizer")
sentiment_model = AutoModelForSequenceClassification.from_pretrained("../../model_collection/sentiment_model/")
sentiment_model = disbert_arch(sentiment_model)
sentiment_model.load_state_dict(torch.load(sentiment_path))

nlp = spacy.load('en_core_web_lg')
sentence_model = SentenceTransformer("../../model_collection/sentence_model")

In [4]:
def cosine_sim(X, Y):
    return np.dot(X, Y)/(np.linalg.norm(X) * np.linalg.norm(Y))

In [5]:
# Inspired by https://www.nature.com/articles/s41598-021-01460-7
# Find thresholds from each category
# labels: list of all the labels from category_dic
# category_dic: dictionary of samples sentences for all the subcategories
# encoding_model: transformer sentence model
def find_thresholds(labels, category_dic, encoding_model):
    label_counts = len(labels)
    cor_matrix = [[0] * label_counts for i in range(label_counts)]
    
    for i in range(label_counts):
        for j in range(label_counts):
            record_1 = category_dic[labels[i]]
            record_2 = category_dic[labels[j]]
            sim_score = 0 # initialize similarity score
            num_sims = 0 # initialize number of similarity score we sum up in the following for loops
            for k in range(len(record_1)):
                for m in range(len(record_2)):
                    encoding_1 = encoding_model.encode(record_1[k])
                    encoding_2 = encoding_model.encode(record_2[m])
                    if labels[i] == labels[j]:
                        if k < m:
                            sim_score += cosine_sim(encoding_1, encoding_2)
                            num_sims += 1
                    else:
                        sim_score += cosine_sim(encoding_1, encoding_2)
                        num_sims += 1
            cor_matrix[i][j] = sim_score / num_sims
    return cor_matrix

In [6]:
# Find thresholds for categories of greeting
opening_labels = ['standard_greeting', 'purpose_of_call', 'ask_for_permission']
opening_thresholds = find_thresholds(opening_labels, opening_lexicons, sentence_model)

In [7]:
# print result of correlation matrix of greeting categories
print("Greeting categories correlation matrix")
pprint(opening_thresholds)

Greeting categories correlation matrix
[[0.27602134676029283, 0.1503592333799504, 0.2741976907062862],
 [0.1503592333799504, 0.2815880780418714, 0.1262115628868785],
 [0.2741976907062862, 0.1262115628868785, 0.42921302503063563]]


In [9]:
# Find thresholds for categories of ending
closing_labels = ['follow_up', 'data_enrichment', 'standard_closing']
closing_thresholds = find_thresholds(closing_labels, closing_lexicons, sentence_model)

In [10]:
# print result of correlation matrix of greeting categories
print("Ending categories correlation matrix")
pprint(closing_thresholds)

Ending categories correlation matrix
[[0.4106628816678292, 0.25458838500910336, 0.2014425235490004],
 [0.25458838500910336, 0.3254513197711536, 0.14600112595671447],
 [0.2014425235490004, 0.14600112595671447, 0.290414386930374]]


In [11]:
# Extract diagonal corrleations and return a dictionary with {label: correlation}
# labels: label list
# part type needs to be either greeting or ending
# assume len(corr_mat) == len(labels)
def extract_diagonal_corr(corr_mat, labels, part_type = "opening"):
    assert len(corr_mat) == len(labels)
    corr_dict = {}
    for i in range(len(labels)):
        corr_dict[labels[i]] = float(corr_mat[i][i])
    output_dict = {part_type: corr_dict} 
    return output_dict

In [12]:
# For each part, save the diagonal correlations to yaml file
def save_corr(corr_dict, save_address):
    dump_yaml(corr_dict, save_address)

In [13]:
# define a function to save config file
# config label is the label of that configurations
def save_config(input_data, config_label, save_address):
    save_dic = {config_label: input_data}
    dump_yaml(save_dic, save_address)

In [16]:
opening_dict = extract_diagonal_corr(opening_thresholds, opening_labels, "opening")
ending_dict = extract_diagonal_corr(closing_thresholds, closing_labels, "closing")

In [17]:
dict_ls = [opening_dict, ending_dict]

In [18]:
save_config(dict_ls, 'similarity_threshold', '../config/general_config.yaml')

In [16]:
# load from yaml file
load_ls = load_yaml('../config/general_config.yaml')
threshold_config = load_ls[0]['similarity_threshold']

In [17]:
threshold_config

[{'opening': {'standard_greeting': 0.27602134676029283,
   'purpose_of_call': 0.2815880780418714,
   'ask_for_permission': 0.41886348923047384}},
 {'ending': {'follow_up': 0.4106628816678292,
   'data_enrichment': 0.2213484893242518,
   'closing': 0.290414386930374}}]

In [18]:
threshold_config[0]['opening'].keys()

dict_keys(['standard_greeting', 'purpose_of_call', 'ask_for_permission'])

# multi label classification on sentence results from transcripts

In [19]:
# use a sample transcripts 
sample_text = ['hello good afternoon just speak to miss leon michael from income ntuc free for one or two minutes if you are not busy', 'okay calling behalf of your adviser xiao guo', 'okay because we having this anniversary plan for the family i just check again you are single or married \n']

In [20]:
# return input sentence, resulting labels, class names, current similarity list
def match_multi_categories(input_sentence, model, sim_threshold_dict, lexicon_dict):
    # make sure the keys in sim_threshold_dict are in the same order with lexicon_dict
    assert sim_threshold_dict.keys() == lexicon_dict.keys()
    new_vector = construct_sentence_vector(input_sentence, model) # new input sentence vector
    similarity_ls = []
    classes = []
    # calculate the average cosine similarity of the current new input sentence vector for each dictionary category
    for aspect, descriptive_words in lexicon_dict.items():
        classes.append(aspect)
        cur_vector = construct_dim_vector(descriptive_words, model)
        cur_similarity = cosine_sim(new_vector, cur_vector)
        similarity_ls.append(cur_similarity)
    # Do a comparison of the generated similarity_ls with the ground-truth similarity list
    threshold_ls = list(sim_threshold_dict.values())
    similar_label_idx = [index for index, (similarity, threshold) in enumerate(zip(similarity_ls, threshold_ls)) if similarity >= threshold] 
    if not similar_label_idx:
        return (input_sentence, "no matching")
    result_labels = [classes[i] for i in similar_label_idx]
    return [input_sentence, result_labels], classes, similarity_ls

In [21]:
threshold_dict = threshold_config[0]['opening']
lexicon_dict = opening_lexicons

In [22]:
match_multi_categories(sample_text[2], sentence_model, threshold_dict, lexicon_dict)

(['okay because we having this anniversary plan for the family i just check again you are single or married \n',
  ['purpose_of_call']],
 ['standard_greeting', 'purpose_of_call', 'ask_for_permission'],
 [0.21059322, 0.49155855, 0.25504497])

In [23]:
def batch_match_multi_categories(sentence_ls, model, sim_threshold_dict, lexicon_dict):
    result_ls = []
    label_ls = []
    for sentence in sentence_ls:
        result, classes, _ = match_multi_categories(sentence, sentence_model, threshold_dict, lexicon_dict)
        result_ls.append(result)
        label_ls.append(result[1])
    return result_ls, sentence_ls, label_ls

In [24]:
aspect_ls, sentence_ls, label_ls = batch_match_multi_categories(sample_text, sentence_model, threshold_dict, lexicon_dict)

In [25]:
aspect_ls

[['hello good afternoon just speak to miss leon michael from income ntuc free for one or two minutes if you are not busy',
  ['standard_greeting', 'ask_for_permission']],
 ['okay calling behalf of your adviser xiao guo',
  ['standard_greeting', 'purpose_of_call']],
 ['okay because we having this anniversary plan for the family i just check again you are single or married \n',
  ['purpose_of_call']]]

In [26]:
label_ls

[['standard_greeting', 'ask_for_permission'],
 ['standard_greeting', 'purpose_of_call'],
 ['purpose_of_call']]

### Sentiment Analysis

In [27]:
sentiment_result = nlp_sentiment_v2(sentence_ls, label_ls, sentiment_tokenizer, sentiment_model)
sentiment_result

[['hello good afternoon just speak to miss leon michael from income ntuc free for one or two minutes if you are not busy',
  ['standard_greeting', 'ask_for_permission'],
  'neutral'],
 ['okay calling behalf of your adviser xiao guo',
  ['standard_greeting', 'purpose_of_call'],
  'polite'],
 ['okay because we having this anniversary plan for the family i just check again you are single or married \n',
  ['purpose_of_call'],
  'neutral']]

### Assign score

In [28]:
def result_reformat(result_ls):    
    result_dic = {} # initialize result dictionary
    for i, result in enumerate(result_ls):
        sentence = result[0]
        category_ls = result[1]
        sentiment = result[2]
        for category in category_ls:
            if category in result_dic: # check if the category is in our newly created result_dic
                result_dic[category].append([sentence, sentiment])
            else:
                result_dic[category] = [[sentence, sentiment]]
    return result_dic

In [38]:
# result_ls is he result after sentiment analysis
# target label is the category list for a specific section
# 
def assign_score(result_ls, target_label, section_name = 'opening'):
    result_dic = result_reformat(result_ls)
    temp_score_dic = {}
    for category, info in result_dic.items():
        sentiments = map(lambda x: x[1], info)
        passed = True
        for sentiment in sentiments:
            if sentiment == 'impolite':
                passed = False
        temp_score_dic[category] = passed
    score_dic = {}
    for key in target_label:
        if key in temp_score_dic:
            if temp_score_dic[key] == True:
                score_dic[key] = True
            else:
                score_dic[key] = False
        else:
            score_dic[key] = False
    final_score_dic = {section_name: score_dic}
    return final_score_dic

In [39]:
score_result = assign_score(sentiment_result, opening_lexicons.keys())

In [40]:
score_result

{'opening': {'standard_greeting': True,
  'purpose_of_call': True,
  'ask_for_permission': True}}

In [43]:
# Save result to config
save_config(score_result, 'qa_result', '../qa_result/opening_result.yaml')

In [47]:
# read score_reference config file
target_score_ls = load_yaml('../config/score_reference.yaml')

In [48]:
target_score_ls[0]

{'opening': {'standard_greeting': 1,
  'purpose_of_call': 2,
  'ask_for_permission': 2},
 'closing': {'follow_up': 2, 'data_enrichment': 2, 'closing': 2}}

In [56]:
def save_score(score_dic, reference_dict, section_name = "opening"):
    temp_dict = {}
    save_dict = {}
    score_result = score_dic[section_name]
    reference_result = reference_dict[section_name]
    sum_score = 0
    for category, result in score_result.items():
        if result == True:
            cur_score = reference_result[category]
            temp_dict[category] = cur_score
            sum_score += cur_score
        else:
            temp_dict[category] = 0
    save_dict[section_name] = temp_dict
    return save_dict, sum_score

In [58]:
result_tuple = save_score(score_result, target_score_ls[0])

In [60]:
dump_yaml(result_tuple[0], '../qa_result/opening_result.yaml')