# Background Knowledge:   
## The main goal is to     
## 1. Spot customers with conversations related to 'cancel' or 'move', 
## 2. Spot any other possible topics which co-occur with 'cancel' or 'move' to help understand the reason for deactivations   
     
## This is a semi-supervised task, the label is the manual category_id tagging from call agents, and each conversations would have unequal quantity of ids.    
## If a conversation contains 'cancel' related ids then it is assgined with label 'cancel', same thing applys to 'move'.

## The reason for doing topic modeling instead of classification is    
### 1. The agent's prior assumption can not be fully trusted    
### 2. We do not have accurate ids mapping for other topics like 'promotion inquiry' or 'billing inquiry'   
### 3. The total unique number of ids is increasing and some old ids are aborted, which makes the long term dependency for category_id as label unreliable.

## Data preparation fucntion blocks
## (preprocessing of both customer and agent raw conversations using sparknlp):

In [0]:
!pip install bertopic

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-69db614e-3176-43fa-892c-a98fb223ca0a/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
class Verint:
    """
        This class wraps data operations for verint
    """

    ### Table names & queries on Databricks ###
    verint_table = "VERINT.CBU_ROG_CONVERSATION_SUMFCT"
    booked_table = "VERINT.SESSIONS_BOOKED"

    
    @staticmethod
    def load_data_filter(min_date = None, max_date = None, categories = [], instances = [], spark_s = None):
        """
        Loading cbu_rog_conversation_sumfct data to spark dataframes
        :param condition: Optional condition to be passed to verint_query e.g. "where []"
        :return cbu_rog_conversation_sumfct dataframes respectively
        """
        #91 add distinct for sid_key
        if instances != [] and categories != []:
            print('Categories and instances provided')
            #Category id can be used to filter our churned customers
            verint_query = "SELECT * FROM ( \
                (SELECT distinct speech_id FROM \
                (SELECT distinct sid_key FROM VERINT.SESSIONS_CATEGORIES WHERE category_id in ('{0}') AND instance_id in ('{1}')) as category \
                INNER JOIN \
                (SELECT CONCAT(unit_num, '0', channel_num) as speech_id, sid_key FROM {1}) as booked \
                ON booked.sid_key = category.sid_key) as merged \
                INNER JOIN \
                (SELECT distinct SPEECH_ID_VERINT,TEXT_AGENT_FULL, TEXT_CUSTOMER_FULL, TEXT_OVERLAP, TEXT_ALL, CUSTOMER_ID, CTN, \
                AGENT_EMP_ID,CONNECTION_ID,RECEIVING_SKILL,LANGUAGE_INDICATOR, CONVERSATION_DATE, YEAR(conversation_date) as Year, \
                MONTH(conversation_date) as Month FROM {3} WHERE conversation_date >= '{4}') \
                as sumfct ON sumfct.speech_id_verint = merged.speech_id) as final"\
                .format(("','".join(categories)),("','".join(instances)),Verint.booked_table,Verint.verint_table, min_date)
        #91 add distinct for sid_key
        elif instances == [] and categories != []:
            print('Categories provided')
            verint_query = "SELECT * FROM ( \
                (SELECT distinct speech_id FROM \
                (SELECT distinct sid_key FROM VERINT.SESSIONS_CATEGORIES WHERE category_id in ('{0}')) as category \
                INNER JOIN \
                (SELECT CONCAT(unit_num, '0', channel_num) as speech_id, sid_key FROM {1}) as booked \
                ON booked.sid_key = category.sid_key) as merged \
                INNER JOIN \
                (SELECT distinct SPEECH_ID_VERINT,TEXT_AGENT_FULL, TEXT_CUSTOMER_FULL, TEXT_OVERLAP, TEXT_ALL, CUSTOMER_ID, CTN, \
                AGENT_EMP_ID,CONNECTION_ID,RECEIVING_SKILL,LANGUAGE_INDICATOR, CONVERSATION_DATE,  YEAR(conversation_date) as Year, \
                MONTH(conversation_date) as Month FROM {2} WHERE conversation_date >= '{3}') \
                as sumfct ON sumfct.speech_id_verint = merged.speech_id) as final" \
                .format(("','".join(categories)),Verint.booked_table,Verint.verint_table, min_date)
        #91 add distinct for sid_key
        elif instances != [] and categories == []:
            print('Instances provided')
            verint_query = "SELECT * FROM ( \
                (SELECT distinct speech_id FROM \
                (SELECT distinct sid_key FROM VERINT.SESSIONS_CATEGORIES WHERE instance_id in ('{0}')) as category \
                INNER JOIN \
                (SELECT CONCAT(unit_num, '0', channel_num) as speech_id, sid_key FROM {1}) as booked \
                ON booked.sid_key = category.sid_key) as merged \
                INNER JOIN \
                (SELECT distinct SPEECH_ID_VERINT,TEXT_AGENT_FULL, TEXT_CUSTOMER_FULL, TEXT_OVERLAP, TEXT_ALL, CUSTOMER_ID, CTN, \
                AGENT_EMP_ID,CONNECTION_ID,RECEIVING_SKILL,LANGUAGE_INDICATOR, CONVERSATION_DATE,  YEAR(conversation_date) as Year, \
                MONTH(conversation_date) as Month FROM {2} WHERE conversation_date >= '{3}') \
                as sumfct ON sumfct.speech_id_verint = merged.speech_id"\
                .format(("','".join(instances)),Verint.booked_table,Verint.verint_table, min_date)
       
        else:
            print('Only date provided')
            verint_query = "SELECT * FROM (SELECT distinct SPEECH_ID_VERINT,TEXT_AGENT_FULL, TEXT_CUSTOMER_FULL, TEXT_OVERLAP, \
                TEXT_ALL, CUSTOMER_ID, CTN, AGENT_EMP_ID, CONNECTION_ID, RECEIVING_SKILL, \
                LANGUAGE_INDICATOR, CONVERSATION_DATE, YEAR(conversation_date) as Year, MONTH(conversation_date) as Month \
                FROM {1} WHERE conversation_date >= '{2}') as final" \
                .format(Verint.booked_table, Verint.verint_table, min_date)
        
        
        if max_date is not None:
            verint_query = "{0} where {1}".format(verint_query, "conversation_date < '{}'".format(max_date))
            
        
        return spark_s.sql(verint_query) 
    @staticmethod
    def load_data_filter_not(min_date = None, max_date = None, categories = [], instances = [], spark_s = None):
        """
        Loading cbu_rog_conversation_sumfct data to spark dataframes
        :param condition: Optional condition to be passed to verint_query e.g. "where []"
        :return cbu_rog_conversation_sumfct dataframes respectively
        """
        #91 add distinct for sid_key
        if instances != [] and categories != []:
            print('Categories and instances provided')
            #Category id can be used to filter our churned customers
            verint_query = "SELECT * FROM ( \
                (SELECT distinct speech_id FROM \
                (SELECT distinct sid_key FROM VERINT.SESSIONS_CATEGORIES WHERE category_id not in ('{0}') AND instance_id not in ('{1}')) as category \
                INNER JOIN \
                (SELECT CONCAT(unit_num, '0', channel_num) as speech_id, sid_key FROM {1}) as booked \
                ON booked.sid_key = category.sid_key) as merged \
                INNER JOIN \
                (SELECT distinct SPEECH_ID_VERINT,TEXT_AGENT_FULL, TEXT_CUSTOMER_FULL, TEXT_OVERLAP, TEXT_ALL, CUSTOMER_ID, CTN, \
                AGENT_EMP_ID,CONNECTION_ID,RECEIVING_SKILL,LANGUAGE_INDICATOR, CONVERSATION_DATE, YEAR(conversation_date) as Year, \
                MONTH(conversation_date) as Month FROM {3} WHERE conversation_date >= '{4}') \
                as sumfct ON sumfct.speech_id_verint = merged.speech_id) as final"\
                .format(("','".join(categories)),("','".join(instances)),Verint.booked_table,Verint.verint_table, min_date)
        #91 add distinct for sid_key
        elif instances == [] and categories != []:
            print('Categories provided')
            verint_query = "SELECT * FROM ( \
                (SELECT distinct speech_id FROM \
                (SELECT distinct sid_key FROM VERINT.SESSIONS_CATEGORIES WHERE category_id not in ('{0}')) as category \
                INNER JOIN \
                (SELECT CONCAT(unit_num, '0', channel_num) as speech_id, sid_key FROM {1}) as booked \
                ON booked.sid_key = category.sid_key) as merged \
                INNER JOIN \
                (SELECT distinct SPEECH_ID_VERINT,TEXT_AGENT_FULL, TEXT_CUSTOMER_FULL, TEXT_OVERLAP, TEXT_ALL, CUSTOMER_ID, CTN, \
                AGENT_EMP_ID,CONNECTION_ID,RECEIVING_SKILL,LANGUAGE_INDICATOR, CONVERSATION_DATE,  YEAR(conversation_date) as Year, \
                MONTH(conversation_date) as Month FROM {2} WHERE conversation_date >= '{3}') \
                as sumfct ON sumfct.speech_id_verint = merged.speech_id) as final" \
                .format(("','".join(categories)),Verint.booked_table,Verint.verint_table, min_date)
        #91 add distinct for sid_key
        elif instances != [] and categories == []:
            print('Instances provided')
            verint_query = "SELECT * FROM ( \
                (SELECT distinct speech_id FROM \
                (SELECT distinct sid_key FROM VERINT.SESSIONS_CATEGORIES WHERE instance_id not in ('{0}')) as category \
                INNER JOIN \
                (SELECT CONCAT(unit_num, '0', channel_num) as speech_id, sid_key FROM {1}) as booked \
                ON booked.sid_key = category.sid_key) as merged \
                INNER JOIN \
                (SELECT distinct SPEECH_ID_VERINT,TEXT_AGENT_FULL, TEXT_CUSTOMER_FULL, TEXT_OVERLAP, TEXT_ALL, CUSTOMER_ID, CTN, \
                AGENT_EMP_ID,CONNECTION_ID,RECEIVING_SKILL,LANGUAGE_INDICATOR, CONVERSATION_DATE,  YEAR(conversation_date) as Year, \
                MONTH(conversation_date) as Month FROM {2} WHERE conversation_date >= '{3}') \
                as sumfct ON sumfct.speech_id_verint = merged.speech_id"\
                .format(("','".join(instances)),Verint.booked_table,Verint.verint_table, min_date)
       
        else:
            print('Only date provided')
            verint_query = "SELECT * FROM (SELECT distinct SPEECH_ID_VERINT,TEXT_AGENT_FULL, TEXT_CUSTOMER_FULL, TEXT_OVERLAP, \
                TEXT_ALL, CUSTOMER_ID, CTN, AGENT_EMP_ID, CONNECTION_ID, RECEIVING_SKILL, \
                LANGUAGE_INDICATOR, CONVERSATION_DATE, YEAR(conversation_date) as Year, MONTH(conversation_date) as Month \
                FROM {1} WHERE conversation_date >= '{2}') as final" \
                .format(Verint.booked_table, Verint.verint_table, min_date)
        
        if max_date is not None:
            verint_query = "{0} where {1}".format(verint_query, "conversation_date < '{}'".format(max_date))
            
        return spark_s.sql(verint_query) 

class Household:
    """
        This class wraps data operations for household
    """

    ### Table names & queries on Databricks ###
    # household_table = "APP_IBRO.IBRO_HOUSEHOLD_ACTIVITY"
    household_table= "ml_etl_output_data.IBRO_HOUSEHOLD_ACTIVITY_CHURN_SCHEDULED"
    # nonchurn_table = "DEFAULT.IBRO_HOUSEHOLD_ACTIVITY_NONCHURN_SCHEDULED"
    # change the nonchurn table for new cluster
    nonchurn_table = "ml_etl_output_data.IBRO_HOUSEHOLD_ACTIVITY_NONCHURN_SCHEDULED"
    all_household= "ml_etl_output_data.IBRO_HOUSEHOLD_ACTIVITY_SCHEDULED"

    @staticmethod
    def load_data(spark_s = None, churn = None):
        """
        Loading ibro_household_activity data to spark dataframes
        :param condition: Optional condition to be passed to household_query e.g. "where []"
        :return ibro_household_activity dataframes respectively
        """

        # those columns can be used to filter our churned customers
        # find out CAN, treated as account_number
        # ARPA_OUT = -1 means deac, ARPA_OUT = 1 means winback
        if churn is None:
            household_query = "select * from {0}".format(Household.all_household)
            
        elif churn == True:
            household_query = "select * from {0}".format(Household.household_table)

        elif churn == False:
            household_query = "select * from {0}".format(Household.nonchurn_table)

        return spark_s.sql(household_query) 
    

In [0]:
ORD_NUM = [
'first',
'second',
'third',
'fourth',
'fifth',
'sixth',
'seventh',
'eighth',
'ninth',
'tenth',
'eleventh',
'twelfth',
'thirteenth',
'fourteenth',
'fifteenth',
'sixteenth',
'seventeenth',
'eighteenth',
'nineteenth',
'twentieth',
'thirtieth',
'twenty first',
'twenty second',
'twenty third',
'twenty fourth',
'twenty fifth',
'twenty sixth',
'twenty seventh',
'twenty eighth',
'twenty ninth',
'thirty first']

NUMBERS = [
'zero',
'one',
'two',
'three',
'four',
'five',
'six',
'seven',
'eight',
'nine',
'ten',
'eleven',
'twelve',
'thirteen',
'fourteen',
'fifteen',
'sixteen',
'seventeen',
'eighteen',
'nineteen',
'twenty',
'thirty',
'forty',
'fifty',
'sixty',
'seventy',
'eighty',
'ninety',
'hundred',
'thousand']

MONTH = [
'january',
'jan',
'february',
'feb',
'march',
'mar',
'april',
'apr',
'may',
'june',
'jun',
'july',
'jul',
'august',
'aug',
'september',
'sep',
'october',
'oct',
'november',
'nov',
'december',
'dec']

DAYS = [
'monday',
'mon',
'tuesday',
'tue',
'wednesday',
'wed',
'thursday',
'thu',
'friday',
'fri',
'saturday',
'sat',
'sunday',
'sun']


BOLDCHAT_STOPWORDS = ['week', 'thank', 'thanks', 'ask', 'ago', 'im', 'sure', 'sound', 'mean', 'lot',
                      'look', 'ok', 'okay', 'thats', 'yes', 'want', 'couple', 'correct', 'use', 'nice',
                      'good', 'day', 'let', 'great', 'fine', 'hi', 'hope', 'already', 'also', 'able',
                      'could', 'else', 'etc', 'guy', 'hello', 'hey', 'however', 'lol', 'maybe',
                      'might', 'morning', 'nope', 'oh', 'perfect', 'please', 'right', 'since', 'someone',
                      'sorry', 'still', 'though', 'thx', 'well', 'today', 'wonder', 'would', 'yeah',
                      'yet', 'even', 'believe', 'think', 'wish', 'mind', 'plz', 'glad', 'say',
                      'possible', 'sept', 'be', 'i', 'the', 'a', 'will', 'mo', 'instead', 'pls', 'go',
                      'of', 'the', 'that', 'to', 'do', 'for', 'if', 'need', 'tomorrow', 'simply',
                     'in', 'but', 'or', 'and', 'about', 'just', 'have', 'an', 'on', 'know', 'not',
                     'yesterday', 'today', 'try', 'google', 'research', 'appreciate', 'help', 'like',
                     'oct', 'aug', 'jun', 'july', 'nov', 'dec', 'stay', 'safe', 'tell', 'bye',
                     'speak', 'chat']
BOLDCHAT_FINAL_STOPWORDS=['still', 'for', 'sound', 'appreciate', 'could', 'please', 'about', 'july', 'want', 
'since', 'yesterday', 'though', 'hey', 'aug', 'hi', 'yeah', 'a', 'right', 'wish', 'great', 'nov', 
'simply', 'perfect', 'even', 'lot', 'might', 'morning', 'today', 'be', 'help', 'mo', 'someone', 'fine',
 'that', 'to', 'not', 'believe', 'ok', 'else', 'glad', 'hello', 'nope', 'sorry', 'google', 'thats', 
 'plz', 'guy', 'hope', 'possible', 'well', 'jun', 'yes', 'wonder', 'mind', 'research', 'lol', 'sept',
  'couple', 'already', 'pls', 'i', 'thx', 'but', 'the', 'try', 'im', 'instead', 'just', 'have', 'and',
 'or', 'thanks', 'week', 'let', 'like', 'also', 'in', 'if', 'think', 'oct', 'thank', 'need', 'will',
 'correct', 'maybe', 'however', 'of', 'oh', 'nice', 'okay', 'dec', 'do', 'on', 'an', 'ask', 'look',
 'would', 'bye', 'good', 'ago', 'tomorrow', 'day', 'mean', 'etc', 'sure', 'yet']

ALL_BOLDCHAT_STOPWORDS = set(ORD_NUM + NUMBERS + MONTH + DAYS + BOLDCHAT_FINAL_STOPWORDS)


FINAL_INTERNAL_STOPWORDS_ALIGN = ['actually', 'better', 'was', 'how', 'for', 'could', 'please', 'then',
  'whereupon', 'so', 'consider', 'as', 'any', 'used', 'brief', 'why', 'several', 'hi', 'six', 'by', 
  'hence', 'yourself', 'k', 'looking', 'right', 'saying', 'fifth', 'wish', 'did', "there's", 'whose',
  'uses', 'inward', 'appear', 'doing', 'n’t', 'doesnt', 'her', 'yourselves', 'nothing', 'not', 'believe',
  '‘ve', 'thereby', 'got', 'took', "c's", 'forty', 'each', "we've", 'whether', 'gets', 'obviously',
  'serious', 'inner', 'perhaps', 'me', 'necessary', 'gotten', 'nevertheless', 'furthermore', 'looks',
  'somebody', 'rather', 'elsewhere', 'former', 'seemed', 'away', 'up', 'than', 'except', 'via', 'can',
  'everything', "you've", 'already', 'along', 'currently', 'while', 'selves', 'anyone', 'our', 
  'thus', 'Shes', 'must', 'either', 'c', "c'mon", 'try', 'wants', 'welcome', '’ve', 'between', 
  'ever', 'z', 'does', 'whereafter', 'twelve', 'apart', 'ca', 'gone', 'awfully', 'came', 'let', 
  'like', 'in', 'u', 'getting', 'again', 'taken', 'itself', 'themselves', 'thank', 'need', 'until',
  'whence', 'she', 'no', 'us', "they'll", 'normally', 'amongst', 'greetings', 'nearly', 'despite',
  'hither', "i'll", 'consequently', 'were', 'whole', 'couldnt', 'knows', 'nine', 'everywhere', 
  'under', 'mainly', 'thanx', 'an', 'corresponding', 'therein', 'would', 'containing', 'causes', 
  'beyond', 'near', 'become', "you're", 'etc', 'liked', "doesn't", 'seriously', 'sure', 's', 
  'asking', 'uucp', 'sixty', 'gives', 'merely', 'myself', 'they', 'about', "hasn't", 'ie', 'st', 
  'indicates', 'far', "i'd", 'since', 'put', 'amount', 'whatever', 'whereby', 'though', 'lately',
  'nd', 'thru', "isn't", "that's", 'a', "haven't", "hadn't", "'ll", 'never', 'cant', 'saw', 'viz',
  'theres', 'their', 'unlikely', 'even', 'ours', 'twenty', "can't", 'am', 'mo', 'jus', 'someone',
  'that', "'s", 'last', 'to', 'd', "he's", "couldn't", 'becoming', 'placed', 'upon', 'one', 
  'this', 'meanwhile', 'more', 'else', 'usually', 'definitely', 'hello', "who's", 'himself', 
  'moreover', 'bottom', 'tends', 'possible', 'well', 'regardless', "shouldn't", 'became',
  'reasonably', 'same', 'b', 'everybody', 'gonna', 'j', 'alone', 'self', '‘d', 'theyd', 
  'these', "wasn't", 'before', 'needs', 'goes', 'with', 'but', 'because', 'went', 'following',
  'hers', 'really', 'thereupon', 'thorough', 'third', 'always', 'described', "we'll", 'some',
  "'ve", 'associated', 'ivent', 'thanks', "i've", 'seen', 'think', 'hereby', 'his', "what's",
  'above', 'going', 'part', 'twice', 'wont', 'th', 'few', 'hows', 'formerly', "'d", 'insofar',
  'sensible', 'happens', 'maybe', 'however', '’d', 'seeming', 'having', 'co', 'somewhere', 
  'him', 'neither', 'okay', 'do', 'whereas', 'according', 'particularly', 'ask', 'howbeit',
  'o', 'besides', 'wherein', "they've", 'contains', 'next', 'throughout', 'against', 'edu',
  'et', 'is', 'unless', 'yet', 'therefore', 'many', 'eight', 'still', 'name', 'certainly', 
  'nobody', 'mine', 'top', 'sup', 'somehow', 'you', 'anybody', 'eg', 'overall', 'been', 'fifty', 
  'sometimes', 'where', 'hereafter', 'three', 'g', 'quite', 'towards', 'made', 'may', 'gotta', 'side',
  'thereafter', 'beside', 'noone', 'none', 'be', 'anywhere', 'further', 'help', '‘ll', 'accordingly',
  'specifying', 'toward', 'due', "you'll", 'hopefully', 'my', 'what', 'p', 'youre', 'pl', 'beforehand',
  'there', 'secondly', 'shes', "we're", 'thence', 'ignored', 'forth', 'indicated', 'together', 'four',
  'hardly', 'useful', 'kept', 'sub', 'yes', 'among', 'at', 'q', "n't", 'ex', 'such', 'considering', 
  't', 'using', 'something', 'ourselves', 'r', 'keep', "wouldn't", 'y', 'another', 'clearly', 'onto', 
  'im', 'given', 'latterly', '‘s', 'dont', 'especially', 'seeing', 'appropriate', 'later', 'had', 'or', 
  'only', 'provides', 'ididnt', 'hundred', 'whenever', 'anyways', 'also', 'itd', 'see', '‘re', 'havent', 
  'best', 'whither', "aren't", 'less', 'allows', 'lest', 'much', 'others', 'n', 'should', 'immediate', 
  'although', 'around', 'sometime', 'example', 'seem', 'entirely', "they're", 'que', 'Im', 'arent', 
  'of', 'l', 'oh', 'down', 'five', 'them', 'allow', 'on', "you'd", 'followed', '‘m', 'yours', 'regard', 
  'somewhat', 'tried', 'very', 'thoroughly', 'various', 'empty', 'shall', "weren't", 'e', "it'll", 
  'through', 'wherever', 'most', 'cannot', "it's", 'here', 'thatll', 'namely', "don't", 'mostly', 
  'during', 'per', 'v', 'whoever', 'out', 'we', "let's", 'course', 'qv', 'alls', 'appreciate', '’re', 
  'want', 'both', 're', 'ten', 'h', 'm', 'anyhow', 'after', 'back', 'trying', 'wasnt', 'everyone', 
  'done', 'often', 'regards', 'concerning', 'w', 'presumably', 'oops', 'hasnt', "they'd", "ain't", 
  'latter', 'might', 'seems', 'now', 'take', 'unto', 'says', 'certain', 'fifteen', "won't", 'every', 
  'own', '’m', 'truly', 'becomes', 'sent', 'likely', 'all', 'ok', 'make', 'sorry', 'which', 'full', 
  'indeed', 'thats', 'nor', 'give', 'other', 'almost', 'unfortunately', 'hereupon', 'who', 'wonder', 
  'nowhere', "here's", "i'm", 'willing', 'youll', '’s', "'re", 'rd', 'com', 'changes', 'said', 'too', 
  'exactly', 'tries', 'x', "didn't", 'ive', 'he', 'i', 'vs', "it'd", 'particular', 'its', 'inasmuch', 
  'those', 'probably', 'the', 'contain', 'it', 'first', 'herein', 'hes', '’ll', 'instead', 'ones', 
  'indicate', 'just', 'specify', 'over', "a's", 'have', 'and', "we'd", "'m", 'specified', 'when', 
  'relatively', 'eleven', 'below', 'if', 'f', 'theirs', 'behind', 'whom', 'didnt', 'are', 'will', 
  "where's", 'from', 'into', 'enough', 'least', 'your', 'anyway', 'n‘t', 'un', "t's", 'cause', 
  'herself', 'keeps', 'ought', 'has', 'within', 'comes', 'once', 'novel', 'front', 'anything', 'known', 
  'afterwards', 'aside', 'look', 'being', 'off', 'across', 'mean', 'respectively', 'value', 'two', 
  'otherwise', 'Dont', 'isnt', 'regarding', 'without', 'follows', 'ltd', 'pci','#pci#','#PCI#']

FRENCH_STOPWORDS = ["a","abord","absolument","afin","ah","ai","aie","aient","aies","ailleurs","ainsi","ait","allaient",
                    "allo","allons","allô","alors","anterieur","anterieure","anterieures","apres","après","as","assez",
                    "attendu","au","aucun","aucune","aucuns","aujourd","aujourd'hui","aupres","auquel","aura","aurai",
                    "auraient","aurais","aurait","auras","aurez","auriez","aurions","aurons","auront","aussi","autant",
                    "autre","autrefois","autrement","autres","autrui","aux","auxquelles","auxquels","avaient","avais",
                    "avait","avant","avec","avez","aviez","avions","avoir","avons","ayant","ayez","ayons","b","bah",
                    "bas","basee","bat","beau","beaucoup","bien","bigre","bon","boum","bravo","brrr","c","car","ce",
                    "ceci","cela","celle","celle-ci","celle-là","celles","celles-ci","celles-là","celui","celui-ci",
                    "celui-là","celà","cent","cependant","certain","certaine","certaines","certains","certes","ces","cet",
                    "cette","ceux","ceux-ci","ceux-là","chacun","chacune","chaque","cher","chers","chez","chiche","chut","chère",
                    "chères","ci","cinq","cinquantaine","cinquante","cinquantième","cinquième","clac","clic","combien",
                    "comme","comment","comparable","comparables","compris","concernant","contre","couic","crac","d","da",
                    "dans","de","debout","dedans","dehors","deja","delà","depuis","dernier","derniere","derriere","derrière",
                    "des","desormais","desquelles","desquels","dessous","dessus","deux","deuxième","deuxièmement","devant",
                    "devers","devra","devrait","different","differentes","differents","différent","différente","différentes",
                    "différents","dire","directe","directement","dit","dite","dits","divers","diverse","diverses","dix","dix-huit",
                    "dix-neuf","dix-sept","dixième","doit","doivent","donc","dont","dos","douze","douzième","dring","droite",
                    "du","duquel","durant","dès","début","désormais","e","effet","egale","egalement","egales","eh","elle",
                    "elle-même","elles","elles-mêmes","en","encore","enfin","entre","envers","environ","es","essai","est",
                    "et","etant","etc","etre","eu","eue","eues","euh","eurent","eus","eusse","eussent","eusses","eussiez",
                    "eussions","eut","eux","eux-mêmes","exactement","excepté","extenso","exterieur","eûmes","eût","eûtes","f",
                    "fais","faisaient","faisant","fait","faites","façon","feront","fi","flac","floc","fois","font","force",
                    "furent","fus","fusse","fussent","fusses","fussiez","fussions","fut","fûmes","fût","fûtes","g","gens","h",
                    "ha","haut","hein","hem","hep","hi","ho","holà","hop","hormis","hors","hou","houp","hue","hui","huit",
                    "huitième","hum","hurrah","hé","hélas","i","ici","il","ils","importe","j","je","jusqu","jusque","juste",
                    "k","l","la","laisser","laquelle","las","le","lequel","les","lesquelles","lesquels","leur","leurs","longtemps",
                    "lors","lorsque","lui","lui-meme","lui-même","là","lès","m","ma","maint","maintenant","mais","malgre",
                    "malgré","maximale","me","meme","memes","merci","mes","mien","mienne","miennes","miens","mille","mince",
                    "mine","minimale","moi","moi-meme","moi-même","moindres","moins","mon","mot","moyennant","multiple",
                    "multiples","même","mêmes","n","na","naturel","naturelle","naturelles","ne","neanmoins","necessaire",
                    "necessairement","neuf","neuvième","ni","nombreuses","nombreux","nommés","non","nos","notamment","notre",
                    "nous","nous-mêmes","nouveau","nouveaux","nul","néanmoins","nôtre","nôtres","o","oh","ohé","ollé","olé",
                    "on","ont","onze","onzième","ore","ou","ouf","ouias","oust","ouste","outre","ouvert","ouverte","ouverts",
                    "o|","où","p","paf","pan","par","parce","parfois","parle","parlent","parler","parmi","parole","parseme",
                    "partant","particulier","particulière","particulièrement","pas","passé","pendant","pense","permet","personne",
                    "personnes","peu","peut","peuvent","peux","pff","pfft","pfut","pif","pire","pièce","plein","plouf",
                    "plupart","plus","plusieurs","plutôt","possessif","possessifs","possible","possibles","pouah","pour",
                    "pourquoi","pourrais","pourrait","pouvait","prealable","precisement","premier","première","premièrement",
                    "pres","probable","probante","procedant","proche","près","psitt","pu","puis","puisque","pur","pure","q",
                    "qu","quand","quant","quant-à-soi","quanta","quarante","quatorze","quatre","quatre-vingt","quatrième",
                    "quatrièmement","que","quel","quelconque","quelle","quelles","quelqu'un","quelque","quelques","quels",
                    "qui","quiconque","quinze","quoi","quoique","r","rare","rarement","rares","relative","relativement",
                    "remarquable","rend","rendre","restant","reste","restent","restrictif","retour","revoici","revoilà",
                    "rien","s","sa","sacrebleu","sait","sans","sapristi","sauf","se","sein","seize","selon","semblable",
                    "semblaient","semble","semblent","sent","sept","septième","sera","serai","seraient","serais","serait",
                    "seras","serez","seriez","serions","serons","seront","ses","seul","seule","seulement","si","sien","sienne",
                    "siennes","siens","sinon","six","sixième","soi","soi-même","soient","sois","soit","soixante","sommes",
                    "son","sont","sous","souvent","soyez","soyons","specifique","specifiques","speculatif","stop","strictement",
                    "subtiles","suffisant","suffisante","suffit","suis","suit","suivant","suivante","suivantes","suivants",
                    "suivre","sujet","superpose","sur","surtout","t","ta","tac","tandis","tant","tardive","te","tel","telle",
                    "tellement","telles","tels","tenant","tend","tenir","tente","tes","tic","tien","tienne","tiennes","tiens",
                    "toc","toi","toi-même","ton","touchant","toujours","tous","tout","toute","toutefois","toutes","treize",
                    "trente","tres","trois","troisième","troisièmement","trop","très","tsoin","tsouin","tu","té","u","un","une",
                    "unes","uniformement","unique","uniques","uns","v","va","vais","valeur","vas","vers","via","vif","vifs",
                    "vingt","vivat","vive","vives","vlan","voici","voie","voient","voilà","voire","vont","vos","votre","vous",
                    "vous-mêmes","vu","vé","vôtre","vôtres","w","x","y","z","zut","à","â","ça","ès","étaient","étais","était",
                    "étant","état","étiez","étions","été","étée","étées","étés","êtes","être","ô"]

ENGLISH_STOPWORDS = ['a', 'about', 'above', 'after', 'again', 'against', 'all', 'am', 'an', 'and', 'any', 'are', "aren't", 'as', 
                     'at', 'be', 'because', 'been', 'before', 'being', 'below', 'between', 'both', 'but', 'by', "can't", 'cannot', 
                     'could', "couldn't", 'did', "didn't", 'do', 'does', "doesn't", 'doing', "don't", 'down', 'during', 'each', 'few', 
                     'for', 'from', 'further', 'had', "hadn't", 'has', "hasn't", 'have', "haven't", 'having', 'he', "he'd", "he'll", 
                     "he's", 'her', 'here', "here's", 'hers', 'herself', 'him', 'himself', 'his', 'how', "how's", 'i', "i'd", "i'll", 
                     "i'm", "i've", 'if', 'in', 'into', 'is', "isn't", 'it', "it's", 'its', 'itself', "let's", 'me', 'more', 'most',
                     "mustn't", 'my', 'myself', 'no', 'nor', 'not', 'of', 'off', 'on', 'once', 'only', 'or', 'other', 'ought', 'our', 
                     'ours', 'ourselves', 'out', 'over', 'own', 'same', "shan't", 'she', "she'd", "she'll", "she's", 'should', 
                     "shouldn't", 'so', 'some', 'such', 'than', 'that', "that's", 'the', 'their', 'theirs', 'them', 'themselves', 
                     'then', 'there', "there's", 'these', 'they', "they'd", "they'll", "they're", "they've", 'this', 'those', 'through', 
                     'to', 'too', 'under', 'until', 'up', 'very', 'was', "wasn't", 'we', "we'd", "we'll", "we're", "we've", 'were', 
                     "weren't", 'what', "what's", 'when', "when's", 'where', "where's", 'which', 'while', 'who', "who's", 'whom', 'why', 
                     "why's", 'with', "won't", 'would', "wouldn't", 'you', "you'd", "you'll", "you're", "you've", 'your', 'yours', 
                     'yourself', 'yourselves']

VERINT_STOPWORDS = ['okay', 'believe', 'june', 'to', 'use', 'happen', 'without', 'dont', 'yet', 'blah',
                    'morning', 'thanks', 'little', 'please', 'perfect', 'sorry', 'think', 'ca', 'blah blah',
                    'guy', 'else', 'call', 'hey', 'however', 'thats', 'thx', 'say', 'every', 'try', 'blah blah blah',
                    'sure', 'come', 'uh', 'well', 'seem', 'yeah', 'know', 'fine', 'might', 'hello', 'password', 'passwords',
                    'want', 'tell', 'ok', 'since', 'cant', 'take', 'sir', 'two', 'maybe', 'im', 'great', 'hold',
                    'oh', 'one', 'chat', 'already', 'nope', 'thank', 'someone', 'roger', 'today', 'give',
                    'good', 'hi', 'bye', 'could', 'also', 'would', 'look', 'find', 'may', 'show', 'alright',
                    'wonder', 'still', 'need', 'actually', 'mean', 'ago', 'rogers', 'go', 'yes',
                    'right', 'though', 'get', 'wait', 'etc', 'see', 'let', 'even', 'august', 'july', 'thing',
                    'lol', 'like', 'um', 'stuff', 'pci', 'com', 'dot', 'yahoo', 'underscore', 'date', 'birth',
                   'postal', 'code', 'number', 'email', 'dollar', 'cent', 'buck', 'gmail', 'hotmail',"leave", "message", 
                   "tone", "leave", "hang", "press", "pound", "option", "est", "que","pas","vous","euh","oui","pour","moi","parce",
                   "mais", "people" , "est" , "year",  "company", "kind" , "point" , "plus" , "card" , "way" , "person" , "non" , 
                   "big" , "letter" , "pour" , "happy" , "start" , "live" , "door" , "money", "best" , "car" , "family" ,"dollars" , 
                   "school" , "son" , "building" , "absolutely" ,  "god" , "everybody","husband", "wife", "mom", "dad", "black",
                   "gonna", "son", "daughter", "white", "guys", "bien", "mois", "puis", "vai", "cinq", "bye", "goodbye", "blue",
                   "color", "anymore", "worry", "somebody"]

VERINT_FINAL_STOPWORDS=['still', 'mom', 'actually', 'pour', 'pas', 'guys', 'little', 'could', 'please', 'july', 'want', 'point',
 'blah blah', 'tone', 'since', 'car', 'moi', 'though', 'hey', 'sir', 'hi', 'birth', 'vous', 'letter',
  'yeah', 'worry', 'big', 'underscore', 'right', 'great', 'may', 'cant', 'perfect', 'even', 'might',
  'today', 'morning', 'door', 'take', 'vai', 'someone', 'fine', 'happy', 'to', 'believe', 'every',
  'one', 'school', 'gmail', 'ok', 'wife', 'else', 'hotmail', 'hello', 'nope', 'sorry', 'cent', 'son',
  'thats', 'guy', 'give', 'dad', 'well', 'press', 'somebody', 'wonder', 'yes', 'everybody', 'gonna',
  'cinq', 'lol', 'com', 'people', 'already', 'dot', 'june', 'stuff', 'person', 'blah', 'black', 
  'uh', 'thx', 'yahoo', 'pound', 'kind', 'money', 'color', 'try', 'im', 'um', 'puis', 'blah blah blah',
  'dollar', 'dont', 'mois', 'est', 'absolutely', 'ca', 'thanks', 'let', 'like', 'also', 'august',
  'see', 'happen', 'best', 'passwords', 'dollars', 'think', 'alright', 'buck', 'thank', 'need',
  'parce', 'pci', 'seem', 'que', 'thing', 'euh', 'maybe', 'hang', 'however', 'oh', 'daughter',
  'mais', 'blue', 'okay', 'husband', 'building', 'look', 'bye', 'would', 'live', 'white', 'good', 
  'ago', 'goodbye', 'mean', 'god', 'bien', 'etc', 'two', 'sure', 'yet', 'without', 'oui']
ALL_VERINT_STOPWORDS = set(ORD_NUM + NUMBERS + MONTH + DAYS + VERINT_FINAL_STOPWORDS + BOLDCHAT_FINAL_STOPWORDS + FRENCH_STOPWORDS + ENGLISH_STOPWORDS)


In [0]:
import logging
import sparknlp
from pyspark.sql.functions import *
from pyspark.sql.types import *
from sparknlp.base import *
from sparknlp.annotator import *
from datetime import datetime
import re
import string

class VerintETL:
    """
    This class covers the required functionalities to perform
    primilinary ETL jobs on cbu_rog_conversation_sumfct
    to prepare an integraded dateset for the Normalizer Pipeline.
    """

    sumfct_keep_cols = [
                        'TEXT_CUSTOMER_FULL',
                        'TEXT_AGENT_FULL',
                        'TEXT_ALL',
                        'RECEIVING_SKILL',
                        'CTN',
                        'CUSTOMER_ID',
                        'SPEECH_ID_VERINT',
                        'CONVERSATION_DATE',
                        'Year',
                        'Month',
                        'CONNECTION_ID'
                        ]
    # interaction_id corrupted
    # sumfct_keep_cols suggested ['TEXT_CUSTOMER_FULL','TEXT_AGENT_FULL','TEXT_ALL','CUSTOMER_ID','CTN',
    #      'SPEECH_ID_VERINT', 'CONNECTION_ID', 'RECEIVING_SKILL', 'CONVERSATION_DATE','Year','Month']
    def __init__(self, verint_sumfct_df):
        """
        :param verint_df: Spark DataFrame containing raw cbu_rog_conversation_sumfct for a given time period
        """
        self.logger = logging.getLogger(__name__)
        verint_sumfct_df = verint_sumfct_df.distinct()
        self.verint_sumfct_df = verint_sumfct_df
        # self.household_df = household_df

        # load data later than set date
        # last_load = "2021-03-10"
        #self.verint_df = self.__last_load_input_data(self.verint_sumfct_df, last_load)
        #self.logger.info("Number of verint records: {}".format(self.verint_df.count()))
        # filter for only consumer vqs
        #self.verint_df = self.__filter_input_data(self.verint_df, self.consumer_vq_df)
        #self.logger.info("Number of verint records: {}".format(self.verint_df.count()))

        # keep necessary columns only
        self.verint_df = self.verint_sumfct_df.select(VerintETL.sumfct_keep_cols)
        print("count rows: {}".format(self.verint_df.count()))
        self.verint_df = self.verint_df.distinct()
        
        # self.merge_df = self.__merge_df(self.verint_df, self.household_df)
        
        # self.merge_df = self.merge_df.distinct()
        print("distinct count rows: {}".format(self.verint_df.count()))
        print("full consumer rows")
        self.logger.info("Number of verint records: {}".format(self.verint_df.count()))
        try:
            assert (self.verint_df.count() > 0)
        except AssertionError:
            self.logger.error("The joined Verint dataframe is empty.")

    def __last_load_input_data(self, verint_sumfct_df, last_load):
        """
        Filtering data later than set date
        :return: DF that loaded later than set date
        """

        sum_date_counts = verint_sumfct_df.groupBy('record_insert_dt').count().sort('record_insert_dt').collect()
        sum_date_dict = {}

        for row in sum_date_counts:
            sum_date_dict[row['record_insert_dt']] = row['count']
        verint_sumfct_df = verint_sumfct_df.where(verint_sumfct_df.record_insert_dt > last_load)
        return verint_sumfct_df


    def __filter_input_data(self, verint_sumfct_df, consumer_vq_df):
        """
        Filtering for only consumer vqs
        :return: joined verint sumfct DF
        """
        # join condition
        print("join")
        join_cond = [consumer_vq_df.VQ == verint_sumfct_df.receiving_skill]
        return verint_sumfct_df.join(consumer_vq_df, join_cond, 'inner')
    

    def __get_batch_start_date_and_end_date(self, verint_df):
        print("new data rows")
        print(verint_df.count())
        if verint_df.count() == 0:
            sys.exit('No new data loaded')
        load_date_counts = verint_df.groupBy('record_insert_dt').count().sort('record_insert_dt').collect()
        load_date_dict = {}
        for row in load_date_counts:
            load_date_dict[row['record_insert_dt']] = row['count']
        new_last_load = sorted(load_date_dict.keys())[-1]
        new_first_load = sorted(load_date_dict.keys())[0]
        print(f'load date range from {new_first_load} to {new_last_load}')
        new_last_load = new_last_load.replace("-", "")
        new_first_load = new_first_load.replace("-", "")
        return (new_last_load, new_first_load)

    ############################
    #### Spark Transformers ####
    ############################

    def __filter_by_receiving_skill(self, receiving_skill):
        """
            Transformer function for filtering the verint data based on receiving_skill (cabel, wireless, etc.)
        """
        print("cable skill")
        def transform(verint_df):
            return verint_df.where("receiving_skill LIKE '%\_{}\_%'".format(receiving_skill))
        return transform


    def __wireless_cable_vqs(self, verint_df):
        """
        Get wireless and vqs
        :return: filtered verint DF
        """
        print("wireless or cable vqs")
        return verint_df.where(
            "receiving_skill LIKE 'ROG\_EN\_%' AND (receiving_skill LIKE '%\_WIR%' OR receiving_skill LIKE '%\_CBL%')")

    def __wireless_cable(self, cbl, wir):
        def cbl_wir(verint_df):
            return verint_df.where(
                "receiving_skill LIKE '%\_{0}%' OR receiving_skill LIKE '%\_{1}%'".format(cbl, wir))
        return cbl_wir


    def __msg_not_empty_spark(self, token_col):
        "Removing all the records with no value for the customer message (empty messages)" #91
        def curry(verint_df):
            msg_not_empty_udf = udf(lambda msg: len(msg) > 1, BooleanType())
            return verint_df.where(msg_not_empty_udf(verint_df[token_col])).drop(token_col)
        return curry
      
    def __msg_not_empty(self, msg_col):
        "Removing all the records with no value for the customer message (empty messages)" 
        def curry(verint_df):
            msg_not_empty_udf = udf(lambda msg: len(msg.strip()) > 0 , BooleanType())
            return verint_df.where(msg_not_empty_udf(verint_df[msg_col]))
        return curry

    def __extract_cus_msg_spark(self, verint_df):
        verint_df_col=list(verint_df.columns)
        if "CLEAN_TEXT_CUSTOMER" in verint_df_col:
            input_col="AGENT"
        else:
            input_col="CUSTOMER"
        if input_col=="CUSTOMER":
            first="TEXT_CUSTOMER_FULL"
        else:
            first="TEXT_AGENT_FULL"
        documentAssembler = DocumentAssembler() \
            .setInputCol(first) \
            .setOutputCol("document")

        documentNormalizer1 = DocumentNormalizer() \
            .setInputCols("document") \
            .setOutputCol("normalizedDocument1") \
            .setAction("clean") \
            .setPatterns(["e mail"]) \
            .setReplacement("email") \
            .setPolicy("pretty_all") \
            .setLowercase(True)
        
        documentNormalizer2 = DocumentNormalizer() \
            .setInputCols("normalizedDocument1") \
            .setOutputCol("normalizedDocument2") \
            .setAction("clean") \
            .setPatterns(["caller d"]) \
            .setReplacement("caller id") \
            .setPolicy("pretty_all") \
            .setLowercase(True)

        documentNormalizer3 = DocumentNormalizer() \
            .setInputCols("normalizedDocument2") \
            .setOutputCol("normalizedDocument3") \
            .setAction("clean") \
            .setPatterns(["datum"]) \
            .setReplacement("data") \
            .setPolicy("pretty_all") \
            .setLowercase(True)
        
        documentNormalizer4 = DocumentNormalizer() \
            .setInputCols("normalizedDocument3") \
            .setOutputCol("normalizedDocument4") \
            .setAction("clean") \
            .setPatterns(["’"]) \
            .setReplacement("'") \
            .setPolicy("pretty_all")
          
        tokenizer = Tokenizer() \
            .setInputCols(["normalizedDocument4"]) \
            .setOutputCol("token")
      
        normalizer1 = Normalizer() \
            .setInputCols(["token"]) \
            .setOutputCol("nonDigitTokens") \
            .setLowercase(True)\
            .setCleanupPatterns(["""[0-9]"""])
 
        lemmatizer = LemmatizerModel.pretrained() \
            .setInputCols(["nonDigitTokens"]) \
            .setOutputCol("lemma")
  
        stopwords_cleaner = StopWordsCleaner() \
            .setInputCols(["lemma"]) \
            .setOutputCol("cleanTokens1")\
            .setStopWords(list(ALL_VERINT_STOPWORDS))
        #91
        stopWords = StopWordsCleaner()\
            .setInputCols(["cleanTokens1"])\
            .setOutputCol("cleanTokens2")\
            .setStopWords(FINAL_INTERNAL_STOPWORDS_ALIGN)
        
        normalizer2 = Normalizer() \
            .setInputCols(["cleanTokens2"]) \
            .setOutputCol("onlyAlphaTokens_"+input_col) \
            .setLowercase(True)\
            .setCleanupPatterns(["""[^A-Za-z]"""]) #91 only keep alphabet letters, could remove "+"       
        
        #91 outputasarray -> true
        finisher = Finisher() \
            .setInputCols(["onlyAlphaTokens_"+input_col]) \
            .setOutputCols("CLEAN_TEXT_"+input_col) \
            .setOutputAsArray(False)\
            .setCleanAnnotations(False)\
            .setAnnotationSplitSymbol(" ")

        nlp_pipeline = Pipeline(
            stages=
            [documentAssembler, documentNormalizer1, documentNormalizer2,documentNormalizer3, documentNormalizer4,  tokenizer, normalizer1, lemmatizer,  stopwords_cleaner, stopWords, normalizer2, finisher]
        )
        verint_df_col.append("CLEAN_TEXT_"+input_col)
        verint_df_col.append("onlyAlphaTokens_"+input_col)
        print(verint_df_col)
        return nlp_pipeline\
            .fit(verint_df)\
            .transform(verint_df).select(verint_df_col)            

    def __drop_for_extracting_cus_msg(self, verint_df):
        print("drop for extracting customer msg")
        return verint_df.drop('TEXT_AGENT_FULL', 'TEXT_OVERLAP', 'TEXT_UNKNOWN')

    def __text_df(self, verint_df):
        return verint_df.where(length('CLEAN_TEXT_CUSTOMER') > 0)


    def __no_text_df(self, verint_df):
        return verint_df.where(length('CLEAN_TEXT_CUSTOMER') == 0)

    def __competitor_mention(self, verint_df):
        print("competitor mention")
        comp_udf = udf(VerintETL.competitor_mention, StringType())
        return verint_df.withColumn("COMPETITOR_MENTION", comp_udf(verint_df['CLEAN_TEXT_CUSTOMER']))
    
    def __product_mention(self, verint_df):
        print("product mention")
        comp_udf = udf(VerintETL.product_mention, StringType())
        return verint_df.withColumn("PRODUCT_MENTION", comp_udf(verint_df['CLEAN_TEXT_CUSTOMER']))

    def __rogers_fido_mention(self, verint_df):
        print("rogers fido mention")
        comp_udf = udf(VerintETL.rogers_fido_mention, StringType())
        return verint_df.withColumn("ROGERS_FIDO_MENTION", comp_udf(verint_df['CLEAN_TEXT_CUSTOMER']))



    ##########################
    ### Cable Service ETL ####
    ##########################

    def get_verint_df(self):
        return self.verint_df
    
    # def get_household_df(self):
    #     return self.household_df

    def en_rogers_fido_mention_etl(self, df):
        return df \
            .transform(self.__filter_by_receiving_skill('EN')) \
            .transform(self.__rogers_fido_mention)

    ##########################
    ### Wireless Service ETL ####
    ##########################

    def en_product_mention_etl(self, df):
        return df \
            .transform(self.__filter_by_receiving_skill('EN')) \
            .transform(self.__product_mention) 

        # removed a line: .transform(self.__wireless_cable('CBL', 'WIR')) \

            
    def cbl_ser_etl_spark(self):
        return self.verint_df \
            .transform(self.__extract_cus_msg_spark)\
            .transform(self.__msg_not_empty_spark("onlyAlphaTokens_CUSTOMER"))\
            .transform(self.__extract_cus_msg_spark)\
            .transform(self.__msg_not_empty_spark("onlyAlphaTokens_AGENT"))\
            .transform(self.__competitor_mention)\
            .transform(self.__rogers_fido_mention)
    ##########################
    ### Wireless Service ETL ####
    ##########################
      
    def wir_ser_etl_spark(self):
        return self.verint_df \
            .transform(self.__wireless_cable_vqs) \
            .transform(self.__filter_by_receiving_skill('WIR')) \
            .transform(self.__extract_cus_msg_spark)\
            .transform(self.__msg_not_empty_spark("onlyAlphaTokens_CUSTOMER"))\
            .transform(self.__drop_for_extracting_cus_msg)
    ###########################
    #### helper functions ####
    ###########################
    def no_text_result(df):
        def add_na(ph):
            return 'N/A'
        add_na_udf = udf(add_na, StringType())
        df = df.withColumn('Top_1_topic', lit('Undefined'))
        df = df.withColumn('Top_2_topic', lit('Undefined'))
        df = df.withColumn('Top_1_prob', lit(1.0))
        df = df.withColumn('Top_2_prob', lit(1.0))
        df = df.withColumn('Top_1_keyword', add_na_udf(df.Top_1_topic))
        return df

    @staticmethod
    def competitor_mention(msg):
        comp_list = ['Bell','Telus','Cogeco','Freedom','Virgin','TekSavvy','Shaw','Public Mobile','Chatr','Koodo','Fonus'] # 'Fido', 'Rogers'
        men_list = []
        msg_list = msg.split()
        for comp in comp_list:
            if comp.lower() in msg_list:
                men_list.append(comp)
        return " | ".join(men_list)
    
    @staticmethod
    def rogers_fido_mention(msg):
        comp_list = ['Rogers','Fido']
        men_list = []
        msg_list = msg.split()
        for comp in comp_list:
            if comp.lower() in msg_list:
                men_list.append(comp)
        return " | ".join(men_list)
    
    @staticmethod
    def product_mention(msg):
        men_list = []
        prod_dict = {"ignite smartstream": "Ignite SmartStream", "ignite smart stream": "Ignite SmartStream",
                     "smartstream": "Ignite SmartStream", "smart stream": "Ignite SmartStream",
                     "ignite tv": "Ignite TV", "tv": "Ignite TV",
                     "ignite internet": "Ignite Internet", "internet": "Ignite Internet",
                     "ignite bundles": "Ignite Bundles", "bundles": "Ignite Bundles", 
                     "smart home": "Smart Home", "smarthome": "Smart Home",
                     "wireless home internet": "Wireless Home Internet", "wireless home": "Wireless Home Internet",
                     "wireless internet": "Wireless Home Internet"
                     }
        for key, val in prod_dict.items():
            if key.lower() in msg:
                if val.lower() not in men_list:
                    men_list.append(val)
        return " | ".join(men_list)

    

## Generate testing data:

In [0]:
import os
import logging
from datetime import datetime, timedelta
from pyspark.sql.functions import *
import sparknlp

spark = sparknlp.start()

print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

from sparknlp.base import *
from pyspark.ml import Pipeline
data = spark.createDataFrame([["Spark NLP is an open-source text processing library."]]).toDF("text")
documentAssembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
result = documentAssembler.transform(data)
print(result)

logger = logging.getLogger("Churn_ETL_test")


####Additional testing
###we have to process the data for 2 days ago. yesterday's data is not available
# today = datetime.today() - timedelta(hours=5)
#today = datetime.today()
today = datetime.strptime("2023-03-08", '%Y-%m-%d')
min_date = (today - timedelta(days=91)).strftime('%Y-%m-%d')
max_date = (today - timedelta(days=1)).strftime('%Y-%m-%d')

# min_date = (today - timedelta(days=5)).strftime('%Y-%m-%d') 
# max_date = (today - timedelta(days=4)).strftime('%Y-%m-%d')

# min_date <= conversation_date < max_date

# " ***** min_date should be read from a parameter storage on Azure, assigning manually for now **** "

# min_date = '2022-09-10'
# max_date = '2022-09-11'
print(min_date)
print(max_date)

###################################
## Creating a VerintETL Object ##
## to perform ETL jobs on the    ##
## input data.                   ##
###################################


spark_session = spark
#verint= Verint.load_data(min_date = min_date, max_date = max_date, spark_s = spark_session) 
# # 1. load cumfct table

#verint= Verint.load_data(min_date = min_date, max_date = max_date, categories = ('101000264', '101001648', '109000006', '109000011'), spark_s = spark_session) 
#verint= Verint.load_data_filter(min_date = min_date, max_date = max_date, categories = ['101000264', '101001648', '109000006', '109000011'], spark_s = spark_session) 
verint1= Verint.load_data_filter(min_date = min_date, max_date = max_date, spark_s = spark_session)
print(verint1.count())

#moves
# verint2 = Verint.load_data_filter(min_date='2022-03-01', max_date='2022-07-01', categories = ['109000011'],spark_s = spark_session)
# verint2= verint2.drop(verint2.speech_id)
# print(verint2.count())

# verint=verint1.union(verint2).drop_duplicates()
verint = verint1
print("=======================verint columns===============================")
print(verint.columns)

# 2. merge before etl transformation
#make sure non-churn customers does not appear in churn customers
household_df_nonchurn = Household.load_data(spark_s = spark_session, churn = False)
print("=======================household columns===============================")
print(household_df_nonchurn.columns)
household_df_churn = Household.load_data(spark_s = spark_session, churn = True)
print("=======================household columns===============================")
print(household_df_churn.columns)
household_df_churn=household_df_churn.select("HASH_LKP_ACCOUNT").distinct()
#retain only left dataset for non-matched records during join
household_df=household_df_nonchurn.join(household_df_churn, household_df_nonchurn.HASH_LKP_ACCOUNT ==  household_df_churn.HASH_LKP_ACCOUNT,"leftanti")

ibro_verint_df = verint.join(household_df, verint.CUSTOMER_ID == household_df.HASH_LKP_ACCOUNT, 'inner')\
    .drop(household_df.CUSTOMER_ID)\
    .drop(household_df.CUSTOMER_COMPANY)\
    .drop(household_df.CUSTOMER_ACCOUNT)\
    .drop(household_df.REPORT_DATE)
## moving .drop(verint.account_number) because no such column

print("=======================merged ibro and verint===============================")
print(ibro_verint_df.columns)
print("ibro_verint count rows: {}".format(ibro_verint_df.count()))

# verint_etl = VerintETL(verint)

verint_etl = VerintETL(ibro_verint_df) 
verint_df = verint_etl.cbl_ser_etl_spark() # preprocess
final_df = verint_etl.en_product_mention_etl(verint_df) # 3. merge as transformation

print("======================preprocess===============================")
print(final_df.count())
temp_path="dbfs:/mnt/ml-model-output-scores-data/mlmodel_output/digital/CD4MT/Verint/GLDA_Topic_Modelling_Output/Connected_Home/cancel_and_move_bertopic"
final_df.write.mode("overwrite").option("header", True).parquet(temp_path)



In [0]:
temp_path="dbfs:/mnt/ml-model-output-scores-data/mlmodel_output/digital/CD4MT/Verint/GLDA_Topic_Modelling_Output/Connected_Home/cancel_and_move_bertopic"
final_df=spark.read.parquet(temp_path)

## Join with inference results by Guided-LDA model for side-to-side model comparsion:

In [0]:
cm_glda=spark.sql("""select 
SPEECH_ID_VERINT,
top_topic,
2nd_topic,
3rd_topic,
4th_topic,
5th_topic,
lda_inference_result,
cancel,
move,
cancel_mention,
move_mention from default.glda_model_df_final_one_month""")
print(cm_glda.count())

396353


In [0]:
comparsion_df=final_df.join(cm_glda,"SPEECH_ID_VERINT",how="inner")
print(comparsion_df.count())
#comparsion_df_pd=comparsion_df.toPandas()

361714


## Generate training data for bert-topic:

In [0]:
min_date = '2022-02-22'
max_date = '2022-06-25'

###################################
## Creating a VerintETL Object ##
## to perform ETL jobs on the    ##
## input data.                   ##
###################################
spark_session = spark
#verint= Verint.load_data(min_date = min_date, max_date = max_date, spark_s = spark_session) 
# # 1. load cumfct table

#verint= Verint.load_data(min_date = min_date, max_date = max_date, categories = ('101000264', '101001648', '109000006', '109000011'), spark_s = spark_session) 
#verint= Verint.load_data_filter(min_date = min_date, max_date = max_date, spark_s = spark_session) 
#cancel only
verint1= Verint.load_data_filter(min_date = min_date, max_date = max_date, categories = ['101000264', '101001648', '109000006'], spark_s = spark_session)

#move only
verint3=Verint.load_data_filter(min_date = min_date, max_date = max_date, categories = ['109000011'], spark_s = spark_session)

#micellous merge, for shorter time sake, change min date to 4-19, as long as we get miscellous it is fine
verint2 = Verint.load_data_filter_not(min_date = '2022-04-19', max_date = max_date, categories = ['101000264', '101001648', '109000006', '109000011'], spark_s = spark_session)

# 2. merge before etl transformation
household_df = Household.load_data(spark_s = spark_session, churn = False)
print("=======================household columns===============================")
print(household_df.columns)

ibro_verint_df1 = verint1.join(household_df, verint1.CUSTOMER_ID == household_df.HASH_LKP_ACCOUNT, 'inner')\
    .drop(household_df.CUSTOMER_ID)\
    .drop(household_df.CUSTOMER_COMPANY)\
    .drop(household_df.CUSTOMER_ACCOUNT)\
    .drop(household_df.REPORT_DATE).limit(30000)
ibro_verint_df2 = verint2.join(household_df, verint2.CUSTOMER_ID == household_df.HASH_LKP_ACCOUNT, 'inner')\
    .drop(household_df.CUSTOMER_ID)\
    .drop(household_df.CUSTOMER_COMPANY)\
    .drop(household_df.CUSTOMER_ACCOUNT)\
    .drop(household_df.REPORT_DATE).limit(90000)
ibro_verint_df3 = verint3.join(household_df, verint3.CUSTOMER_ID == household_df.HASH_LKP_ACCOUNT, 'inner')\
    .drop(household_df.CUSTOMER_ID)\
    .drop(household_df.CUSTOMER_COMPANY)\
    .drop(household_df.CUSTOMER_ACCOUNT)\
    .drop(household_df.REPORT_DATE).limit(30000)
ibro_verint_df=ibro_verint_df1.union(ibro_verint_df3)
ibro_verint_df=ibro_verint_df.union(ibro_verint_df2)
## moving .drop(verint.account_number) because no such column
print("=======================merged ibro and verint===============================")
print(ibro_verint_df.columns)
print("ibro_verint count rows: {}".format(ibro_verint_df.count()))

# verint_etl = VerintETL(verint)

verint_etl = VerintETL(ibro_verint_df) 
verint_df = verint_etl.cbl_ser_etl_spark() # preprocess
train_final_df = verint_etl.en_product_mention_etl(verint_df) # 3. merge as transformation

train_temp_path="dbfs:/mnt/ml-model-output-scores-data/mlmodel_output/digital/CD4MT/Verint/GLDA_Topic_Modelling_Output/Connected_Home/cancel_and_move_bertopic_training"
train_final_df.write.mode("overwrite").option("header", True).parquet(train_temp_path)

Categories provided
Categories provided
Categories provided
['ACCOUNT_NUMBER', 'REPORT_DATE', 'ARPA_OUT', 'CUSTOMER_ID', 'CUSTOMER_COMPANY', 'CUSTOMER_ACCOUNT', 'ENTERPRISE_ID', 'MULTI_BRAND', 'VOL_INVOL_IND', 'PLATFORM', 'ACTIVITY_GRADE_CODE', 'HASH_LKP_ACCOUNT', 'HASH_LKP_ECID']
['speech_id', 'SPEECH_ID_VERINT', 'TEXT_AGENT_FULL', 'TEXT_CUSTOMER_FULL', 'TEXT_OVERLAP', 'TEXT_ALL', 'CUSTOMER_ID', 'CTN', 'AGENT_EMP_ID', 'CONNECTION_ID', 'RECEIVING_SKILL', 'LANGUAGE_INDICATOR', 'CONVERSATION_DATE', 'Year', 'Month', 'ACCOUNT_NUMBER', 'ARPA_OUT', 'ENTERPRISE_ID', 'MULTI_BRAND', 'VOL_INVOL_IND', 'PLATFORM', 'ACTIVITY_GRADE_CODE', 'HASH_LKP_ACCOUNT', 'HASH_LKP_ECID']
ibro_verint count rows: 144832
count rows: 141605
distinct count rows: 140992
full consumer rows
lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[ | ][ / ][OK!]
['TEXT_CUSTOMER_FULL', 'TEXT_AGENT_FULL', 'TEXT_ALL', 'RECEIVING_SKILL', 'CTN', 'CUSTOMER_ID', 'SPEECH_ID_VERINT', 'CONVE

In [0]:
train_temp_path="dbfs:/mnt/ml-model-output-scores-data/mlmodel_output/digital/CD4MT/Verint/GLDA_Topic_Modelling_Output/Connected_Home/cancel_and_move_bertopic_training"
train_final_df=spark.read.parquet(train_temp_path)

## Generate evaluation metrics for Guided-LDA model on testing data:

In [0]:
#17 topics (inlcude miscelluous) from GLDA
#top 1 calculation : precision, recall, accuracy 
tp_cancel = comparsion_df.filter((col("cancel") == True)).filter(col("top_topic") == "Deactivation").count()
print(f"TP count: {tp_cancel}")

tn_cancel = comparsion_df.filter((col("cancel") == False) ).filter(col("top_topic") != "Deactivation").count()
print(f"TN count: {tn_cancel}")

fp_cancel = comparsion_df.filter((col("cancel") == False)).filter(col("top_topic") == "Deactivation").count()
print(f"FP count: {fp_cancel}")

fn_cancel = comparsion_df.filter((col("cancel") == True)).filter(col("top_topic") != "Deactivation").count()
print(f"FN count: {fn_cancel}")

precision_cancel = tp_cancel / (tp_cancel + fp_cancel)
print(f"Precision: {precision_cancel}")

recall_cancel = tp_cancel / (tp_cancel + fn_cancel)
print(f"Recall: {recall_cancel}")
Accuracy = (tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel) 
print(f"Accuracy: {(tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel)}")

cancel = [('cancel', tp_cancel, tn_cancel, fp_cancel, fn_cancel, precision_cancel, recall_cancel, Accuracy)]
f1 = spark.createDataFrame(data=cancel, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])
f1.createOrReplaceTempView("f1")

TP count: 3976
TN count: 291638
FP count: 2733
FN count: 63367
Precision: 0.5926367565956179
Recall: 0.05904102876319736
Accuracy: 0.8172589393830485


In [0]:
tp_moves = comparsion_df.filter((col("move") == True) ).filter(col("top_topic") == "Move Request").count()
print(f"TP count: {tp_moves}")

tn_moves = comparsion_df.filter((col("move") == False)).filter(col("top_topic") != "Move Request").count()
print(f"TN count: {tn_moves}")

fp_moves = comparsion_df.filter((col("move") == False)).filter(col("top_topic") == "Move Request").count()
print(f"FP count: {fp_moves}")

fn_moves = comparsion_df.filter((col("move") == True)).filter(col("top_topic") != "Move Request").count()
print(f"FN count: {fn_moves}")

precision_moves = tp_moves / (tp_moves + fp_moves)
print(f"Precision: {precision_moves}")
Accuracy = (tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)
recall_moves = tp_moves / (tp_moves + fn_moves)
print(f"Recall: {recall_moves}")
print(f"Accuracy: {(tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)}")

move = [('moves', tp_moves, tn_moves, fp_moves, fn_moves, precision_moves, recall_moves, Accuracy)]
f2 = spark.createDataFrame(data=move, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 5522
TN count: 256795
FP count: 3944
FN count: 95453
Precision: 0.5833509402070568
Recall: 0.05468680366427334
Accuracy: 0.7252055491355048


In [0]:
#top 3 calculation : precision, recall, accuracy 
tp_cancel = comparsion_df.filter((col("cancel") == True)).filter((col("top_topic") == "Deactivation") | (col("2nd_topic") == "Deactivation") | (col("3rd_topic") == "Deactivation") ).count()
print(f"TP count: {tp_cancel}")

tn_cancel = comparsion_df.filter((col("cancel") == False)).filter((col("top_topic") != "Deactivation") | (col("2nd_topic") != "Deactivation") | (col("3rd_topic") != "Deactivation") ).count()
print(f"TN count: {tn_cancel}")

fp_cancel = comparsion_df.filter((col("cancel") == False) ).filter((col("top_topic") == "Deactivation") | (col("2nd_topic") == "Deactivation") | (col("3rd_topic") == "Deactivation") ).count()
print(f"FP count: {fp_cancel}")

fn_cancel = comparsion_df.filter((col("cancel") == True) ).filter((col("top_topic") != "Deactivation") | (col("2nd_topic") != "Deactivation") | (col("3rd_topic") != "Deactivation") ).count()
print(f"FN count: {fn_cancel}")

precision_cancel = tp_cancel / (tp_cancel + fp_cancel)
print(f"Precision: {precision_cancel}")

recall_cancel = tp_cancel / (tp_cancel + fn_cancel)
print(f"Recall: {recall_cancel}")
Accuracy = (tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel) 
print(f"Accuracy: {(tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel)}")

cancel = [('cancel', tp_cancel, tn_cancel, fp_cancel, fn_cancel, precision_cancel, recall_cancel, Accuracy)]
f5 = spark.createDataFrame(data=cancel, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])
f5.createOrReplaceTempView("f5")

TP count: 14421
TN count: 294371
FP count: 13189
FN count: 67343
Precision: 0.5223107569721116
Recall: 0.1763734650946627
Accuracy: 0.7931491508358077


In [0]:
tp_moves = comparsion_df.filter((col("move") == True) ).filter((col("top_topic") == "Move Request") | (col("2nd_topic") == "Move Request") | (col("3rd_topic") == "Move Request") ).count()
print(f"TP count: {tp_moves}")

tn_moves = comparsion_df.filter((col("move") == False) ).filter((col("top_topic") != "Move Request") | (col("2nd_topic") != "Move Request") | (col("3rd_topic") != "Move Request") ).count()
print(f"TN count: {tn_moves}")

fp_moves = comparsion_df.filter((col("move") == False)).filter((col("top_topic") == "Move Request") | (col("2nd_topic") == "Move Request") | (col("3rd_topic") == "Move Request") ).count()
print(f"FP count: {fp_moves}")

fn_moves = comparsion_df.filter((col("move") == True) ).filter((col("top_topic") != "Move Request") | (col("2nd_topic") != "Move Request") | (col("3rd_topic") != "Move Request") ).count()
print(f"FN count: {fn_moves}")

precision_moves = tp_moves / (tp_moves + fp_moves)
print(f"Precision: {precision_moves}")
Accuracy = (tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)
recall_moves = tp_moves / (tp_moves + fn_moves)
print(f"Recall: {recall_moves}")
print(f"Accuracy: {(tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)}")

move = [('moves', tp_moves, tn_moves, fp_moves, fn_moves, precision_moves, recall_moves, Accuracy)]
f6 = spark.createDataFrame(data=move, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 18824
TN count: 260739
FP count: 22810
FN count: 100975
Precision: 0.45213047028870634
Recall: 0.1571298591807945
Accuracy: 0.6931062010968196


In [0]:
#top 5 calculation : precision, recall, accuracy
tp_cancel = comparsion_df.filter((col("cancel") == True) ).filter((col("top_topic") == "Deactivation") | (col("2nd_topic") == "Deactivation") | (col("3rd_topic") == "Deactivation") | (col("4th_topic") == "Deactivation") | (col("5th_topic") == "Deactivation")).count()
print(f"TP count: {tp_cancel}")

tn_cancel = comparsion_df.filter((col("cancel") == False) ).filter((col("top_topic") != "Deactivation") | (col("2nd_topic") != "Deactivation") | (col("3rd_topic") != "Deactivation") | (col("4th_topic") != "Deactivation") | (col("5th_topic") != "Deactivation")).count()
print(f"TN count: {tn_cancel}")

fp_cancel = comparsion_df.filter((col("cancel") == False) ).filter((col("top_topic") == "Deactivation") | (col("2nd_topic") == "Deactivation") | (col("3rd_topic") == "Deactivation") | (col("4th_topic") == "Deactivation") | (col("5th_topic") == "Deactivation")).count()
print(f"FP count: {fp_cancel}")

fn_cancel = comparsion_df.filter((col("cancel") == True)).filter((col("top_topic") != "Deactivation") | (col("2nd_topic") != "Deactivation") | (col("3rd_topic") != "Deactivation") | (col("4th_topic") != "Deactivation") | (col("5th_topic") != "Deactivation")).count()
print(f"FN count: {fn_cancel}")

precision_cancel = tp_cancel / (tp_cancel + fp_cancel)
print(f"Precision: {precision_cancel}")

recall_cancel = tp_cancel / (tp_cancel + fn_cancel)
print(f"Recall: {recall_cancel}")
Accuracy = (tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel) 
print(f"Accuracy: {(tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel)}")

cancel = [('cancel', tp_cancel, tn_cancel, fp_cancel, fn_cancel, precision_cancel, recall_cancel, Accuracy)]
f = spark.createDataFrame(data=cancel, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])
f.createOrReplaceTempView("f")

TP count: 24041
TN count: 294371
FP count: 29674
FN count: 67343
Precision: 0.44756585683701017
Recall: 0.26307668738510026
Accuracy: 0.7664655091483743


In [0]:
tp_moves = comparsion_df.filter((col("move") == True) ).filter((col("top_topic") == "Move Request") | (col("2nd_topic") == "Move Request") | (col("3rd_topic") == "Move Request") | (col("4th_topic") == "Move Request") | (col("5th_topic") == "Move Request")).count()
print(f"TP count: {tp_moves}")

tn_moves = comparsion_df.filter((col("move") == False) ).filter((col("top_topic") != "Move Request") | (col("2nd_topic") != "Move Request") | (col("3rd_topic") != "Move Request") | (col("4th_topic") != "Move Request") | (col("5th_topic") != "Move Request")).count()
print(f"TN count: {tn_moves}")

fp_moves = comparsion_df.filter((col("move") == False) ).filter((col("top_topic") == "Move Request") | (col("2nd_topic") == "Move Request") | (col("3rd_topic") == "Move Request") | (col("4th_topic") == "Move Request") | (col("5th_topic") == "Move Request")).count()
print(f"FP count: {fp_moves}")

fn_moves = comparsion_df.filter((col("move") == True) ).filter((col("top_topic") != "Move Request") | (col("2nd_topic") != "Move Request") | (col("3rd_topic") != "Move Request") | (col("4th_topic") != "Move Request") | (col("5th_topic") != "Move Request")).count()
print(f"FN count: {fn_moves}")

precision_moves = tp_moves / (tp_moves + fp_moves)
print(f"Precision: {precision_moves}")
Accuracy = (tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)
recall_moves = tp_moves / (tp_moves + fn_moves)
print(f"Recall: {recall_moves}")
print(f"Accuracy: {(tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)}")

move = [('moves', tp_moves, tn_moves, fp_moves, fn_moves, precision_moves, recall_moves, Accuracy)]
f3 = spark.createDataFrame(data=move, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 32372
TN count: 260739
FP count: 51237
FN count: 100975
Precision: 0.3871831979810786
Recall: 0.242765116575551
Accuracy: 0.6581986558071332


In [0]:
comparsion_df

Out[33]: DataFrame[SPEECH_ID_VERINT: string, TEXT_CUSTOMER_FULL: string, TEXT_AGENT_FULL: string, TEXT_ALL: string, RECEIVING_SKILL: string, CTN: string, CUSTOMER_ID: string, CONVERSATION_DATE: date, Year: int, Month: int, CONNECTION_ID: string, CLEAN_TEXT_CUSTOMER: string, CLEAN_TEXT_AGENT: string, COMPETITOR_MENTION: string, ROGERS_FIDO_MENTION: string, PRODUCT_MENTION: string, top_topic: string, 2nd_topic: string, 3rd_topic: string, 4th_topic: string, 5th_topic: string, lda_inference_result: array<double>, cancel: boolean, move: boolean, cancel_mention: boolean, move_mention: boolean]

In [0]:
#only label for cancel / for move
comparsion_df_pd=comparsion_df.select(["SPEECH_ID_VERINT","TEXT_CUSTOMER_FULL","CLEAN_TEXT_CUSTOMER","cancel","move","cancel_mention","move_mention"]).toPandas()

In [0]:
comparsion_df_pd

Unnamed: 0,SPEECH_ID_VERINT,TEXT_CUSTOMER_FULL,CLEAN_TEXT_CUSTOMER,cancel,move,cancel_mention,move_mention
0,504040085168657,"SO LITTLE BIT HARD TO, RENTED YOUR AND A VAN O...",bit hard rent van channel year flight pre purc...,False,False,False,False
1,504026085339031,"OKAY I'M JUST WONDERING, I RECEIVE MY BILL WHY...",receive bill bill high walk term month bill us...,False,False,True,True
2,504060086115429,"WHAT'S NOT DIDN'T DO IT UP, OKAY, MY BILL IT G...",bill go know wait eva bit leave team change av...,False,False,False,False
3,504028086101940,GOOD MORNING THIS IS RINA FROM ROGERS HOW CAN ...,rina rogers get save bill change postal code i...,False,False,False,True
4,504060087279166,"LOUIS, GIVE CALLING I HAVE NO ALONE ENTER THAT...",louis call enter ignite tv elia restore minute...,False,False,False,False
...,...,...,...,...,...,...,...
361709,504050086626925,HI MY NAME IS FROM YOUR FROM OUR NUMBER PROPER...,number property manager avenue road get new co...,False,False,False,True
361710,504062085795792,MY MY INTERNET NO NO WIRELESS AND THEN I DON'T...,internet wireless know line low internet get c...,False,False,False,False
361711,504048085081139,"I HAVE A PROBLEM, BY BOX, SAYS SORRY OR CABLE ...",problem box say cable box button work call num...,False,False,False,False
361712,504034086911591,"YEAH EIGHT BOX, I JUST WANT TO KNOW HOW MUCH M...",box know account number monthly bill prepay bi...,False,False,False,True


In [0]:
train_final_df_pd=train_final_df.select(["SPEECH_ID_VERINT","CLEAN_TEXT_CUSTOMER","TEXT_CUSTOMER_FULL"]).toPandas()

In [0]:
train_final_df_pd

Unnamed: 0,SPEECH_ID_VERINT,CLEAN_TEXT_CUSTOMER,TEXT_CUSTOMER_FULL
0,504034068351565,ahead frustrate get direction smart home monit...,"AHEAD, THANK YOU THANK YOU, FINE I'M JUST VERY..."
1,504030066153475,account online confirm special arrangement que...,"THANK YOU, I'M LOOKING AT MY ACCOUNT ONLINE AN..."
2,504030066727042,question pay end month say hold get limit know...,"YES MY NAME IS, QUESTION IS, YOU ARE YOU HELP ..."
3,504064067040979,national call phone number,"HI MY NAME IS HER NATIONAL, EIGHT EIGHT FORTY..."
4,504032065162553,awesome extension number id number rate cancel...,"HERE I JUST GIVE ME YOUR NAME, I, HERE YOUR LA..."
...,...,...,...
138078,504028077386450,get phone year rebate nothings work provide ch...,WELL WE NEED I NEED SOMEONE I HERE THERE LIKE ...
138079,504028077069451,speak rogers come television lick reason numbe...,I WAS JUST SPEAKING TO SOMEONE FROM ROGERS THE...
138080,504058067758651,call debbie income call click card oclock walk...,YEAH I'M CALLING TO DEBBIE MONEY INCOMING I I ...
138081,504068064869794,high month corporate charlie telephone number ...,"HIGH LET ME SEE, MY NAME IS DONE DAYS THE MY W..."


## Modeling using guided version of bertopic:    
### Guided means a list of anchor topics and words corresponding to each of the topic are provided to the model, since customized instead of random topics are wanted.

In [0]:
#guided bertopic on raw conversation
from umap import UMAP
from hdbscan import HDBSCAN
from sentence_transformers import SentenceTransformer
from sklearn.feature_extraction.text import CountVectorizer

from bertopic import BERTopic
from bertopic.representation import KeyBERTInspired
from bertopic.vectorizers import ClassTfidfTransformer
#guided version has seed_topic_list


seed_topic_list = [
    #promotion inquiry
    ['internet promo', 'set expire', 'come end', 'expire month', 'promotion expire', 'promotion internet', 
    'offer available', 'offer end offer', 'internet promotion end', 'internet plan expire',
    'internet promotion expire', 'plan expire', 'offer deal', 'loyal customer', 'promotion expire soon', 
    'know promotion', 'internet promo end', 'internet promotion', 'price internet', 'soon offer', 
    'expire soon offer', 'new promotion', 'promo end'], 

    #billing inquiry 
    ['charge month', 'charge late payment', 'late payment charge', 'high month', 'charge late', 
    'month home', 'pay time', 'cycle end', 'late payment', 'new monthly', 'billing billing', 
    'say bill', 'bill bill', 'bill increase', 'bill go', 'monthly bill', 'increase bill', 
    'internet bill', 'account balance', 'bill high', 'bill issue', 'current bill', 'overage charge',
    'bill inquiry'],

    #cancel
    ['cancel','cancellation','cancel service', 'account cancel', 'account cancel account', 'account return', 'address cancel', 
     'canada post', 'cancel account', 'cancel cable', 'cancel cancel', 'cancel cause', 'cancel charge',
     'cancel credit', 'cancel home','cancel home phone','cancel internet', 'cancel internet service', 
     'cancel month', 'cancel pay', 'cancel phone', 'cancel return', 'cancel service', 'cancel service cancel',
     'close account','cancel time', 'cause cancel', 'credit cancel', 'end month', 'home monitoring','home monitor',
     'internet cancel', 'modem cancel', 'month cancel', 'new account' ,'pay cancel', 'pay return', 'phone cancel', 
     'return account', 'service cancel','service return', 'store return','time cancel', 'time return','cancel rogers', 
     'rogers cancel', 'account cancelation', 'cancel plan', 'canceling service','question cancel', 'service end month',
     'internet service cancel', 'cancel internet cancel','anymore cancel','cancel home internet','service cancel internet',
     'service cancel service', 'cancel subscription', 'cancel tv','internet cancel internet'
    ],

    #Plan Inquiry - Internet
    ['option account', 'deal internet', 'plan home internet', 'internet ignite', 'cost upgrade', 
    'change internet plan', 'change internet package', 'question upgrade', 'change plan change', 
    'internet unlimited', 'internet plan change', 'data usage', 'new plan', 'internet time', 
    'roger internet', 'internet deal', 'upgrade internet speed', 'internet package', 'plan home', 
    'home internet plan', 'internet internet', 'home internet'],

    #myrogers
    ['rogers online', 'password rogers', 'telecommunication service view', 'message able', 
    'display info', 'online message', 'say able display', 'account number register', 
    'manage company telecommunication', 'work fix', 'online message able', 'telecommunication service',
    'manage company', 'reset voicemail', 'fix able display', 'service view', 'number register',
    'password rogers online', 'company telecommunication', 'able display', 'fix issue', 'fix able',
    'reset voicemail password', 'rogers online message'],

    #bundle inquiry
    ['tv internet phone', 'phone tv', 'tv bundle', 'bundle tv internet', 'ignite bundle', 
    'change flex', 'ignite premier', 'internet home phone', 'ignite popular', 'ignite popular bundle',
    'bundle ignite', 'popular bundle', 'phone bundle', 'home phone tv', 'channel flex', 'bundle tv', 
    'channel flex channel', 'change flex channel', 'internet phone', 'ignite tv bundle'],

    #equipment inquiry
    ['modem return', 'modem send', 'modem tell', 'charge return', 'send return', 'return cable box',
    'tell send', 'return label', 'store open', 'receive label', 'equipment return', 'wait return',
    'return cable', 'equipment equipment', 'box return', 'label rogers', 'internet modem', 'return old', 
    'confirm receive', 'ship label', 'service address', 'modem store', 'return modem', 'return equipment', 
    'label rogers send'],

    #Tech Support
    ['new modem work', 'issue time', 'status provide', 'box status', 'serial number check', 
    'provide serial', 'number check', 'provide serial number', 'speed slow', 'work internet', 
    'area internet', 'modem work', 'internet work internet', 'internet connection', 
    'provide serial', 'phone work', 'internet issue'],

    #technician installation inquiry
    ['come way', 'receive text', 'come rogers', 'reschedule appointment', 'number tell', 
    'technician come', 'install internet', 'technician install', 'cancel installation', 
    'installation date'],

    #tv addon inuqiry
    ['add channel add', 'theme pack', 'crave tv', 'tv rogers', 'channel tv', 'nfl ticket', 
    'channel tv package', 'add channel', 'channel add channel', 'rogers nhl'],

    #plan inquiry tv
    ['package change package', 'tv package popular', 'package package', 'change package', 
    'package popular'],

    #payment inquiry
    ['charge pay', 'payment payment', 'credit card info', 'pay internet', 'card info', 
    'payment online', 'change payment', 'pay bill', 'payment arrangement', 'internet pay'],

    #account inquiry
    ['number change', 'number link', 'set myroger account', 'account change', 'number end', 
    'place order', 'account number set', 'number account', 'old account', 'line new', 'change phone',
    'number set', 'receive service', 'link account', 'line change', 'change service', 'internet usage',
    'rogers account'],

    #plan inquiry home phone
    ['standard long distance', 'distance charge', 'distance plan', 'long distance phone', 
    'distance phone', 'long distance plan', 'long distance', 'long distance charge', 'home phone', 
    'phone plan', 'standard long'],

    #moves
    ['address','move','moving','new address','old address','address address','address account','change address','account address', 
     'address send', 'address new', 'account new','new place','service address', 'new work','change job', 
     'send address', 'new house', 'address change', 'service new','technical support' ,'tech support' ,
     'send technician','home address','new service', 'address cause', 'current address', 
     'set account', 'address file','new account' ,'home internet', 'address internet', 'set address', 
     'address phone', 'toronto ontario', 'address service','work home' ,'new modem',
     'set new' ,'transfer service','address old', 'internet new', 'new new', 'new home', 'new job',
     'mailing address','mail address', 'address check', 'send new', 'new location', 'address long', 'address new address',
     'previous address', 'new apartment','internet service','cause address', 'address pay', 'check address', 
     'brand new', 'sell house', 'address home', 'address street', 'modem new',
     'send check', 'house house', 'address cancel', 'address location', 'address relocate', 'account relocate',
     'location relocate', 'change address','new house', 'new condo', 'new apartment', 'new place',
     'new city', 'new province', 'moving in','move in','leaving home','service area', 'leave home', 
     'moving back','move back', 'moving out','move out', 'old place','new address new','service new address',
     'service hold', 'transfer service new', 
     'move new', 'moving new', 'move place', 'moving place', 'move address', 'moving address'
     ],

     #customer notification
    ['safe sign internet', 'safe sign', 'sign internet', 'email state', 'email receive email', 
    'email rogers', 'sign internet service', 'service limited', 'stay safe sign', 
    'automatically apply']

] 

# Step 1 - Extract embeddings
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

# Step 2 - Reduce dimensionality
umap_model = UMAP(n_neighbors=15, n_components=5, min_dist=0.0, metric='cosine')

# Step 3 - Cluster reduced embeddings
hdbscan_model = HDBSCAN(min_cluster_size=15, metric='euclidean', cluster_selection_method='eom', prediction_data=True)

# Step 4 - Tokenize topics
vectorizer_model = CountVectorizer(ngram_range=(1, 3))#stop words already removed at preprocessing, no need to do it again

# Step 5 - Create topic representation
ctfidf_model = ClassTfidfTransformer()

# Step 6 - (Optional) Fine-tune topic representations with 
# a `bertopic.representation` model
representation_model = KeyBERTInspired()

# All steps together
guided_topic_model_processed = BERTopic(
  #min_topic_size=100,
  nr_topics=30, #customize cluster size to 30, otherwise would get hundreds
  language="english", 
  calculate_probabilities=True,
  top_n_words=20, 
  n_gram_range=(1, 3), #most of the phrases and words we want to capture are bigrams, a few are unigrams and trigrams
  verbose=True,
  seed_topic_list=seed_topic_list,
  embedding_model=embedding_model,          # Step 1 - Extract embeddings
  umap_model=umap_model,                    # Step 2 - Reduce dimensionality
  hdbscan_model=hdbscan_model,              # Step 3 - Cluster reduced embeddings
  vectorizer_model=vectorizer_model,        # Step 4 - Tokenize topics
  ctfidf_model=ctfidf_model,                # Step 5 - Extract topic words
  representation_model=representation_model # Step 6 - (Optional) Fine-tune topic represenations
)

guided_topics_processed, guided_probs_processed = guided_topic_model_processed.fit_transform(train_final_df_pd["CLEAN_TEXT_CUSTOMER"])

Batches:   0%|          | 0/4316 [00:00<?, ?it/s]

2023-03-15 20:48:41,730 - BERTopic - Transformed documents to Embeddings


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

2023-03-15 20:50:13,259 - BERTopic - Reduced dimensionality
2023-03-15 20:52:09,587 - BERTopic - Clustered reduced embeddings
2023-03-15 20:59:49,148 - BERTopic - Reduced number of topics from 126 to 30


In [0]:
freq = guided_topic_model_processed.get_topic_info()
freq

Unnamed: 0,Topic,Count,Name
0,-1,104269,-1_cell phone_home phone_customer_service
1,0,7255,0_remote control_tv box_get channel_turn tv
2,1,6002,1_modem_new modem_internet connection_router
3,2,3769,2_bill pay_pay bill_get bill_bill month
4,3,2979,3_cancel service_cancel internet service_servi...
5,4,2163,4_street_line_bill_customer
6,5,2091,5_move address_move new address_call move_move...
7,6,1658,6_password rogers_password say_password get_pa...
8,7,1443,7_call maam_maam call_maam_maam maam
9,8,1102,8_call behalf mother_fathers account_mothers a...


## As shown in the following block, topics are fine clustered:       
### Such as topic 0 for TV addon, topic 1 for equipment and modem inquiry, topic 2 for billing inquiry, topic 5 and 13 for move and topic 3, 9, 12, 15 for cancel, topic 11 for iginte TV, topic 14 for technician inquiry,

In [0]:
freq.head(20)

Unnamed: 0,Topic,Count,Name
0,-1,104269,-1_cell phone_home phone_customer_service
1,0,7255,0_remote control_tv box_get channel_turn tv
2,1,6002,1_modem_new modem_internet connection_router
3,2,3769,2_bill pay_pay bill_get bill_bill month
4,3,2979,3_cancel service_cancel internet service_servi...
5,4,2163,4_street_line_bill_customer
6,5,2091,5_move address_move new address_call move_move...
7,6,1658,6_password rogers_password say_password get_pa...
8,7,1443,7_call maam_maam call_maam_maam maam
9,8,1102,8_call behalf mother_fathers account_mothers a...


In [0]:
guided_topic_model_processed.get_topic(5)

Out[32]: [('move address', 0.82902545),
 ('move new address', 0.814141),
 ('call move', 0.7979211),
 ('move service', 0.79291254),
 ('address move', 0.7853836),
 ('move move', 0.77695674),
 ('move change', 0.7602319),
 ('move', 0.75284326),
 ('service move', 0.75245863),
 ('move new', 0.74628544)]

In [0]:
guided_topic_model_processed.get_topic(3)

Out[33]: [('cancel service', 0.9475236),
 ('cancel internet service', 0.9303482),
 ('service cancel', 0.9178182),
 ('internet cancel', 0.90097195),
 ('cancel internet', 0.8775258),
 ('get cancel', 0.78922474),
 ('canceling', 0.7803995),
 ('cancel', 0.77065384),
 ('cancel cancel', 0.7691685),
 ('call cancel', 0.76684797)]

In [0]:
guided_topic_model_processed.get_topic(9)

Out[34]: [('appointment cancel', 0.7422272),
 ('cancel appointment', 0.73458594),
 ('schedule appointment', 0.700595),
 ('reschedule appointment', 0.67970234),
 ('appointment schedule', 0.67012626),
 ('schedule technician', 0.60416454),
 ('cancel internet', 0.5515235),
 ('appointment', 0.54247195),
 ('appointment get', 0.5397885),
 ('cancel service', 0.5311317)]

In [0]:
guided_topic_model_processed.get_topic(12)

Out[35]: [('cancel home monitor', 0.8969857),
 ('smart home monitor', 0.69981027),
 ('home monitor', 0.610461),
 ('home monitor system', 0.5986349),
 ('smart home', 0.54229116),
 ('monitor system', 0.4799768),
 ('monitor', 0.47123423),
 ('cancel', 0.43586034),
 ('screen', 0.3232578),
 ('smart', 0.26737633)]

In [0]:
guided_topic_model_processed.get_topic(13)

Out[36]: [('postal code', 0.9705958),
 ('code postal code', 0.95317274),
 ('postal code get', 0.9524472),
 ('postal code postal', 0.9513249),
 ('code postal', 0.93877685),
 ('service postal code', 0.9077622),
 ('postal code know', 0.9038202),
 ('postal code go', 0.89465874),
 ('postal code move', 0.8835842),
 ('internet postal code', 0.8776128)]

In [0]:
guided_topic_model_processed.get_topic(15)

Out[37]: [('call cancel fido', 0.8503492),
 ('cancel fido internet', 0.81184363),
 ('fido cancel', 0.7642471),
 ('cancel fido', 0.76025105),
 ('call fido', 0.6783703),
 ('fido phone number', 0.6667167),
 ('internet cancel', 0.6639839),
 ('call cancel', 0.6633958),
 ('fido phone', 0.6352875),
 ('internet service fido', 0.6327174)]

### After the model was trained, use it to transfrom the test dataset and evaluate the metrics on cancel and move categorys:

In [0]:
inference_guided_topics_processed, inference_guided_probs_processed = guided_topic_model_processed.transform(comparsion_df_pd["CLEAN_TEXT_CUSTOMER"])

Batches:   0%|          | 0/11304 [00:00<?, ?it/s]

2023-03-15 22:32:09,538 - BERTopic - Reduced dimensionality
2023-03-15 22:40:27,469 - BERTopic - Calculated probabilities with HDBSCAN
2023-03-15 22:40:27,470 - BERTopic - Predicted clusters


In [0]:
inference_guided_probs_processed[0]

Out[70]: array([0.21190969, 0.0414405 , 0.11500228, 0.0697221 , 0.1311359 ,
       0.00980058, 0.02268176, 0.00196047, 0.01042774, 0.04141208,
       0.01857227, 0.00333116, 0.01104373, 0.00197972, 0.00440377,
       0.00970661, 0.02157798, 0.01606316, 0.00204341, 0.00619947,
       0.00591042, 0.01272498, 0.00268563, 0.00181886, 0.00823875,
       0.00179017, 0.00226856, 0.00383816, 0.0029784 ])

In [0]:
guided_topic_model_processed.visualize_distribution(inference_guided_probs_processed[0], min_probability=0.015)

In [0]:
#subset=comparsion_df_pd["CLEAN_TEXT_CUSTOMER"]
inference_guided_topics_processed, inference_guided_probs_processed

In [0]:
inference_guided_probs_processed

Out[41]: array([[3.50832086e-02, 1.14950777e-01, 8.43513216e-02, ...,
        6.41267627e-03, 9.40349561e-04, 1.00084706e-03],
       [9.73018869e-40, 1.78827518e-39, 9.16852249e-40, ...,
        1.30339012e-40, 1.05139739e-40, 1.26813533e-40],
       [4.02585683e-24, 1.79675506e-23, 4.53224908e-24, ...,
        1.07321202e-24, 5.83158995e-28, 1.40065202e-28],
       ...,
       [2.47531418e-01, 2.07300433e-02, 1.50011490e-02, ...,
        3.23742598e-04, 2.90870733e-04, 3.21338222e-04],
       [1.82756725e-02, 2.49313500e-02, 1.37514497e-02, ...,
        1.28894147e-03, 1.15547632e-03, 1.27073263e-03],
       [2.68042123e-02, 8.52529825e-01, 1.15970913e-02, ...,
        5.22197095e-07, 1.42191062e-07, 1.00113239e-07]])

### Save the topic probability for future usage:

In [0]:
import pandas as pd
lis1=[]
for i in range(29):
    lis1.append("topic_"+str(i))
print(lis1)
saved_probs=pd.DataFrame(data=inference_guided_probs_processed,columns=lis1)
saved_probs_spark=spark.createDataFrame(saved_probs)
saved_probs_spark_path="dbfs:/mnt/ml-model-output-scores-data/mlmodel_output/digital/CD4MT/Verint/GLDA_Topic_Modelling_Output/Connected_Home/df_bert_spark_saved_probs"
saved_probs_spark.write.mode("overwrite").option("header", True).parquet(saved_probs_spark_path)

['topic_0', 'topic_1', 'topic_2', 'topic_3', 'topic_4', 'topic_5', 'topic_6', 'topic_7', 'topic_8', 'topic_9', 'topic_10', 'topic_11', 'topic_12', 'topic_13', 'topic_14', 'topic_15', 'topic_16', 'topic_17', 'topic_18', 'topic_19', 'topic_20', 'topic_21', 'topic_22', 'topic_23', 'topic_24', 'topic_25', 'topic_26', 'topic_27', 'topic_28']


In [0]:
comparsion_df_pd

Unnamed: 0,SPEECH_ID_VERINT,CLEAN_TEXT_CUSTOMER,cancel,move,cancel_mention,move_mention
0,504060085267811,know account suspend go come account suspend k...,False,False,False,False
1,504042085789731,know tactic know say signal recheck avenue say...,False,False,False,False
2,504052084807090,personal blockage david problem know go know m...,False,False,False,False
3,504062085789419,internet hall phone rogers associate associate...,False,False,False,False
4,504068084886774,picket rogers internet order mart internet iss...,False,False,False,False
...,...,...,...,...,...,...
361709,504050086869885,rogers guess use go able modem innovation mont...,False,False,True,True
361710,504032085471372,love book speak tell hear month mary postal co...,False,False,False,False
361711,504036086648471,account use charge problem account reason mont...,True,False,True,False
361712,504042085504653,market watch nancy go remember answer question...,False,False,False,False


## Create cancel and move evaluation dataset:

In [0]:
import pandas as pd
df_bert = pd.DataFrame({"SPEECH_ID_VERINT": comparsion_df_pd["SPEECH_ID_VERINT"], 
                   "CLEAN_TEXT_CUSTOMER": comparsion_df_pd["CLEAN_TEXT_CUSTOMER"], 
                   "cancel": comparsion_df_pd["cancel"], #label
                   "move": comparsion_df_pd["move"], #label
                   "cancel_main_bert_prob": [row[3] for row in inference_guided_probs_processed],
                   "cancel_appoint_bert_prob": [row[9] for row in inference_guided_probs_processed],
                   "cancel_monitor_bert_prob": [row[12] for row in inference_guided_probs_processed],
                   "cancel_fido_bert_prob": [row[15] for row in inference_guided_probs_processed],
                   "move_main_bert_prob": [row[5] for row in inference_guided_probs_processed],
                   "move_postal_bert_prob": [row[13] for row in inference_guided_probs_processed]
                       })

pd.set_option('display.max_colwidth', 50)
df_bert

Unnamed: 0,SPEECH_ID_VERINT,CLEAN_TEXT_CUSTOMER,cancel,cancel_mention,move,move_mention,cancel_main_bert_prob,cancel_appoint_bert_prob,cancel_monitor_bert_prob,cancel_fido_bert_prob,move_main_bert_prob,move_postal_bert_prob
0,504060085267811,know account suspend go come account suspend k...,False,False,False,False,1.586867e-01,1.214076e-02,8.009574e-03,5.917426e-03,3.546651e-02,5.813683e-02
1,504042085789731,know tactic know say signal recheck avenue say...,False,False,False,False,1.225734e-39,2.632841e-40,3.641440e-40,1.733897e-40,4.122045e-40,5.397220e-40
2,504052084807090,personal blockage david problem know go know m...,False,False,False,False,9.948376e-24,1.486336e-16,8.956138e-01,1.221124e-19,3.086363e-24,4.250572e-24
3,504062085789419,internet hall phone rogers associate associate...,False,False,False,False,7.051140e-05,4.278857e-06,1.962694e-10,8.426420e-11,4.579164e-08,1.518287e-05
4,504068084886774,picket rogers internet order mart internet iss...,False,False,False,False,4.812843e-13,2.417653e-09,1.101782e-13,6.856982e-14,1.218361e-13,2.012626e-13
...,...,...,...,...,...,...,...,...,...,...,...,...
361709,504050086869885,rogers guess use go able modem innovation mont...,False,True,False,True,0.000000e+00,0.000000e+00,3.372735e-02,0.000000e+00,0.000000e+00,0.000000e+00
361710,504032085471372,love book speak tell hear month mary postal co...,False,False,False,False,1.964775e-58,3.018073e-59,2.559357e-59,1.867847e-01,6.296916e-59,8.753205e-59
361711,504036086648471,account use charge problem account reason mont...,True,True,False,False,3.911282e-03,7.655489e-04,6.740866e-03,5.352024e-04,1.156537e-03,1.646747e-03
361712,504042085504653,market watch nancy go remember answer question...,False,False,False,False,1.540757e-02,3.148782e-03,5.986110e-03,2.122089e-03,4.651569e-03,6.569071e-03


In [0]:
import numpy as np
df_bert["label_cancel"]= np.where((df_bert["cancel"]==True)), True, False)
df_bert["label_move"]= np.where((df_bert["move"]==True)), True, False)

In [0]:
df_bert_spark=spark.createDataFrame(df_bert)
df_bert_spark_path="dbfs:/mnt/ml-model-output-scores-data/mlmodel_output/digital/CD4MT/Verint/GLDA_Topic_Modelling_Output/Connected_Home/df_bert_spark_inference"
df_bert_spark.write.mode("overwrite").option("header", True).parquet(df_bert_spark_path)

In [0]:
df_bert_spark_path="dbfs:/mnt/ml-model-output-scores-data/mlmodel_output/digital/CD4MT/Verint/GLDA_Topic_Modelling_Output/Connected_Home/df_bert_spark_inference"
df_bert_spark=spark.read.parquet(df_bert_spark_path)

## Evaluation on bertopic for label cancel and label move:

In [0]:
#calculate metrics or bert
#threshold for cancel at 0.40
tp_cancel = df_bert_spark.filter(col("label_cancel") == True).filter((col("cancel_main_bert_prob") >= 0.40) | (col("cancel_appoint_bert_prob") >= 0.40) | (col("cancel_monitor_bert_prob") >= 0.40) | (col("cancel_fido_bert_prob") >= 0.40)).count()
print(f"TP count: {tp_cancel}")

tn_cancel = df_bert_spark.filter(col("label_cancel") == False).filter((col("cancel_main_bert_prob") < 0.40) & (col("cancel_appoint_bert_prob") < 0.40) & (col("cancel_monitor_bert_prob") < 0.40) & (col("cancel_fido_bert_prob") < 0.40)).count()
print(f"TN count: {tn_cancel}")

fp_cancel = df_bert_spark.filter(col("label_cancel") == False).filter((col("cancel_main_bert_prob") >= 0.40) | (col("cancel_appoint_bert_prob") >= 0.40) | (col("cancel_monitor_bert_prob") >= 0.40) | (col("cancel_fido_bert_prob") >= 0.40)).count()
print(f"FP count: {fp_cancel}")

fn_cancel = df_bert_spark.filter(col("label_cancel") == True).filter((col("cancel_main_bert_prob") < 0.40) & (col("cancel_appoint_bert_prob") < 0.40) & (col("cancel_monitor_bert_prob") < 0.40) & (col("cancel_fido_bert_prob") < 0.40)).count()
print(f"FN count: {fn_cancel}")

precision_cancel = tp_cancel / (tp_cancel + fp_cancel)
print(f"Precision: {precision_cancel}")

recall_cancel = tp_cancel / (tp_cancel + fn_cancel)
print(f"Recall: {recall_cancel}")
Accuracy = (tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel) 
print(f"Accuracy: {(tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel)}")

cancel = [('cancel', tp_cancel, tn_cancel, fp_cancel, fn_cancel, precision_cancel, recall_cancel, Accuracy)]
f7 = spark.createDataFrame(data=cancel, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 4908
TN count: 282293
FP count: 12078
FN count: 62435
Precision: 0.28894383610031793
Recall: 0.07288062604873558
Accuracy: 0.7940002322276716


In [0]:
#calculate metrics or bert
#threshold for cancel at 0.10
tp_cancel = df_bert_spark.filter(col("label_cancel") == True).filter((col("cancel_main_bert_prob") >= 0.10) | (col("cancel_appoint_bert_prob") >= 0.10) | (col("cancel_monitor_bert_prob") >= 0.10) | (col("cancel_fido_bert_prob") >= 0.10)).count()
print(f"TP count: {tp_cancel}")

tn_cancel = df_bert_spark.filter(col("label_cancel") == False).filter((col("cancel_main_bert_prob") < 0.10) & (col("cancel_appoint_bert_prob") < 0.10) & (col("cancel_monitor_bert_prob") < 0.10) & (col("cancel_fido_bert_prob") < 0.10)).count()
print(f"TN count: {tn_cancel}")

fp_cancel = df_bert_spark.filter(col("label_cancel") == False).filter((col("cancel_main_bert_prob") >= 0.10) | (col("cancel_appoint_bert_prob") >= 0.10) | (col("cancel_monitor_bert_prob") >= 0.10) | (col("cancel_fido_bert_prob") >= 0.10)).count()
print(f"FP count: {fp_cancel}")

fn_cancel = df_bert_spark.filter(col("label_cancel") == True).filter((col("cancel_main_bert_prob") < 0.10) & (col("cancel_appoint_bert_prob") < 0.10) & (col("cancel_monitor_bert_prob") < 0.10) & (col("cancel_fido_bert_prob") < 0.10)).count()
print(f"FN count: {fn_cancel}")

precision_cancel = tp_cancel / (tp_cancel + fp_cancel)
print(f"Precision: {precision_cancel}")

recall_cancel = tp_cancel / (tp_cancel + fn_cancel)
print(f"Recall: {recall_cancel}")
Accuracy = (tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel) 
print(f"Accuracy: {(tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel)}")

cancel = [('cancel', tp_cancel, tn_cancel, fp_cancel, fn_cancel, precision_cancel, recall_cancel, Accuracy)]
f7 = spark.createDataFrame(data=cancel, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 13325
TN count: 249411
FP count: 44960
FN count: 54018
Precision: 0.2286179977695805
Recall: 0.19786763286458875
Accuracy: 0.7263639228782961


In [0]:
#calculate metrics or bert
#threshold for cancel at 0.05
tp_cancel = df_bert_spark.filter(col("label_cancel") == True).filter((col("cancel_main_bert_prob") >= 0.05) | (col("cancel_appoint_bert_prob") >= 0.05) | (col("cancel_monitor_bert_prob") >= 0.05) | (col("cancel_fido_bert_prob") >= 0.05)).count()
print(f"TP count: {tp_cancel}")

tn_cancel = df_bert_spark.filter(col("label_cancel") == False).filter((col("cancel_main_bert_prob") < 0.05) & (col("cancel_appoint_bert_prob") < 0.05) & (col("cancel_monitor_bert_prob") < 0.05) & (col("cancel_fido_bert_prob") < 0.05)).count()
print(f"TN count: {tn_cancel}")

fp_cancel = df_bert_spark.filter(col("label_cancel") == False).filter((col("cancel_main_bert_prob") >= 0.05) | (col("cancel_appoint_bert_prob") >= 0.05) | (col("cancel_monitor_bert_prob") >= 0.05) | (col("cancel_fido_bert_prob") >= 0.05)).count()
print(f"FP count: {fp_cancel}")

fn_cancel = df_bert_spark.filter(col("label_cancel") == True).filter((col("cancel_main_bert_prob") < 0.05) & (col("cancel_appoint_bert_prob") < 0.05) & (col("cancel_monitor_bert_prob") < 0.05) & (col("cancel_fido_bert_prob") < 0.05)).count()
print(f"FN count: {fn_cancel}")

precision_cancel = tp_cancel / (tp_cancel + fp_cancel)
print(f"Precision: {precision_cancel}")

recall_cancel = tp_cancel / (tp_cancel + fn_cancel)
print(f"Recall: {recall_cancel}")
Accuracy = (tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel) 
print(f"Accuracy: {(tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel)}")

cancel = [('cancel', tp_cancel, tn_cancel, fp_cancel, fn_cancel, precision_cancel, recall_cancel, Accuracy)]
f7 = spark.createDataFrame(data=cancel, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 16687
TN count: 223901
FP count: 70470
FN count: 50656
Precision: 0.19145909106554837
Recall: 0.24779115869503884
Accuracy: 0.6651332268034967


In [0]:
#calculate metrics or bert
#threshold for cancel at 0.01
tp_cancel = df_bert_spark.filter(col("label_cancel") == True).filter((col("cancel_main_bert_prob") >= 0.01) | (col("cancel_appoint_bert_prob") >= 0.01) | (col("cancel_monitor_bert_prob") >= 0.01) | (col("cancel_fido_bert_prob") >= 0.01)).count()
print(f"TP count: {tp_cancel}")

tn_cancel = df_bert_spark.filter(col("label_cancel") == False).filter((col("cancel_main_bert_prob") < 0.01) & (col("cancel_appoint_bert_prob") < 0.01) & (col("cancel_monitor_bert_prob") < 0.01) & (col("cancel_fido_bert_prob") < 0.01)).count()
print(f"TN count: {tn_cancel}")

fp_cancel = df_bert_spark.filter(col("label_cancel") == False).filter((col("cancel_main_bert_prob") >= 0.01) | (col("cancel_appoint_bert_prob") >= 0.01) | (col("cancel_monitor_bert_prob") >= 0.01) | (col("cancel_fido_bert_prob") >= 0.01)).count()
print(f"FP count: {fp_cancel}")

fn_cancel = df_bert_spark.filter(col("label_cancel") == True).filter((col("cancel_main_bert_prob") < 0.01) & (col("cancel_appoint_bert_prob") < 0.01) & (col("cancel_monitor_bert_prob") < 0.01) & (col("cancel_fido_bert_prob") < 0.01)).count()
print(f"FN count: {fn_cancel}")

precision_cancel = tp_cancel / (tp_cancel + fp_cancel)
print(f"Precision: {precision_cancel}")

recall_cancel = tp_cancel / (tp_cancel + fn_cancel)
print(f"Recall: {recall_cancel}")
Accuracy = (tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel) 
print(f"Accuracy: {(tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel)}")

cancel = [('cancel', tp_cancel, tn_cancel, fp_cancel, fn_cancel, precision_cancel, recall_cancel, Accuracy)]
f7 = spark.createDataFrame(data=cancel, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 24836
TN count: 186292
FP count: 108079
FN count: 42507
Precision: 0.18685626152052062
Recall: 0.3687985388236342
Accuracy: 0.5836876648401776


In [0]:
#calculate metrics or bert
#threshold for cancel at 0.005
tp_cancel = df_bert_spark.filter(col("label_cancel") == True).filter((col("cancel_main_bert_prob") >= 0.005) | (col("cancel_appoint_bert_prob") >= 0.005) | (col("cancel_monitor_bert_prob") >= 0.005) | (col("cancel_fido_bert_prob") >= 0.005)).count()
print(f"TP count: {tp_cancel}")

tn_cancel = df_bert_spark.filter(col("label_cancel") == False).filter((col("cancel_main_bert_prob") < 0.005) & (col("cancel_appoint_bert_prob") < 0.005) & (col("cancel_monitor_bert_prob") < 0.005) & (col("cancel_fido_bert_prob") < 0.005)).count()
print(f"TN count: {tn_cancel}")

fp_cancel = df_bert_spark.filter(col("label_cancel") == False).filter((col("cancel_main_bert_prob") >= 0.005) | (col("cancel_appoint_bert_prob") >= 0.005) | (col("cancel_monitor_bert_prob") >= 0.005) | (col("cancel_fido_bert_prob") >= 0.005)).count()
print(f"FP count: {fp_cancel}")

fn_cancel = df_bert_spark.filter(col("label_cancel") == True).filter((col("cancel_main_bert_prob") < 0.005) & (col("cancel_appoint_bert_prob") < 0.005) & (col("cancel_monitor_bert_prob") < 0.005) & (col("cancel_fido_bert_prob") < 0.005)).count()
print(f"FN count: {fn_cancel}")

precision_cancel = tp_cancel / (tp_cancel + fp_cancel)
print(f"Precision: {precision_cancel}")

recall_cancel = tp_cancel / (tp_cancel + fn_cancel)
print(f"Recall: {recall_cancel}")
Accuracy = (tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel) 
print(f"Accuracy: {(tp_cancel + tn_cancel) / (tp_cancel + fp_cancel + fn_cancel + tn_cancel)}")

cancel = [('cancel', tp_cancel, tn_cancel, fp_cancel, fn_cancel, precision_cancel, recall_cancel, Accuracy)]
f7 = spark.createDataFrame(data=cancel, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 27067
TN count: 178822
FP count: 115549
FN count: 40276
Precision: 0.18978936444718686
Recall: 0.4019274460597241
Accuracy: 0.5692038461325799


In [0]:
#move, threshold 0.20
tp_moves = df_bert_spark.filter(col("label_move") == True).filter((col("move_main_bert_prob") >= 0.20) | (col("move_postal_bert_prob") >= 0.20)).count()
print(f"TP count: {tp_moves}")

tn_moves = df_bert_spark.filter(col("label_move") == False).filter((col("move_main_bert_prob") < 0.20) & (col("move_postal_bert_prob") < 0.20)).count()
print(f"TN count: {tn_moves}")

fp_moves = df_bert_spark.filter(col("label_move") == False).filter((col("move_main_bert_prob") >= 0.20) | (col("move_postal_bert_prob") >= 0.20)).count()
print(f"FP count: {fp_moves}")

fn_moves = df_bert_spark.filter(col("label_move") == True).filter((col("move_main_bert_prob") < 0.20) & (col("move_postal_bert_prob") < 0.20)).count()
print(f"FN count: {fn_moves}")

precision_moves = tp_moves / (tp_moves + fp_moves)
print(f"Precision: {precision_moves}")
Accuracy = (tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)
recall_moves = tp_moves / (tp_moves + fn_moves)
print(f"Recall: {recall_moves}")
print(f"Accuracy: {(tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)}")

move = [('moves', tp_moves, tn_moves, fp_moves, fn_moves, precision_moves, recall_moves, Accuracy)]
f3 = spark.createDataFrame(data=move, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 2173
TN count: 258875
FP count: 1864
FN count: 98802
Precision: 0.5382709933118652
Recall: 0.021520178261946025
Accuracy: 0.7216972525254759


In [0]:
#move, threshold 0.10
tp_moves = df_bert_spark.filter(col("label_move") == True).filter((col("move_main_bert_prob") >= 0.10) | (col("move_postal_bert_prob") >= 0.10)).count()
print(f"TP count: {tp_moves}")

tn_moves = df_bert_spark.filter(col("label_move") == False).filter((col("move_main_bert_prob") < 0.10) & (col("move_postal_bert_prob") < 0.10)).count()
print(f"TN count: {tn_moves}")

fp_moves = df_bert_spark.filter(col("label_move") == False).filter((col("move_main_bert_prob") >= 0.10) | (col("move_postal_bert_prob") >= 0.10)).count()
print(f"FP count: {fp_moves}")

fn_moves = df_bert_spark.filter(col("label_move") == True).filter((col("move_main_bert_prob") < 0.10) & (col("move_postal_bert_prob") < 0.10)).count()
print(f"FN count: {fn_moves}")

precision_moves = tp_moves / (tp_moves + fp_moves)
print(f"Precision: {precision_moves}")
Accuracy = (tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)
recall_moves = tp_moves / (tp_moves + fn_moves)
print(f"Recall: {recall_moves}")
print(f"Accuracy: {(tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)}")

move = [('moves', tp_moves, tn_moves, fp_moves, fn_moves, precision_moves, recall_moves, Accuracy)]
f3 = spark.createDataFrame(data=move, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 4497
TN count: 254473
FP count: 6266
FN count: 96478
Precision: 0.4178203103224008
Recall: 0.044535776182223326
Accuracy: 0.7159523822688644


In [0]:
#move, threshold 0.05
tp_moves = df_bert_spark.filter(col("label_move") == True).filter((col("move_main_bert_prob") >= 0.05) | (col("move_postal_bert_prob") >= 0.05)).count()
print(f"TP count: {tp_moves}")

tn_moves = df_bert_spark.filter(col("label_move") == False).filter((col("move_main_bert_prob") < 0.05) & (col("move_postal_bert_prob") < 0.05)).count()
print(f"TN count: {tn_moves}")

fp_moves = df_bert_spark.filter(col("label_move") == False).filter((col("move_main_bert_prob") >= 0.05) | (col("move_postal_bert_prob") >= 0.05)).count()
print(f"FP count: {fp_moves}")

fn_moves = df_bert_spark.filter(col("label_move") == True).filter((col("move_main_bert_prob") < 0.05) & (col("move_postal_bert_prob") < 0.05)).count()
print(f"FN count: {fn_moves}")

precision_moves = tp_moves / (tp_moves + fp_moves)
print(f"Precision: {precision_moves}")
Accuracy = (tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)
recall_moves = tp_moves / (tp_moves + fn_moves)
print(f"Recall: {recall_moves}")
print(f"Accuracy: {(tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)}")

move = [('moves', tp_moves, tn_moves, fp_moves, fn_moves, precision_moves, recall_moves, Accuracy)]
f3 = spark.createDataFrame(data=move, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 7268
TN count: 247895
FP count: 12844
FN count: 93707
Precision: 0.36137629276054095
Recall: 0.07197821242881901
Accuracy: 0.7054274924387776


In [0]:
#move, threshold 0.01
tp_moves = df_bert_spark.filter(col("label_move") == True).filter((col("move_main_bert_prob") >= 0.01) | (col("move_postal_bert_prob") >= 0.01)).count()
print(f"TP count: {tp_moves}")

tn_moves = df_bert_spark.filter(col("label_move") == False).filter((col("move_main_bert_prob") < 0.01) & (col("move_postal_bert_prob") < 0.01)).count()
print(f"TN count: {tn_moves}")

fp_moves = df_bert_spark.filter(col("label_move") == False).filter((col("move_main_bert_prob") >= 0.01) | (col("move_postal_bert_prob") >= 0.01)).count()
print(f"FP count: {fp_moves}")

fn_moves = df_bert_spark.filter(col("label_move") == True).filter((col("move_main_bert_prob") < 0.01) & (col("move_postal_bert_prob") < 0.01)).count()
print(f"FN count: {fn_moves}")

precision_moves = tp_moves / (tp_moves + fp_moves)
print(f"Precision: {precision_moves}")
Accuracy = (tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)
recall_moves = tp_moves / (tp_moves + fn_moves)
print(f"Recall: {recall_moves}")
print(f"Accuracy: {(tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)}")

move = [('moves', tp_moves, tn_moves, fp_moves, fn_moves, precision_moves, recall_moves, Accuracy)]
f3 = spark.createDataFrame(data=move, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 20746
TN count: 205424
FP count: 55315
FN count: 80229
Precision: 0.2727547626247354
Recall: 0.20545679623669225
Accuracy: 0.6252730057448702


In [0]:
#move, threshold 0.005
tp_moves = df_bert_spark.filter(col("label_move") == True).filter((col("move_main_bert_prob") >= 0.005) | (col("move_postal_bert_prob") >= 0.005)).count()
print(f"TP count: {tp_moves}")

tn_moves = df_bert_spark.filter(col("label_move") == False).filter((col("move_main_bert_prob") < 0.005) & (col("move_postal_bert_prob") < 0.005)).count()
print(f"TN count: {tn_moves}")

fp_moves = df_bert_spark.filter(col("label_move") == False).filter((col("move_main_bert_prob") >= 0.005) | (col("move_postal_bert_prob") >= 0.005)).count()
print(f"FP count: {fp_moves}")

fn_moves = df_bert_spark.filter(col("label_move") == True).filter((col("move_main_bert_prob") < 0.005) & (col("move_postal_bert_prob") < 0.005)).count()
print(f"FN count: {fn_moves}")

precision_moves = tp_moves / (tp_moves + fp_moves)
print(f"Precision: {precision_moves}")
Accuracy = (tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)
recall_moves = tp_moves / (tp_moves + fn_moves)
print(f"Recall: {recall_moves}")
print(f"Accuracy: {(tp_moves + tn_moves) / (tp_moves + fp_moves + fn_moves + tn_moves)}")

move = [('moves', tp_moves, tn_moves, fp_moves, fn_moves, precision_moves, recall_moves, Accuracy)]
f3 = spark.createDataFrame(data=move, schema = ['category', 'tp', 'tn', 'fp', 'fn', 'precision', 'recall', 'accuracy'])

TP count: 24673
TN count: 197310
FP count: 63429
FN count: 76302
Precision: 0.28005039613175636
Recall: 0.24434761079475117
Accuracy: 0.6136975621623714


# CONCLUSION:
## Guided Bertopic achieved simliar but a bit less performance on evaluation of cancel and move labels than the Guided LDA model.    
## Other comparsions:    
### Guided LDA model only take into account of the co-occurance of the words while bertopic take into account of the sequence and semantics of the sentences.     
### However, the Guided LDA model yeilded fixed number of topics as we provided in seeds_list while bertopic did not. 
### Also bertopic tend to assign huge number of conversations to 'topic -1' meaning noises and outliers since by default it is using density based clustering HDBSCAN as clustering algorithm. So unless one decease the min_cluster_size paramter of HDBSCAN or switch to algorithms like k-means, otherwise 'topic -1' 's probability tend to dominate over 30 topics in this specific dataset.