In [527]:
import findspark
findspark.init("/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark")

import os
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_181-cloudera"

from pyspark import SparkContext
from pyspark.sql import SparkSession,HiveContext,Window
from pyspark.sql import functions as fn
from pyspark.sql.types import IntegerType, FloatType, DoubleType, ArrayType, StringType, DecimalType,MapType

spark_session = SparkSession.builder.enableHiveSupport().appName("test").config("spark.driver.memory","30g").getOrCreate()
hc = HiveContext(spark_session.sparkContext)

In [2]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

from datetime import datetime

#### Prepare Data and have an overview

In [426]:
outbound_mg = hc.sql('''
SELECT
    a.*
FROM 
    rdtwarehouse.tmp_outbound_2019 a 
WHERE
    brand = 'MG'
''').toPandas()

In [427]:
outbound_mg.shape

(1096907, 10)

In [428]:
outbound_2019_mg.isna().sum()

mobile                 0
brand                  0
channel_one       423855
channel_two       424509
task_item_time         0
process_time      322453
task_type              0
ob_count          322443
ob_result_code    325684
call_rank              0
dtype: int64

In [429]:
outbound_2019_mg[['mobile','task_type']].groupby(['task_type']).count()

Unnamed: 0_level_0,mobile
task_type,Unnamed: 1_level_1
1,568335
2,102668
3,134320
4,12381
5,90143
6,189060


In [430]:
outbound_2019_mg[outbound_2019_mg['task_type'] == '001'].isna().sum()

mobile              0
brand               0
channel_one        46
channel_two       689
task_item_time      0
process_time      609
task_type           0
ob_count          608
ob_result_code    761
call_rank           0
dtype: int64

In [431]:
len(outbound_2019_mg['mobile'].unique())

742455

#### Leads Outbound

In [435]:
outbound_leads_2019 = outbound_2019_mg[(outbound_2019_mg['task_type'] == '001') & \
                                       (outbound_2019_mg['process_time'] < '2020-01-01')]
outbound_leads_2020 = outbound_2019_mg[(outbound_2019_mg['task_type'] == '001') & \
                                       (outbound_2019_mg['process_time'] >= '2020-01-01')]

In [436]:
# outbound result
outbound_leads_2019[['mobile','ob_result_code']].groupby(['ob_result_code']).count()
outbound_leads_2020[['mobile','ob_result_code']].groupby(['ob_result_code']).count()

Unnamed: 0_level_0,mobile
ob_result_code,Unnamed: 1_level_1
2,29245
3,26668
4,56848
5,4214
6,2218
7,16766
8,14514
9,148
10,1601
18,2001


In [411]:
outbound_leads['hour'] = outbound_leads['process_time'].dt.hour
outbound_leads[['mobile', 'hour']].groupby(['hour']).count()

outbound_success = outbound_leads[(outbound_leads['ob_result_code'] == '007') | (outbound_leads['ob_result_code'] == '002') |\
                  (outbound_leads['ob_result_code'] == '008')][['mobile','process_time']]
outbound_success['hour'] = outbound_success['process_time'].dt.hour
outbound_success[['mobile', 'hour']].groupby(['hour']).count()

outbound_cust = outbound_leads[(outbound_leads['ob_result_code'] == '007')][['mobile','process_time']]
outbound_cust['hour'] = outbound_cust['process_time'].dt.hour
outbound_cust[['mobile', 'hour']].groupby(['hour']).count()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


Unnamed: 0_level_0,mobile
hour,Unnamed: 1_level_1
1.0,1
4.0,2
8.0,66
9.0,12953
10.0,12177
11.0,9801
12.0,3495
13.0,11106
14.0,11990
15.0,12439


In [245]:
# remove days with more than 6500 calls
normal_days = daily_workload_mg_2019[daily_workload_mg_2019['outbound_vol'] <= 6500]

In [253]:
outbound_leads_rank_diff_nor = outbound_leads_rank[(outbound_leads_rank['task_rank'] == 1) & (outbound_leads_rank['task_item_time'].isin(normal_days['task_item_date']))]\
[['mobile','task_item_time']].merge(outbound_leads_rank[outbound_leads_rank['task_rank'] == 2][['mobile','task_item_time']],\
                                                                                                 on = 'mobile', how = 'inner')
outbound_leads_rank_diff_nor['diff'] = outbound_leads_rank_diff_nor['task_item_time_y'] - outbound_leads_rank_diff_nor['task_item_time_x']
outbound_leads_rank_diff_nor['diff'] = outbound_leads_rank_diff_nor['diff'].astype('timedelta64[D]').astype(int)

pd.cut(outbound_leads_rank_diff_nor['diff'].values, bins=[0, 7, 14, 20, 25, 30, 60, 90, 1000]).value_counts()

(0, 7]         5178
(7, 14]         935
(14, 20]      50724
(20, 25]       1602
(25, 30]        880
(30, 60]       4254
(60, 90]       1157
(90, 1000]     4241
dtype: int64

In [259]:
outbound_leads_001[outbound_leads_001['mobile'].isin(outbound_leads_rank_diff['mobile'])][['mobile','ob_result_code']].\
groupby(['ob_result_code']).count()

Unnamed: 0_level_0,mobile
ob_result_code,Unnamed: 1_level_1
2,4432
3,8662
4,9124
5,1333
6,871
7,87276
8,4110
9,72
10,536
18,48


#### Failed customers outbound

In [448]:
outbound_failed_2019 = outbound_2019_mg[(outbound_2019_mg['task_type'] == '003') & \
                                       (outbound_2019_mg['process_time'] < '2020-01-01')]
outbound_failed_2020 = outbound_2019_mg[(outbound_2019_mg['task_type'] == '003') & \
                                       (outbound_2019_mg['process_time'] >= '2020-01-01')]
outbound_failed = outbound_2019_mg[(outbound_2019_mg['task_type'] == '003')]

In [569]:
len(outbound_failed['mobile'].unique())

107018

In [440]:
# outbound result
outbound_failed_2019[['mobile','ob_result_code']].groupby(['ob_result_code']).count()
outbound_failed_2020[['mobile','ob_result_code']].groupby(['ob_result_code']).count()

Unnamed: 0_level_0,mobile
ob_result_code,Unnamed: 1_level_1
2,152
3,8237
4,11762
5,1030
6,584
7,823
8,11522
9,4
10,669
18,2512


In [529]:
failed_date = hc.sql('''
SELECT
    id as mobile, MAX(score) AS failed_date
FROM 
    rdtwarehouse.tmp_mg_tags_0513
WHERE 
    tagid IN ('506806140933')
GROUP BY id
''').toPandas()

In [550]:
failed_date['failed_date'] = [datetime(year=int(str(i)[0:4]), month=int(str(i)[4:6]), day=int(str(i)[6:8])).isoformat() \
                            for i in failed_date['failed_date']]

In [572]:
outbound_failed['mobile'] = outbound_failed['mobile'].astype(int)
failed_date['mobile'] = failed_date['mobile'].astype(int)
failed_engage = failed_date.merge(outbound_failed[['mobile','process_time']], \
                                  how = 'left', on = 'mobile')

fail_date_2019 = failed_date[(failed_date['failed_date'] >= '2019-01-01') & (failed_date['failed_date'] <= '2019-12-31')]
failed_engage_2019 = fail_date_2019.merge(outbound_failed[['mobile','process_time']], how = 'left', on = 'mobile')

fail_date_2020 = failed_date[(failed_date['failed_date'] >= '2020-01-01') & (failed_date['failed_date'] <= '2020-12-31')]
failed_engage_2020 = fail_date_2020.merge(outbound_failed[['mobile','process_time']], how = 'left', on = 'mobile')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


In [574]:
print('Failed 2019:',len(fail_date_2019['mobile'].unique()), '\t',
      'Failed 2020:',len(fail_date_2020['mobile'].unique()))

Failed 2019: 761922 	 Failed 2020: 200662


In [587]:
print('Failed Engaged 2019:',len(failed_engage_2019[failed_engage_2019['process_time'].isna() == False]['mobile'].unique()),'\t',
      'Failed Engaged 2020:',len(failed_engage_2020[failed_engage_2020['process_time'].isna() == False]['mobile'].unique()))

Failed Engaged 2019: 23768 	 Failed Engaged 2020: 13859


In [562]:
failed_engage.isna().sum()

mobile                0
failed_date           0
process_time    2194967
dtype: int64

In [563]:
failed_engage_2019.isna().sum()

mobile                0
failed_date           0
process_time    1375562
dtype: int64

In [492]:
status = hc.sql('''
SELECT DISTINCT
    id, tagid
FROM rdtwarehouse.tmp_mg_tags_0513
WHERE 
    tagid IN
    ('609885356032','609885356033','609885356034','609885356035',
    '609885356036','609885356037','609885356038','609885356039',
    '609885356040')
''').toPandas()

In [546]:
outbound_failed_out = outbound_failed[~outbound_failed['mobile'].isin(failed_engage['mobile'])]
status['id'] = status['id'].astype(int)
status['tagid'] = status['tagid'].astype(str)

outbound_failed_out.merge(status, how = 'left', left_on = 'mobile', right_on = 'id')[['tagid','mobile']].\
groupby(['tagid']).count()

Unnamed: 0_level_0,mobile
tagid,Unnamed: 1_level_1
609885356033,23320
609885356034,1173
609885356035,30817
609885356036,10825
609885356037,50
609885356038,31
609885356039,193


In [564]:
failed_engage['failed_date'] = pd.to_datetime(failed_engage['failed_date'],format="%Y-%m-%dT%H:%M:%S")

In [565]:
failed_engage['diff'] = failed_engage['process_time']- failed_engage['failed_date']
failed_engage = failed_engage.dropna()
failed_engage['diff'] = failed_engage['diff'].astype('timedelta64[D]').astype(int)

In [566]:
len(failed_engage['mobile'].unique())

38294

In [568]:
pd.cut(failed_engage['diff'].values, bins=[-1000, -1, 7, 14, 20, 25, 30, 60, 90, 1000]).value_counts()

(-1000, -1]    15860
(-1, 7]         3725
(7, 14]         4995
(14, 20]        3478
(20, 25]        2184
(25, 30]        2159
(30, 60]        6587
(60, 90]        3040
(90, 1000]      5271
dtype: int64

#### Workload

In [423]:
daily_workload_leads = hc.sql('''
SELECT
    CAST(a.create_time AS DATE) AS cust_time,
    COUNT(Distinct mobile_no) AS outbound_vol
FROM
    rdtwarehouse.ods_smcsc_tsk_customer a
LEFT JOIN
    rdtwarehouse.ods_smcsc_tsk_task_item b
ON a.id = b.cust_id
LEFT JOIN 
    rdtwarehouse.ods_smcsc_tsk_task c
ON b.task_id = c.id
WHERE 
    task_type = '001'
GROUP BY CAST(a.create_time AS DATE)
''').toPandas()

In [424]:
daily_workload_leads.to_csv('EDA_result/daily_workload_leads.csv')

In [421]:
daily_workload_failed = hc.sql('''
SELECT
    CAST(a.create_time AS DATE) AS cust_time,
    COUNT(Distinct mobile_no) AS outbound_vol
FROM
    rdtwarehouse.ods_smcsc_tsk_customer a
LEFT JOIN
    rdtwarehouse.ods_smcsc_tsk_task_item b
ON a.id = b.cust_id
LEFT JOIN 
    rdtwarehouse.ods_smcsc_tsk_task c
ON b.task_id = c.id
WHERE 
    task_type = '003'
GROUP BY CAST(a.create_time AS DATE)
''').toPandas()

In [422]:
daily_workload_failed.to_csv('EDA_result/daily_workload_failed.csv')

#### Adhoc