#VN Customer Service Data Analytics

###Objective: To curate data table as input to Power BI to visualize Inbound Call and Inbound Email case categories

Serve as input to prioritize scope for solution to reduce total inbound call and inbound email volume to reduce call and improve customer experience and self service solution adoption

In [0]:
%run "/Repos/dung_nguyen_hoang@mfcgd.com/Utilities/Functions"

<strong>Load libls, params and paths</strong>

In [0]:
from pyspark.sql.functions import *
from datetime import date, datetime, timedelta

spark.conf.set('partitionOverwriteMode', 'dynamic')

sfdc_path = '/mnt/prod/Published/VN/Master/VN_PUBLISHED_SFDC_EASYCLAIMS_DB/'
cics_path = '/mnt/prod/Published/VN/Master/VN_PUBLISHED_CICS_DB/'
cas_path = '/mnt/prod/Published/VN/Master/VN_PUBLISHED_CAS_DB/'
dm_path = '/mnt/prod/Curated/VN/Master/VN_CURATED_DATAMART_DB/'
cws_path = '/mnt/prod/Published/VN/Master/VN_PUBLISHED_ADOBE_PWS_DB/'

tblSrc1 = 'CASE/'
tblSrc2 = 'USER/'
tblSrc3 = 'TCISC_SERVICE_DETAILS/'
tblSrc4 = 'TFIELD_VALUES/'
tblSrc5 = 'tpolidm_daily/'
tblSrc6 = 'tcustdm_daily/'
tblSrc7 = 'tagtdm_daily/'
tblSrc8 = 'hit_data/'
tblSrc9 = 'ACCOUNT/'

src_paths = [sfdc_path,cics_path,cas_path,dm_path,cws_path,]
src_files = [tblSrc1,tblSrc2,tblSrc3,tblSrc4,
             tblSrc5,tblSrc6,tblSrc7,tblSrc8,
             tblSrc9,]

x = 0 # Change to number of months ago (0: last month-end, 1: last last month-end, ...)
today = datetime.now()
first_day_of_current_month = today.replace(day=1)
current_month = first_day_of_current_month

for i in range(x):
    first_day_of_previous_month = current_month - timedelta(days=1)
    first_day_of_previous_month = first_day_of_previous_month.replace(day=1)
    current_month = first_day_of_previous_month

last_day_of_x_months_ago = current_month - timedelta(days=1)
st_mth = date(2023, 1, 1).strftime('%Y-%m-%d')
end_mth = last_day_of_x_months_ago.strftime('%Y-%m-%d')
st_yr = st_mth[0:4]
print("Start and end date:", st_mth, end_mth)
print(f"Start year: {st_yr}")

<strong>Load parquet files and convert to temp views</strong>

In [0]:
list_df = load_parquet_files(src_paths, src_files)

In [0]:
generate_temp_view(list_df)

##Data Curation

Extract data from SFOX to form the base

In [0]:
sfdc_case_string = """
SELECT DISTINCT
  current_date() AS run_dt
, 'VN'                    AS Market
, A.Id                    AS ID
, A.Description           AS Call_Description
, D.cli_nm                AS CAS_User_Name
, A.CaseNumber            AS Case_ID
, A.Case_Type__c          AS CC_Case_Record_Type
, A.GroupRequest__c       AS CC_Group_Request
, A.Requester_Role__c     AS CC_OwnerRole -- (01-Self/02-Relative/03-Lawyer/04-Reporter/05-Agent/06-Collector/99-Other)
, Case When A.Requester_Role__c = '05 - Đại lý' Then 'DCC'
  Else 'CCC' End          AS CC_Role_Type
, B.Plan_Nm               AS CC_Plan_Name
, B.Att_Rid_Ind           AS CC_Rider_Ind
, B.Plan_Nm_By_Chnl       AS CC_Plan_Name_Cap
, A.Policy_Number__c      AS Pol_Num
, B.sa_code               AS CC_Servicing_Agent
, C.agt_nm                AS CC_Servicing_AgentName
, A.Owner_Department__c   AS CC_User_Department -- ClassificationTypeSubType_c.Department__c
, A.Complaint_Channel__c  AS Complaint_Channel
, A.CreatedByID           AS CreatedBy_ID
, A.CreatedDate           AS Created_Date
, A.IsDeleted             AS IsDeleted
, A.Origin                AS Origin_of_Inbound_Inquiry
, A.Type	                AS Lv_1_Case_Category -- ClassificationTypeSubType_c.TaskType__c
, A.Sub_Type__c           AS Lv_2_Case_Category -- ClassificationTypeSubType_c.SubTaskType__c
, A.Subject               AS Request_Content
, date_trunc('DD', A.CreatedDate) 
                          AS Case_Creation_Date
, A.LastModifiedDate      AS LastModifiedDate
--, ''	                    AS	Is_HNW
, Case When B.Plan_Code Like 'UL%' Then 'Y' Else 'N' End
                        	AS	Is_UL
, B.ORPH_POL_IND	        AS	Policy_Orphan_Indicator_curr
, B.Prod_Cat            	AS	Policy_Base_Category
, ''                    	AS	Policy_Base_Needs
, B.pol_iss_dt	          AS	Policy_Issue_Date
, B.pol_eff_dt          	AS	Policy_Effective_Date
, B.xpry_dt             	AS	Policy_Expiry_Date_curr
, B.bill_mthd	            AS	Bill_Method_curr
--, ''                    	AS	Bill_Method2_curr
, B.pmt_mode            	AS	Payment_Mode_curr
, B.po_num              	AS	Policy_Owner_Client_Number
, B.po_iss_age          	AS	PO_Effective_Age
, B.insrd_num           	AS	Insured_Client_Number_curr
, B.sa_code             	AS	Servicing_Agent_curr
, B.wa_code             	AS	Writing_Agent
, datediff(A.CreatedDate, B.pol_eff_dt) 
                          AS Pol_Tenure_in_Days	
--, ''                     	AS	SA_Channel_Group_curr
, C.loc_code              AS SA_Location
, C.channel             	AS	SA_Channel_curr
, C.team_code	            AS	SA_Unit_Name_curr
, c.br_code             	AS	SA_Branch_Code_curr
--, ''	                    AS	SA_Zone_curr
, Case When C.channel='Banca' Then substr(C.loc_cd,1,3) Else '' End
    	                    AS	PO_Banca_Group_curr
, D.cli_typ              	AS	PO_Client_Type_curr
, D.birth_dt            	AS	PO_DOB
, D.sex_code	            AS	PO_Gender
, CASE WHEN B.wa_code = B.sa_code THEN 'Never Reassigned' ELSE "Reassigned" END 
                          AS Ever_Reassigned

FROM `case` A
LEFT JOIN tpolidm_daily B on A.Policy_Number__c = B.pol_num
LEFT JOIN tagtdm_daily C on B.sa_code = C.agt_code
LEFT JOIN tcustdm_daily D on B.po_num = D.cli_num

WHERE 1=1
AND A.Origin in ('Inbound call', 'Email')
AND Year(A.CreatedDate) >= {st_yr}
"""

df_vn_sfdc_case = sql_to_df(sfdc_case_string, 1, spark)

In [0]:
#print(df_vn_sfdc_case.count())
df_vn_sfdc_case.createOrReplaceTempView('df_vn_sfdc_case')

In [0]:
%sql
-- new build
create or replace temporary view df_Last_Next_Date2 AS

SELECT DISTINCT
Case_ID
, Pol_Num
, concat(Pol_Num, concat(Lv_1_Case_Category, Lv_2_Case_Category)) AS KEY
, case_creation_date
, lag(case_creation_date) over (partition by concat(Pol_Num, concat(Lv_1_Case_Category, Lv_2_Case_Category)) order by case_creation_date) as Last_Case_Date 
, lead(case_creation_date) over (partition by concat(Pol_Num, concat(Lv_1_Case_Category, Lv_2_Case_Category)) order by case_creation_date) as Next_Case_Date 

from (Select distinct Case_ID, case_creation_date, Pol_Num, Lv_1_Case_Category, Lv_2_Case_Category from df_vn_sfdc_case)
Where Pol_Num <> '000'

In [0]:
%sql

CREATE or replace temp view tcws_client_device_log as
with cws_acc as (
	select
		external_id__c cli_num
		,mcf_user_id__pc acc_id
	from
		`account`
	where
		mcf_user_id__pc is not null
)
,cws_login_transactions as(
	select
		hd.post_evar37 as login_id
		,concat(hd.post_visid_high, hd.post_visid_low, hd.visit_num) as visit_id
		,hd.date_time as login_date_time
		,row_number() over(partition by hd.post_evar37 order by hd.date_time asc) rw_num
	from
		hit_data hd	
	where
		1=1
		and hd.exclude_hit = 0
		and hd.hit_source not in ('5', '7', '8', '9')
		and concat(hd.post_visid_high, hd.post_visid_low) is not null
		and hd.post_evar37 <> ''
		and hd.post_evar19 = '/portfolio/policies'
		and hd.user_server in ('hopdongcuatoi.manulife.com.vn','hopdong.manulife.com.vn')
)
,cws_reg as (
	select
		login_id
		,login_date_time reg_dt
	from
		cws_login_transactions
	where
		rw_num = 1
)
,cws_login as (
	select
		login_id
		,max(login_date_time) lst_login_dt
	from
		cws_login_transactions
	where
		rw_num > 1
		--and login_date_time <= last_day(add_months(current_date,-1))
	group by
		login_id
)
,tcws_client_log as (
	select
		a.cli_num
		,a.acc_id user_num
		,b.reg_dt cws_joint_dt
		,c.lst_login_dt
	from
		cws_acc a
		left join cws_reg b on (a.acc_id = b.login_id)
		left join cws_login c on (a.acc_id = c.login_id)
) select * from tcws_client_log

In [0]:
%sql

-- this is to locate the last log-in date prior to case created date

create or replace temporary view df_CWS_Usage_last_log AS

Select distinct
 Case_ID
,Created_Date
,Policy_Owner_Client_Number
,User_Num
,max(Lst_Login_Dt) as Log_date
FROM df_vn_sfdc_case A left join
     tcws_client_device_log B on A.Policy_Owner_Client_Number=B.cli_num
group by 1,2,3,4

In [0]:
%sql

create or replace temporary view df_CWS_Usage_Last_Log2 AS

SELECT distinct 
Case_ID
, Log_Date
, User_Num
From df_CWS_Usage_last_log


In [0]:
%sql

create or replace temporary view df_Output AS

SELECT DISTINCT 
A.run_dt
, A.Market
, A.ID
, A.Call_Description
, A.CAS_User_Name
, A.Case_ID
, A.CC_Case_Record_Type
, A.CC_Group_Request
, A.CC_OwnerProfileName
, A.CC_OwnerRole
, A.CC_Role_Type
, A.CC_Plan_Name
, A.CC_Rider_Ind
, A.CC_Plan_Name_Cap
, A.Pol_Num
, A.CC_Servicing_Agent
, A.CC_Servicing_AgentName
, A.CC_User_Department
, A.Complaint_Channel
, A.CreatedBy_ID
, A.Created_Date
, A.IsDeleted
, A.Origin_of_Inbound_Inquiry
, A.Lv_1_Case_Category
, A.Lv_2_Case_Category
, A.Case_Creation_Date
, A.LastModifiedDate
--, A.Is_HNW
, A.Is_UL
, A.Policy_Orphan_Indicator_curr
, A.Policy_Base_Category
, A.Policy_Base_Needs
, A.Policy_Issue_Date
, A.Policy_Effective_Date
, A.Policy_Expiry_Date_curr
, A.Bill_Method_curr
--, A.Bill_Method2_curr
, A.Payment_Mode_curr
, A.Policy_Owner_Client_Number
, A.PO_Effective_Age
, A.Insured_Client_Number_curr
, A.Servicing_Agent_curr
, A.Writing_Agent
, A.Pol_Tenure_in_Days
, A.SA_Location
, A.SA_Channel_Group_curr
, A.SA_Channel_curr
, A.SA_Unit_Name_curr
, A.SA_Branch_Code_curr
--, '' AS SA_Zone_curr
, A.PO_Banca_Group_curr
, A.PO_Client_Type_curr
, A.PO_DOB
, A.PO_Gender
, A.Ever_Reassigned
, from_utc_timestamp(A.Created_Date, 'GMT+7')  as Created_Date_VNT 

, B.Last_Case_Date
, B.Next_Case_Date

, C.user_num AS CWS_User_Num
, C.cws_joint_dt AS CWS_Last_Create_Date_curr

, D.Log_date AS CWS_Last_Login_Date


FROM df_vn_sfdc_case A
LEFT JOIN df_Last_Next_Date2 B on A.Case_ID = B.Case_ID
LEFT JOIN tcws_client_device_log C on A.policy_owner_client_number = C.cli_num
LEFT JOIN df_CWS_Usage_Last_Log2 D on A.Case_ID = D.Case_ID


##Final Output = cservicedm

In [0]:
sql_string = """
select * from df_output
"""

cservicedm = sql_to_df(sql_string, 1, spark)
#print(cservicedm.count())

## Write to ADLS GEN2

In [0]:
#Switch back Spark conf to avoid date adjustment
spark.conf.set("spark.sql.session.timeZone","UTC+0")

#Write result to ADLS gen2
cservicedm.write.mode("overwrite").parquet(f"/mnt/lab/vn/project/cpm/datamarts/TCESRVDM_DAILY")