## Preprocessing and aggregation of email data for anomaly detection

#### Setup logging

In [None]:
import logging
import datetime
import sys
import json
import pandas as pd
t = pd.Timestamp("today").strftime("%Y-%m-%d")
filename = 'DataAggregationLogs_' + t + '.log'
logging.basicConfig(filename=filename, level=logging.INFO)

In [None]:
config_file = json.loads(open('systemConfig').read())
notebook_instance_name = config_file['notebook_instance_for_aggregation']

In [None]:
logging.info('***********************************************')
logging.info('Starting the Instance ' + notebook_instance_name)
logging.info('***********************************************')
logging.info('***********************************************')
logging.info('Starting the Data Aggregation Script')
logging.info('***********************************************')

In [None]:
def log_file_to_s3():
    log_path = 's3://' + bucket + '/logs'
    ! aws s3 cp $filename "$log_path/$filename"

In [None]:
def shutdown_notebook_instance():
    ! aws sagemaker stop-notebook-instance --notebook-instance-name $notebook_instance_name

#### Import libraries

In [None]:
try:   
    logging.info(str(pd.datetime.now()) + ': Importing libraries')
    import os
    import json
    import s3fs
    import boto3
    from datetime import datetime
    from io import StringIO
    from cryptography.fernet import Fernet
    import sagemaker as sage
    from sagemaker import get_execution_role
    logging.info(str(pd.datetime.now()) + ': Libraries imported sucessfully')
except:
    logging.info(str(pd.datetime.now()) + 'ERROR:' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()   

In [None]:
try:
    # Create a Secrets Manager client to retrieve the secret
    logging.info(str(pd.datetime.now()) + ': Fetching configuration from secret manager')
    sess = sage.Session()
    region_name = sess.boto_session.region_name    
    secret_name = config_file['secret_name']
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=region_name)
    get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    config = json.loads(get_secret_value_response['SecretString'])
    bucket = config['bucket']
    encryption_key = config['encryption_key']
    logging.info(str(pd.datetime.now()) + ': Configuration from secret manager fetched successully')    
except:
    logging.info(str(pd.datetime.now()) + 'ERROR:' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

#### Import data from S3

In [None]:
try:   
    logging.info(str(pd.datetime.now()) + ': Setting up S3 path')
    prefix = 's3://' + bucket + '/'
    input_path = prefix+'input/data'
    training_path = os.path.join(input_path, 'training')
    testing_path = os.path.join(input_path, 'testing')
    validation_path = os.path.join(input_path, 'validation')
    processed_path = os.path.join(input_path, 'processed')
    logging.info(str(pd.datetime.now()) + ': S3 path setup completed successfully')    
except:  
    logging.info(str(pd.datetime.now()) + 'ERROR:' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ': Importing files from S3')
    role = get_execution_role()
    fs = s3fs.S3FileSystem()
    f_key = Fernet(encryption_key)
    file_list = []
    email_file_path = os.path.join(training_path, 'email')
    try:
        email_list = fs.ls(email_file_path)
        email_file_list = []
    #if len(email_list) > 0:
        for email_file in email_list:
            with fs.open('s3://{}'.format(email_file), 'rb') as f:
                encrypted_data = f.read()
            decrypted_data = f_key.decrypt(encrypted_data)
            s = str(decrypted_data, 'utf-8')
            data = StringIO(s)
            df_email = pd.read_csv(data, encoding='utf-8')
            email_file_list.append(df_email)
        emails = pd.concat(email_file_list, sort=True)
        file_list.append('email')
    except:
        logging.info(str(pd.datetime.now()) + ': Warning' + '- Email file not present')
    chat_file_path = os.path.join(training_path, 'chat')
    try:
        chat_list = fs.ls(chat_file_path)
        chat_file_list = []
    #if len(email_list) > 0:
        for chat_file in chat_list:
            with fs.open('s3://{}'.format(chat_file), 'rb') as f:
                encrypted_data = f.read()
            decrypted_data = f_key.decrypt(encrypted_data)
            s = str(decrypted_data, 'utf-8')
            data = StringIO(s)
            df_chat = pd.read_csv(data, encoding='utf-8')
            chat_file_list.append(df_chat)
        chats = pd.concat(chat_file_list, sort=True)
        file_list.append('chat')
    except:
        logging.info(str(pd.datetime.now()) + ': Warning' + '- Chat file not present')
    audio_file_path = os.path.join(training_path, 'voice')
    try:
        audio_list = fs.ls(audio_file_path)
        audio_file_list = []
    #if len(email_list) > 0:
        for audio_file in audio_list:
            with fs.open('s3://{}'.format(audio_file), 'rb') as f:
                encrypted_data = f.read()
            decrypted_data = f_key.decrypt(encrypted_data)
            s = str(decrypted_data, 'utf-8')
            data = StringIO(s)
            df_audio = pd.read_csv(data, encoding='utf-8')
            audio_file_list.append(df_audio)
        audio = pd.concat(audio_file_list, sort=True)
        file_list.append('voice')
    except:
        logging.info(str(pd.datetime.now()) + ': Warning'  + '- Voice file not present')
    user_file_path = os.path.join(training_path, 'user')
    try:
        user_list = fs.ls(user_file_path)
        user_file_list = []
    #if len(email_list) > 0:
        for user_file in user_list:
            with fs.open('s3://{}'.format(user_file), 'rb') as f:
                encrypted_data = f.read()
            decrypted_data = f_key.decrypt(encrypted_data)
            s = str(decrypted_data, 'utf-8')
            data = StringIO(s)
            df_user = pd.read_csv(data, encoding='utf-8')
            user_file_list.append(df_user)
        users = pd.concat(user_file_list, sort=True)
        file_list.append('user')
    except:
        logging.info(str(pd.datetime.now()) + ': Warning' + '- User file not present')
    logging.info(str(pd.datetime.now()) + ': Importing files from S3 Completed')
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
logging.info(str(pd.datetime.now()) + ': Checking for files imported')
if len(file_list) < 2:
    logging.info(str(pd.datetime.now()) + ': ERROR- Source data files are not present. Stopping further progress!')
    shutdown_notebook_instance()
else:
    logging.info(str(pd.datetime.now()) + ': Files importing check is complete.Ready to proceed!')

In [None]:
try:   
    logging.info(str(pd.datetime.now()) + ': Verifying the columns for the dataset imported')
    #Column Verification for the dataset imported
    if 'email' in file_list:
        email_col_list = ['interaction_identifier', 'interaction_start', 'interaction_from',
                    'interaction_to', 'interaction_cc', 'interaction_source',
                    'interaction_type', 'interaction_has_attachment', 'childcount',
                    'interaction_language', 'scope_BF', 'participant_count_BF_I', 'size', 'Party_ID_BF', 'interaction_original_author_id']
    if 'chat' in file_list:
        chat_col_list = ['interaction_identifier', 'interaction_start', 'interaction_from',
                  'interaction_to', 'interaction_original_author_id',
                  'interaction_language', 'interaction_type', 'interaction_source',
                  'size', 'participant_count_BF_I']
    if 'voice' in file_list:
        audio_col_list = ['interaction_identifier', 'interaction_sourceid', 'interaction_type',
                    'interaction_start', 'interaction_from', 'interaction_to',
                    'interaction_direction', 'interaction_duration_int',
                    'participant_count_BF_I', 'interaction_language', 'interaction_source',
                    'interaction_author', 'interaction_original_author_id']
    if 'user' in file_list:
        user_col_list = ['multisource.username', 'multisource.firstname', 'multisource.lastname',
                   'FullName_CUP', 'Region_CUP', 'Country_CUP', 'Title_CUP', 'ProductGroup_CUP',
                   'Exchange_EUSID', 'Bloomberg_EUSID', 'Yieldbroker_EUSID',
                   'NiceTradingRecording_EUSID',
                   'Reuters_EUSID', 'NIM_EUSID', 'NTRExport_EUSID', 'Avaya_EUSID', 'Verint_EUSID', 'Intercall_EUSID']      
    logging.info(str(pd.datetime.now()) + ': Column Verification for the dataset is completed')
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:
    def handle(dataset, col_list, dataset_name):
        col_not_present = [col for col in col_list if col not in dataset.columns]
        if len(col_not_present) > 0:
            logging.info(str(pd.datetime.now()) + "Do not proceed further as " + dataset_name + " has some missing columns")
            for col1 in col_not_present:
                logging.info(str(pd.datetime.now()) + dataset_name + " does not have " + col1 +" column in the input file.")
            logging.info(str(pd.datetime.now()) + "Stopping further execution")
            shutdown_notebook_instance()
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:    
    logging.info(str(pd.datetime.now()) + ": Verifying columns in the dataset provided")
    if 'email' in file_list:
        handle(emails, email_col_list, 'email')
    logging.info(str(pd.datetime.now()) + ": Email dataset Verification is complete")
    if 'chat' in file_list:
        handle(chats, chat_col_list, 'chat')
    logging.info(str(pd.datetime.now()) + ": Chat dataset Verification is complete")
    if 'voice' in file_list:
        handle(audio, audio_col_list, 'voice')
    logging.info(str(pd.datetime.now()) + ": Audio dataset Verification is complete")
    if 'user' in file_list:
        handle(users, user_col_list, 'user')
    logging.info(str(pd.datetime.now()) + ": User dataset Verification is complete")     
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ": Limiting the source systems for communication")
    # Row Count dataset Validation
    if 'email' in file_list:
        email_sources = ['Exchange', 'Bloomberg']
        email_records = emails[emails.interaction_source.isin(email_sources)].shape[0]
        if email_records == 0:
            file_list.remove('email')
    if 'chat' in file_list:
        chat_sources = ['Bloomberg', 'Yieldbroker', 'Reuters']
        chat_records = chats[chats.interaction_source.isin(chat_sources)].shape[0]
        if chat_records == 0:
            file_list.remove('chat')
    if 'voice' in file_list:
        audio_sources = ['NiceTradingRecording', 'NTRExport', 'NIM', 'Avaya', 'Verint', 'InterCall']
        audio_records = audio[audio.interaction_source.isin(audio_sources)].shape[0]
        if audio_records == 0:
            file_list.remove('voice')
    if 'user' in file_list:
        if users.shape[0] == 0:
            file_list.remove('user')
    logging.info(str(pd.datetime.now()) + ": Source systems configured")
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ": Converting the datatype")
    #Convert user attributes column type to objest
    if 'user' in file_list:
        data_list = [users]
        for dataset in data_list:
            for cols in dataset.columns:
                dataset[cols] = dataset[cols].astype('object')
    if 'email' in file_list:
        data_list = [emails]
        for dataset in data_list:
            for cols in dataset.columns:
                dataset[cols] = dataset[cols].astype('object')  
    logging.info(str(pd.datetime.now()) + ": Datatype conversion Complete")
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:
    for cols in [col for col in users.columns if col.endswith(('EUSID'))]:
        users[cols] = users[cols].astype('str').apply(lambda x: x.lower())
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()

In [None]:
def extract_policy_columns(df):
    return [col for col in df.columns if col.startswith("interaction_risk_")]

#### Email data processing( filtering, cleanup, missing value treatment, feature engineering and aggregation)

In [None]:
try:
    email_policy_cols = extract_policy_columns(emails)
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()    

In [None]:
try:    
    def email_data_cleanup(df_email):
        df_email_clean = df_email[df_email.interaction_type == 'Email']
        df_email_clean = df_email_clean[df_email_clean.interaction_from.notnull()]
        df_email_clean = df_email_clean[df_email_clean.interaction_start.notnull()]
        df_email_clean = df_email_clean[df_email_clean.participant_count_BF_I.notnull()]
        df_email_clean['size'] = df_email_clean['size'].fillna(0)
        df_email_clean['participant_count_BF_I'] = df_email_clean['participant_count_BF_I'].fillna(0)
        df_email_clean['childcount'] = df_email_clean['childcount'].fillna(0)
        df_email_clean['size'] = df_email_clean['size'].astype('int')
        df_email_clean['participant_count_BF_I'] = df_email_clean['participant_count_BF_I'].astype('int')
        df_email_clean['childcount'] = df_email_clean['childcount'].astype('int')
        for col in email_policy_cols:
            df_email_clean[col] = df_email_clean[col].fillna(0)
        #df_email_clean=pd.concat(df_email_clean_exchange,df_email_clean_exchange)
        return df_email_clean
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:
    def email_data_feature_generation(df_email_clean, bloomberg_user_mapping):
        #Derive new features from existing ones
        df_email_clean = df_email_clean[df_email_clean.interaction_start.str.contains('T') == True]
        df_email_clean['Date'] = df_email_clean.interaction_start.map(lambda x: x.split('T')[0].strip())
        df_email_clean['Hour'] = df_email_clean.interaction_start.map(lambda x: int(x.split('T')[1].strip().split(':')[0]))
        df_email_clean['Minute'] = df_email_clean.interaction_start.map(lambda x: int(x.split('T')[1].strip().split(':')[1]))
        df_email_clean['Cal_Minutes'] = df_email_clean.Hour*60 + df_email_clean.Minute
        df_email_clean['Content_Language'] = df_email_clean.interaction_language.map(lambda x: 0 if x == 'English' else 1 if pd.notnull(x) else 0)
        df_email_clean_exchange = df_email_clean[df_email_clean.interaction_source == 'Exchange']
        if df_email_clean_exchange.shape[0]!=0:
            df_email_clean_exchange = df_email_clean_exchange[df_email_clean_exchange.interaction_from.str.contains('@') == True]
            df_email_clean_exchange.interaction_from = df_email_clean_exchange.interaction_from.map(lambda x: x if x.find('<') | x.find('>') == -1 else x[x.find('<')+1:x.find('>')])
            df_email_clean_exchange.interaction_to = df_email_clean_exchange.apply(lambda x: "|".join(list(set(list(x.interaction_original_author_id.replace(x.interaction_from, '').split('|'))))), axis=1)
        df_email_clean_bloomberg = df_email_clean[df_email_clean.interaction_source=='Bloomberg']
        if df_email_clean_bloomberg.shape[0]!=0:
            df_email_clean_bloomberg = df_email_clean_bloomberg[df_email_clean_bloomberg.Party_ID_BF.str.contains('\|') == True]
            df_email_clean_bloomberg.interaction_from = df_email_clean_bloomberg.Party_ID_BF.map(lambda x: x.split('|')[1].strip())
            df_email_clean_bloomberg.interaction_to = df_email_clean_bloomberg.apply(lambda x: "|".join(list(set(list(x.interaction_original_author_id.split('|'))[x.participant_count_BF_I +1:]))), axis=1)
        df_email_clean = pd.concat([df_email_clean_exchange, df_email_clean_bloomberg])
        df_email_clean = df_email_clean.merge(bloomberg_user_mapping, how='left', left_on='interaction_from', right_on='Bloomberg_EUSID')
        df_email_clean.interaction_from = df_email_clean.Exchange_EUSID.fillna(df_email_clean.interaction_from)
        df_email_clean.drop(['Bloomberg_EUSID','Exchange_EUSID'], axis=1, inplace=True)
        df = pd.DataFrame(list(zip(df_email_clean.Date, df_email_clean.interaction_to, df_email_clean.Cal_Minutes)))
        # Create a list of emails in To and CC with sent date
        date_list = df[0].unique()
        To_List_Expended=[]
        for date in date_list:
            for date1, email, minutes in zip(df[0].values, df[1].values, df[2].values):
                if date==date1:
                    l1 = email.split('|')
                    for l2 in l1:
                        To_List_Expended.append(l2.strip()+ "~" + date1 + "~" + str(minutes))
        #Split Email, Sent Date and Email received minutes as seperate column
        Rec_Email = []
        Sent_Date = []
        Rec_Minutes = []
        for i in To_List_Expended:
            l = i.split('~')
            if l[0]:
                Rec_Email.append(l[0])
                Sent_Date.append(l[1])
                Rec_Minutes.append(int(l[2]))   
        #Aggregate features at day level    
        df_temp = pd.DataFrame(list(zip(Rec_Email, Sent_Date, Rec_Minutes)))
        if df_temp.shape[0] == 0:
            df_temp = pd.DataFrame(columns=['Rec_Email','Date', 'Mean_Email_Rec_Time'])
        else:
            df_temp=df_temp.rename(index=str, columns={0: "Rec_Email", 1: "Date", 2: "Mean_Email_Rec_Time"})
        df_temp = df_temp[df_temp.Rec_Email.notnull()]
        df_temp= df_temp.merge(bloomberg_user_mapping, how='left', left_on='Rec_Email', right_on='Bloomberg_EUSID')
        df_temp.Rec_Email = df_temp.Exchange_EUSID.fillna(df_temp.Rec_Email)
        df_temp.drop(['Bloomberg_EUSID','Exchange_EUSID'], axis=1, inplace=True)
        df = df_temp.groupby(['Rec_Email', 'Date']).size().reset_index()
        df1_sum_rec_time = df_temp.groupby(['Rec_Email', 'Date']).Mean_Email_Rec_Time.sum().reset_index(name="Sum_Email_Rec_Time")
        df1_count_rec_time = df_temp.groupby(['Rec_Email', 'Date']).Mean_Email_Rec_Time.count().reset_index(name="Count_Email_Rec_Time")
        df1=pd.merge(df1_sum_rec_time, df1_count_rec_time, how='inner', on=['Rec_Email', 'Date'])
        df_email_From_grp = df_email_clean.groupby(['interaction_from', 'Date'])
        df_email_stats = df_email_From_grp.interaction_start.count().reset_index()
        df_email_stats = df_email_stats.rename(index=str, columns={"interaction_start": "Email_Sent"})
        df_email_stats['Sum_Email_Recipient'] = df_email_From_grp.participant_count_BF_I.sum().reset_index().participant_count_BF_I.values
        df_email_stats['Count_Email_Recipient'] = df_email_From_grp.participant_count_BF_I.count().reset_index().participant_count_BF_I.values
        df_email_stats['Max_NonEnglish_Emails'] = df_email_From_grp.Content_Language.sum().reset_index().Content_Language.values
        df_email_stats['Sum_Email_Sent_Time'] = df_email_From_grp.Cal_Minutes.sum().reset_index().Cal_Minutes.values
        df_email_stats['Count_Email_Sent_Time'] = df_email_From_grp.Cal_Minutes.count().reset_index().Cal_Minutes.values
        df_email_stats['Sum_Email_Size'] = df_email_From_grp['size'].sum().values
        df_email_stats['Count_Email_Size'] = df_email_From_grp['size'].count().values
        df_email_stats['Emails_with_Attachment'] = df_email_From_grp.interaction_has_attachment.count().reset_index().interaction_has_attachment.values
        df_email_stats['Total_Attachment_in_Emails'] = df_email_From_grp.childcount.sum().reset_index().childcount.values        
        for col in email_policy_cols:
            df_email_stats['email_' + col] = df_email_From_grp[col].mean().values
        df_email_stats = df_email_stats.merge(df, how='outer', left_on=['interaction_from', 'Date'], right_on=['Rec_Email','Date'])
        df_email_stats = df_email_stats.rename(columns={0: 'Email_Received'})
        df_email_stats.Email_Received.fillna(0, inplace=True)
        df_email_stats.interaction_from = df_email_stats.interaction_from.fillna(df_email_stats.Rec_Email)
        df_email_stats.Date = df_email_stats.Date.fillna(df_email_stats.Date)
        df_email_stats.Email_Sent.fillna(0, inplace=True)
        df_email_stats.drop(['Rec_Email'], axis=1, inplace=True)
        df_email_stats=df_email_stats.merge(df1, how='left',left_on=['interaction_from','Date'], right_on=['Rec_Email', 'Date']).drop('Rec_Email', axis=1)
        df_email_stats.Sum_Email_Rec_Time.fillna(df_email_stats.Sum_Email_Sent_Time, inplace=True)
        df_email_stats.dropna(subset=['interaction_from'], inplace=True)
        df_email_stats.fillna(0, inplace=True)
        return df_email_stats    
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()     

In [None]:
try:
    def email_cleanup_aggregation(df_emails,users):
        cols = ['interaction_identifier',
                'interaction_start',
                'interaction_from',
                'interaction_to',
                'interaction_cc',
                'interaction_type',
                'interaction_source',
                'interaction_has_attachment',
                'childcount',
                'interaction_language',
                'scope_BF',
                'participant_count_BF_I',
                'size',
                'Party_ID_BF',
                'interaction_original_author_id'
                ]
        df_emails = df_emails[cols + email_policy_cols]        
        col_list = ['interaction_from', 'interaction_to', 'interaction_cc', 'Party_ID_BF', 'interaction_original_author_id']        
        for col in col_list:
            df_emails[col] = df_emails[col].astype('str').apply(lambda x: x.lower())
        bloomberg_user_mapping = pd.DataFrame(list(zip(users.Bloomberg_EUSID, users.Exchange_EUSID)), columns=['Bloomberg_EUSID', 'Exchange_EUSID'])
        bloomberg_user_mapping.dropna(inplace=True)
        df_email_clean = email_data_cleanup(df_emails)
        df_email_stats =email_data_feature_generation(df_email_clean,bloomberg_user_mapping)
        return df_email_stats    
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ": Executing Email cleaning and aggregation")
#Executing the cleaning and the aggregation script
    if 'email' in file_list and 'user' in file_list:
        df_email_stats = email_cleanup_aggregation(emails,users)
        logging.info(str(pd.datetime.now()) + ": Execution of Email cleaning and Aggregation is complete")
                     
    else:
        df_email_stats = pd.DataFrame(columns=['interaction_from', 'Date', 'Email_Sent', 'Sum_Email_Recipient','Count_Email_Recipient',
                                               'Max_NonEnglish_Emails', 'Sum_Email_Sent_Time','Count_Email_Sent_Time',
                                               'Sum_Email_Size','Count_Email_Size',
                                               'Emails_with_Attachment', 'Total_Attachment_in_Emails',
                                               'Email_Received', 'Sum_Email_Rec_Time', 'Count_Email_Rec_Time'])
        logging.info(str(pd.datetime.now()) + ": Email Cleaning and aggregation will not be processed as either Email or User file is missing or there are no rows in it")    
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()     

#### Chat data processing( filtering, cleanup, missing value treatment, feature engineering and aggregation)

In [None]:
try:
    chat_policy_cols = extract_policy_columns(chats)
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()    

In [None]:
try:
    def chat_dataset_cleanup(dataset):
        columns = list(dataset.columns)
        interaction_sources = ['Bloomberg', 'Yieldbroker', 'Reuters']    
        dataset = dataset[dataset.interaction_source.isin(interaction_sources)]
        for col in columns:
            if dataset[col].isnull().any()==True:
                if col in ('interaction_start', 'interaction_from', 'interaction_original_author_id'):
                    dataset = dataset.dropna(axis=0, subset=[col])
                if col in ('interaction_language'):
                    dataset[col]  = dataset.groupby("interaction_from")[col].transform(lambda x: x.fillna(x.mode()[0]) if not x.mode().empty else "UNK")
                if col in ('interaction_type','interaction_source'):
                    dataset[col] = dataset[col].fillna('UNK')
                else:    
                    dataset[col] = dataset[col].fillna(0)
        #dataset['chat_interaction_from']=dataset.interaction_original_author_id.map(lambda x: x.split('|')[0].strip())
        dataset = dataset[dataset.interaction_start.str.contains('T') == True]
        dataset['chat_interaction_start']=dataset.interaction_start.map(lambda x: x.split('|')[0].strip())
        dataset['chat_date'] = dataset.chat_interaction_start.map(lambda x: x.split('T')[0].strip())
        dataset['chat_hour'] = dataset.interaction_start.map(lambda x: int(x.split('T')[1].strip().split(':')[0]))
        dataset['chat_minutes'] = dataset.interaction_start.map(lambda x: int(x.split('T')[1].strip().split(':')[1]))
        dataset['chat_time'] = dataset.chat_hour*60 + dataset.chat_minutes
        dataset['chat_non_english_flag'] = dataset.interaction_language.map(lambda x: 0 if x == 'English' else 1 if pd.notnull(x) else 0)
        for col in chat_policy_cols:
            dataset[col] = dataset[col].fillna(0)
        return dataset    
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()     

In [None]:
try:
    

    def chat_data_preprocessing(chat_df):
        # Create a list of emails in To and CC with sent date
        identifier_list = chat_df['interaction_identifier'].unique()
        identifier_list_expended = []
        for identifier in identifier_list:
            for identifier1,chatid in zip(chat_df['interaction_identifier'].values,chat_df['interaction_original_author_id'].values):
                if identifier == identifier1:
                    l1 = chatid.split('|')
                    for l2 in l1:
                        identifier_list_expended.append(l2.strip()+ "~" + identifier1)

        chat_identifier=[]
        chat_id=[]
        for i in identifier_list_expended:
            l=i.split('~')
            chat_identifier.append(l[1])
            chat_id.append(l[0])      

        df_temp = pd.DataFrame(list(zip(chat_identifier, chat_id)))
        df_temp.columns = ['interaction_identifier', 'chat_id']
        df = df_temp.merge(chat_df, how='left', on='interaction_identifier')      

        bloomberg_df = df[df.interaction_source == 'Bloomberg']
        bloomberg_df=bloomberg_df[bloomberg_df['chat_id'].str.contains('@bloomberg.')]
        bloomberg_users = users[['Exchange_EUSID', 'Bloomberg_EUSID']]
        bloomberg_df = bloomberg_df.merge(bloomberg_users, how='left', left_on='chat_id', right_on='Bloomberg_EUSID')
        bloomberg_df.drop('Bloomberg_EUSID', axis=1, inplace=True)

        #fxt_df = df[df.interaction_source=='FXT']
        #fxt_users = users[['Exchange_EUSID','FXT_EUSID']]
        #fxt_df = fxt_df.merge(fxt_users, how='left', left_on='chat_id', right_on='FXT_EUSID')
        #fxt_df.drop('FXT_EUSID',axis=1,inplace=True)

        yieldbroker_df = df[df.interaction_source == 'Yieldbroker']
        yieldbroker_users = users[['Exchange_EUSID', 'Yieldbroker_EUSID']]
        yieldbroker_df = yieldbroker_df.merge(yieldbroker_users, how='left', left_on='chat_id', right_on='Yieldbroker_EUSID')
        yieldbroker_df.drop('Yieldbroker_EUSID', axis=1, inplace=True)

        thomsonreuters_df = df[df.interaction_source == 'Reuters']
        thomsonreuters_users = users[['Exchange_EUSID', 'Reuters_EUSID']]
        thomsonreuters_df = thomsonreuters_df.merge(thomsonreuters_users, how='left', left_on='chat_id', right_on='Reuters_EUSID')
        thomsonreuters_df.drop('Reuters_EUSID', axis=1, inplace=True)

        chat_df_processed = pd.concat([bloomberg_df, yieldbroker_df, thomsonreuters_df], axis=0)
        chat_df_processed.Exchange_EUSID = chat_df_processed.Exchange_EUSID.fillna(chat_df_processed.chat_id)
        chat_df_processed.drop(['chat_id'], axis=1, inplace=True)
        chat_df_processed.rename(columns={'Exchange_EUSID':'chat_id'}, inplace=True)

        return chat_df_processed
    
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()   


In [None]:
try:    
    class chat_feature_generation:   
        def window_selection(dataset,window):
            if window=='daily':
                duration_feature='chat_date'
            if window=='weekly':
                duration_feature='week_of_year'
            if window=='monthly':
                duration_feature='month_of_year'
            if window=='quarter':
                duration_feature='quarter_of_year'
            grouped_data=dataset.groupby(['chat_id', duration_feature])   
            def chat_sent(dataset):
                file=dataset.interaction_identifier.nunique().reset_index(name='counts')
                chat_sent=file.rename(columns={'counts': 'Chat_Count'})
                return chat_sent 
            def sent_size_sum_user(dataset):
                sum_chat_size=dataset['size'].sum().reset_index(name='Sum_Chat_Size')
                return sum_chat_size
            def sent_size_count_user(dataset):
                count_chat_size=dataset['size'].count().reset_index(name='Count_Chat_Size')
                return count_chat_size
            def sum_recipient_per_user(dataset):
                Sum_chat_Recipient=dataset['participant_count_BF_I'].sum().reset_index(name="Sum_chat_recipient")
                return Sum_chat_Recipient
            def count_recipient_per_user(dataset):
                Count_chat_Recipient=dataset['participant_count_BF_I'].count().reset_index(name="Count_chat_recipient")
                return Count_chat_Recipient
            def Sum_time_for_chat(dataset):
                Sum_chat_time=dataset['chat_time'].sum().reset_index(name="Sum_chat_time")
                return Sum_chat_time
            def Count_time_for_chat(dataset):
                Count_time_chat=dataset['chat_time'].count().reset_index(name="Count_chat_time")
                return Count_time_chat
            def non_english_chat_detail(dataset):
                non_eng_chat_detail=dataset.chat_non_english_flag.sum().reset_index(name="nonEnglish_chat_count")
                #non_eng_chat_detail=non_eng_chat_detail.rename(columns={'chat_non_english_flag': 'nonEnglish_chat_count'})
                for col in chat_policy_cols:
                    non_eng_chat_detail['chat_' + col]=dataset[col].mean().values
                return non_eng_chat_detail
            chat_sent=chat_sent(grouped_data)
            sent_size_sum_user=sent_size_sum_user(grouped_data)
            sent_size_count_user=sent_size_count_user(grouped_data)
            sum_recipient_per_user=sum_recipient_per_user(grouped_data)
            count_recipient_per_user=count_recipient_per_user(grouped_data)
            Sum_time_for_chat=Sum_time_for_chat(grouped_data)
            Count_time_for_chat=Count_time_for_chat(grouped_data)
            non_english_chat=non_english_chat_detail(grouped_data)
            s1 = pd.merge(chat_sent, sent_size_sum_user, how='inner', on=['chat_id',duration_feature])
            dataframe_list=[sent_size_count_user,
            sum_recipient_per_user,
            count_recipient_per_user,
            Sum_time_for_chat,
            Count_time_for_chat,
            non_english_chat]
            for i in dataframe_list:
                s1=pd.merge(s1, i, how='inner', on=['chat_id',duration_feature])
            return s1        
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()         

In [None]:
try:    
    def chat_email_aggregation(chats):
        cols = ['interaction_identifier',
        'interaction_start',
        'interaction_from',
        'interaction_to',
        'interaction_original_author_id',
        'interaction_language',
        'interaction_type',
        'interaction_source',
        'size',
        'participant_count_BF_I'
        ]
        #chats = chats[cols]        
        col_list = ['interaction_from', 'interaction_to', 'interaction_original_author_id']        
        for col in col_list:
            chats[col] = chats[col].astype('str').apply(lambda x: x.lower())
        chat_df=chat_dataset_cleanup(chats)
        chat_cols = ['chat_date', 'chat_time', 'chat_non_english_flag', 'participant_count_BF_I', 'size', 'interaction_source', 'interaction_type', 'interaction_language', 'interaction_identifier', 'interaction_original_author_id'] + chat_policy_cols
        chat_df=chat_df[chat_cols]
        chat_df_processed = chat_data_preprocessing(chat_df)
        df_chat_stats=chat_feature_generation.window_selection(chat_df_processed,'daily')
        return df_chat_stats    
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ": Executing Chat cleaning and aggregation")
    #Executing the cleaning and the aggregation script
    if 'chat' in file_list and 'user' in file_list:
        df_chat_stats = chat_email_aggregation(chats)
        logging.info(str(pd.datetime.now()) + ": Execution of Chat cleaning and aggregation is complete")
    else:
        df_chat_stats = pd.DataFrame(columns=['chat_id', 'chat_date', 'Chat_Count', 'Sum_Chat_Size',
                                               'Count_Chat_Size', 'Sum_chat_recipient', 'Count_chat_recipient',
                                               'Sum_chat_time', 'Count_chat_time', 'nonEnglish_chat_count'])
        logging.info(str(pd.datetime.now()) + ": Chat Cleaning and aggregation will not be processed as either Chat or User file is missing or there are no rows in it")        
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()        

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ": Merging Email and Chat data")
    df_communication_stats = df_email_stats.merge(df_chat_stats, how='outer', left_on=['interaction_from','Date'], right_on=['chat_id','chat_date'])
    df_communication_stats.interaction_from = df_communication_stats.interaction_from.fillna(df_communication_stats.chat_id)
    df_communication_stats.Date = df_communication_stats.Date.fillna(df_communication_stats.chat_date)
    df_communication_stats.drop(['chat_id','chat_date'], axis=1, inplace=True)
    df_communication_stats = df_communication_stats.fillna(0)
    logging.info(str(pd.datetime.now()) + ": Merging of Email and Chat data is complete")
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

#### Audio Data Processing ,cleanup and feature generation 

In [None]:
try:
    audio_policy_cols = extract_policy_columns(audio)
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()    

In [None]:
try:   
    def audio_dataset_cleanup(dataset):
        columns = list(dataset.columns)
        interaction_sources = ['NiceTradingRecording', 'NTRExport', 'NIM', 'Avaya', 'Verint', 'InterCall']
        dataset = dataset[dataset.interaction_source.isin(interaction_sources)]
        for col in columns:
            if dataset[col].isnull().any()==True:
                if col in ('interaction_start', 'interaction_original_author_id'):
                    dataset = dataset.dropna(axis=0, subset=[col])
                if col in ('interaction_direction'):
                    dataset[col] = dataset[col].fillna('Outgoing')
                if col in ('interaction_language'):
                    dataset[col]  = dataset.groupby("interaction_from")[col].transform(lambda x: x.fillna(x.mode()[0]) if not x.mode().empty else "UNK")
                if col in ('interaction_type', 'interaction_source'):
                    dataset[col] = dataset[col].fillna('UNK')
                else:    
                    dataset[col] = dataset[col].fillna(0)
        dataset = dataset[(dataset.interaction_start.str.contains('T') == True)]
        dataset['audio_interaction_start']=dataset.interaction_start.map(lambda x: x.split('|')[0].strip() if x.find('|') != -1 else x.strip())
        dataset['audio_date']=dataset.audio_interaction_start.map(lambda x: x.split('T')[0].strip())
        dataset['audio_hour']=dataset.interaction_start.map(lambda x: int(x.split('T')[1].strip().split(':')[0]))
        dataset['audio_minutes']=dataset.interaction_start.map(lambda x: int(x.split('T')[1].strip().split(':')[1]))
        dataset['audio_time']=dataset.audio_hour*60 + dataset.audio_minutes
        dataset['audio_Non_English_flag']=dataset.interaction_language.map(lambda x: 0 if x in ('English','International English') else 1 if pd.notnull(x) else 0)
        #Seperating the original_author_id for Outgoing Interaction direction
        outgoing_audio_data = dataset[(dataset['interaction_direction'] == 'Outgoing') | (dataset['interaction_direction'] == 'Internal')]
        outgoing_audio_data = outgoing_audio_data[['interaction_sourceid', 'interaction_original_author_id']]
        #outgoing_audio_data = outgoing_audio_data[outgoing_audio_data.interaction_original_author_id.str.contains('\|') == True]
        outgoing_audio_data['interaction_from']=outgoing_audio_data.interaction_original_author_id.map(lambda x: x.split('|').pop(0) if x.find('|') != -1 else x)
        outgoing_audio_data['interaction_to']=outgoing_audio_data.interaction_original_author_id.map(lambda x: x.split('|')[1:len(x.split('|'))] if x.find('|') != -1 else 'UNK')
        outgoing_audio_data['interaction_to']=outgoing_audio_data.interaction_to.map(lambda x: "~".join(a for a in x) if x != 'UNK' else 'UNK')
        incoming_audio_data = dataset[dataset['interaction_direction'] == 'Incoming']
        incoming_audio_data = incoming_audio_data[['interaction_sourceid', 'interaction_original_author_id']]
        incoming_audio_data['interaction_from']='UNK'
        #incoming_audio_data = incoming_audio_data[incoming_audio_data.interaction_original_author_id.str.contains('\|') == True]
        incoming_audio_data['interaction_to']=incoming_audio_data.interaction_original_author_id.map(lambda x: x.split('|')[0:len(x.split('|'))] if x.find('|') != -1 else 'UNK')
        incoming_audio_data['interaction_to']=incoming_audio_data.interaction_to.map(lambda x: "~".join(a for a in x) if x != 'UNK' else 'UNK')
        audio_dataset=pd.concat([outgoing_audio_data, incoming_audio_data])
        dataset=pd.merge(dataset, audio_dataset, left_on='interaction_sourceid', right_on='interaction_sourceid')
        del dataset['interaction_from_x']
        del dataset['interaction_to_x']
        del dataset['interaction_original_author_id_x']
        dataset=dataset.rename(columns={'interaction_from_y': 'interaction_from', 'interaction_original_author_id_y' : 'interaction_original_author_id','interaction_to_y':'interaction_to'})
        return dataset    
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()   

In [None]:
try:    
    def audio_from_user_mapping(dataset):
        #users.Avaya_EUSID=users.Avaya_EUSID.astype('str')
        try:
            NTR_df = dataset[dataset.interaction_source == 'NiceTradingRecording']
            NTR_users = users[['Exchange_EUSID', 'NiceTradingRecording_EUSID']]
            NTR_df = NTR_df.merge(NTR_users, how='left', left_on='interaction_from', right_on='NiceTradingRecording_EUSID')
            NTR_df.drop('NiceTradingRecording_EUSID', axis=1, inplace=True)
        except:
            print("NiceTradingRecording_EUSID is missing in user file")
            #NTR_df = NTR_df.merge(NTR_users, how='left', left_on='interaction_to', right_on='NiceTradingRecording_EUSID')
            #NTR_df.drop('NiceTradingRecording_EUSID',axis=1,inplace=True)
        try:
            Avaya_df = dataset[dataset.interaction_source == 'Avaya']
            Avaya_users = users[['Exchange_EUSID', 'Avaya_EUSID']]
            Avaya_users.Avaya_EUSID = Avaya_users.Avaya_EUSID.astype('str')
            Avaya_df.interaction_from = Avaya_df.interaction_from.astype('str')
            Avaya_df = Avaya_df.merge(Avaya_users, how='left', left_on='interaction_from', right_on='Avaya_EUSID')
            Avaya_df.drop('Avaya_EUSID', axis=1, inplace=True)
        except:
            print("Avaya_EUSID is missing in user file")
        try:    
            NTRExport_df = dataset[dataset.interaction_source == 'NTRExport']
            NTRExport_users = users[['Exchange_EUSID', 'NTRExport_EUSID']]
            NTRExport_df = NTRExport_df.merge(NTRExport_users, how='left', left_on='interaction_from', right_on='NTRExport_EUSID')
            NTRExport_df.drop('NTRExport_EUSID', axis=1, inplace=True)
        except:
            print("NTRExport_EUSID is missing in user file")        
        try:    
            NIM_df = dataset[dataset.interaction_source == 'NIM']
            NIM_users = users[['Exchange_EUSID', 'NIM_EUSID']]
            NIM_df = NIM_df.merge(NIM_users, how='left', left_on='interaction_from', right_on='NIM_EUSID')
            NIM_df.drop('NIM_EUSID', axis=1, inplace=True)
        except:
            print("NIM_EUSID is missing in user file")    
        try:
            Verint_df = dataset[dataset.interaction_source == 'Verint']
            Verint_users = users[['Exchange_EUSID', 'Verint_EUSID']]
            Verint_df = Verint_df.merge(Verint_users, how='left', left_on='interaction_from', right_on='Verint_EUSID')
            Verint_df.drop('Verint_EUSID', axis=1, inplace=True)    
        except:
            print("Verint_EUSID is missing in user file") 
        try:
            Intercall_df = dataset[dataset.interaction_source == 'InterCall']
            Intercall_users = users[['Exchange_EUSID', 'Intercall_EUSID']]
            Intercall_df = Intercall_df.merge(Intercall_users, how='left', left_on='interaction_from', right_on='Intercall_EUSID')
            Intercall_df.drop('Intercall_EUSID', axis=1, inplace=True)    
        except:
            print("Intercall_EUSID is missing in user file") 
            #Merging the datasets from different sources into one
            #dataset = pd.concat([NTR_df,Avaya_df,Folder_df,NIM_df], axis=0)
        dataset = pd.concat([NTR_df, NTRExport_df, Avaya_df, NIM_df, Verint_df, Intercall_df], axis=0)
        dataset.Exchange_EUSID = dataset.Exchange_EUSID.fillna(dataset.interaction_from)
            #dataset.Exchange_EUSID_y = dataset.Exchange_EUSID_y.fillna(dataset.interaction_to)
        del dataset['interaction_from']
        del dataset['interaction_to']
        dataset=dataset.rename(columns={'Exchange_EUSID': 'interaction_from'})#,'Exchange_EUSID_y' : 'interaction_to'})
        return dataset    
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()    


In [None]:
try:
    class audio_sent_feature_generation:
        def window_selection(dataset, window):
            if window == 'daily':
                duration_feature='audio_date'
            if window == 'weekly':
                duration_feature='week_of_year'
            if window == 'monthly':
                duration_feature='month_of_year'
            if window == 'quarter':
                duration_feature = 'quarter_of_year'
            dataset = dataset.rename(columns={'interaction_from' : 'users'})
            grouped_data = dataset.groupby(['users', duration_feature])
            def call_sent(dataset):
                call_sent = dataset.interaction_sourceid.nunique().reset_index(name='Call_Made')
                return call_sent
            def sum_call_sent_duration(dataset):
                sum_call_sent_duration = dataset['interaction_duration_int'].sum().reset_index(name="sum_call_sent_duration")
                return sum_call_sent_duration
            def count_call_sent_duration(dataset):
                count_call_sent_duration = dataset['interaction_duration_int'].count().reset_index(name="count_call_sent_duration")
                return count_call_sent_duration
            def sum_call_sent_paricipants(dataset):
                sum_call_sent_paricipants = dataset['participant_count_BF_I'].sum().reset_index(name="sum_call_sent_paricipants")
                return sum_call_sent_paricipants
            def count_call_sent_paricipants(dataset):
                count_call_sent_paricipants = dataset['participant_count_BF_I'].count().reset_index(name="count_call_sent_paricipants")
                return count_call_sent_paricipants
            def sum_call_sent_time(dataset):
                sum_call_sent_time = dataset['audio_time'].sum().reset_index(name="sum_call_sent_time")
                return sum_call_sent_time
            def count_call_sent_time(dataset):
                count_call_sent_time = dataset['audio_time'].count().reset_index(name="count_call_sent_time")
                return count_call_sent_time            
            def non_english_made_call(dataset):
                non_english_made_call = dataset.audio_Non_English_flag.sum().reset_index(name="NonEnglish_Call_Made")
                for col in audio_policy_cols:
                    non_english_made_call['voice_' + col] = dataset[col].mean().values                
                return non_english_made_call
            call_sent = call_sent(grouped_data)
            sum_call_sent_duration = sum_call_sent_duration(grouped_data)
            count_call_sent_duration = count_call_sent_duration(grouped_data)
            sum_call_sent_paricipants = sum_call_sent_paricipants(grouped_data)
            count_call_sent_paricipants = count_call_sent_paricipants(grouped_data)
            sum_call_sent_time = sum_call_sent_time(grouped_data)
            count_call_sent_time = count_call_sent_time(grouped_data)
            non_english_made_call = non_english_made_call(grouped_data)
            sent_feature = pd.merge(call_sent, sum_call_sent_duration, how='outer', on=['users',duration_feature])
            dataframe_list = [count_call_sent_duration,
                            sum_call_sent_paricipants,
                            count_call_sent_paricipants,
                            sum_call_sent_time,
                            count_call_sent_time,
                            non_english_made_call]
                            #non_english_recieve_call]
            for i in dataframe_list:
                sent_feature = pd.merge(sent_feature, i, how='outer', on=['users', duration_feature])
            return sent_feature         
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:   
    def splitting_audio_recieved_users(dataset):
        recieve_data = dataset[['interaction_sourceid', 'interaction_to']]
        df_interaction_to = recieve_data.interaction_to.str.split('~', expand=True)
        recieved_names_df = pd.DataFrame(columns=['interaction_sourceid', 'interaction_to'])
        list1 = df_interaction_to.values.tolist()
        for index,value in enumerate(list1):
            a = list(recieve_data.iloc[index:index+1,0])
            b = list(filter(None, list1[index]))
            index = pd.MultiIndex.from_product([a, b], names=['interaction_sourceid', 'interaction_to'])
            names_df = pd.DataFrame(index = index).reset_index()
            recieved_names_df = pd.concat([recieved_names_df, names_df], axis=0)
            recieved_names_df.reset_index(drop=True)
        del dataset['interaction_to']
        dataset=pd.merge(dataset, recieved_names_df, left_on='interaction_sourceid', right_on='interaction_sourceid')
        #dataset=dataset[['interaction_sourceid','interaction_to','audio_date','interaction_direction','audio_time','audio_Non_English_flag','participant_count_BF_I','interaction_duration_int','interaction_source','interaction_language']]
        #dataset=pd.merge(dataset,audio_dataset,left_on='interaction_sourceid', right_on='interaction_sourceid')
        return dataset
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:
    def audio_to_user_mapping(dataset):
        #users['Avaya_EUSID']=users['Avaya_EUSID'].astype('str')
        try:
            NTR_df = dataset[dataset.interaction_source == 'NiceTradingRecording']
            NTR_users = users[['Exchange_EUSID', 'NiceTradingRecording_EUSID']]
            #NTR_df = NTR_df.merge(NTR_users, how='left', left_on='interaction_from', right_on='NiceTradingRecording_EUSID')
            #NTR_df.drop('NiceTradingRecording_EUSID',axis=1,inplace=True)
            NTR_df = NTR_df.merge(NTR_users, how='left', left_on='interaction_to', right_on='NiceTradingRecording_EUSID')
            NTR_df.drop('NiceTradingRecording_EUSID',axis=1,inplace=True)
        except:
            print("NiceTradingRecording_EUSID is missing in user file")
        try:
            Avaya_df = dataset[dataset.interaction_source=='Avaya']
            Avaya_users = users[['Exchange_EUSID', 'Avaya_EUSID']]
            Avaya_users.Avaya_EUSID = Avaya_users.Avaya_EUSID.astype('str')
            Avaya_df.interaction_to = Avaya_df.interaction_to.astype('str')
            Avaya_df = Avaya_df.merge(Avaya_users, how='left', left_on='interaction_to', right_on='Avaya_EUSID')
            Avaya_df.drop('Avaya_EUSID', axis=1, inplace=True)
        except:
            print("Avaya_EUSID is missing in user file")
        try:    
            NTRExport_df = dataset[dataset.interaction_source == 'NTRExport']
            NTRExport_users = users[['Exchange_EUSID', 'NTRExport_EUSID']]
            NTRExport_df = NTRExport_df.merge(NTRExport_users, how='left', left_on='interaction_from', right_on='NTRExport_EUSID')
            NTRExport_df.drop('NTRExport_EUSID', axis=1, inplace=True)
        except:
            print("NTRExport_EUSID is missing in user file")            
        try:
            NIM_df = dataset[dataset.interaction_source == 'NIM']
            NIM_users = users[['Exchange_EUSID', 'NIM_EUSID']]
            NIM_df = NIM_df.merge(NIM_users, how='left', left_on='interaction_to', right_on='NIM_EUSID')
            NIM_df.drop('NIM_EUSID', axis=1, inplace=True)
        except:
            print("NIM_EUSID is missing in user file")
        try:
            Verint_df = dataset[dataset.interaction_source == 'Verint']
            Verint_users = users[['Exchange_EUSID', 'Verint_EUSID']]
            Verint_df = Verint_df.merge(Verint_users, how='left', left_on='interaction_to', right_on='Verint_EUSID')
            Verint_df.drop('Verint_EUSID', axis=1, inplace=True)
        except:
            print("Verint_EUSID is missing in user file")
        try:
            Intercall_df = dataset[dataset.interaction_source == 'InterCall']
            Intercall_users = users[['Exchange_EUSID', 'Intercall_EUSID']]
            Intercall_df = Intercall_df.merge(Intercall_users, how='left',left_on='interaction_to', right_on='Intercall_EUSID')
            Intercall_df.drop('Intercall_EUSID', axis=1, inplace=True)   
        except:
            print("Intercall_EUSID is missing in user file")
        #Merging the datasets from different sources into one
        #dataset = pd.concat([NTR_df,Avaya_df,Folder_df,NIM_df], axis=0)
        dataset = pd.concat([NTR_df, NTRExport_df, Avaya_df, NIM_df, Verint_df, Intercall_df], axis=0)
        dataset.Exchange_EUSID = dataset.Exchange_EUSID.fillna(dataset.interaction_to)
        #dataset.Exchange_EUSID_y = dataset.Exchange_EUSID_y.fillna(dataset.interaction_to)
        del dataset['interaction_from']
        del dataset['interaction_to']
        dataset=dataset.rename(columns={'Exchange_EUSID': 'interaction_to'})#,'Exchange_EUSID_y' : 'interaction_to'})
        return dataset    
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:
    class audio_recieve_feature_generation:
        def window_selection(dataset,window):
            if window == 'daily':
                duration_feature = 'audio_date'
            if window == 'weekly':
                duration_feature = 'week_of_year'
            if window == 'monthly':
                duration_feature='month_of_year'
            if window == 'quarter':
                duration_feature = 'quarter_of_year'
            dataset=dataset.rename(columns={'interaction_to':'users'})  
            grouped_data=dataset.groupby(['users', duration_feature])
            def call_recieved(dataset):
                call_recieved=dataset.interaction_sourceid.nunique().reset_index(name='Call_Recieved')
                return call_recieved
            def sum_call_recieve_duration(dataset):
                sum_call_recieve_duration=dataset['interaction_duration_int'].sum().reset_index(name="sum_call_recieve_duration")
                return sum_call_recieve_duration
            def count_call_recieve_duration(dataset):
                count_call_recieve_duration=dataset['interaction_duration_int'].count().reset_index(name="count_call_recieve_duration")
                return count_call_recieve_duration
            def sum_call_recieve_paricipants(dataset):
                sum_call_recieve_paricipants=dataset['participant_count_BF_I'].sum().reset_index(name="sum_call_recieve_paricipants")
                return sum_call_recieve_paricipants
            def count_call_recieve_paricipants(dataset):
                count_call_recieve_paricipants=dataset['participant_count_BF_I'].count().reset_index(name="count_call_recieve_paricipants")
                return count_call_recieve_paricipants
            def sum_call_recieve_time(dataset):
                sum_call_recieve_time=dataset['audio_time'].sum().reset_index(name="sum_call_recieve_time")
                return sum_call_recieve_time
            def count_call_recieve_time(dataset):
                count_call_recieve_time=dataset['audio_time'].count().reset_index(name="count_call_recieve_time")
                return count_call_recieve_time
            def non_english_recieve_call(dataset):
                non_english_recieve_call=dataset.audio_Non_English_flag.sum().reset_index(name="NonEnglish_Call_Receive")
                return non_english_recieve_call     
            #call_sent=call_sent(dataset,duration_feature)
            call_recieved = call_recieved(grouped_data)
            sum_call_recieve_duration = sum_call_recieve_duration(grouped_data)
            count_call_recieve_duration = count_call_recieve_duration(grouped_data)
            sum_call_recieve_paricipants = sum_call_recieve_paricipants(grouped_data)
            count_call_recieve_paricipants = count_call_recieve_paricipants(grouped_data)
            sum_call_recieve_time = sum_call_recieve_time(grouped_data)
            count_call_recieve_time = count_call_recieve_time(grouped_data)
            non_english_recieve_call = non_english_recieve_call(grouped_data)
            recieve_feature = pd.merge(call_recieved,sum_call_recieve_duration, how='outer', on=['users',duration_feature])
            dataframe_list = [count_call_recieve_duration,
                            sum_call_recieve_paricipants,
                            count_call_recieve_paricipants,
                            sum_call_recieve_time,
                            count_call_recieve_time,
                            non_english_recieve_call]
            for i in dataframe_list:
                recieve_feature=pd.merge(recieve_feature, i, how='outer', on=['users',duration_feature])
            return recieve_feature        
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:    
    def voice_chat_email_aggregation(audio):        
        col_list = ['interaction_from', 'interaction_to', 'interaction_original_author_id', 'interaction_author']        
        for col in col_list:
            audio[col] = audio[col].astype('str').apply(lambda x: x.lower())
        audio_df = audio_dataset_cleanup(audio)
        sent_audio_df = audio_from_user_mapping(audio_df)
        df_sent_audio_stats = audio_sent_feature_generation.window_selection(sent_audio_df,'daily')
        to_audio_df = splitting_audio_recieved_users(audio_df)
        recieve_audio_df = audio_to_user_mapping(to_audio_df)
        df_recieve_audio_stats = audio_recieve_feature_generation.window_selection(recieve_audio_df,'daily')
        df_audio_stats = df_sent_audio_stats.merge(df_recieve_audio_stats, how='outer', left_on=['users', 'audio_date'], right_on=['users', 'audio_date'])
        #df_audio_stats = df_audio_stats['users'].astype('object')
        #df_audio_stats = df_audio_stats['audio_date'].astype('object')
        df_audio_stats = df_audio_stats[df_audio_stats['users'] != 'UNK']
        df_audio_stats = df_audio_stats.fillna(0)
        return df_audio_stats    
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()   

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ": Voice data cleaning and Aggregation in progress")
    if 'voice' in file_list and 'user' in file_list:
        df_audio_stats = voice_chat_email_aggregation(audio)
        logging.info(str(pd.datetime.now()) + ": Voice data cleaning and Aggregation is complete")
    else:
        df_audio_stats = pd.DataFrame(columns=['users', 'audio_date', 'Call_Made', 'sum_call_sent_duration',
                                               'count_call_sent_duration', 'sum_call_sent_paricipants',
                                               'count_call_sent_paricipants', 'sum_call_sent_time',
                                               'count_call_sent_time', 'NonEnglish_Call_Made', 'Call_Recieved',
                                               'sum_call_recieve_duration', 'count_call_recieve_duration',
                                               'sum_call_recieve_paricipants', 'count_call_recieve_paricipants',
                                               'sum_call_recieve_time', 'count_call_recieve_time',
                                               'NonEnglish_Call_Receive'] , dtype='object')
        logging.info(str(pd.datetime.now()) + ": Voice Cleaning and aggregation will not be processed as either Voice or User file is missing or there are no rows in it")        
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:    
    logging.info(str(pd.datetime.now()) + ": Combining Email, chat and voice data")
    df_communication_stats = df_communication_stats.merge(df_audio_stats, how='outer', left_on=['interaction_from','Date'], right_on=['users','audio_date'])
    df_communication_stats.interaction_from = df_communication_stats.interaction_from.fillna(df_communication_stats.users)
    df_communication_stats.Date = df_communication_stats.Date.fillna(df_communication_stats.audio_date)
    df_communication_stats.drop(['users', 'audio_date'], axis=1, inplace=True)
    df_communication_stats.dropna(subset=['interaction_from'], inplace=True)
    df_communication_stats = df_communication_stats[df_communication_stats['interaction_from']!='']
    df_communication_stats = df_communication_stats.fillna(0)
    logging.info(str(pd.datetime.now()) + ": Email, chat and voice combined successfully")
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ": Merging observe policy columns for email chat and voice")
    for i, col in enumerate(list(set(email_policy_cols + chat_policy_cols + audio_policy_cols))):
        f_col = []
        for col1 in df_communication_stats.columns:
            if (col == col1[5:] or col == col1[6:]):
                f_col.append(col1)
        df_communication_stats[col] = df_communication_stats[f_col].sum(axis=1)
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ": Droping observe policy related unwanted columns ")
    df_communication_stats.drop([col for col in df_communication_stats.columns if col.startswith("email_interaction_risk_") or col.startswith("chat_interaction_risk_") or col.startswith("voice_interaction_risk_")], axis=1, inplace=True)
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ": Merging observe policy columns for email chat and voice")
    for i, col in enumerate(list(set(email_policy_cols + chat_policy_cols + audio_policy_cols))):
        f_col = []
        for col1 in df_communication_stats.columns:
            if (col == col1[5:] or col == col1[6:]):
                f_col.append(col1)
        df_communication_stats[col] = df_communication_stats[f_col].sum(axis=1)
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ": Droping observe policy related unwanted columns ")
    df_communication_stats.drop([col for col in df_communication_stats.columns if col.startswith("email_interaction_risk_") or col.startswith("chat_interaction_risk_") or col.startswith("voice_interaction_risk_")], axis=1, inplace=True)
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()

In [None]:
def encrypt(col):
    col = col.astype(str)
    df_col = pd.DataFrame(col)
    encrypted_col = df_col.applymap(lambda x: f_key.encrypt(x.encode('utf-8')))
    return encrypted_col

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ": Transfer the aggregated data files to S3")
    if df_communication_stats.shape[0] > 0:        
        df_communication_stats_encrypted = encrypt(df_communication_stats['interaction_from'])
        df_communication_stats.interaction_from = df_communication_stats_encrypted
        bytes_to_write = df_communication_stats.to_csv(index=False).encode()
        start_date = datetime.strptime(min(df_communication_stats['Date']), '%Y-%m-%d').strftime('%Y-%m-%d')
        end_date = datetime.strptime(max(df_communication_stats['Date']), '%Y-%m-%d').strftime('%Y-%m-%d')
        processed_date = datetime.now().date().strftime('%Y-%m-%d')
        processed_file = processed_path + '/Input_Data_Aggregated_' + processed_date + '|' + start_date + '|' + end_date +'.csv'
        s3 = s3fs.S3FileSystem(anon=False)
        with s3.open(processed_file, 'wb') as f:
            f.write(bytes_to_write)
    else:
        logging.info(str(pd.datetime.now()) + ": Warning- No data to write")        
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()   

In [None]:
try:
    logging.info(str(pd.datetime.now()) + ": Deleting the source data files from S3")
    path_list = [email_file_path,chat_file_path,audio_file_path]
    for path in path_list:
        try:
            fs.rm(path, recursive=True)
        except:
            logging.info(str(pd.datetime.now()) + ': ' + path.split('/')[6] + ' file not found')            
    logging.info(str(pd.datetime.now()) + ": File deletion Complete from S3")    
except:   
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))
    log_file_to_s3()
    shutdown_notebook_instance()     

In [None]:
try:
    log_file_to_s3()
except:
    logging.info(str(pd.datetime.now()) + ': ERROR- ' + str(sys.exc_info()))

In [None]:
try:
    !find . -maxdepth 1 -type f -name '*DataAggregationLog*' -not -name "*$t*" -delete
except:
    pass

In [None]:
try:
    shutdown_notebook_instance()
except:
    pass