# working with pyspark

In [1]:
sc

<pyspark.context.SparkContext at 0x7f1182d82c90>

## Getting Started 
### Starting point: SQLContext
The entry point into all relational functionality in Spark is the `SQLContext` class, or one of its decendents. To create a basic `SQLContext` all you need is a SparkContext

In [2]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [7]:
clm_df = sqlContext.read.text('/data/discovery/totaloss/staging/clm/')


In [8]:
clm_df.count()

14066898

In [9]:
clm_df.explain()

== Physical Plan ==
Scan TextRelation[value#1] InputPaths: hdfs://nameservice1/data/discovery/totaloss/staging/clm


In [10]:
clm_df.head()

Row(value=u'$4G6I3%9HE   ,null,$4G6I3%9HE,59,  ,A403703,A,S,R ,2009-11-23 10:10:26.014,DFLT,0,R,2015-07-02 12:50:14.638622,EDT ,-4,         ,N,N,N,N,9999-12-31,16,2009-11-20,0924AM,2009-11-23,2015-07-25,Y,N,             ,I,  ,AGT,CNV,ST79         ,0,9999-12-31 00:00:00.0,ST79         ,     ,0,2009-11-23 10:10:13.0,   ,       ,Y,N, , ,510,59,1510,CAVANAUGH,0,11028277          ,Y,MIAMI GARDENS PD,9999-12-31,      ,MIAMI, ,FL,305,USA,8237820, ,         ,SRU1959A     ,  , , , ,null,0,NW 2ND AVENUE & 179TH STREET,V1 traveling S/B on NW 2nd Ave.  V1 made an illegal u-turn in front of V2 who')

In [15]:
clm_df.columns

['value']

In [16]:
clm_textfile = sc.textFile('/data/discovery/totaloss/staging/clm/')
clm_textfile.count()

14066898

In [17]:
clm_textfile.first() # first item in this RDD

u'$4G6I3%9HE   ,null,$4G6I3%9HE,59,  ,A403703,A,S,R ,2009-11-23 10:10:26.014,DFLT,0,R,2015-07-02 12:50:14.638622,EDT ,-4,         ,N,N,N,N,9999-12-31,16,2009-11-20,0924AM,2009-11-23,2015-07-25,Y,N,             ,I,  ,AGT,CNV,ST79         ,0,9999-12-31 00:00:00.0,ST79         ,     ,0,2009-11-23 10:10:13.0,   ,       ,Y,N, , ,510,59,1510,CAVANAUGH,0,11028277          ,Y,MIAMI GARDENS PD,9999-12-31,      ,MIAMI, ,FL,305,USA,8237820, ,         ,SRU1959A     ,  , , , ,null,0,NW 2ND AVENUE & 179TH STREET,V1 traveling S/B on NW 2nd Ave.  V1 made an illegal u-turn in front of V2 who'

## Aside 
### Looking at VRP data first (it is in parquet format and so has schema)

In [19]:
hdir = '/data/discovery/vehrepat/core'
vehrep_dir_list = !hdfs dfs -ls {hdir}
vehrep_dir_files = [flist.split()[-1] for flist in vehrep_dir_list[1:]]
vehrep_dir_files

['/data/discovery/vehrepat/core/.temp',
 '/data/discovery/vehrepat/core/aes',
 '/data/discovery/vehrepat/core/auto_est_sum_old',
 '/data/discovery/vehrepat/core/auto_est_team',
 '/data/discovery/vehrepat/core/auto_est_user',
 '/data/discovery/vehrepat/core/auto_rpr_fac',
 '/data/discovery/vehrepat/core/detl',
 '/data/discovery/vehrepat/core/est_party',
 '/data/discovery/vehrepat/core/est_party_detl_rltn',
 '/data/discovery/vehrepat/core/lbr_note',
 '/data/discovery/vehrepat/core/los_est',
 '/data/discovery/vehrepat/core/msg',
 '/data/discovery/vehrepat/core/non_oem',
 '/data/discovery/vehrepat/core/opt',
 '/data/discovery/vehrepat/core/rate',
 '/data/discovery/vehrepat/core/tax',
 '/data/discovery/vehrepat/core/ttl']

In [20]:
auto_est_user_df = sqlContext.read.load(vehrep_dir_files[4])
auto_est_user_df.count()

239688

In [21]:
auto_est_user_df.columns

['AUTO_EST_USER_DIM_ID',
 'RGN_CD',
 'SECT_CD',
 'TEAM_CD',
 'ESTR_ID',
 'ESTR_SGNON_ID',
 'MEMBR_TYPE_CD',
 'VNDR_CD',
 'CAT_CD',
 'MOD_TSTMP',
 'DLTE_IND',
 'ESTR_FIRST_NM',
 'ESTR_LAST_NM',
 'NU002_UPDT_CD',
 'EC012_UPDT_CD',
 'EC013_UPDT_CD',
 'OFFC_ID',
 'RPT_ID',
 'DATA_CNTXT_CD',
 'FDW_RPLC_IND',
 'FDW_INSRT_TSTMP',
 'FDW_RPLC_TSTMP',
 'SRC_INSRT_TSTMP',
 'SRC_RPLC_TSTMP',
 'SRC_SYS_DLTE_IND']

In [22]:
auto_est_user_df.describe()

DataFrame[summary: string, AUTO_EST_USER_DIM_ID: string, MOD_TSTMP: string, FDW_INSRT_TSTMP: string, FDW_RPLC_TSTMP: string, SRC_INSRT_TSTMP: string, SRC_RPLC_TSTMP: string]

In [23]:
auto_est_user_df.SRC_RPLC_TSTMP.take(10)

TypeError: 'Column' object is not callable

In [None]:
clm_broad_schema = ["CLM_ID string,CLM_GRP_ID smallint,LGCY_CLM_SYS_ID string,
CLM_ST_CD string,
ST_ASGN_ST_CD string,
CLM_NUM string,
PROD_LINE_CD string,
CFDTL_LEVL_CD string,
CLM_FILE_TYPE_CD string,
CLM_RCRD_TSTMP timestamp,
CLM_RCRD_TMZN_CD string,
CLM_RCRD_OFFST_TMZN_CNT int,
CLM_STTS_CD string,
CLM_STTS_TSTMP timestamp,
CLM_STTS_TMZN_CD string,
CLM_STTS_OFFST_TMZN_CNT int,
ST_ASGN_CLM_NUM string,
RCRD_ONLY_IND string,
DUP_CLM_IND string,
CLM_COV_FORM_PRNT_IND string,
SIU_IND string,
MAINT_DT string,
CONV_NUM smallint,
LOS_OCCR_DT string,
LOS_TM string,
LOS_RPT_DT string,
ISO_INFRM_DT string,
ISO_INIT_SEND_IND string,
FATAL_EXST_IND string,
ASGNE_ORGZN_ID string,
APP_CD string,
APP_VER_CD string,
USER_TYPE_CD string,
LOS_TYPE_CD string,
LOS_INIT_SGNON_ID string,
LOS_INIT_PARTY_BUSN_ID bigint,
LOS_INIT_TSTMP timestamp,
LOS_SBMTR_SGNON_ID string,
LOS_SBMT_SFX_CD string,
LOS_SBMT_PARTY_BUSN_ID bigint,
LOS_SBMT_TSTMP timestamp,
LOS_SBMTR_AREA_CD string,
LOS_SBMTR_PHON_NUM string,
PLCY_PRE_POPLT_IND string,
SEND_TO_UNDWR_IND string,
ASGN_TO_FLD_IND string,
FAST_TRK_TYPE_CD string,
MAX_PRTPT_ID smallint,
RPT_AGT_ST_CD string,
RPT_AGT_CD string,
RPT_AGT_NM string,
PARTY_BUSN_ID bigint,
GOVT_BODY_RPT_NUM string,
GOVT_BODY_RPT_MADE_IND string,
RPT_GOVT_BODY_NM string,
GOVT_BODY_RPT_DT string,
GOVT_BODY_RPT_TM string,
LOS_LOC_CITY_NM string,
LOS_LOC_STRET_NM string,
LOS_LOC_ST_CD string,
RPT_AGT_AREA_CD string,
LOS_LOC_CNTRY_CD string,
RPT_AGT_PHONE_NUM string,
LOS_LOC_CNTY_NM string,
LOS_LOC_ZIP_CD string,
STAT_RPT_UNIT_ORGZN_ID string,
LGCY_CAT_CD string,
LOS_SBMTR_FIRST_NM string,
LOS_SBMTR_MID_NM string,
LOS_SBMTR_LAST_NM string,
SIU_INVSGR_ORGZN_ID string,
SYS_PRCS_CD string,
LOS_LOC_DESC_TXT string,
LOS_DESC_TXT string,
SRC_INSRT_TSTMP timestamp,
FDW_INSRT_TSTMP timestamp,
FDW_RPLC_TSTMP timestamp,
SRC_RPLC_TSTMP timestamp,
ASGN_ORGZN_PARTY_BUSN_ID bigint,
RPT_AGT_PARTY_BUSN_ID bigint,
SRC_OPRTN_CD string,
CLM_BUSN_ID bigint,
FRAUD_RING_INVSG_IND string,
DATA_CNTXT_CD string,
STG_INSRT_TSTMP timestamp,
SRC_RPLC_IND string,
FDW_RPLC_IND string,
SRC_SYS_DLTE_IND string,
ISO_CONV_CLM_IND string,
GOVT_BODY_RPT_R_DT string,
LATUD_NUM float,
LNGTD_NUM float,
MTCH_CD string,
LOC_QLTY_CD string,
IND_ID_TTL_EXPO_AMT float,
IND_ID_EXPO_LAST_MOD_TSTMP timestamp,
IND_ID_EXPO_MOD_SGNON_ID string,
IND_ID_SBMT_TASK_TSTMP timestamp,
RPT_RCRD_ONLY_IND string,
RPT_PRTCT_PRSN_IND string"]

In [12]:
from pyspark.sql.functions import countDistinct

In [13]:
clm_df.agg(countDistinct(clm_df.clm_st_cd,clm_df.clm_id,clm_df.clm_num,clm_df.clm_los_rpt_dt, clm_df.los_sbmt_tstmp )).alias('c').collect()

AttributeError: 'DataFrame' object has no attribute 'clm_st_cd'

In [None]:
taskkey_df = sqlContext.sql("""
    SELECT """)

In [None]:
val eml_complete_df = sqlContext.sql("""
    SELECT 
      coalesce(t2.CLM_ID,t1.CLM_ID) AS CLM_ID,
      coalesce(t2.PRTPT_ID,t1.PRTPT_ID) AS PRTPT_ID,
      coalesce(t2.CREAT_TSTMP,t1.CREAT_TSTMP) AS CREAT_TSTMP,
      coalesce(t2.SRC_INSRT_TSTMP,t1.SRC_INSRT_TSTMP) AS SRC_INSRT_TSTMP,
      coalesce(t2.SRC_RPLC_TSTMP,t1.SRC_RPLC_TSTMP) AS SRC_RPLC_TSTMP,
      coalesce(t2.EML_ADDR_TXT,t1.EML_ADDR_TXT) AS EML_ADDR_TXT,
      coalesce(t2.SRC_RPLC_IND,t1.SRC_RPLC_IND) AS SRC_RPLC_IND,
      coalesce(t2.SRC_SYS_DLTE_IND,t1.SRC_SYS_DLTE_IND) AS SRC_SYS_DLTE_IND     
    FROM fdwcfpoc.SNA_CLM_PRTPT_EML_RLTN_1_yr AS t1
    FULL OUTER JOIN fdwcfpoc.EML_Daily AS t2
    ON t1.CLM_ID = t2.CLM_ID AND t1.PRTPT_ID = t2.PRTPT_ID AND t1.CREAT_TSTMP = t2.CREAT_TSTMP
  """)
