In [1]:
import botocore.session as s
from botocore.exceptions import ClientError
import boto3.session
import json
import boto3
import sagemaker
import operator
from botocore.exceptions import WaiterError
from botocore.waiter import WaiterModel
from botocore.waiter import create_waiter_with_client

import s3fs
import time
import os
import random
import datetime

import pandas as pd
import numpy as np

### Define credentials to axcess redshift data warehouse

In [4]:
ssm = boto3.client('ssm')

# Expected output of get_parameters
parameters = {"host":"10.13.2.27",
             "port":5439,
             "db":"prd",
             "user":"dwuser",
             "password":"DataLakea1012!"}

region = boto3.session.Session().region_name

bc_session = s.get_session()
session = boto3.Session(botocore_session =  bc_session,
                        region_name = region)
session

### Execute SQL Query

In [6]:
client_redshift = session.client('redshift-data')

sql_statement = '''
with initial_pass as (
select 
date(new_dateandtimeofcall) as call_date,
extract(month from date(new_dateandtimeofcall)) as call_month,
new_dateandtimeofcall as call_time, 
new_callagentnameyominame as agent_name, 
new_categoryname as category_name,
new_calldirectonname as call_direction, 
new_policyno as policy_number, 
new_subcategory1name as subcategory1, 
new_subcategory2name as subcategory2,
new_telephonenumber as customer_phone, 
new_cscremarks as manual_notes1, 
new_customerdetails as manual_notes2
from dl_crm.filterednew_calllog
where call_date > '2021-01-01' and subcategory1 in ('Other Changes/Endorsement', 'Cancellation','Self Help - Assist', 'Self Help – Direct'))
select * from initial_pass
'''

execute_statement = client_redshift.execute_statement(ClusterIdentifier = 'fwd-prod-inst',
                                                      Database = parameters['db'],
                                                      DbUser = parameters['user'],
                                                      Sql = sql_statement) # Execute the SQL statement

# If the statement isn't done yet, then wait for a while before checking again.
query_id = execute_statement['Id']
query_description = client_redshift.describe_statement(Id = query_id)

while query_description['Status'] != 'FINISHED':
    if 'Error' in query_description:
        break
    time.sleep(20) # Check again in 20 seconds
    query_description = client_redshift.describe_statement(Id = query_id)
        
output = client_redshift.get_statement_result(Id = execute_statement['Id']) # Get the results from ID of the executed SQL
output

{'ColumnMetadata': [{'isCaseSensitive': False,
   'isCurrency': False,
   'isSigned': False,
   'label': 'call_date',
   'length': 0,
   'name': 'call_date',
   'nullable': 1,
   'precision': 13,
   'scale': 0,
   'schemaName': '',
   'tableName': '',
   'typeName': 'date'},
  {'isCaseSensitive': False,
   'isCurrency': False,
   'isSigned': True,
   'label': 'call_month',
   'length': 0,
   'name': 'call_month',
   'nullable': 1,
   'precision': 10,
   'scale': 0,
   'schemaName': '',
   'tableName': '',
   'typeName': 'int4'},
  {'isCaseSensitive': False,
   'isCurrency': False,
   'isSigned': False,
   'label': 'call_time',
   'length': 0,
   'name': 'call_time',
   'nullable': 1,
   'precision': 29,
   'scale': 6,
   'schemaName': 'pg_temp_9',
   'tableName': 'prd_dl_crm_filterednew_calllog_5d6883024b096',
   'typeName': 'timestamp'},
  {'isCaseSensitive': True,
   'isCurrency': False,
   'isSigned': False,
   'label': 'agent_name',
   'length': 0,
   'name': 'agent_name',
   'null

### Extract Data in JSON Format

In [7]:
# nrows = output['TotalNumRows']
ncols = len(output['ColumnMetadata'])

col_labels = [output['ColumnMetadata'][i]['label'] for i in range(ncols)]
types = [output['ColumnMetadata'][i]['typeName'] for i in range(ncols)] # The schema type of each column

# Write down the columns that should be used with long or string. 
type_dictionary = {
    'bigserial': 'longValue',
    'bool':'stringValue',
    'int4': 'longValue',
    'int8': 'longValue',
    'numeric': 'longValue',
    'timestamp': 'stringValue',
    'varchar': 'stringValue'
}
types = list(map(type_dictionary.get, types))

df = pd.DataFrame(np.array(output['Records']), columns = col_labels)
df

Unnamed: 0,call_date,call_month,call_time,agent_name,category_name,call_direction,policy_number,subcategory1,subcategory2,customer_phone,manual_notes1,manual_notes2
0,{'stringValue': '2021-11-26'},{'longValue': 11},{'stringValue': '2021-11-26 16:46:36'},{'stringValue': 'Go Irish Dianne Vinaviles'},{'stringValue': 'Service'},{'stringValue': 'Inbound'},{'stringValue': 'PNTR2021-00007327'},{'stringValue': 'Self Help - Assist'},{'stringValue': 'Cancellation'},{'stringValue': 'Unknown'},{'stringValue': ''},{'stringValue': 'ADv cannot add in but may can...
1,{'stringValue': '2021-11-26'},{'longValue': 11},{'stringValue': '2021-11-26 16:06:29'},{'stringValue': 'Go Irish Dianne Vinaviles'},{'stringValue': 'Service'},{'stringValue': 'Inbound'},"{'stringValue': 'PNTR2021-00008804, PNTC2021-0...",{'stringValue': 'Self Help - Assist'},{'stringValue': 'Cancellation'},{'stringValue': '84313773'},{'stringValue': ''},{'stringValue': 'PH supposed to go for company...
2,{'stringValue': '2021-11-27'},{'longValue': 11},{'stringValue': '2021-11-27 12:30:40'},{'stringValue': 'Nur Azlina Binte Abdul Rahman'},{'stringValue': 'Service'},{'stringValue': 'Inbound'},{'stringValue': ''},{'stringValue': 'Self Help – Direct'},{'stringValue': 'POI Move'},{'stringValue': '97738346'},{'stringValue': ''},{'stringValue': 'Cm asked if can move the star...
3,{'stringValue': '2021-11-27'},{'longValue': 11},{'stringValue': '2021-11-27 10:11:45'},{'stringValue': 'Nur Azlina Binte Abdul Rahman'},{'stringValue': 'Service'},{'stringValue': 'Inbound'},{'stringValue': 'PNTR2021-00009153 & PNTC2021-...,{'stringValue': 'Self Help – Direct'},{'stringValue': 'POI extension'},{'stringValue': '86205312'},{'stringValue': ''},{'stringValue': 'End date to be extended to 3 ...
4,{'stringValue': '2021-11-29'},{'longValue': 11},{'stringValue': '2021-11-29 11:40:12'},{'stringValue': 'Tracy Hwang Poh Sim'},{'stringValue': 'Service'},{'stringValue': 'Inbound'},{'stringValue': 'PNTR2021-00011604'},{'stringValue': 'Self Help - Assist'},{'stringValue': 'Cancellation'},{'stringValue': '81271736'},{'stringValue': ''},{'stringValue': 'Adv her trip is more than 30 ...
...,...,...,...,...,...,...,...,...,...,...,...,...
4530,{'stringValue': '2021-12-02'},{'longValue': 12},{'stringValue': '2021-12-02 10:09:00'},{'stringValue': 'Ethan Lua Wei Jun'},{'stringValue': 'Service'},{'stringValue': 'Outbound'},{'stringValue': ''},{'stringValue': 'Other Changes/Endorsement'},{'stringValue': 'Others'},{'stringValue': '94233484'},{'stringValue': 'Settled as PH called in ady'},{'stringValue': 'From: Anna Chua <annachua37@y...
4531,{'stringValue': '2021-12-02'},{'longValue': 12},{'stringValue': '2021-12-02 16:45:58'},{'stringValue': 'Muhammad Ridzwan Bin Yusri'},{'stringValue': 'Service'},{'stringValue': 'Inbound'},{'stringValue': ''},{'stringValue': 'Self Help – Direct'},{'stringValue': 'Claim Submission'},{'stringValue': '63533564'},{'stringValue': ''},{'stringValue': 'submit online or app or email...
4532,{'stringValue': '2021-12-02'},{'longValue': 12},{'stringValue': '2021-12-02 17:08:10'},{'stringValue': 'Tracy Hwang Poh Sim'},{'stringValue': 'Service'},{'stringValue': 'Inbound'},{'stringValue': 'PNTR2021-00012200 & PNTC2021-...,{'stringValue': 'Self Help - Assist'},{'stringValue': 'Cancellation'},{'stringValue': '91175713'},{'stringValue': ''},{'stringValue': 'Adv her to purchase a new one...
4533,{'stringValue': '2021-12-02'},{'longValue': 12},{'stringValue': '2021-12-02 13:33:00'},{'stringValue': 'Go Irish Dianne Vinaviles'},{'stringValue': 'Service'},{'stringValue': 'Outbound'},"{'stringValue': 'PNTR2021-00005203, PNTC2021-0...",{'stringValue': 'Self Help - Assist'},{'stringValue': 'POI Move'},{'stringValue': '98316175'},{'stringValue': ''},{'stringValue': 'Called and verified. Adv that...


### Extract embedded data into dataframe

In [8]:
%%time
# Create a utility function so that it attempts to query longValue or stringValue first, failing which it'll use isNull.
def utility_query(data, value):
    try:
        result = data[value]
    except:
        result = None
    return result

for col, value in zip(df.columns, types):
    df[col] = df[col].apply(lambda x: utility_query(x, value))

CPU times: user 58.1 ms, sys: 0 ns, total: 58.1 ms
Wall time: 59.8 ms


In [16]:
df

Unnamed: 0,call_date,call_month,call_time,agent_name,category_name,call_direction,policy_number,subcategory1,subcategory2,customer_phone,manual_notes1,manual_notes2
0,,11,2021-11-26 16:46:36,Go Irish Dianne Vinaviles,Service,Inbound,PNTR2021-00007327,Self Help - Assist,Cancellation,Unknown,,ADv cannot add in but may cancel policy via CP...
1,,11,2021-11-26 16:06:29,Go Irish Dianne Vinaviles,Service,Inbound,"PNTR2021-00008804, PNTC2021-00004652",Self Help - Assist,Cancellation,84313773,,PH supposed to go for company conference from ...
2,,11,2021-11-27 12:30:40,Nur Azlina Binte Abdul Rahman,Service,Inbound,,Self Help – Direct,POI Move,97738346,,"Cm asked if can move the start date to 29 Nov,..."
3,,11,2021-11-27 10:11:45,Nur Azlina Binte Abdul Rahman,Service,Inbound,PNTR2021-00009153 & PNTC2021-00004902,Self Help – Direct,POI extension,86205312,,End date to be extended to 3 Jan 2022\nInform ...
4,,11,2021-11-29 11:40:12,Tracy Hwang Poh Sim,Service,Inbound,PNTR2021-00011604,Self Help - Assist,Cancellation,81271736,,Adv her trip is more than 30 day so no covid-1...
...,...,...,...,...,...,...,...,...,...,...,...,...
4530,,12,2021-12-02 10:09:00,Ethan Lua Wei Jun,Service,Outbound,,Other Changes/Endorsement,Others,94233484,Settled as PH called in ady,From: Anna Chua <annachua37@yahoo.com.sg> \nSe...
4531,,12,2021-12-02 16:45:58,Muhammad Ridzwan Bin Yusri,Service,Inbound,,Self Help – Direct,Claim Submission,63533564,,submit online or app or email to claims.sg@fwd...
4532,,12,2021-12-02 17:08:10,Tracy Hwang Poh Sim,Service,Inbound,PNTR2021-00012200 & PNTC2021-00007148,Self Help - Assist,Cancellation,91175713,,Adv her to purchase a new one instead of movin...
4533,,12,2021-12-02 13:33:00,Go Irish Dianne Vinaviles,Service,Outbound,"PNTR2021-00005203, PNTC2021-00002158",Self Help - Assist,POI Move,98316175,,Called and verified.\nAdv that flight cancella...


# Save and Export Dataset to S3 Bucket 

In [17]:
Data = "./Data"
file_name = "merged_crm_meta.csv"

# Save dataset as csv file into 
df.to_csv("{}/{}".format(Data,file_name), index=False)

# Export dataset as csv file to S3 bucket
df.to_csv("s3://fwd-sg-sagemaker-raw-data/voice_analytics/{}".format(file_name), index=False)