In [1]:
import re
import pandas as pd
import numpy as np
import string
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Window
import datetime
import calendar
from dateutil.relativedelta import relativedelta

#acc_mth = 7
#acc_yr = 2017
#transv_pr_gr_psea_path = 'data/sas_402/transv_pr_gr_psea.parquet'
#transv_polhistory_psea_path = 'data/sas_401/transv_polhistory_psea.parquet'
#transv_p2_path = 'data/sas_405/transv_p2.parquet'
#riskpf_path = '/group/axa_malaysia/data/adm_riskpf'

def pillar2(acc_mth, acc_yr, transv_pr_gr_psea_path, transv_polhistory_psea_path, riskpf_path, output_folder='data/sas_405/'):
    #/* ****************************************************************** */	
    #/* ***************** Get policy by vision per policy **************** */
    #/* ****************************************************************** */
    #/* ******** Premium Section ******* */
    #/* ******************************** */

    transv_pr_gr_psea = spark.read.parquet(transv_pr_gr_psea_path)
    transv_polhistory_psea = spark.read.parquet(transv_polhistory_psea_path)
    riskpf = spark.read.parquet(riskpf_path)[["CHDRNO","TRANNO","DATIME"]]
    
    #/* Aggregate Premium */
    premium = transv_pr_gr_psea.groupby(['chdrnum','tranno','trantype']).agg(sum('GWP').alias('gwp'), sum('CWP').alias('cwp'))
    
    #/* Split GWP */
    #/* Flag the issuances */
    premium = premium.withColumn('gwp_poi',col('gwp')-col('cwp'))
    premium = premium.withColumn('gwp_issu_poi',when(col('trantype').isin(["NB", "RN"]),col('gwp_poi')).otherwise(0))
    premium = premium.withColumn('gwp_canc_poi',when(col('trantype').isin(["CA", "RE"]),col('gwp_poi')).otherwise(0)).drop('cwp')

    #/*Map the POI from POLHISTORY to the premium transactions*/
    map_zrenno = transv_polhistory_psea[['CHDRNUM','TRANNO','ZRENNO']].sort(['CHDRNUM','TRANNO']).drop_duplicates()

    #/*Inner Join: if there is a mismatch, it can only be due to delay between the extractions of ZTRN & CHDR*/
    premium2 = premium.join(map_zrenno, ['CHDRNUM','TRANNO'], 'inner')
    premium3 = premium2.groupby('CHDRNUM','ZRENNO').agg(
        sum('gwp_poi').alias('gwp_poi'),
        sum('gwp_issu_poi').alias('gwp_issu_poi'),
        sum('gwp_canc_poi').alias('gwp_canc_poi'),
    )

    #/* ******** Policy Section ******** */
    #/* ******************************** */
    #/* From POLHISTORY, keep only the latest transaction for each POI (ZRENNO) - by descending D_to & descending TRANNO*/
    list_poi = transv_polhistory_psea[['CHDRNUM',
     'D_com',
     'D_exp',
     'D_to',
     'ZRENNO',
     'TRANNO',
     'RENEWABLE',
     'D_cancel',
     'CHDRSTCDC',
     'CNTTYPE',
     'ORIG_POL',
     'LATEST_REPLACEMENT',
     'CNTBRANCH',
     'AGENTID']].sort('CHDRNUM','ZRENNO',desc('D_to'),desc('TRANNO')).drop_duplicates(subset=['CHDRNUM','ZRENNO']).drop('D_to')

    #/*Map Premium for each POI*/
    list_poi2 = list_poi.join(premium3,['CHDRNUM','ZRENNO'],'inner')
    
    #/* ****************************************************************** */
    #/* ************************ Prepare KPI Data ************************ */
    #/* ****************************************************************** */
    
    #/* Last - (sorted BY latest inception - first time step of policy journey) is NEW BUSINESS */
    ranked_poi = list_poi2.\
    withColumn('ranknum', rank().over(Window.orderBy("chdrnum","zrenno").partitionBy("orig_pol"))).\
    withColumn('first', when(col('ranknum')==1,True).otherwise(False) ).drop('ranknum').\
    withColumn('ranknum_desc', rank().over(Window.orderBy(col("chdrnum").desc(), col("zrenno").desc()).partitionBy("orig_pol"))).\
    withColumn('last', when(col('ranknum_desc')==1,True).otherwise(False) ).drop('ranknum_desc')

    ranked_poi = ranked_poi.\
    withColumn('gwp_ee', col('gwp_issu_poi')).\
    withColumn('gwp_ie', col('gwp_poi')-col('gwp_canc_poi')).\
    withColumn('cancelled', when((col('last')==True)&
                                 (col('D_com')<=col('D_cancel'))&(col('D_cancel')<=col('D_exp')),True).otherwise(False) ).\
    withColumn('lapsed', when((col('last')==True)&
                              (col('D_exp')<=last_day(lit('{y}-{m}-01'.format(y=acc_yr,m=str(acc_mth).zfill(2))))),True).otherwise(False) ).\
    withColumn('abnormal', when(((col('last')!=True)&(col('renewable')==0)),1).otherwise(0)).\
    withColumn('pc_rnw', when(col('first')==True, 0).otherwise(1)).\
    withColumn('pc_nbz', when(col('first')==True, 1).otherwise(0)).\
    withColumn('gwp_rnw_ee', when(col('first')==True, 0).otherwise(col('gwp_issu_poi'))).\
    withColumn('gwp_nbz_ee', when(col('first')==True, col('gwp_issu_poi')).otherwise(0)).\
    withColumn('gwp_rnw_ie', when(col('first')==True, 0).otherwise(col('gwp_ie'))).\
    withColumn('gwp_nbz_ie', when(col('first')==True, col('gwp_ie')).otherwise(0)).\
    withColumn('pc_exp_rnabl_canc', when(
            ((col('cancelled')==True)&
            (col('renewable')==1)),1).otherwise(0)).\
    withColumn('pc_exp_nonrn_canc', when(
            ((col('cancelled')==True)&
            (col('renewable')==0)),1).otherwise(0)).\
    withColumn('pc_exp_rnabl_laps', when(
            ((col('lapsed')==True)&
            (col('cancelled')!=True)&
            (col('renewable')==1)),1).otherwise(0)).\
    withColumn('pc_exp_nonrn_expi', when(
            ((col('lapsed')==True)&
            (col('cancelled')!=True)&
            (col('renewable')==0)),1).otherwise(0)).\
    withColumn('s_rnabl_gwp_canc', when((col('pc_exp_rnabl_canc')==1), col('gwp_ie')).otherwise(0) ).\
    withColumn('s_nonrn_gwp_canc', when((col('pc_exp_nonrn_canc')==1), col('gwp_ie')).otherwise(0) ).\
    withColumn('s_rnabl_gwp_laps', when((col('pc_exp_rnabl_laps')==1), col('gwp_ie')).otherwise(0) ).\
    withColumn('s_nonrn_gwp_expi', when((col('pc_exp_nonrn_expi')==1), col('gwp_ie')).otherwise(0) ).\
    withColumn('D_end', when((rank().over(Window.orderBy(col("zrenno").desc()).partitionBy("orig_pol","chdrnum"))==1)&
                                 (col('D_com')<=col('D_cancel'))&(col('D_cancel')<=col('D_exp')),
                             col('D_cancel')).otherwise(col('D_exp')) ) #Date end, which is when the policy ended - both cancel and expire

    #/* 366.25 since thailand cover starts and ends on the same day (not the day before) */
    ranked_poi2 = ranked_poi.\
    withColumn('annual_factor', 366.25 / (datediff(col('D_exp'),col('D_com')) + 1) ).\
    withColumn('thisyear_data', struct('CHDRNUM','CHDRSTCDC','CNTBRANCH','CNTTYPE','AGENTID','gwp_ee','gwp_ie','annual_factor')).\
    withColumn('nextyear_data', lead('thisyear_data').over(Window.orderBy("chdrnum","zrenno").partitionBy("orig_pol"))).\
    withColumn('new_pol', when(col('last')==True,'NA').otherwise(col('nextyear_data')['CHDRNUM']) ).\
    withColumn('new_line', when(col('last')==True,'NA').otherwise(col('nextyear_data')['CHDRSTCDC']) ).\
    withColumn('new_branch', when(col('last')==True,'NA').otherwise(col('nextyear_data')['CNTBRANCH']) ).\
    withColumn('new_cnttype', when(col('last')==True,'NA').otherwise(col('nextyear_data')['CNTTYPE']) ).\
    withColumn('new_agentid', when(col('last')==True,'NA').otherwise(col('nextyear_data')['AGENTID']) )
    
    #/* Calculation of various KPIs */
    ranked_poi3 = ranked_poi2.withColumn('gwp_rn_old_365', col('gwp_poi') * col('annual_factor') ).\
    withColumn('gwp_rn_new_ee', col('nextyear_data')['gwp_ee'] ).\
    withColumn('gwp_rn_new_ie', col('nextyear_data')['gwp_ie'] ).\
    withColumn('gwp_rn_new_365_ee', col('gwp_rn_new_ee')*col('nextyear_data')['annual_factor'] ).\
    withColumn('gwp_rn_new_365_ie', col('gwp_rn_new_ie')*col('nextyear_data')['annual_factor'] ).\
    withColumn('pc_exp_rnabl_renw', when((col('last')!=True)&(col('chdrnum')==col('new_pol')), 1).otherwise(0) ).\
    withColumn('pc_exp_rnabl_repl', when((col('last')!=True)&(col('chdrnum')!=col('new_pol')), 1).otherwise(0) ).\
    withColumn('s_rnabl_gwp_renw', col('pc_exp_rnabl_renw') * (col('gwp_rnw_ie')+col('gwp_nbz_ie')) ).\
    withColumn('s_rnabl_gwp_repl', col('pc_exp_rnabl_repl') * (col('gwp_rnw_ie')+col('gwp_nbz_ie')) )

    transv_p2 = ranked_poi3.\
    drop('thisyear_data','nextyear_data','first','last','lapsed','cancelled','annual_factor','gwp_poi','gwp_issu_poi',
         'gwp_ee','gwp_ie','d_cancel')

    transv_p2.write.parquet('{}transv_p2.parquet'.format(output_folder))

def pillar2_summaries(acc_mth, acc_yr, riskpf_path, transv_polhistory_psea_path, transv_p2_path, output_folder='data/sas_405/'):
    riskpf = spark.read.parquet(riskpf_path)[["CHDRNO","TRANNO","DATIME"]]
    transv_polhistory_psea = spark.read.parquet(transv_polhistory_psea_path)
    transv_p2 = spark.read.parquet(transv_p2_path)
    transv_p2.cache()
    
    #/*List of Trannos*/
    riskpf = riskpf.sort("CHDRNO","TRANNO",desc("DATIME")).drop("DATIME").drop_duplicates(["CHDRNO","TRANNO"]).\
    withColumnRenamed('CHDRNO','CHDRNUM')

    trannolist = riskpf.join(transv_polhistory_psea[["CHDRNUM","TRANNO","ZRENNO","D_from","D_to"]], on=["CHDRNUM","TRANNO"])
    trannolist = trannolist.toDF(*trannolist.columns)
    trannolist.cache()
    
    #*********************************************************
    #*														*
    #*					Underwriting KPIs					*
    #*														*
    #*********************************************************
    
    #/*Append Relevant Tranno : The first one of each POI/ZRENNO*/
    trannolist_uw = trannolist.sort("CHDRNUM", "ZRENNO", "D_from", "TRANNO").\
    drop_duplicates(['CHDRNUM','ZRENNO'])[["CHDRNUM", "ZRENNO", "TRANNO"]]

    uw = transv_p2[['CHDRNUM',
     'ZRENNO',
     'D_com',
     'D_exp',
     'CHDRSTCDC',
     'PC_RNW',
     'PC_NBZ',
     'GWP_NBZ_EE',
     'GWP_NBZ_IE',
     'GWP_RNW_EE',
     'GWP_RNW_IE']].filter(year('D_com')>=(acc_yr-4)).withColumn('yrm', date_format('D_com', 'yyyyMM'))

    uw = uw.groupby(['CHDRNUM','ZRENNO','yrm','D_com','D_exp','CHDRSTCDC']).\
    agg(sum('PC_RNW').alias('PC_RNW'),
        sum('PC_NBZ').alias('PC_NBZ'),
        sum('GWP_NBZ_EE').alias('GWP_NBZ_EE'),
        sum('GWP_NBZ_IE').alias('GWP_NBZ_IE'),
        sum('GWP_RNW_EE').alias('GWP_RNW_EE'),
        sum('GWP_RNW_IE').alias('GWP_RNW_IE'))

    uw2 = uw.join(trannolist_uw, on=['CHDRNUM','ZRENNO'], how='left')

    #*********************************************************
    #*														*
    #*						Expiring KPIs					*
    #*														*
    #*********************************************************
    exp = transv_p2[['CHDRNUM',
     'ZRENNO',
     'D_com',
     'D_exp',
     'CHDRSTCDC',
     'pc_exp_rnabl_canc',
     'pc_exp_nonrn_canc',
     'pc_exp_rnabl_laps',
     'pc_exp_nonrn_expi',
     's_rnabl_gwp_canc',
     's_nonrn_gwp_canc',
     's_rnabl_gwp_laps',
     's_nonrn_gwp_expi',
     'pc_exp_rnabl_renw',
     'pc_exp_rnabl_repl',
     's_rnabl_gwp_renw',
     's_rnabl_gwp_repl',
     'GWP_RN_OLD_365', 
     'GWP_RN_NEW_365_EE']].filter(year('D_exp')>=(acc_yr-4)).withColumn('yrm', date_format('D_exp', 'yyyyMM'))

    exp = exp.groupby(['CHDRNUM','ZRENNO','yrm','D_com','D_exp','CHDRSTCDC']).\
    agg(sum('pc_exp_rnabl_canc').alias('pc_exp_rnabl_canc'),
        sum('pc_exp_nonrn_canc').alias('pc_exp_nonrn_canc'),
        sum('pc_exp_rnabl_laps').alias('pc_exp_rnabl_laps'),
        sum('pc_exp_nonrn_expi').alias('pc_exp_nonrn_expi'),
        sum('s_rnabl_gwp_canc').alias('s_rnabl_gwp_canc'),
        sum('s_nonrn_gwp_canc').alias('s_nonrn_gwp_canc'),
        sum('s_rnabl_gwp_laps').alias('s_rnabl_gwp_laps'),
        sum('s_nonrn_gwp_expi').alias('s_nonrn_gwp_expi'),
        sum('pc_exp_rnabl_renw').alias('pc_exp_rnabl_renw'),
        sum('pc_exp_rnabl_repl').alias('pc_exp_rnabl_repl'),
        sum('s_rnabl_gwp_renw').alias('s_rnabl_gwp_renw'),
        sum('s_rnabl_gwp_repl').alias('s_rnabl_gwp_repl'),
        sum('GWP_RN_OLD_365').alias('GWP_RN_OLD_365'), 
        sum('GWP_RN_NEW_365_EE').alias('GWP_RN_NEW_365_EE'))

    #/*Append Relevant Tranno : The last one of each POI/ZRENNO*/
    trannolist_exp = trannolist.sort("CHDRNUM", "ZRENNO", desc("D_to"), desc("TRANNO")).\
    drop_duplicates(['CHDRNUM','ZRENNO'])[["CHDRNUM", "ZRENNO", "TRANNO"]]

    exp2 = exp.join(trannolist_exp, on=['CHDRNUM','ZRENNO'], how='left')\

    #*********************************************************
    #*														*
    #*			  Combine Expiring, UW & PIF KPIs			*
    #*														*
    #*********************************************************
    exp2 = exp2.select(['*']+[lit(0).alias(c) for c in set(uw2.columns) - set(exp2.columns)])
    uw2 = uw2.select(['*']+[lit(0).alias(c) for c in set(exp2.columns) - set(uw2.columns)])
    p2_uwexp = uw2.union(exp2.select(uw2.columns))

    transv_p2_uwexp = p2_uwexp.groupby(["CHDRNUM","ZRENNO","TRANNO","YRM","D_com","D_exp","CHDRSTCDC"]).\
    agg(sum('PC_RNW').alias('PC_RNW'),
        sum('PC_NBZ').alias('PC_NBZ'),
        sum('GWP_NBZ_EE').alias('GWP_NBZ_EE'),
        sum('GWP_NBZ_IE').alias('GWP_NBZ_IE'),
        sum('GWP_RNW_EE').alias('GWP_RNW_EE'),
        sum('GWP_RNW_IE').alias('GWP_RNW_IE'),
        sum('pc_exp_rnabl_canc').alias('pc_exp_rnabl_canc'),
        sum('pc_exp_nonrn_canc').alias('pc_exp_nonrn_canc'),
        sum('pc_exp_rnabl_laps').alias('pc_exp_rnabl_laps'),
        sum('pc_exp_nonrn_expi').alias('pc_exp_nonrn_expi'),
        sum('s_rnabl_gwp_canc').alias('s_rnabl_gwp_canc'),
        sum('s_nonrn_gwp_canc').alias('s_nonrn_gwp_canc'),
        sum('s_rnabl_gwp_laps').alias('s_rnabl_gwp_laps'),
        sum('s_nonrn_gwp_expi').alias('s_nonrn_gwp_expi'),
        sum('pc_exp_rnabl_renw').alias('pc_exp_rnabl_renw'),
        sum('pc_exp_rnabl_repl').alias('pc_exp_rnabl_repl'),
        sum('s_rnabl_gwp_renw').alias('s_rnabl_gwp_renw'),
        sum('s_rnabl_gwp_repl').alias('s_rnabl_gwp_repl'),
        sum('GWP_RN_OLD_365').alias('GWP_RN_OLD_365'), 
        sum('GWP_RN_NEW_365_EE').alias('GWP_RN_NEW_365_EE'))
    
    transv_p2_uwexp.write.parquet('{}transv_p2_uwexp.parquet'.format(output_folder))
    
    #*********************************************************
    #*														*
    #*					Policies in Force					*
    #*														*
    #*********************************************************

    start_PIF = datetime.date(acc_yr-4,1,1).strftime('%Y-%m-%d')
    end_PIF = datetime.date(acc_yr,acc_mth,calendar.monthrange(acc_yr, acc_mth)[1]).strftime('%Y-%m-%d')
    PIF = transv_p2[['CHDRNUM',
     'ZRENNO',
     'D_com',
     'D_exp',
     'D_end',
     'CHDRSTCDC',
     'GWP_NBZ_IE',
     'GWP_RNW_IE']].filter((col('D_com')<=end_PIF)&(col('D_end')>=start_PIF))
    PIF.cache()

    start_date = datetime.date(acc_yr-4,1,1)
    end_date = datetime.date(acc_yr,acc_mth,calendar.monthrange(acc_yr, acc_mth)[1])

    result = []
    today = end_date
    current = start_date
    while current <= end_date:
        result.append(datetime.date(current.year,current.month,calendar.monthrange(current.year, current.month)[1]))
        current += relativedelta(months=1)

    all_PIF = None
    for PIF_date in result:
        PIF_date_ymd = PIF_date.strftime('%Y-%m-%d')
        PIF_date_yrm = PIF_date.strftime('%Y%m')
        PIF_month = PIF.filter( (col('D_com')<=PIF_date_ymd)&(PIF_date_ymd<=col('D_end')) ).\
        withColumn('PIF', lit(1)).\
        withColumn('GWPIF',col('GWP_NBZ_IE')+col('GWP_RNW_IE')).\
        withColumn('YRM', lit(PIF_date_yrm)).\
        withColumn('PIF_date', lit(PIF_date_ymd).astype('date')).select(['CHDRNUM',
         'ZRENNO',
         'YRM',
         'D_com',
         'D_exp',
         'CHDRSTCDC',
         'PIF',
         'GWPIF',
         'PIF_date'])
        if all_PIF:
            all_PIF = all_PIF.union(PIF_month)
        else:
            all_PIF = PIF_month
            
    #/*Append Relevant Tranno : The one where the last day of the InForce yrm is included between the bounds of the policy*/
    cond = [all_PIF['CHDRNUM']==trannolist['CHDRNUM'], 
            all_PIF['ZRENNO']==trannolist['ZRENNO'],
           trannolist['D_from']<=all_PIF['PIF_date']]
    PIF2 = all_PIF.join(trannolist, cond, 'left').select([all_PIF[c] for c in all_PIF.columns]+[trannolist['TRANNO'],trannolist['D_from']])
    transv_p2_pif = PIF2.sort(['CHDRNUM', 'ZRENNO', 'YRM', desc('D_from')]).drop_duplicates(['CHDRNUM', 'ZRENNO', 'YRM'])
    
    transv_p2_pif.write.parquet('{}transv_p2_pif.parquet'.format(output_folder))

In [3]:
acc_mth = 7
acc_yr = 2017
transv_pr_gr_psea_path = 'data/sas_402/transv_pr_gr_psea.parquet'
transv_polhistory_psea_path = 'data/sas_401/transv_polhistory_psea.parquet'
transv_p2_path = 'data/sas_405/transv_p2.parquet'
riskpf_path = '/group/axa_malaysia/data/adm_riskpf'
output_folder='data/sas_405/'

pillar2(acc_mth, acc_yr, transv_pr_gr_psea_path, transv_polhistory_psea_path, riskpf_path)
pillar2_summaries(acc_mth, acc_yr, riskpf_path, transv_polhistory_psea_path, transv_p2_path)

In [14]:
riskpf = spark.read.parquet(riskpf_path)[["CHDRNO","TRANNO","DATIME"]]
transv_polhistory_psea = spark.read.parquet(transv_polhistory_psea_path)
transv_p2 = spark.read.parquet(transv_p2_path)
transv_p2.cache()

#/*List of Trannos*/
riskpf = riskpf.sort("CHDRNO","TRANNO",desc("DATIME")).drop("DATIME").drop_duplicates(["CHDRNO","TRANNO"]).\
withColumnRenamed('CHDRNO','CHDRNUM')

trannolist = riskpf.join(transv_polhistory_psea[["CHDRNUM","TRANNO","ZRENNO","D_from","D_to"]], on=["CHDRNUM","TRANNO"])
trannolist = trannolist.toDF(*trannolist.columns)
trannolist.cache()

#*********************************************************
#*														*
#*					Underwriting KPIs					*
#*														*
#*********************************************************

#/*Append Relevant Tranno : The first one of each POI/ZRENNO*/
trannolist_uw = trannolist.sort("CHDRNUM", "ZRENNO", "D_from", "TRANNO").\
drop_duplicates(['CHDRNUM','ZRENNO'])[["CHDRNUM", "ZRENNO", "TRANNO"]]

uw = transv_p2[['CHDRNUM',
 'ZRENNO',
 'D_com',
 'D_exp',
 'CHDRSTCDC',
 'PC_RNW',
 'PC_NBZ',
 'GWP_NBZ_EE',
 'GWP_NBZ_IE',
 'GWP_RNW_EE',
 'GWP_RNW_IE']].filter(year('D_com')>=(acc_yr-4)).withColumn('yrm', date_format('D_com', 'yyyyMM'))

uw = uw.groupby(['CHDRNUM','ZRENNO','yrm','D_com','D_exp','CHDRSTCDC']).\
agg(sum('PC_RNW').alias('PC_RNW'),
    sum('PC_NBZ').alias('PC_NBZ'),
    sum('GWP_NBZ_EE').alias('GWP_NBZ_EE'),
    sum('GWP_NBZ_IE').alias('GWP_NBZ_IE'),
    sum('GWP_RNW_EE').alias('GWP_RNW_EE'),
    sum('GWP_RNW_IE').alias('GWP_RNW_IE'))

uw2 = uw.join(trannolist_uw, on=['CHDRNUM','ZRENNO'], how='left')

#*********************************************************
#*														*
#*						Expiring KPIs					*
#*														*
#*********************************************************
exp = transv_p2[['CHDRNUM',
 'ZRENNO',
 'D_com',
 'D_exp',
 'CHDRSTCDC',
 'pc_exp_rnabl_canc',
 'pc_exp_nonrn_canc',
 'pc_exp_rnabl_laps',
 'pc_exp_nonrn_expi',
 's_rnabl_gwp_canc',
 's_nonrn_gwp_canc',
 's_rnabl_gwp_laps',
 's_nonrn_gwp_expi',
 'pc_exp_rnabl_renw',
 'pc_exp_rnabl_repl',
 's_rnabl_gwp_renw',
 's_rnabl_gwp_repl',
 'GWP_RN_OLD_365', 
 'GWP_RN_NEW_365_EE']].filter(year('D_exp')>=(acc_yr-4)).withColumn('yrm', date_format('D_exp', 'yyyyMM'))

exp = exp.groupby(['CHDRNUM','ZRENNO','yrm','D_com','D_exp','CHDRSTCDC']).\
agg(sum('pc_exp_rnabl_canc').alias('pc_exp_rnabl_canc'),
    sum('pc_exp_nonrn_canc').alias('pc_exp_nonrn_canc'),
    sum('pc_exp_rnabl_laps').alias('pc_exp_rnabl_laps'),
    sum('pc_exp_nonrn_expi').alias('pc_exp_nonrn_expi'),
    sum('s_rnabl_gwp_canc').alias('s_rnabl_gwp_canc'),
    sum('s_nonrn_gwp_canc').alias('s_nonrn_gwp_canc'),
    sum('s_rnabl_gwp_laps').alias('s_rnabl_gwp_laps'),
    sum('s_nonrn_gwp_expi').alias('s_nonrn_gwp_expi'),
    sum('pc_exp_rnabl_renw').alias('pc_exp_rnabl_renw'),
    sum('pc_exp_rnabl_repl').alias('pc_exp_rnabl_repl'),
    sum('s_rnabl_gwp_renw').alias('s_rnabl_gwp_renw'),
    sum('s_rnabl_gwp_repl').alias('s_rnabl_gwp_repl'),
    sum('GWP_RN_OLD_365').alias('GWP_RN_OLD_365'), 
    sum('GWP_RN_NEW_365_EE').alias('GWP_RN_NEW_365_EE'))

#/*Append Relevant Tranno : The last one of each POI/ZRENNO*/
trannolist_exp = trannolist.sort("CHDRNUM", "ZRENNO", desc("D_to"), desc("TRANNO")).\
drop_duplicates(['CHDRNUM','ZRENNO'])[["CHDRNUM", "ZRENNO", "TRANNO"]]

exp2 = exp.join(trannolist_exp, on=['CHDRNUM','ZRENNO'], how='left')\

#*********************************************************
#*														*
#*			  Combine Expiring, UW & PIF KPIs			*
#*														*
#*********************************************************
exp2 = exp2.select(['*']+[lit(0).alias(c) for c in set(uw2.columns) - set(exp2.columns)])
uw2 = uw2.select(['*']+[lit(0).alias(c) for c in set(exp2.columns) - set(uw2.columns)])
p2_uwexp = uw2.union(exp2.select(uw2.columns))

transv_p2_uwexp = p2_uwexp.groupby(["CHDRNUM","ZRENNO","TRANNO","YRM","D_com","D_exp","CHDRSTCDC"]).\
agg(sum('PC_RNW').alias('PC_RNW'),
    sum('PC_NBZ').alias('PC_NBZ'),
    sum('GWP_NBZ_EE').alias('GWP_NBZ_EE'),
    sum('GWP_NBZ_IE').alias('GWP_NBZ_IE'),
    sum('GWP_RNW_EE').alias('GWP_RNW_EE'),
    sum('GWP_RNW_IE').alias('GWP_RNW_IE'),
    sum('pc_exp_rnabl_canc').alias('pc_exp_rnabl_canc'),
    sum('pc_exp_nonrn_canc').alias('pc_exp_nonrn_canc'),
    sum('pc_exp_rnabl_laps').alias('pc_exp_rnabl_laps'),
    sum('pc_exp_nonrn_expi').alias('pc_exp_nonrn_expi'),
    sum('s_rnabl_gwp_canc').alias('s_rnabl_gwp_canc'),
    sum('s_nonrn_gwp_canc').alias('s_nonrn_gwp_canc'),
    sum('s_rnabl_gwp_laps').alias('s_rnabl_gwp_laps'),
    sum('s_nonrn_gwp_expi').alias('s_nonrn_gwp_expi'),
    sum('pc_exp_rnabl_renw').alias('pc_exp_rnabl_renw'),
    sum('pc_exp_rnabl_repl').alias('pc_exp_rnabl_repl'),
    sum('s_rnabl_gwp_renw').alias('s_rnabl_gwp_renw'),
    sum('s_rnabl_gwp_repl').alias('s_rnabl_gwp_repl'),
    sum('GWP_RN_OLD_365').alias('GWP_RN_OLD_365'), 
    sum('GWP_RN_NEW_365_EE').alias('GWP_RN_NEW_365_EE'))


#*********************************************************
#*														*
#*					Policies in Force					*
#*														*
#*********************************************************

start_PIF = datetime.date(acc_yr-4,1,1).strftime('%Y-%m-%d')
end_PIF = datetime.date(acc_yr,acc_mth,calendar.monthrange(acc_yr, acc_mth)[1]).strftime('%Y-%m-%d')
PIF = transv_p2[['CHDRNUM',
 'ZRENNO',
 'D_com',
 'D_exp',
 'D_end',
 'CHDRSTCDC',
 'GWP_NBZ_IE',
 'GWP_RNW_IE']].filter((col('D_com')<=end_PIF)&(col('D_end')>=start_PIF))
PIF.cache()

start_date = datetime.date(acc_yr-4,1,1)
end_date = datetime.date(acc_yr,acc_mth,calendar.monthrange(acc_yr, acc_mth)[1])

result = []
today = end_date
current = start_date
while current <= end_date:
    result.append(datetime.date(current.year,current.month,calendar.monthrange(current.year, current.month)[1]))
    current += relativedelta(months=1)

all_PIF = None
for PIF_date in result:
    PIF_date_ymd = PIF_date.strftime('%Y-%m-%d')
    PIF_date_yrm = PIF_date.strftime('%Y%m')
    PIF_month = PIF.filter( (col('D_com')<=PIF_date_ymd)&(PIF_date_ymd<=col('D_end')) ).\
    withColumn('PIF', lit(1)).\
    withColumn('GWPIF',col('GWP_NBZ_IE')+col('GWP_RNW_IE')).\
    withColumn('YRM', lit(PIF_date_yrm)).\
    withColumn('PIF_date', lit(PIF_date_ymd).astype('date')).select(['CHDRNUM',
     'ZRENNO',
     'YRM',
     'D_com',
     'D_exp',
     'CHDRSTCDC',
     'PIF',
     'GWPIF',
     'PIF_date'])
    if all_PIF:
        all_PIF = all_PIF.union(PIF_month)
    else:
        all_PIF = PIF_month

#/*Append Relevant Tranno : The one where the last day of the InForce yrm is included between the bounds of the policy*/
cond = [all_PIF['CHDRNUM']==trannolist['CHDRNUM'],
        all_PIF['ZRENNO']==trannolist['ZRENNO'],
       trannolist['D_from']<=all_PIF['PIF_date']]
PIF2 = all_PIF.join(trannolist, cond, 'left').select([all_PIF[c] for c in all_PIF.columns]+[trannolist['TRANNO'],trannolist['D_from']])
transv_p2_pif = PIF2.sort(['CHDRNUM', 'ZRENNO', 'YRM', desc('D_from')]).drop_duplicates(['CHDRNUM', 'ZRENNO', 'YRM'])

In [1]:
spark.read.parquet('data/sas_405/transv_p2.parquet').count()

12715204

In [2]:
spark.read.parquet('data/sas_405/transv_p2_uwexp.parquet').count()

17423570

In [6]:
spark.read.parquet('data/sas_405/transv_p2_pif.parquet').count()

85582256

In [1]:
spark_chdrnum_count = spark.read.parquet('data/sas_405/transv_p2_pif.parquet').groupby('chdrnum').count()

In [None]:
spark_chdrnum_count.show()

In [3]:
sas_chdrnum_count = spark.read.csv(r'/group/axa_malaysia/raw/OSPREY/02_ACTLIB/01_Main/03_Transversal/P2_PIF.csv',header=True).groupby('chdrnum').count()

In [None]:
sas_chdrnum_count.show()

In [7]:
spark.read.parquet('data/sas_405/transv_p2_pif.parquet').filter(col('chdrnum').isin(['01059026','03411558'])).show(1000)

NameError: name 'col' is not defined

# getting list_poi2

In [43]:
acc_mth = 7
acc_yr = 2017
transv_pr_gr_psea_path = 'data/sas_402/transv_pr_gr_psea.parquet'
transv_polhistory_psea_path = 'data/sas_401/transv_polhistory_psea.parquet'
transv_p2_path = 'data/sas_405/transv_p2.parquet'
riskpf_path = '/group/axa_malaysia/data/adm_riskpf'
output_folder='data/sas_405/'

transv_pr_gr_psea = spark.read.parquet(transv_pr_gr_psea_path)
transv_polhistory_psea = spark.read.parquet(transv_polhistory_psea_path)
riskpf = spark.read.parquet(riskpf_path)[["CHDRNO","TRANNO","DATIME"]]

#/* Aggregate Premium */
premium = transv_pr_gr_psea.groupby(['chdrnum','tranno','trantype']).agg(sum('GWP').alias('gwp'), sum('CWP').alias('cwp'))

#/* Split GWP */
#/* Flag the issuances */
premium = premium.withColumn('gwp_poi',col('gwp')-col('cwp'))
premium = premium.withColumn('gwp_issu_poi',when(col('trantype').isin(["NB", "RN"]),col('gwp_poi')).otherwise(0))
premium = premium.withColumn('gwp_canc_poi',when(col('trantype').isin(["CA", "RE"]),col('gwp_poi')).otherwise(0)).drop('cwp')

#/*Map the POI from POLHISTORY to the premium transactions*/
map_zrenno = transv_polhistory_psea[['CHDRNUM','TRANNO','ZRENNO']].sort(['CHDRNUM','TRANNO']).drop_duplicates()

#/*Inner Join: if there is a mismatch, it can only be due to delay between the extractions of ZTRN & CHDR*/
premium2 = premium.join(map_zrenno, ['CHDRNUM','TRANNO'], 'inner')
premium3 = premium2.groupby('CHDRNUM','ZRENNO').agg(
    sum('gwp_poi').alias('gwp_poi'),
    sum('gwp_issu_poi').alias('gwp_issu_poi'),
    sum('gwp_canc_poi').alias('gwp_canc_poi'),
)

#/* ******** Policy Section ******** */
#/* ******************************** */
#/* From POLHISTORY, keep only the latest transaction for each POI (ZRENNO) - by descending D_to & descending TRANNO*/
list_poi = transv_polhistory_psea[['CHDRNUM',
 'D_com',
 'D_exp',
 'D_to',
 'ZRENNO',
 'TRANNO',
 'RENEWABLE',
 'D_cancel',
 'CHDRSTCDC',
 'CNTTYPE',
 'ORIG_POL',
 'LATEST_REPLACEMENT',
 'CNTBRANCH',
 'AGENTID']].sort('CHDRNUM','ZRENNO',desc('D_to'),desc('TRANNO')).drop_duplicates(subset=['CHDRNUM','ZRENNO']).drop('D_to')

#/*Map Premium for each POI*/
list_poi2 = list_poi.join(premium3,['CHDRNUM','ZRENNO'],'inner')

In [46]:
list_poi2.filter(col('CHDRNUM').isin(['01224897','01604469','02063058','02209829','03914724','90423642','91236854'])).show(1000)

+--------+------+----------+----------+------+---------+----------+---------+-------+--------+------------------+---------+-------+-----------------+-----------------+------------+
| CHDRNUM|ZRENNO|     D_com|     D_exp|TRANNO|RENEWABLE|  D_cancel|CHDRSTCDC|CNTTYPE|ORIG_POL|LATEST_REPLACEMENT|CNTBRANCH|AGENTID|          gwp_poi|     gwp_issu_poi|gwp_canc_poi|
+--------+------+----------+----------+------+---------+----------+---------+-------+--------+------------------+---------+-------+-----------------+-----------------+------------+
|90423642|     2|2014-01-05|2015-01-04|     4|        1|2014-01-05|      019|    AAD|90423642|          91236854|       98|  21734|              0.0|             60.0|       -60.0|
|01604469|     4|2010-04-08|2011-04-07|     5|        1|2999-12-31|      001|    VPP|01604469|          01604469|       18|  04580|           195.77|           195.77|         0.0|
|01604469|     7|2013-04-08|2014-04-07|     9|        1|2013-07-31|      001|    VPP|01604469| 

In [47]:
spark.read.csv(r'/group/axa_malaysia/raw/OSPREY/02_ACTLIB/01_Main/03_Transversal/LIST_POI2.csv',header=True)\
.filter(col('CHDRNUM').isin(['01224897','01604469','02063058','02209829','03914724','90423642','91236854'])).show(1000)

+--------+-------+------+---------+-------+---------+------+---------+---------+---------+---------+--------+------------------+-------+------------+------------+
| chdrnum|CNTTYPE|TRANNO|CNTBRANCH|agentid|CHDRSTCDC|ZRENNO|    d_com|    d_exp| d_cancel|renewable|orig_pol|latest_replacement|GWP_POI|GWP_ISSU_POI|GWP_CANC_POI|
+--------+-------+------+---------+-------+---------+------+---------+---------+---------+---------+--------+------------------+-------+------------+------------+
|01224897|    VPP|    11|       62|  04250|      001|     9|03SEP2012|02SEP2013|30APR2013|        1|01224897|          01224897| 336.34|      513.65|     -177.31|
|01224897|    VPP|     9|       62|  04250|      001|     8|03SEP2011|02SEP2012|31DEC2999|        1|01224897|          01224897| 507.57|      507.57|           0|
|01224897|    VPP|     8|       62|  04250|      001|     7|03SEP2010|02SEP2011|31DEC2999|        1|01224897|          01224897| 542.67|      542.67|           0|
|01224897|    VPP|    

In [67]:
import re
import pandas as pd
import numpy as np
import string
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Window
import datetime
import calendar
from dateutil.relativedelta import relativedelta
from dateutil.parser import parse
from datetime import datetime
def format_date1(strdate):
    try: 
        return datetime.strptime(str(strdate)[:3]+str(strdate)[3:5].lower()+str(strdate)[5:],'%d%b%Y').strftime('%Y-%m-%d')
    except: 
        return '2999-12-31'
_format_date1 = udf(format_date1,StringType())
def format_date2(strdate):
    try: 
        return datetime.strptime(str(strdate),'%Y%m%d').strftime('%Y-%m-%d')
    except: 
        return '2999-12-31'
_format_date2 = udf(format_date2,StringType())

In [78]:
chdrpf = spark.read.parquet(r'/group/axa_malaysia/data/adm_chdrpf')
sas_p2 = spark.read.csv(r'/group/axa_malaysia/raw/OSPREY/02_ACTLIB/01_Main/03_Transversal/P2.csv', header=True)
sas_p2_pif = spark.read.csv(r'/group/axa_malaysia/raw/OSPREY/02_ACTLIB/01_Main/03_Transversal/P2_PIF.csv', header=True)
sas_p2_pif = sas_p2_pif.withColumn('d_com', to_date(_format_date1(col('d_com'))))
spark_p2 = spark.read.parquet('data/sas_405/transv_p2.parquet')
spark_p2_pif = spark.read.parquet('data/sas_405/transv_p2_pif.parquet')

In [108]:
chdrpf_clean = chdrpf[['CHDRNUM','CCDate','STATCODE','REPNUM','DTECAN','ZRENNO','ZENDNO','TRANNO']]\
.withColumn('CCDate', to_date(_format_date2(col('CCDate'))))

In [109]:
chdrpf_grouped = chdrpf_clean.groupby('CHDRNUM','CCDate').agg(max('TRANNO').alias('TRANNO')).\
join(chdrpf_clean, how='left', on=['CHDRNUM','CCDate','TRANNO'])

In [110]:
chdrpf_grouped.cache()
chdrpf_grouped.show(5)

+--------+----------+------+--------+------+------+------+------+
| CHDRNUM|    CCDate|TRANNO|STATCODE|REPNUM|DTECAN|ZRENNO|ZENDNO|
+--------+----------+------+--------+------+------+------+------+
|00279708|2999-12-31|     1|      QR|      |     0|     0|     0|
|00279898|2999-12-31|     1|      QR|      |     0|     0|     0|
|00281240|2999-12-31|     1|      QR|      |     0|     0|     0|
|00281735|2999-12-31|     1|      QR|      |     0|     0|     0|
|00282006|2999-12-31|     1|      QR|      |     0|     0|     0|
+--------+----------+------+--------+------+------+------+------+
only showing top 5 rows



In [111]:
chdrpf_grouped.count()

18780409

In [112]:
sas_pif_sum = sas_p2_pif.groupby('chdrnum','d_com').agg(sum('pif').cast("int").alias('sas_pif_sum')).withColumnRenamed('d_com','CCDate')
spark_pif_sum = spark_p2_pif.groupby('chdrnum','d_com').agg(sum('pif').cast("int").alias('spark_pif_sum')).withColumnRenamed('d_com','CCDate')

In [113]:
chdrpf_sas_spark_pif = chdrpf_grouped\
.join(sas_pif_sum, ['CHDRNUM','CCDate'])\
.join(spark_pif_sum, ['CHDRNUM','CCDate'])

In [114]:
chdrpf_sas_spark_pif1 = chdrpf_sas_spark_pif.filter(col('sas_pif_sum')!=col('spark_pif_sum'))

In [115]:
chdrpf_sas_spark_pif1.cache()

DataFrame[CHDRNUM: string, CCDate: date, TRANNO: int, STATCODE: string, REPNUM: string, DTECAN: int, ZRENNO: int, ZENDNO: int, sas_pif_sum: int, spark_pif_sum: int]

In [116]:
chdrpf_sas_spark_pif1.count()

2836

In [117]:
chdrpf_sas_spark_pif1.coalesce(1).write.csv('chdrpf_sas_spark_pif1.csv',header=True,mode='overwrite')

In [5]:
spark_p2_pif.filter(col('chdrnum')=='91236854').show(100)

+--------+------+------+----------+----------+---------+---+-----+----------+------+----------+
| CHDRNUM|ZRENNO|   YRM|     D_com|     D_exp|CHDRSTCDC|PIF|GWPIF|  PIF_date|TRANNO|    D_from|
+--------+------+------+----------+----------+---------+---+-----+----------+------+----------+
|91236854|     0|201402|2014-01-16|2015-01-15|      019|  1| 60.0|2014-02-28|     1|2014-01-16|
|91236854|     0|201407|2014-01-16|2015-01-15|      019|  1| 60.0|2014-07-31|     1|2014-01-16|
|91236854|     0|201401|2014-01-16|2015-01-15|      019|  1| 60.0|2014-01-31|     1|2014-01-16|
|91236854|     0|201408|2014-01-16|2015-01-15|      019|  1| 60.0|2014-08-31|     1|2014-01-16|
|91236854|     0|201404|2014-01-16|2015-01-15|      019|  1| 60.0|2014-04-30|     1|2014-01-16|
|91236854|     0|201405|2014-01-16|2015-01-15|      019|  1| 60.0|2014-05-31|     1|2014-01-16|
|91236854|     0|201409|2014-01-16|2015-01-15|      019|  1| 60.0|2014-09-30|     1|2014-01-16|
|91236854|     0|201412|2014-01-16|2015-

In [19]:
sas_p2.filter(col('orig_pol')=='02063058').show(100)

+--------+-------+------+---------+-------+---------+------+---------+---------+---------+--------+------------------+------------+----------+-----------+--------+--------+-----------+---------+--------+------+----------+----------+------+----------+----------+-----------------+----------------+-----------------+----------------+-----------------+----------------+-----------------+----------------+-----------------+-----------------+--------------+-------------+-------------+-----------------+-----------------+----------------+----------------+
| chdrnum|CNTTYPE|TRANNO|CNTBRANCH|agentid|CHDRSTCDC|ZRENNO|    d_com|    d_exp|renewable|orig_pol|latest_replacement|GWP_CANC_POI|NEW_BRANCH|NEW_CNTTYPE|NEW_LINE| NEW_POL|NEW_AGENTID|    D_end|ABNORMAL|PC_RNW|GWP_RNW_EE|GWP_RNW_IE|PC_NBZ|GWP_NBZ_EE|GWP_NBZ_IE|PC_EXP_RNABL_CANC|S_RNABL_GWP_CANC|PC_EXP_NONRN_CANC|S_NONRN_GWP_CANC|PC_EXP_NONRN_EXPI|S_NONRN_GWP_EXPI|PC_EXP_RNABL_LAPS|S_RNABL_GWP_LAPS|PC_EXP_RNABL_RENW|PC_EXP_RNABL_REPL|GWP_RN_OLD_3

In [26]:
spark_p2.filter(col('orig_pol')=='02063058').show(100)

+--------+------+----------+----------+------+---------+---------+-------+--------+------------------+---------+-------+------------+--------+------+------+----------+----------+----------+----------+-----------------+-----------------+-----------------+-----------------+----------------+----------------+----------------+----------------+----------+--------+--------+----------+-----------+-----------+------------------+-------------+-------------+------------------+------------------+-----------------+-----------------+----------------+----------------+
| CHDRNUM|ZRENNO|     D_com|     D_exp|TRANNO|RENEWABLE|CHDRSTCDC|CNTTYPE|ORIG_POL|LATEST_REPLACEMENT|CNTBRANCH|AGENTID|gwp_canc_poi|abnormal|pc_rnw|pc_nbz|gwp_rnw_ee|gwp_nbz_ee|gwp_rnw_ie|gwp_nbz_ie|pc_exp_rnabl_canc|pc_exp_nonrn_canc|pc_exp_rnabl_laps|pc_exp_nonrn_expi|s_rnabl_gwp_canc|s_nonrn_gwp_canc|s_rnabl_gwp_laps|s_nonrn_gwp_expi|     D_end| new_pol|new_line|new_branch|new_cnttype|new_agentid|    gwp_rn_old_365|gwp_rn_new_ee|gwp

In [12]:
sas_p2_pif.filter(col('chdrnum').isin(['90423642','91236854'])).show(100)

+--------+---------+------+---------+---------+---+-----+------+------+
| chdrnum|CHDRSTCDC|ZRENNO|    d_com|    d_exp|PIF|GWPIF|   YRM|TRANNO|
+--------+---------+------+---------+---------+---+-----+------+------+
|90423642|      019|     1|05JAN2013|04JAN2014|  1|   60|201301|     2|
|90423642|      019|     1|05JAN2013|04JAN2014|  1|   60|201302|     2|
|90423642|      019|     1|05JAN2013|04JAN2014|  1|   60|201303|     2|
|90423642|      019|     1|05JAN2013|04JAN2014|  1|   60|201304|     2|
|90423642|      019|     1|05JAN2013|04JAN2014|  1|   60|201305|     2|
|90423642|      019|     1|05JAN2013|04JAN2014|  1|   60|201306|     2|
|90423642|      019|     1|05JAN2013|04JAN2014|  1|   60|201307|     2|
|90423642|      019|     1|05JAN2013|04JAN2014|  1|   60|201308|     2|
|90423642|      019|     1|05JAN2013|04JAN2014|  1|   60|201309|     2|
|90423642|      019|     1|05JAN2013|04JAN2014|  1|   60|201310|     2|
|90423642|      019|     1|05JAN2013|04JAN2014|  1|   60|201311|

In [14]:
spark_p2_pif.filter(col('chdrnum').isin(['90423642','91236854'])).show(100)

+--------+------+------+----------+----------+---------+---+-----+----------+------+----------+
| CHDRNUM|ZRENNO|   YRM|     D_com|     D_exp|CHDRSTCDC|PIF|GWPIF|  PIF_date|TRANNO|    D_from|
+--------+------+------+----------+----------+---------+---+-----+----------+------+----------+
|91236854|     0|201402|2014-01-16|2015-01-15|      019|  1| 60.0|2014-02-28|     1|2014-01-16|
|90423642|     1|201301|2013-01-05|2014-01-04|      019|  1| 60.0|2013-01-31|     2|2013-01-05|
|90423642|     1|201306|2013-01-05|2014-01-04|      019|  1| 60.0|2013-06-30|     2|2013-01-05|
|90423642|     1|201305|2013-01-05|2014-01-04|      019|  1| 60.0|2013-05-31|     2|2013-01-05|
|91236854|     0|201407|2014-01-16|2015-01-15|      019|  1| 60.0|2014-07-31|     1|2014-01-16|
|90423642|     1|201304|2013-01-05|2014-01-04|      019|  1| 60.0|2013-04-30|     2|2013-01-05|
|90423642|     1|201310|2013-01-05|2014-01-04|      019|  1| 60.0|2013-10-31|     2|2013-01-05|
|91236854|     0|201401|2014-01-16|2015-

In [13]:
chdrpf.filter(col('chdrnum')=='91236854').show(1000)

+-------+-------+--------+--------+-------+------+---------+--------+--------+--------+---------+--------+--------+--------+--------+--------+-------+-------+-------+--------+--------+---------+-------+-------+--------+--------+---------+---------+------+-----+------+-------+--------+-------+------+------+---------------+--------------------+
|CHDRPFX|CHDRCOY| CHDRNUM|SERVUNIT|CNTTYPE|TRANNO|VALIDFLAG|CURRFROM|  CURRTO|STATCODE|STATREASN|STATDATE|STATTRAN| OCCDATE|  CCDATE|  CRDATE|RNLTYPE|RNLDURN|REPTYPE|  REPNUM| COWNNUM|CNTBRANCH|AGNTNUM|PAYPLAN|CAMPAIGN|NOFRISKS|CHDRSTCDA|CHDRSTCDC|MPLNUM|COPPN|COTYPE|COVERNT|  DTECAN|QUOTENO|ZRENNO|ZENDNO|       ZREPOLNO|              datime|
+-------+-------+--------+--------+-------+------+---------+--------+--------+--------+---------+--------+--------+--------+--------+--------+-------+-------+-------+--------+--------+---------+-------+-------+--------+--------+---------+---------+------+-----+------+-------+--------+-------+------+------+---

In [12]:
spark_chdrnum_count = spark_p2_pif.groupby('chdrnum').count()

In [16]:
spark_chdrnum_count = spark_chdrnum_count.withColumnRenamed('count','spark_count')

In [21]:
chdrnum_count = sas_chdrnum_count.join(spark_chdrnum_count, on='chdrnum').filter(col('sas_count')!=col('spark_count'))

In [23]:
chdrnum_count.show(1000)

+--------+---------+-----------+
| chdrnum|sas_count|spark_count|
+--------+---------+-----------+
|01224897|        3|          4|
|01604469|        6|          7|
|02209829|       51|         39|
|02705003|        4|          5|
|03269785|        8|          9|
|03321751|        3|          4|
|03940091|       12|         13|
|71376965|       12|         13|
|71464035|       12|         13|
|90230519|       50|         51|
|90423642|       24|         12|
|90565923|        6|          7|
|90897908|        6|          7|
|91065696|        5|          6|
|91146579|       36|         37|
|91169877|        7|          8|
|91171650|        7|          8|
|91249202|       36|         37|
|91253247|       36|         37|
|91299953|       37|         38|
|91377841|       36|         37|
|91391767|       36|         37|
|91844169|        8|          9|
|92570269|        7|          8|
|S0121525|        2|          3|
|S0517172|       12|         13|
|02617368|        3|          4|
|02738020|

In [2]:
import re
import pandas as pd
import numpy as np
import string
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Window
import datetime
import calendar
from dateutil.relativedelta import relativedelta

In [29]:
spark.read.parquet('data/sas_405/transv_p2_pif.parquet').show(100)

+--------+------+------+----------+----------+---------+---+------------------+----------+------+----------+
| CHDRNUM|ZRENNO|   YRM|     D_com|     D_exp|CHDRSTCDC|PIF|             GWPIF|  PIF_date|TRANNO|    D_from|
+--------+------+------+----------+----------+---------+---+------------------+----------+------+----------+
|01001006|     2|201706|2004-04-27|2005-04-26|      008|  1|             180.0|2017-06-30|     5|2004-04-27|
|01001024|     0|201401|2002-07-03|2002-10-02|      016|  1|               0.0|2014-01-31|     2|2002-07-03|
|01001027|     0|201411|2002-07-08|2003-07-07|      019|  1|             531.0|2014-11-30|     1|2002-07-08|
|01001028|     1|201504|2003-08-01|2004-07-31|      019|  1|             239.4|2015-04-30|     3|2003-08-01|
|01001030|     0|201404|2002-06-01|2003-05-31|      003|  1|             244.8|2014-04-30|     3|2002-06-01|
|01001030|     0|201409|2002-06-01|2003-05-31|      003|  1|             244.8|2014-09-30|     3|2002-06-01|
|01001031|     2|20

In [5]:
spark.read.parquet('data/sas_401/transv_polhistory_psea.parquet').filter(col('chdrnum')=='01001017').show(100)

+--------+--------+------+---------+-------+--------+---------+--------+-------+--------+-------+------+------+--------+--------+----+----------+----------+----------+----------+----------+----------+---------+-------+-------+--------+------------------+-----------+------------+
|servunit| chdrnum|tranno|chdrstcdc|cnttype| ownerid|cntbranch|nofrisks|reptype|statcode|agentid|zendno|zrenno|stattran|campaign|  id|    d_from|      d_to|  d_oricom|     d_com|     d_exp|  d_cancel|renewable|replnum|compnum|orig_pol|latest_replacement|d_first_com|update_count|
+--------+--------+------+---------+-------+--------+---------+--------+-------+--------+-------+------+------+--------+--------+----+----------+----------+----------+----------+----------+----------+---------+-------+-------+--------+------------------+-----------+------------+
|      FG|01001017|     1|      018|    STS|00565941|       63|       1|       |      IF|  20139|     0|     0|       1|        |PSEA|2002-07-01|2003-06-30|2002

In [18]:
spark.read.parquet('data/sas_405/transv_p2.parquet').filter(col('orig_pol')=='2063058').show(100)

+-------+------+-----+-----+------+---------+---------+-------+--------+------------------+---------+-------+------------+--------+------+------+----------+----------+----------+----------+-----------------+-----------------+-----------------+-----------------+----------------+----------------+----------------+----------------+-----+-------+--------+----------+-----------+-----------+--------------+-------------+-------------+-----------------+-----------------+-----------------+-----------------+----------------+----------------+
|CHDRNUM|ZRENNO|D_com|D_exp|TRANNO|RENEWABLE|CHDRSTCDC|CNTTYPE|ORIG_POL|LATEST_REPLACEMENT|CNTBRANCH|AGENTID|gwp_canc_poi|abnormal|pc_rnw|pc_nbz|gwp_rnw_ee|gwp_nbz_ee|gwp_rnw_ie|gwp_nbz_ie|pc_exp_rnabl_canc|pc_exp_nonrn_canc|pc_exp_rnabl_laps|pc_exp_nonrn_expi|s_rnabl_gwp_canc|s_nonrn_gwp_canc|s_rnabl_gwp_laps|s_nonrn_gwp_expi|D_end|new_pol|new_line|new_branch|new_cnttype|new_agentid|gwp_rn_old_365|gwp_rn_new_ee|gwp_rn_new_ie|gwp_rn_new_365_ee|gwp_rn_new_365_

In [3]:
spark.read.parquet('data/sas_405/transv_p2_pif.parquet').filter(col('chdrnum')=='01001017').show(100)

+--------+------+------+----------+----------+---------+---+-----+----------+------+----------+
| CHDRNUM|ZRENNO|   YRM|     D_com|     D_exp|CHDRSTCDC|PIF|GWPIF|  PIF_date|TRANNO|    D_from|
+--------+------+------+----------+----------+---------+---+-----+----------+------+----------+
|01001017|     0|201401|2002-07-01|2014-06-30|      018|  1|  0.0|2014-01-31|    18|2013-06-30|
|01001017|     0|201412|2002-07-01|2014-06-30|      018|  1|  0.0|2014-12-31|    18|2013-06-30|
|01001017|     0|201410|2002-07-01|2014-06-30|      018|  1|  0.0|2014-10-31|    18|2013-06-30|
|01001017|     0|201307|2002-07-01|2014-06-30|      018|  1|  0.0|2013-07-31|    18|2013-06-30|
|01001017|     0|201409|2002-07-01|2014-06-30|      018|  1|  0.0|2014-09-30|    18|2013-06-30|
|01001017|     0|201406|2002-07-01|2014-06-30|      018|  1|  0.0|2014-06-30|    18|2013-06-30|
|01001017|     0|201609|2002-07-01|2014-06-30|      018|  1|  0.0|2016-09-30|    18|2013-06-30|
|01001017|     0|201405|2002-07-01|2014-

In [30]:
spark.read.csv('/group/axa_malaysia/raw/OSPREY/02_ACTLIB/01_Main/03_Transversal/P2_PIF.csv',
              header =True).filter(col('chdrnum')=='01001017').show(100)

+--------+---------+------+---------+---------+---+-----+------+------+
| chdrnum|CHDRSTCDC|ZRENNO|    d_com|    d_exp|PIF|GWPIF|   YRM|TRANNO|
+--------+---------+------+---------+---------+---+-----+------+------+
|01001017|      018|     0|01JUL2002|30JUN2014|  1|    0|201301|    17|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|    0|201302|    17|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|    0|201303|    17|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|    0|201304|    17|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|    0|201305|    17|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|    0|201306|    18|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|    0|201307|    18|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|    0|201308|    18|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|    0|201309|    18|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|    0|201310|    18|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|    0|201311|

In [28]:
spark.read.csv('/group/axa_malaysia/raw/OSPREY/02_ACTLIB/01_Main/03_Transversal/P2_PIF.csv',
              header =True).show(100)

+--------+---------+------+---------+---------+---+------+------+------+
| chdrnum|CHDRSTCDC|ZRENNO|    d_com|    d_exp|PIF| GWPIF|   YRM|TRANNO|
+--------+---------+------+---------+---------+---+------+------+------+
|01001017|      018|     0|01JUL2002|30JUN2014|  1|     0|201301|    17|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|     0|201302|    17|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|     0|201303|    17|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|     0|201304|    17|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|     0|201305|    17|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|     0|201306|    18|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|     0|201307|    18|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|     0|201308|    18|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|     0|201309|    18|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|     0|201310|    18|
|01001017|      018|     0|01JUL2002|30JUN2014|  1|

In [3]:
from pyspark.sql.functions import *

In [9]:
p2_pif = spark.read.parquet('data/sas_405/transv_p2_pif.parquet')

In [10]:
p2_pif.count()

85613395

In [12]:
896/3235

0.2769706336939722

In [15]:
PIF.filter(col('chdrnum').isin(['01059026','03411558'])).show(1000)

+--------+------+----------+----------+----------+---------+----------+------------------+
| CHDRNUM|ZRENNO|     D_com|     D_exp|     D_end|CHDRSTCDC|GWP_NBZ_IE|        GWP_RNW_IE|
+--------+------+----------+----------+----------+---------+----------+------------------+
|01059026|    10|2012-12-02|2013-12-01|2013-12-01|      005|       0.0|               0.0|
|03411558|     0|2012-12-02|2013-12-01|2013-12-01|      005|       0.0|            3332.0|
|03411558|     1|2013-12-02|2014-12-01|2014-12-01|      005|       0.0|            2744.0|
|03411558|     2|2014-12-02|2015-12-01|2015-11-03|      005|       0.0|2525.9900000000002|
+--------+------+----------+----------+----------+---------+----------+------------------+



In [11]:
p2_pif.filter(col('chdrnum').isin(['01059026','03411558'])).show(1000)

+--------+------+------+----------+----------+---------+---+------------------+----------+------+----------+
| CHDRNUM|ZRENNO|   YRM|     D_com|     D_exp|CHDRSTCDC|PIF|             GWPIF|  PIF_date|TRANNO|    D_from|
+--------+------+------+----------+----------+---------+---+------------------+----------+------+----------+
|01059026|    10|201305|2012-12-02|2013-12-01|      005|  1|               0.0|2013-05-31|    21|2012-12-02|
|03411558|     0|201305|2012-12-02|2013-12-01|      005|  1|            3332.0|2013-05-31|     1|2012-12-02|
|01059026|    10|201307|2012-12-02|2013-12-01|      005|  1|               0.0|2013-07-31|    21|2012-12-02|
|03411558|     2|201504|2014-12-02|2015-12-01|      005|  1|2525.9900000000002|2015-04-30|     3|2014-12-02|
|03411558|     0|201308|2012-12-02|2013-12-01|      005|  1|            3332.0|2013-08-31|     1|2012-12-02|
|03411558|     1|201409|2013-12-02|2014-12-01|      005|  1|            2744.0|2014-09-30|     2|2013-12-02|
|01059026|    10|20

In [1]:
import re
import pandas as pd
import numpy as np
import string
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Window
import datetime
import calendar
from dateutil.relativedelta import relativedelta

acc_mth = 7
acc_yr = 2017
transv_pr_gr_psea_path = 'data/sas_402/transv_pr_gr_psea.parquet'
transv_polhistory_psea_path = 'data/sas_401/transv_polhistory_psea.parquet'
transv_p2_path = 'data/sas_405/transv_p2.parquet'
riskpf_path = '/group/axa_malaysia/data/adm_riskpf'


transv_p2 = spark.read.parquet("data/sas_405/transv_p2.parquet")

# pillar2(acc_mth, acc_yr, transv_pr_gr_psea_path, transv_polhistory_psea_path, riskpf_path)
# pillar2_summaries(acc_mth, acc_yr, riskpf_path, transv_polhistory_psea_path, transv_p2_path)

In [2]:
start_PIF = datetime.date(acc_yr-4,1,1).strftime('%Y-%m-%d')
end_PIF = datetime.date(acc_yr,acc_mth,calendar.monthrange(acc_yr, acc_mth)[1]).strftime('%Y-%m-%d')
PIF = transv_p2[['CHDRNUM',
 'ZRENNO',
 'D_com',
 'D_exp',
 'D_end',
 'CHDRSTCDC',
 'GWP_NBZ_IE',
 'GWP_RNW_IE']].filter((col('D_com')<=end_PIF)&(col('D_end')>=start_PIF))
PIF.cache()

start_date = datetime.date(acc_yr-4,1,1)
end_date = datetime.date(acc_yr,acc_mth,calendar.monthrange(acc_yr, acc_mth)[1])

result = []
today = end_date
current = start_date
while current <= end_date:
    result.append(datetime.date(current.year,current.month,calendar.monthrange(current.year, current.month)[1]))
    current += relativedelta(months=1)

all_PIF = None
for PIF_date in result:
    PIF_date_ymd = PIF_date.strftime('%Y-%m-%d')
    PIF_date_yrm = PIF_date.strftime('%Y%m')
    PIF_month = PIF.filter( (col('D_com')<=PIF_date_ymd)&(PIF_date_ymd<=col('D_end')) ).\
    withColumn('PIF', lit(1)).\
    withColumn('GWPIF',col('GWP_NBZ_IE')+col('GWP_RNW_IE')).\
    withColumn('YRM', lit(PIF_date_yrm)).\
    withColumn('PIF_date', lit(PIF_date_ymd).astype('date')).\
    filter(~((col('D_end') != col('D_exp')) & (col('D_end')==lit(PIF_date_ymd)))).select(['CHDRNUM',
     'ZRENNO',
     'YRM',
     'D_com',
     'D_exp',
     'CHDRSTCDC',
     'PIF',
     'GWPIF',
     'PIF_date'])
    if all_PIF:
        all_PIF = all_PIF.union(PIF_month)
    else:
        all_PIF = PIF_month
    print(all_PIF.count(),PIF_month.count)
    break


732047 <bound method DataFrame.count of DataFrame[CHDRNUM: string, ZRENNO: int, YRM: string, D_com: date, D_exp: date, CHDRSTCDC: string, PIF: int, GWPIF: double, PIF_date: date]>


In [6]:
p2_pif_spark = all_PIF.toPandas()

In [17]:
jan_2013['chdrnum'] = jan_2013['chdrnum'].map(lambda x: x.decode('utf-8'))
jan_2013['CHDRSTCDC'] = jan_2013['CHDRSTCDC'].map(lambda x: x.decode('utf-8'))

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  if __name__ == '__main__':
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  from ipykernel import kernelapp as app


In [8]:
import pickle
p2_pif_sas = pickle.load(open('p2_pif.pickle','rb'))

In [3]:
#jan_2013 = p2_pif_sas[p2_pif_sas['YRM']==201301]
jan_2013 = spark.read.csv('/group/axa_malaysia/raw/OSPREY/02_ACTLIB/01_Main/03_Transversal/PIF201301.csv',header=True).toPandas()

In [4]:
jan_2013 = jan_2013.rename(columns={'chdrnum':'CHDRNUM'})

In [5]:
jan_2013['ZRENNO'] = jan_2013['ZRENNO'].map(lambda x: int(x))

In [7]:
joined_sas_spark = p2_pif_spark.merge(jan_2013, 'left',['CHDRNUM','ZRENNO'])

In [37]:
joined_sas_spark[pd.isnull(joined_sas_spark['PIF_y'])]

Unnamed: 0,CHDRNUM,ZRENNO,YRM_x,D_com,D_exp,CHDRSTCDC_x,PIF_x,GWPIF_x,PIF_date_x,CHDRSTCDC_y,d_com,d_exp,PIF_date_y,PIF_y,GWPIF_y,YRM_y
294,01888151,4,201301,2012-04-01,2013-03-31,003,1,63.81,2013-01-31,,,,,,,
923,03129602,1,201301,2012-06-23,2013-06-22,012,1,6000.00,2013-01-31,,,,,,,
7635,03337939,0,201301,2013-01-01,2013-12-31,006,1,1470.60,2013-01-31,,,,,,,
7797,01946633,4,201301,2012-07-01,2013-06-30,008,1,3256.20,2013-01-31,,,,,,,
7801,01953883,4,201301,2012-07-01,2013-06-30,005,1,3292.98,2013-01-31,,,,,,,
8065,02510433,3,201301,2013-01-01,2013-12-31,005,1,150.00,2013-01-31,,,,,,,
8275,03032569,2,201301,2013-01-01,2013-12-31,019,1,1092.50,2013-01-31,,,,,,,
11801,02667993,2,201301,2012-10-16,2013-10-15,001,1,1442.50,2013-01-31,,,,,,,
12118,03164264,1,201301,2012-09-07,2013-09-06,019,1,281.00,2013-01-31,,,,,,,
14343,90704819,0,201301,2012-11-16,2013-04-25,001,1,453.42,2013-01-31,,,,,,,


In [8]:
error_df = spark.createDataFrame(joined_sas_spark[pd.isnull(joined_sas_spark['PIF_y'])][['CHDRNUM','ZRENNO']])
error_df.cache()
error_df.count()

260

In [9]:
transv_p2 = spark.read.parquet("data/sas_405/transv_p2.parquet")
transv_p2_error = transv_p2.join(error_df,['CHDRNUM','ZRENNO'])[['CHDRNUM',
 'ZRENNO',
 'D_com',
'D_end',
 'D_exp',
 'TRANNO',
 'RENEWABLE',
 'CHDRSTCDC',
 'CNTTYPE',
 'ORIG_POL',
 'LATEST_REPLACEMENT',
 'CNTBRANCH',
 'AGENTID']]
transv_p2_error.count()
#PIF.filter( (col('D_com')<=PIF_date_ymd)&(PIF_date_ymd<=col('D_end')) ).\
#    filter(~((col('D_end') != col('D_exp')) & (col('D_end')==lit(PIF_date_ymd)))).select(['CHDRNUM',

260

In [16]:
transv_p2_error.sort(['CHDRNUM','ZRENNO']).show(1000)

+--------+------+----------+----------+----------+------+---------+---------+-------+--------+------------------+---------+-------+
| CHDRNUM|ZRENNO|     D_com|     D_end|     D_exp|TRANNO|RENEWABLE|CHDRSTCDC|CNTTYPE|ORIG_POL|LATEST_REPLACEMENT|CNTBRANCH|AGENTID|
+--------+------+----------+----------+----------+------+---------+---------+-------+--------+------------------+---------+-------+
|01059026|    10|2012-12-02|2013-12-01|2013-12-01|    22|        1|      005|    PAX|01059026|          03411558|       62|  03864|
|01282165|     9|2013-01-01|2013-12-31|2013-12-31|    14|        1|      003|    SHP|01282165|          03356614|       19|  04878|
|01341347|     8|2012-07-24|2013-07-23|2013-07-23|    10|        1|      019|    SAS|01199291|          03450338|       14|  03936|
|01361126|     8|2012-09-06|2013-09-05|2013-09-05|    10|        1|      019|    SAP|01361126|          03314984|       33|  20126|
|01389500|     8|2012-11-01|2013-10-31|2013-10-31|    10|        1|      003

In [17]:
transv_sas_p2 = spark.read.csv("/group/axa_malaysia/raw/OSPREY/02_ACTLIB/01_Main/03_Transversal/P2.csv",header=True)
transv_sas_p2.cache()
transv_sas_p2_error = transv_sas_p2.join(error_df,['CHDRNUM','ZRENNO'])[['CHDRNUM',
 'ZRENNO',
 'D_com',
'D_end',
 'D_exp',
 'TRANNO',
 'RENEWABLE',
 'CHDRSTCDC',
 'CNTTYPE',
 'ORIG_POL',
 'LATEST_REPLACEMENT',
 'CNTBRANCH',
 'AGENTID']]

In [18]:
transv_sas_p2_error.count()

260

In [19]:
transv_sas_p2_error.sort(['CHDRNUM','ZRENNO']).show(1000)

+--------+------+---------+---------+---------+------+---------+---------+-------+--------+------------------+---------+-------+
| CHDRNUM|ZRENNO|    D_com|    D_end|    D_exp|TRANNO|RENEWABLE|CHDRSTCDC|CNTTYPE|ORIG_POL|LATEST_REPLACEMENT|CNTBRANCH|AGENTID|
+--------+------+---------+---------+---------+------+---------+---------+-------+--------+------------------+---------+-------+
|01059026|    10|02DEC2012|02DEC2012|01DEC2013|    22|        1|      005|    PAX|01059026|          01059026|       62|  03864|
|01282165|     9|01JAN2013|01JAN2013|31DEC2013|    14|        1|      003|    SHP|01282165|          01282165|       19|  04878|
|01341347|     8|24JUL2012|24JUL2012|23JUL2013|    10|        1|      019|    SAS|01341347|          01341347|       14|  03936|
|01361126|     8|06SEP2012|06SEP2012|05SEP2013|    10|        1|      019|    SAP|01361126|          01361126|       33|  20126|
|01389500|     8|01NOV2012|01NOV2012|31OCT2013|    10|        1|      003|    SHP|01389500|      

In [52]:
import pickle

In [50]:
pd_error_df = joined_sas_spark[pd.isnull(joined_sas_spark['PIF_y'])][['CHDRNUM','ZRENNO']]

In [53]:
p2_sas = pickle.load(open('p2.pickle','rb'))

In [54]:
p2_sas['chdrnum'] = p2_sas['chdrnum'].map(lambda x: x.decode('utf-8'))

In [55]:
p2_sas = p2_sas.rename(columns={'chdrnum':'CHDRNUM'})

In [56]:
df22 = p2_sas.merge(pd_error_df, 'inner',['CHDRNUM','ZRENNO'])

In [57]:
df22 = df22[['CHDRNUM', 'CNTTYPE', 'TRANNO', 'CNTBRANCH', 'agentid', 'CHDRSTCDC',
       'ZRENNO', 'd_com','D_end', 'd_exp', 'renewable', 'orig_pol','latest_replacement']]

In [58]:
df22['d_com'] = pd.to_timedelta(df22['d_com'], unit='D') + pd.datetime(1960, 1, 1)
df22['D_end'] = pd.to_timedelta(df22['D_end'], unit='D') + pd.datetime(1960, 1, 1)
df22['d_exp'] = pd.to_timedelta(df22['d_exp'], unit='D') + pd.datetime(1960, 1, 1)

In [60]:
df22.sort_values(['CHDRNUM','ZRENNO'])

Unnamed: 0,CHDRNUM,CNTTYPE,TRANNO,CNTBRANCH,agentid,CHDRSTCDC,ZRENNO,d_com,D_end,d_exp,renewable,orig_pol,latest_replacement
0,01059026,b'PAX',22.0,b'62',b'03864',b'005',10.0,2012-12-02,2012-12-02,2013-12-01,1.0,b'01059026',b'01059026'
1,01282165,b'SHP',14.0,b'19',b'04878',b'003',9.0,2013-01-01,2013-01-01,2013-12-31,1.0,b'01282165',b'01282165'
2,01341347,b'SAS',10.0,b'14',b'03936',b'019',8.0,2012-07-24,2012-07-24,2013-07-23,1.0,b'01341347',b'01341347'
3,01361126,b'SAP',10.0,b'33',b'20126',b'019',8.0,2012-09-06,2012-09-06,2013-09-05,1.0,b'01361126',b'01361126'
4,01389500,b'SHP',10.0,b'16',b'04536',b'003',8.0,2012-11-01,2012-11-01,2013-10-31,1.0,b'01389500',b'01389500'
5,01389514,b'SHP',10.0,b'16',b'04536',b'003',8.0,2012-11-01,2012-11-01,2013-10-31,1.0,b'01389514',b'01389514'
6,01422333,b'SSP',9.0,b'19',b'05053',b'006',7.0,2012-02-17,2012-02-17,2013-02-16,1.0,b'01422333',b'01422333'
7,01446794,b'PFI',21.0,b'68',b'31632',b'004',8.0,2013-01-01,2013-01-01,2013-12-31,1.0,b'01446794',b'01446794'
8,01474263,b'VPP',10.0,b'68',b'31632',b'001',7.0,2012-12-21,2012-12-21,2013-12-20,1.0,b'01474263',b'01474263'
9,01478915,b'SHQ',11.0,b'37',b'05173',b'003',7.0,2012-06-16,2012-06-16,2013-06-15,1.0,b'01478915',b'01478915'


In [61]:
transv_p2_error.sort(['CHDRNUM','ZRENNO']).show()

+--------+------+----------+----------+----------+------+---------+---------+-------+--------+------------------+---------+-------+
| CHDRNUM|ZRENNO|     D_com|     D_end|     D_exp|TRANNO|RENEWABLE|CHDRSTCDC|CNTTYPE|ORIG_POL|LATEST_REPLACEMENT|CNTBRANCH|AGENTID|
+--------+------+----------+----------+----------+------+---------+---------+-------+--------+------------------+---------+-------+
|01059026|    10|2012-12-02|2013-12-01|2013-12-01|    22|        1|      005|    PAX|01059026|          03411558|       62|  03864|
|01282165|     9|2013-01-01|2013-12-31|2013-12-31|    14|        1|      003|    SHP|01282165|          03356614|       19|  04878|
|01341347|     8|2012-07-24|2013-07-23|2013-07-23|    10|        1|      019|    SAS|01199291|          03450338|       14|  03936|
|01361126|     8|2012-09-06|2013-09-05|2013-09-05|    10|        1|      019|    SAP|01361126|          03314984|       33|  20126|
|01389500|     8|2012-11-01|2013-10-31|2013-10-31|    10|        1|      003

In [20]:
acc_mth = 7
acc_yr = 2017
transv_pr_gr_psea_path = 'data/sas_402/transv_pr_gr_psea.parquet'
transv_polhistory_psea_path = 'data/sas_401/transv_polhistory_psea.parquet'
transv_p2_path = 'data/sas_405/transv_p2.parquet'
riskpf_path = '/group/axa_malaysia/data/adm_riskpf'

In [21]:
transv_polhistory_psea = spark.read.parquet(transv_polhistory_psea_path)

In [22]:
list_poi = transv_polhistory_psea[['CHDRNUM',
     'D_com',
     'D_exp',
     'D_to',
     'ZRENNO',
     'TRANNO',
     'RENEWABLE',
     'D_cancel',
     'CHDRSTCDC',
     'CNTTYPE',
     'ORIG_POL',
     'LATEST_REPLACEMENT',
     'CNTBRANCH',
     'AGENTID']].sort('CHDRNUM','ZRENNO',desc('D_to'),desc('TRANNO')).drop_duplicates(subset=['CHDRNUM','ZRENNO']).drop('D_to')

In [23]:
list_poi.cache()

DataFrame[CHDRNUM: string, D_com: date, D_exp: date, ZRENNO: int, TRANNO: int, RENEWABLE: int, D_cancel: date, CHDRSTCDC: string, CNTTYPE: string, ORIG_POL: string, LATEST_REPLACEMENT: string, CNTBRANCH: string, AGENTID: string]

In [17]:
list_poi.filter(col('ORIG_POL')=='2063058').show()

NameError: name 'list_poi' is not defined

In [1]:
import re
import pandas as pd
import numpy as np
import string
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Window
import datetime
import calendar
from dateutil.relativedelta import relativedelta

In [None]:
list_poi_sas = spark.read.csv('/group/axa_malaysia/raw/OSPREY/02_ACTLIB/01_Main/03_Transversal/LIST_POI2.csv',header=True)

In [None]:
list_poi_sas.filter(col('orig_pol')=='03411558').show()

In [None]:
spark.read.csv('pr_gr_psea.csv', header=True)

In [47]:
pr_gr_psea = spark.read.csv('pr_gr_psea.csv', header=True)

In [49]:
pr_gr_psea.write.saveAsTable('axa_malaysia.pr_gr_psea')

In [57]:
!hdfs dfs -get /group/axa_malaysia/raw/OSPREY/02_ACTLIB/01_Main/03_Transversal/cl_quanti_psea.sas7bdat

In [58]:
cl_quanti_psea = pd.read_sas('cl_quanti_psea.sas7bdat')

In [60]:
for i in cl_quanti_psea.dtypes.items():
    if i[1]=='object':
        cl_quanti_psea[i[0]] = cl_quanti_psea[i[0]].str.decode("utf-8")

In [62]:
cl_quanti_psea.head()

Unnamed: 0,CLAIM,TRANNO,BATCTRCDE,PAYCDE,CLRATE,CLSTAT,d_tran,d_clo,paymntid,yrm,prcl_rscd,gpay,gmov,cpay,cmov,rpay,rmov
0,A0001005,1.0,T421,,1.0,1,15539.0,379852.0,,200207.0,APXDI,0.0,500.0,0.0,0.0,0.0,37.5
1,A0001005,1.0,T421,,1.0,1,15539.0,379852.0,,200207.0,APXMI,0.0,0.0,0.0,0.0,0.0,0.0
2,A0001005,2.0,T422,,1.0,1,15673.0,379852.0,,200212.0,APXDI,0.0,0.0,0.0,0.0,0.0,0.0
3,A0001005,2.0,T422,,1.0,1,15673.0,379852.0,,200212.0,APXMI,0.0,0.0,0.0,0.0,0.0,0.0
4,A0001005,3.0,T422,,1.0,1,15690.0,379852.0,,200212.0,APXDI,0.0,-500.0,0.0,0.0,0.0,-37.5


In [63]:
cl_quanti_psea.to_csv('cl_quanti_psea.csv', index=False)

In [64]:
!hdfs dfs -put cl_quanti_psea.csv

In [1]:
cl_quanti_psea = spark.read.csv('cl_quanti_psea.csv', header=True)
cl_quanti_psea.write.saveAsTable('axa_malaysia.cl_quanti_psea')

In [68]:
!kinit -kt cchin.keytab cchin 

In [None]:
cl_quanti_psea.csv

In [61]:
cl_quanti_psea_df = spark.createDataFrame(cl_quanti_psea)

TypeError: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'>