In [31]:
import os
import glob

In [2]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession 
from pyspark.sql import functions as F        

In [3]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
SparkContext.setSystemProperty('spark.executor.memory', '5g')
SparkContext.setSystemProperty('spark.driver.memory', '3g')

In [4]:
spark._sc.getConf().getAll()

[('spark.driver.memory', '3g'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1581531668846'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.executor.memory', '5g'),
 ('spark.driver.host', '192.168.1.82'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.driver.port', '53202')]

## 1. Concat dataset and split into two parts ( with OC and without OC)

In [13]:
def file_lst (datapath,filetype):
    """
    Return a list including all datasets' directory in one file.
    """
    lst = []
    for filedirs, files, filenames in os.walk(datapath):
        lst += [os.path.join(filedirs, file) for file in filenames if file.endswith(filetype)]
    return lst

In [14]:
def concat_df(datapath):
    """
    Concat all the same kind of datasets as one and return it.
    """
    datapath_lst = [os.path.join(datapath, file) for file in os.listdir(datapath) if file.endswith("csv")]
    df1 = spark.read.format("csv").option("inferSchema", True).option("header", True).load(datapath_lst[0])
    for i in range(1,20):
        df2 = spark.read.format("csv").option("inferSchema", True).option("header", True).load(datapath_lst[i])
        df1 = df1.union(df2) 
    return df1   

In [15]:
def split_df(df):
    """
    Create a new column "class", class 1 == with OC, class 2 == without OC.
    Create two new datasets, one is a dataset with OC patients, the other one is without OC patients.
    Return these two datasets.
    """
    df = df.withColumn('class', (F.when(((F.col('ICD9_DGNS_CD_1').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_2').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_3').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_4').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_5').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_6').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_7').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_8').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_9').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_10').eqNullSafe('1830'))),1)
                                        .otherwise(0)))
    df_1 = df.where(df['class'] == 1)
    df_0 = df.where(df['class'] == 0)
    #df2 = df.subtract(df1)
    return df_1, df_0

In [81]:
def write_df(df, datapath):
    """
    Save a parquet format dataset into a new directory. 
    """
    df.write.save(datapath, format='parquet')

### Inpatient_claim data

In [2]:
startpath_in = "/Users/jill/Downloads/2020_Project/integrated_data/inpatient_claim/"

In [31]:
#df_in = concat_df(startpath_in)

In [32]:
#df_in.write.save("./concat_data/inpatient", format='parquet')

In [7]:
# Check the saved dataset.

df_inpatient = spark.read.load("/Users/jill/Downloads/2020_Project/EDA/concat_data/inpatient/inpatient_con/")

In [8]:
df_inpatient.take(5)

[Row(DESYNPUF_ID='4164404F0A50EC8D', CLM_ID=90191100136835, SEGMENT=1, CLM_FROM_DT=20080123, CLM_THRU_DT=20080127, PRVDR_NUM='3901ZS', CLM_PMT_AMT=2000.0, NCH_PRMRY_PYR_CLM_PD_AMT=0.0, AT_PHYSN_NPI=771958540, OP_PHYSN_NPI=7718505294, OT_PHYSN_NPI=None, CLM_ADMSN_DT=20080123, ADMTNG_ICD9_DGNS_CD='2910', CLM_PASS_THRU_PER_DIEM_AMT=0.0, NCH_BENE_IP_DDCTBL_AMT=1024.0, NCH_BENE_PTA_COINSRNC_LBLTY_AM=0.0, NCH_BENE_BLOOD_DDCTBL_LBLTY_AM=0.0, CLM_UTLZTN_DAY_CNT=4, NCH_BENE_DSCHRG_DT=20080127, CLM_DRG_CD='895', ICD9_DGNS_CD_1='29181', ICD9_DGNS_CD_2='7850', ICD9_DGNS_CD_3='30560', ICD9_DGNS_CD_4='V1046', ICD9_DGNS_CD_5='4019', ICD9_DGNS_CD_6='30390', ICD9_DGNS_CD_7='40390', ICD9_DGNS_CD_8='2920', ICD9_DGNS_CD_9='2967', ICD9_DGNS_CD_10=None, ICD9_PRCDR_CD_1=586, ICD9_PRCDR_CD_2='2768', ICD9_PRCDR_CD_3=None, ICD9_PRCDR_CD_4=None, ICD9_PRCDR_CD_5=None, ICD9_PRCDR_CD_6=None, HCPCS_CD_1=None, HCPCS_CD_2=None, HCPCS_CD_3=None, HCPCS_CD_4=None, HCPCS_CD_5=None, HCPCS_CD_6=None, HCPCS_CD_7=None, HCPCS_

In [9]:
df_inpatient.printSchema()

root
 |-- DESYNPUF_ID: string (nullable = true)
 |-- CLM_ID: long (nullable = true)
 |-- SEGMENT: integer (nullable = true)
 |-- CLM_FROM_DT: integer (nullable = true)
 |-- CLM_THRU_DT: integer (nullable = true)
 |-- PRVDR_NUM: string (nullable = true)
 |-- CLM_PMT_AMT: double (nullable = true)
 |-- NCH_PRMRY_PYR_CLM_PD_AMT: double (nullable = true)
 |-- AT_PHYSN_NPI: long (nullable = true)
 |-- OP_PHYSN_NPI: long (nullable = true)
 |-- OT_PHYSN_NPI: long (nullable = true)
 |-- CLM_ADMSN_DT: integer (nullable = true)
 |-- ADMTNG_ICD9_DGNS_CD: string (nullable = true)
 |-- CLM_PASS_THRU_PER_DIEM_AMT: double (nullable = true)
 |-- NCH_BENE_IP_DDCTBL_AMT: double (nullable = true)
 |-- NCH_BENE_PTA_COINSRNC_LBLTY_AM: double (nullable = true)
 |-- NCH_BENE_BLOOD_DDCTBL_LBLTY_AM: double (nullable = true)
 |-- CLM_UTLZTN_DAY_CNT: integer (nullable = true)
 |-- NCH_BENE_DSCHRG_DT: integer (nullable = true)
 |-- CLM_DRG_CD: string (nullable = true)
 |-- ICD9_DGNS_CD_1: string (nullable = true)


In [10]:
df_inpatient.show(1)

+----------------+--------------+-------+-----------+-----------+---------+-----------+------------------------+------------+------------+------------+------------+-------------------+--------------------------+----------------------+------------------------------+------------------------------+------------------+------------------+----------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+---------

In [11]:
df_inpatient.count()

1332822

In [14]:
df_inpatient = df_inpatient.withColumn('class', (F.when(((F.col('ICD9_DGNS_CD_1').eqNullSafe('1830'))|
                                                (F.col('ICD9_DGNS_CD_2').eqNullSafe('1830'))|
                                                (F.col('ICD9_DGNS_CD_3').eqNullSafe('1830'))|
                                                (F.col('ICD9_DGNS_CD_4').eqNullSafe('1830'))|
                                                (F.col('ICD9_DGNS_CD_5').eqNullSafe('1830'))|
                                                (F.col('ICD9_DGNS_CD_6').eqNullSafe('1830'))|
                                                (F.col('ICD9_DGNS_CD_7').eqNullSafe('1830'))|
                                                (F.col('ICD9_DGNS_CD_8').eqNullSafe('1830'))|
                                                (F.col('ICD9_DGNS_CD_9').eqNullSafe('1830'))|
                                                (F.col('ICD9_DGNS_CD_10').eqNullSafe('1830'))),1)
                                                .otherwise(0)))

In [15]:
df_inpatient.select('class').distinct().rdd.map(lambda r: r[0]).collect()

[1, 0]

In [18]:
from pyspark.sql import dataframe
df_inpatient.cube('class').count().show()

+-----+-------+
|class|  count|
+-----+-------+
|    1|   2649|
| null|1332822|
|    0|1330173|
+-----+-------+



In [174]:
dfin_oc = df_inpatient.where(df_inpatient['class'] == 1)

In [175]:
dfin_nooc = df_inpatient.where(df_inpatient['class'] == 0)

In [181]:
dfin_oc.count(), dfin_nooc.count()

(2649, 1330173)

In [176]:
dfin_oc.count()+ dfin_nooc.count() == df_inpatient.count()

True

In [10]:
df_inpatient_oc,df_inpatient_nooc = split_df(df_inpatient)

In [188]:
df_inpatient_oc.count(),df_inpatient_nooc.count()

(2649, 1330173)

In [11]:
ocpath = "/Users/jill/Downloads/2020_Project/EDA/concat_data/inpatient/OC"
noocpath = "/Users/jill/Downloads/2020_Project/EDA/concat_data/inpatient/noOC"
#write_df(df_inpatient_oc,ocpath)

In [12]:
#write_df(df_inpatient_nooc,noocpath)

In [23]:
## Test new splited datasets
df1 = spark.read.load("/Users/jill/Downloads/2020_Project/EDA/concat_data/inpatient/OC")
df2 = spark.read.load("/Users/jill/Downloads/2020_Project/EDA/concat_data/inpatient/noOC")

In [24]:
df1.count(),df2.count()

(2649, 1330173)

In [25]:
df1.printSchema()

root
 |-- DESYNPUF_ID: string (nullable = true)
 |-- CLM_ID: long (nullable = true)
 |-- SEGMENT: integer (nullable = true)
 |-- CLM_FROM_DT: integer (nullable = true)
 |-- CLM_THRU_DT: integer (nullable = true)
 |-- PRVDR_NUM: string (nullable = true)
 |-- CLM_PMT_AMT: double (nullable = true)
 |-- NCH_PRMRY_PYR_CLM_PD_AMT: double (nullable = true)
 |-- AT_PHYSN_NPI: long (nullable = true)
 |-- OP_PHYSN_NPI: long (nullable = true)
 |-- OT_PHYSN_NPI: long (nullable = true)
 |-- CLM_ADMSN_DT: integer (nullable = true)
 |-- ADMTNG_ICD9_DGNS_CD: string (nullable = true)
 |-- CLM_PASS_THRU_PER_DIEM_AMT: double (nullable = true)
 |-- NCH_BENE_IP_DDCTBL_AMT: double (nullable = true)
 |-- NCH_BENE_PTA_COINSRNC_LBLTY_AM: double (nullable = true)
 |-- NCH_BENE_BLOOD_DDCTBL_LBLTY_AM: double (nullable = true)
 |-- CLM_UTLZTN_DAY_CNT: integer (nullable = true)
 |-- NCH_BENE_DSCHRG_DT: integer (nullable = true)
 |-- CLM_DRG_CD: string (nullable = true)
 |-- ICD9_DGNS_CD_1: string (nullable = true)


In [26]:
df2.printSchema()

root
 |-- DESYNPUF_ID: string (nullable = true)
 |-- CLM_ID: long (nullable = true)
 |-- SEGMENT: integer (nullable = true)
 |-- CLM_FROM_DT: integer (nullable = true)
 |-- CLM_THRU_DT: integer (nullable = true)
 |-- PRVDR_NUM: string (nullable = true)
 |-- CLM_PMT_AMT: double (nullable = true)
 |-- NCH_PRMRY_PYR_CLM_PD_AMT: double (nullable = true)
 |-- AT_PHYSN_NPI: long (nullable = true)
 |-- OP_PHYSN_NPI: long (nullable = true)
 |-- OT_PHYSN_NPI: long (nullable = true)
 |-- CLM_ADMSN_DT: integer (nullable = true)
 |-- ADMTNG_ICD9_DGNS_CD: string (nullable = true)
 |-- CLM_PASS_THRU_PER_DIEM_AMT: double (nullable = true)
 |-- NCH_BENE_IP_DDCTBL_AMT: double (nullable = true)
 |-- NCH_BENE_PTA_COINSRNC_LBLTY_AM: double (nullable = true)
 |-- NCH_BENE_BLOOD_DDCTBL_LBLTY_AM: double (nullable = true)
 |-- CLM_UTLZTN_DAY_CNT: integer (nullable = true)
 |-- NCH_BENE_DSCHRG_DT: integer (nullable = true)
 |-- CLM_DRG_CD: string (nullable = true)
 |-- ICD9_DGNS_CD_1: string (nullable = true)


### Outpatient_claim data

In [6]:
startpath_out = "/Users/jill/Downloads/2020_Project/integrated_data/outpatient_claim/"

In [31]:
#concat_out = concat_df(startpath_out)

In [32]:
#concat_out.count()

15826987

In [9]:
outpath= "/Users/jill/Downloads/2020_Project/EDA/concat_data/outpatient/outpatient_con"
#write_df(concat_out, outpath)

In [10]:
df_outpatient = spark.read.load(outpath)

In [11]:
df_outpatient.count()

15826987

In [17]:
df_out_OC,df_out_noOC = split_df(df_outpatient)

In [18]:
df_out_OC.count(),df_out_noOC.count()

(16706, 15810281)

In [13]:
df_out_OC.select('class').distinct().rdd.map(lambda r: r[0]).collect()

[1]

In [14]:
df_out_noOC.select('class').distinct().rdd.map(lambda r: r[0]).collect()

[0]

In [7]:
out_OC_path = "/Users/jill/Downloads/2020_Project/EDA/concat_data/outpatient/outpatient_OC"
out_noOC_path = "/Users/jill/Downloads/2020_Project/EDA/concat_data/outpatient/outpatient_noOC"

In [43]:
#write_df(df_out_OC, out_OC_path)

In [20]:
df_out_noOC.count()

15810281

In [22]:
#write_df(df_out_noOC, out_noOC_path)

In [6]:
# Test splitted data
dfout_noOC = spark.read.load(out_noOC_path)
dfout_OC = spark.read.load(out_OC_path)

In [9]:
sample = dfout_OC.toPandas()

In [10]:
sample.head()

Unnamed: 0,DESYNPUF_ID,CLM_ID,SEGMENT,CLM_FROM_DT,CLM_THRU_DT,PRVDR_NUM,CLM_PMT_AMT,NCH_PRMRY_PYR_CLM_PD_AMT,AT_PHYSN_NPI,OP_PHYSN_NPI,...,HCPCS_CD_37,HCPCS_CD_38,HCPCS_CD_39,HCPCS_CD_40,HCPCS_CD_41,HCPCS_CD_42,HCPCS_CD_43,HCPCS_CD_44,HCPCS_CD_45,class
0,423AD70C0C12049B,38212211989631,1,20090922,20090922,1000JB,900.0,0.0,4068566000.0,3718273000.0,...,,,,,,,,,,1
1,4285270CB7890679,38532212193660,1,20100803,20100803,1113ZK,30.0,0.0,8435701000.0,8435701000.0,...,,,,,,,,,,1
2,428BB462CE429C5B,38792212093444,1,20090126,20090126,2301GU,90.0,0.0,1586461000.0,,...,,,,,,,,,,1
3,42CDA84C361BDF03,38722212203867,1,20090610,20090624,2301GU,30.0,0.0,1586461000.0,1586461000.0,...,,,,,,,,,,1
4,42FB0E4AA8A4F401,38192211696179,1,20090418,20090418,4200QS,1700.0,0.0,329957400.0,,...,,,,,,,,,,1


In [25]:
dfout_noOC.count(),dfout_OC.count()

(15810281, 16706)

### Carrier concat data

In [26]:
startpath_carrier = "/Users/jill/Downloads/2020_Project/integrated_data/carrier_claim/"

In [27]:
#concat_carrier = concat_df(startpath_carrier)

In [28]:
concat_carrier.count()

47448741

In [29]:
outpath_carrier= "/Users/jill/Downloads/2020_Project/EDA/concat_data/carrier/carrier_con"
#write_df(concat_carrier, outpath_carrier)

In [30]:
# Load the data
df_carrier = spark.read.load(outpath_carrier)

In [31]:
df_carrier.count()

47448741

In [33]:
df_carrier.printSchema()

root
 |-- DESYNPUF_ID: string (nullable = true)
 |-- CLM_ID: long (nullable = true)
 |-- CLM_FROM_DT: integer (nullable = true)
 |-- CLM_THRU_DT: integer (nullable = true)
 |-- ICD9_DGNS_CD_1: string (nullable = true)
 |-- ICD9_DGNS_CD_2: string (nullable = true)
 |-- ICD9_DGNS_CD_3: string (nullable = true)
 |-- ICD9_DGNS_CD_4: string (nullable = true)
 |-- ICD9_DGNS_CD_5: string (nullable = true)
 |-- ICD9_DGNS_CD_6: string (nullable = true)
 |-- ICD9_DGNS_CD_7: string (nullable = true)
 |-- ICD9_DGNS_CD_8: string (nullable = true)
 |-- PRF_PHYSN_NPI_1: long (nullable = true)
 |-- PRF_PHYSN_NPI_2: long (nullable = true)
 |-- PRF_PHYSN_NPI_3: long (nullable = true)
 |-- PRF_PHYSN_NPI_4: long (nullable = true)
 |-- PRF_PHYSN_NPI_5: long (nullable = true)
 |-- PRF_PHYSN_NPI_6: long (nullable = true)
 |-- PRF_PHYSN_NPI_7: long (nullable = true)
 |-- PRF_PHYSN_NPI_8: long (nullable = true)
 |-- PRF_PHYSN_NPI_9: long (nullable = true)
 |-- PRF_PHYSN_NPI_10: long (nullable = true)
 |-- PRF_

In [34]:
def split_df_carrier(df):
    """
    Create a new column "class", class 1 == with OC, class 2 == without OC.
    Create two new datasets, one is a dataset with OC patients, the other one is without OC patients.
    Return these two datasets.
    """
    df = df.withColumn('class', (F.when(((F.col('ICD9_DGNS_CD_1').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_2').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_3').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_4').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_5').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_6').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_7').eqNullSafe('1830'))|
                                        (F.col('ICD9_DGNS_CD_8').eqNullSafe('1830'))),1)
                                        .otherwise(0)))
    df_1 = df.where(df['class'] == 1)
    df_0 = df.where(df['class'] == 0)
    return df_1, df_0

In [36]:
df_carrier_OC,df_carrier_noOC = split_df_carrier(df_carrier)

In [37]:
df_carrier_OC.count(), df_carrier_noOC.count()

(32239, 47416502)

In [38]:
carrier_oc_path = "/Users/jill/Downloads/2020_Project/EDA/concat_data/carrier/carrier_OC"
carrier_nooc_path = "/Users/jill/Downloads/2020_Project/EDA/concat_data/carrier/carrier_noOC"

In [39]:
#write_df(df_carrier_OC, carrier_oc_path)

In [40]:
#write_df(df_carrier_noOC, carrier_nooc_path)

In [41]:
# Check the splitted data
dfcarrier_oc = spark.read.load(carrier_oc_path)
dfcarrier_nooc = spark.read.load(carrier_nooc_path)

In [42]:
dfcarrier_oc.count(), dfcarrier_nooc.count()

(32239, 47416502)

### Benifician summary data (just concat)

In [44]:
startpath_benefician = "/Users/jill/Downloads/2020_Project/integrated_data/beneficiary_summary/"

In [45]:
#concat_benefician = concat_df(startpath_benefician)

In [46]:
concat_benefician.count()

2296491

In [47]:
concat_benefician.printSchema()

root
 |-- DESYNPUF_ID: string (nullable = true)
 |-- BENE_BIRTH_DT: integer (nullable = true)
 |-- BENE_DEATH_DT: integer (nullable = true)
 |-- BENE_SEX_IDENT_CD: integer (nullable = true)
 |-- BENE_RACE_CD: integer (nullable = true)
 |-- BENE_ESRD_IND: string (nullable = true)
 |-- SP_STATE_CODE: integer (nullable = true)
 |-- BENE_COUNTY_CD: integer (nullable = true)
 |-- BENE_HI_CVRAGE_TOT_MONS: integer (nullable = true)
 |-- BENE_SMI_CVRAGE_TOT_MONS: integer (nullable = true)
 |-- BENE_HMO_CVRAGE_TOT_MONS: integer (nullable = true)
 |-- PLAN_CVRG_MOS_NUM: integer (nullable = true)
 |-- SP_ALZHDMTA: integer (nullable = true)
 |-- SP_CHF: integer (nullable = true)
 |-- SP_CHRNKIDN: integer (nullable = true)
 |-- SP_CNCR: integer (nullable = true)
 |-- SP_COPD: integer (nullable = true)
 |-- SP_DEPRESSN: integer (nullable = true)
 |-- SP_DIABETES: integer (nullable = true)
 |-- SP_ISCHMCHT: integer (nullable = true)
 |-- SP_OSTEOPRS: integer (nullable = true)
 |-- SP_RA_OA: integer (

In [48]:
outpath_benefician = "/Users/jill/Downloads/2020_Project/EDA/concat_data/benefician/benefician_con"

In [49]:
#write_df(concat_benefician, outpath_benefician)

In [68]:
# Check the saved dataset
df_benefician = spark.read.load(outpath_benefician)

In [69]:
df_benefician.count()

2296491

In [70]:
df_benefician.printSchema()

root
 |-- DESYNPUF_ID: string (nullable = true)
 |-- BENE_BIRTH_DT: integer (nullable = true)
 |-- BENE_DEATH_DT: integer (nullable = true)
 |-- BENE_SEX_IDENT_CD: integer (nullable = true)
 |-- BENE_RACE_CD: integer (nullable = true)
 |-- BENE_ESRD_IND: string (nullable = true)
 |-- SP_STATE_CODE: integer (nullable = true)
 |-- BENE_COUNTY_CD: integer (nullable = true)
 |-- BENE_HI_CVRAGE_TOT_MONS: integer (nullable = true)
 |-- BENE_SMI_CVRAGE_TOT_MONS: integer (nullable = true)
 |-- BENE_HMO_CVRAGE_TOT_MONS: integer (nullable = true)
 |-- PLAN_CVRG_MOS_NUM: integer (nullable = true)
 |-- SP_ALZHDMTA: integer (nullable = true)
 |-- SP_CHF: integer (nullable = true)
 |-- SP_CHRNKIDN: integer (nullable = true)
 |-- SP_CNCR: integer (nullable = true)
 |-- SP_COPD: integer (nullable = true)
 |-- SP_DEPRESSN: integer (nullable = true)
 |-- SP_DIABETES: integer (nullable = true)
 |-- SP_ISCHMCHT: integer (nullable = true)
 |-- SP_OSTEOPRS: integer (nullable = true)
 |-- SP_RA_OA: integer (

### PDE data (just concat)

In [53]:
startpath_pde = "/Users/jill/Downloads/2020_Project/integrated_data/pde_claim"

In [55]:
#pde_con = concat_df(startpath_pde)

In [57]:
pde_con.count()

111085969

In [58]:
pde_con.printSchema()

root
 |-- DESYNPUF_ID: string (nullable = true)
 |-- PDE_ID: long (nullable = true)
 |-- SRVC_DT: integer (nullable = true)
 |-- PROD_SRVC_ID: string (nullable = true)
 |-- QTY_DSPNSD_NUM: double (nullable = true)
 |-- DAYS_SUPLY_NUM: integer (nullable = true)
 |-- PTNT_PAY_AMT: double (nullable = true)
 |-- TOT_RX_CST_AMT: double (nullable = true)



In [59]:
outpath_pde = "/Users/jill/Downloads/2020_Project/EDA/concat_data/pde/pde_con"

In [60]:
#write_df(pde_con,outpath_pde)

In [61]:
# Reload the dataset
df_pde = spark.read.load(outpath_pde)

In [62]:
df_pde.count()

111085969

In [63]:
df_pde.printSchema()

root
 |-- DESYNPUF_ID: string (nullable = true)
 |-- PDE_ID: long (nullable = true)
 |-- SRVC_DT: integer (nullable = true)
 |-- PROD_SRVC_ID: string (nullable = true)
 |-- QTY_DSPNSD_NUM: double (nullable = true)
 |-- DAYS_SUPLY_NUM: integer (nullable = true)
 |-- PTNT_PAY_AMT: double (nullable = true)
 |-- TOT_RX_CST_AMT: double (nullable = true)



### Feature Extraction

-- *Benefician_feature dataset*

In [75]:
df_benefician.count()

2296491

In [76]:
df_benefician.columns

['DESYNPUF_ID',
 'BENE_BIRTH_DT',
 'BENE_DEATH_DT',
 'BENE_SEX_IDENT_CD',
 'BENE_RACE_CD',
 'BENE_ESRD_IND',
 'SP_STATE_CODE',
 'BENE_COUNTY_CD',
 'BENE_HI_CVRAGE_TOT_MONS',
 'BENE_SMI_CVRAGE_TOT_MONS',
 'BENE_HMO_CVRAGE_TOT_MONS',
 'PLAN_CVRG_MOS_NUM',
 'SP_ALZHDMTA',
 'SP_CHF',
 'SP_CHRNKIDN',
 'SP_CNCR',
 'SP_COPD',
 'SP_DEPRESSN',
 'SP_DIABETES',
 'SP_ISCHMCHT',
 'SP_OSTEOPRS',
 'SP_RA_OA',
 'SP_STRKETIA',
 'MEDREIMB_IP',
 'BENRES_IP',
 'PPPYMT_IP',
 'MEDREIMB_OP',
 'BENRES_OP',
 'PPPYMT_OP',
 'MEDREIMB_CAR',
 'BENRES_CAR',
 'PPPYMT_CAR']

In [96]:
feature_benefician = ['DESYNPUF_ID',
 'BENE_BIRTH_DT',
 'BENE_DEATH_DT',
 'BENE_SEX_IDENT_CD',
 'BENE_RACE_CD',
 'SP_STATE_CODE',
 'BENE_COUNTY_CD',
 'SP_ALZHDMTA',
 'SP_CHF',
 'SP_CHRNKIDN',
 'SP_CNCR',
 'SP_COPD',
 'SP_DEPRESSN',
 'SP_DIABETES',
 'SP_ISCHMCHT',
 'SP_OSTEOPRS',
 'SP_RA_OA',
 'SP_STRKETIA']

In [99]:
df_benefician_feature = df_benefician.select(*feature_benefician)

In [100]:
df_benefician_feature.count()

2296491

In [101]:
df_benefician_feature.columns

['DESYNPUF_ID',
 'BENE_BIRTH_DT',
 'BENE_DEATH_DT',
 'BENE_SEX_IDENT_CD',
 'BENE_RACE_CD',
 'SP_STATE_CODE',
 'BENE_COUNTY_CD',
 'SP_ALZHDMTA',
 'SP_CHF',
 'SP_CHRNKIDN',
 'SP_CNCR',
 'SP_COPD',
 'SP_DEPRESSN',
 'SP_DIABETES',
 'SP_ISCHMCHT',
 'SP_OSTEOPRS',
 'SP_RA_OA',
 'SP_STRKETIA']

In [104]:
outpath_feature_benefician= "/Users/jill/Downloads/2020_Project/EDA/concat_data/benefician/benefician_feature"

In [105]:
#write_df(df_benefician_feature, outpath_feature_benefician)

In [106]:
# Check the dataset
df_benefician_feature = spark.read.load(outpath_feature_benefician)

In [107]:
df_benefician_feature.count(), len(df_benefician_feature.columns)

(2296491, 18)

In [108]:
df_benefician_feature.printSchema()

root
 |-- DESYNPUF_ID: string (nullable = true)
 |-- BENE_BIRTH_DT: integer (nullable = true)
 |-- BENE_DEATH_DT: integer (nullable = true)
 |-- BENE_SEX_IDENT_CD: integer (nullable = true)
 |-- BENE_RACE_CD: integer (nullable = true)
 |-- SP_STATE_CODE: integer (nullable = true)
 |-- BENE_COUNTY_CD: integer (nullable = true)
 |-- SP_ALZHDMTA: integer (nullable = true)
 |-- SP_CHF: integer (nullable = true)
 |-- SP_CHRNKIDN: integer (nullable = true)
 |-- SP_CNCR: integer (nullable = true)
 |-- SP_COPD: integer (nullable = true)
 |-- SP_DEPRESSN: integer (nullable = true)
 |-- SP_DIABETES: integer (nullable = true)
 |-- SP_ISCHMCHT: integer (nullable = true)
 |-- SP_OSTEOPRS: integer (nullable = true)
 |-- SP_RA_OA: integer (nullable = true)
 |-- SP_STRKETIA: integer (nullable = true)



-- *Inpatient_claim feature dataset*

In [7]:
df_in_OC = spark.read.load("/Users/jill/Downloads/2020_Project/EDA/concat_data/inpatient/inpatient_OC/")
df_in_noOC = spark.read.load("/Users/jill/Downloads/2020_Project/EDA/concat_data/inpatient/inpatient_noOC/")

In [8]:
df_in_OC.count(), len(df_in_OC.columns), df_in_noOC.count(),len(df_in_noOC.columns)

(2649, 82, 1330173, 82)

In [9]:
df_in_OC.columns

['DESYNPUF_ID',
 'CLM_ID',
 'SEGMENT',
 'CLM_FROM_DT',
 'CLM_THRU_DT',
 'PRVDR_NUM',
 'CLM_PMT_AMT',
 'NCH_PRMRY_PYR_CLM_PD_AMT',
 'AT_PHYSN_NPI',
 'OP_PHYSN_NPI',
 'OT_PHYSN_NPI',
 'CLM_ADMSN_DT',
 'ADMTNG_ICD9_DGNS_CD',
 'CLM_PASS_THRU_PER_DIEM_AMT',
 'NCH_BENE_IP_DDCTBL_AMT',
 'NCH_BENE_PTA_COINSRNC_LBLTY_AM',
 'NCH_BENE_BLOOD_DDCTBL_LBLTY_AM',
 'CLM_UTLZTN_DAY_CNT',
 'NCH_BENE_DSCHRG_DT',
 'CLM_DRG_CD',
 'ICD9_DGNS_CD_1',
 'ICD9_DGNS_CD_2',
 'ICD9_DGNS_CD_3',
 'ICD9_DGNS_CD_4',
 'ICD9_DGNS_CD_5',
 'ICD9_DGNS_CD_6',
 'ICD9_DGNS_CD_7',
 'ICD9_DGNS_CD_8',
 'ICD9_DGNS_CD_9',
 'ICD9_DGNS_CD_10',
 'ICD9_PRCDR_CD_1',
 'ICD9_PRCDR_CD_2',
 'ICD9_PRCDR_CD_3',
 'ICD9_PRCDR_CD_4',
 'ICD9_PRCDR_CD_5',
 'ICD9_PRCDR_CD_6',
 'HCPCS_CD_1',
 'HCPCS_CD_2',
 'HCPCS_CD_3',
 'HCPCS_CD_4',
 'HCPCS_CD_5',
 'HCPCS_CD_6',
 'HCPCS_CD_7',
 'HCPCS_CD_8',
 'HCPCS_CD_9',
 'HCPCS_CD_10',
 'HCPCS_CD_11',
 'HCPCS_CD_12',
 'HCPCS_CD_13',
 'HCPCS_CD_14',
 'HCPCS_CD_15',
 'HCPCS_CD_16',
 'HCPCS_CD_17',
 'HCPCS_CD_18',

In [10]:
feature_inpatient = ['DESYNPUF_ID',
 'CLM_ID',
 'PRVDR_NUM',
 'CLM_ADMSN_DT',
 'NCH_BENE_DSCHRG_DT',
 'ADMTNG_ICD9_DGNS_CD',
 'CLM_DRG_CD',
 'ICD9_DGNS_CD_1',
 'ICD9_DGNS_CD_2',
 'ICD9_DGNS_CD_3',
 'ICD9_DGNS_CD_4',
 'ICD9_DGNS_CD_5',
 'ICD9_DGNS_CD_6',
 'ICD9_DGNS_CD_7',
 'ICD9_DGNS_CD_8',
 'ICD9_DGNS_CD_9',
 'ICD9_DGNS_CD_10',
 'ICD9_PRCDR_CD_1',
 'ICD9_PRCDR_CD_2',
 'ICD9_PRCDR_CD_3',
 'ICD9_PRCDR_CD_4',
 'ICD9_PRCDR_CD_5',
 'ICD9_PRCDR_CD_6',
 'HCPCS_CD_1',
 'HCPCS_CD_2',
 'HCPCS_CD_3',
 'HCPCS_CD_4',
 'HCPCS_CD_5',
 'HCPCS_CD_6',
 'HCPCS_CD_7',
 'HCPCS_CD_8',
 'HCPCS_CD_9',
 'HCPCS_CD_10',
 'HCPCS_CD_11',
 'HCPCS_CD_12',
 'HCPCS_CD_13',
 'HCPCS_CD_14',
 'HCPCS_CD_15',
 'HCPCS_CD_16',
 'HCPCS_CD_17',
 'HCPCS_CD_18',
 'HCPCS_CD_19',
 'HCPCS_CD_20',
 'HCPCS_CD_21',
 'HCPCS_CD_22',
 'HCPCS_CD_23',
 'HCPCS_CD_24',
 'HCPCS_CD_25',
 'HCPCS_CD_26',
 'HCPCS_CD_27',
 'HCPCS_CD_28',
 'HCPCS_CD_29',
 'HCPCS_CD_30',
 'HCPCS_CD_31',
 'HCPCS_CD_32',
 'HCPCS_CD_33',
 'HCPCS_CD_34',
 'HCPCS_CD_35',
 'HCPCS_CD_36',
 'HCPCS_CD_37',
 'HCPCS_CD_38',
 'HCPCS_CD_39',
 'HCPCS_CD_40',
 'HCPCS_CD_41',
 'HCPCS_CD_42',
 'HCPCS_CD_43',
 'HCPCS_CD_44',
 'HCPCS_CD_45',
 'class'                 
 ]

In [11]:
df_inOC_feature, df_innoOC_feature = df_in_OC.select(*feature_inpatient),df_in_noOC.select(*feature_inpatient)

In [12]:
df_inOC_feature.count(), df_innoOC_feature.count()

(2649, 1330173)

In [13]:
len(df_inOC_feature.columns), len(df_innoOC_feature.columns)

(69, 69)

In [14]:
outpath_inoc_feature = "/Users/jill/Downloads/2020_Project/EDA/concat_data/inpatient/inpatient_oc_feature"
outpath_innooc_feature = "/Users/jill/Downloads/2020_Project/EDA/concat_data/inpatient/inpatient_nooc_feature"

In [15]:
df_inOC_feature.write.format("parquet").mode("overwrite").save(outpath_inoc_feature)

In [16]:
df_innoOC_feature.write.format("parquet").mode("overwrite").save(outpath_innooc_feature)

In [23]:
#write_df(df_innoOC_feature, outpath_innooc_feature)

In [17]:
# Check the datasets
dfin_oc = spark.read.load(outpath_inoc_feature)
dfin_nooc = spark.read.load(outpath_innooc_feature)

In [18]:
dfin_oc.count(), len(dfin_oc.columns), dfin_nooc.count(), len(dfin_nooc.columns)

(2649, 69, 1330173, 69)

In [19]:
sample_inpatient_OC = dfin_oc.limit(10).toPandas()

In [20]:
sample_inpatient_OC.head()

Unnamed: 0,DESYNPUF_ID,CLM_ID,PRVDR_NUM,CLM_ADMSN_DT,NCH_BENE_DSCHRG_DT,ADMTNG_ICD9_DGNS_CD,CLM_DRG_CD,ICD9_DGNS_CD_1,ICD9_DGNS_CD_2,ICD9_DGNS_CD_3,...,HCPCS_CD_37,HCPCS_CD_38,HCPCS_CD_39,HCPCS_CD_40,HCPCS_CD_41,HCPCS_CD_42,HCPCS_CD_43,HCPCS_CD_44,HCPCS_CD_45,class
0,81E664EC84B48C11,992831161633841,4400TC,20090914,20090916,78791,627,27651,56400,5849,...,,,,,,,,,,1
1,8318DFD5748676F7,992871161635799,1000ZT,20090427,20090505,1976,758,1830,1990,4280,...,,,,,,,,,,1
2,8634D62DFD5C1D29,992991161623602,2000HG,20090611,20090614,49121,204,49121,34690,1830,...,,,,,,,,,,1
3,87B20293027725BD,992661161617703,4400TC,20081230,20090102,79902,199,1972,4329,2760,...,,,,,,,,,,1
4,885F827203B8DC45,992331161619833,5000CP,20081005,20081009,51911,196,5183,25000,4019,...,,,,,,,,,,1


In [21]:
sample_inpatient_OC.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 69 columns):
DESYNPUF_ID            10 non-null object
CLM_ID                 10 non-null int64
PRVDR_NUM              10 non-null object
CLM_ADMSN_DT           10 non-null int32
NCH_BENE_DSCHRG_DT     10 non-null int32
ADMTNG_ICD9_DGNS_CD    10 non-null object
CLM_DRG_CD             10 non-null object
ICD9_DGNS_CD_1         10 non-null object
ICD9_DGNS_CD_2         10 non-null object
ICD9_DGNS_CD_3         10 non-null object
ICD9_DGNS_CD_4         10 non-null object
ICD9_DGNS_CD_5         10 non-null object
ICD9_DGNS_CD_6         10 non-null object
ICD9_DGNS_CD_7         9 non-null object
ICD9_DGNS_CD_8         9 non-null object
ICD9_DGNS_CD_9         9 non-null object
ICD9_DGNS_CD_10        0 non-null object
ICD9_PRCDR_CD_1        6 non-null float64
ICD9_PRCDR_CD_2        3 non-null object
ICD9_PRCDR_CD_3        3 non-null object
ICD9_PRCDR_CD_4        2 non-null object
ICD9_PRCDR_CD_5        1 n

-- *Outpatient_claim feature datasets*

In [5]:
df_out_OC = spark.read.load("/Users/jill/Downloads/2020_Project/EDA/concat_data/outpatient/outpatient_OC/")
df_out_noOC = spark.read.load("/Users/jill/Downloads/2020_Project/EDA/concat_data/outpatient/outpatient_noOC/")

In [6]:
df_out_OC.count(), len(df_out_OC.columns), df_out_noOC.count(),len(df_out_noOC.columns)

(16706, 77, 15810281, 77)

In [7]:
df_out_OC.columns

['DESYNPUF_ID',
 'CLM_ID',
 'SEGMENT',
 'CLM_FROM_DT',
 'CLM_THRU_DT',
 'PRVDR_NUM',
 'CLM_PMT_AMT',
 'NCH_PRMRY_PYR_CLM_PD_AMT',
 'AT_PHYSN_NPI',
 'OP_PHYSN_NPI',
 'OT_PHYSN_NPI',
 'NCH_BENE_BLOOD_DDCTBL_LBLTY_AM',
 'ICD9_DGNS_CD_1',
 'ICD9_DGNS_CD_2',
 'ICD9_DGNS_CD_3',
 'ICD9_DGNS_CD_4',
 'ICD9_DGNS_CD_5',
 'ICD9_DGNS_CD_6',
 'ICD9_DGNS_CD_7',
 'ICD9_DGNS_CD_8',
 'ICD9_DGNS_CD_9',
 'ICD9_DGNS_CD_10',
 'ICD9_PRCDR_CD_1',
 'ICD9_PRCDR_CD_2',
 'ICD9_PRCDR_CD_3',
 'ICD9_PRCDR_CD_4',
 'ICD9_PRCDR_CD_5',
 'ICD9_PRCDR_CD_6',
 'NCH_BENE_PTB_DDCTBL_AMT',
 'NCH_BENE_PTB_COINSRNC_AMT',
 'ADMTNG_ICD9_DGNS_CD',
 'HCPCS_CD_1',
 'HCPCS_CD_2',
 'HCPCS_CD_3',
 'HCPCS_CD_4',
 'HCPCS_CD_5',
 'HCPCS_CD_6',
 'HCPCS_CD_7',
 'HCPCS_CD_8',
 'HCPCS_CD_9',
 'HCPCS_CD_10',
 'HCPCS_CD_11',
 'HCPCS_CD_12',
 'HCPCS_CD_13',
 'HCPCS_CD_14',
 'HCPCS_CD_15',
 'HCPCS_CD_16',
 'HCPCS_CD_17',
 'HCPCS_CD_18',
 'HCPCS_CD_19',
 'HCPCS_CD_20',
 'HCPCS_CD_21',
 'HCPCS_CD_22',
 'HCPCS_CD_23',
 'HCPCS_CD_24',
 'HCPCS_CD_25',


In [8]:
outpath_outoc_feature = "/Users/jill/Downloads/2020_Project/EDA/concat_data/outpatient/outpatient_oc_feature"


In [9]:
feature_outpatient = ['DESYNPUF_ID',
 'CLM_ID',
 'PRVDR_NUM',
 'ICD9_DGNS_CD_1',
 'ICD9_DGNS_CD_2',
 'ICD9_DGNS_CD_3',
 'ICD9_DGNS_CD_4',
 'ICD9_DGNS_CD_5',
 'ICD9_DGNS_CD_6',
 'ICD9_DGNS_CD_7',
 'ICD9_DGNS_CD_8',
 'ICD9_DGNS_CD_9',
 'ICD9_DGNS_CD_10',
 'ICD9_PRCDR_CD_1',
 'ICD9_PRCDR_CD_2',
 'ICD9_PRCDR_CD_3',
 'ICD9_PRCDR_CD_4',
 'ICD9_PRCDR_CD_5',
 'ICD9_PRCDR_CD_6',
 'ADMTNG_ICD9_DGNS_CD',
 'HCPCS_CD_1',
 'HCPCS_CD_2',
 'HCPCS_CD_3',
 'HCPCS_CD_4',
 'HCPCS_CD_5',
 'HCPCS_CD_6',
 'HCPCS_CD_7',
 'HCPCS_CD_8',
 'HCPCS_CD_9',
 'HCPCS_CD_10',
 'HCPCS_CD_11',
 'HCPCS_CD_12',
 'HCPCS_CD_13',
 'HCPCS_CD_14',
 'HCPCS_CD_15',
 'HCPCS_CD_16',
 'HCPCS_CD_17',
 'HCPCS_CD_18',
 'HCPCS_CD_19',
 'HCPCS_CD_20',
 'HCPCS_CD_21',
 'HCPCS_CD_22',
 'HCPCS_CD_23',
 'HCPCS_CD_24',
 'HCPCS_CD_25',
 'HCPCS_CD_26',
 'HCPCS_CD_27',
 'HCPCS_CD_28',
 'HCPCS_CD_29',
 'HCPCS_CD_30',
 'HCPCS_CD_31',
 'HCPCS_CD_32',
 'HCPCS_CD_33',
 'HCPCS_CD_34',
 'HCPCS_CD_35',
 'HCPCS_CD_36',
 'HCPCS_CD_37',
 'HCPCS_CD_38',
 'HCPCS_CD_39',
 'HCPCS_CD_40',
 'HCPCS_CD_41',
 'HCPCS_CD_42',
 'HCPCS_CD_43',
 'HCPCS_CD_44',
 'HCPCS_CD_45',
 'class'
 ]

In [10]:
feature_outpatient_exclude = [
 'SEGMENT',
 'CLM_FROM_DT',
 'CLM_THRU_DT',
 'CLM_PMT_AMT',
 'NCH_PRMRY_PYR_CLM_PD_AMT',
 'AT_PHYSN_NPI',
 'OP_PHYSN_NPI',
 'OT_PHYSN_NPI',
 'NCH_BENE_BLOOD_DDCTBL_LBLTY_AM',
 'NCH_BENE_PTB_DDCTBL_AMT',
 'NCH_BENE_PTB_COINSRNC_AMT',
]

In [19]:
len(feature_outpatient_exclude)

11

In [10]:
len(feature_outpatient)

66

In [23]:
len(df_out_noOC.columns)

77

In [11]:
df_outnoOC_features = df_out_noOC.drop(*feature_outpatient_exclude)

In [12]:
df_outnoOC_features.count(), len(df_outnoOC_features.columns)

(15810281, 66)

In [21]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F


In [23]:
w = Window.orderBy("DESYNPUF_ID")
sample = df_outnoOC_features.withColumn("index", F.row_number().over(w))

In [24]:
sample.count(),len(sample.columns)

(15810281, 67)

In [25]:
sample.show()

+----------------+---------------+---------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+-------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----+-----+
|     DESYNPUF_ID|         CLM_ID|PRVDR_NUM|ICD9_DGNS_CD_1|ICD9_DGNS_CD_2|ICD9_DGNS_CD_3|ICD9_DGNS_CD_4|ICD9_DGNS_CD_5|ICD9_DGNS_CD_6|ICD9_DGNS_C

In [45]:
df1 = sample.filter(sample.index.between(1, 500000)).drop("index") 

In [49]:
df2 = sample.filter(sample.index.between(500001, 1000000)).drop("index")

In [50]:
df3 = sample.filter(sample.index.between(1000001, 1500000)).drop("index")

In [51]:
df4 = sample.filter(sample.index.between(1500001, 15810281)).drop("index")

In [46]:
df1.count()

500000

In [48]:
len(df1.columns)

66

In [47]:
df1.limit(10).toPandas()

Unnamed: 0,DESYNPUF_ID,CLM_ID,PRVDR_NUM,ICD9_DGNS_CD_1,ICD9_DGNS_CD_2,ICD9_DGNS_CD_3,ICD9_DGNS_CD_4,ICD9_DGNS_CD_5,ICD9_DGNS_CD_6,ICD9_DGNS_CD_7,...,HCPCS_CD_37,HCPCS_CD_38,HCPCS_CD_39,HCPCS_CD_40,HCPCS_CD_41,HCPCS_CD_42,HCPCS_CD_43,HCPCS_CD_44,HCPCS_CD_45,class
0,00000B48BCF4AD29,391362254696220,10026U,2721,V1250,2330,,,,,...,,,,,,,,,,0
1,00000B48BCF4AD29,391612254357344,0300YM,72999,72981,,,,,,...,,,,,,,,,,0
2,00000B48BCF4AD29,391812254100253,1002GD,7216,,,,,,,...,,,,,,,,,,0
3,00000B48BCF4AD29,391012254328356,1000GD,99671,4019,V1046,53081.0,V5861,3441,2859.0,...,,,,,,,,,,0
4,00000B48BCF4AD29,391602254471197,1000UU,7916,V7644,,,,,,...,,,,,,,,,,0
5,00000B48BCF4AD29,391222254167687,2300PV,7850,4019,V5861,,,,,...,,,,,,,,,,0
6,00000B48BCF4AD29,391392254472138,1000BA,81112,,,,,,,...,,,,,,,,,,0
7,00000B48BCF4AD29,391212254536362,2201RB,2370,,,,,,,...,,,,,,,,,,0
8,00000B48BCF4AD29,391942254400929,1000SM,2279,V1005,4019,56210.0,20280,V4581,,...,,,,,,,,,,0
9,00000B48BCF4AD29,391812254364873,2200TM,9916,,,,,,,...,,,,,,,,,,0


In [65]:
df1.write.save(outpath_outnooc_feature, format="parquet", mode = "overwrite")

In [66]:
df2.write.save(outpath_outnooc_feature, format="parquet", mode = "append")

In [67]:
df3.write.save(outpath_outnooc_feature, format="parquet", mode = "append")

In [68]:
df4.write.save(outpath_outnooc_feature, format="parquet", mode = "append")

In [54]:
df2.write.format("parquet").mode("append").save(outpath_outnooc_feature)

In [55]:
df3.write.format("parquet").mode("append").save(outpath_outnooc_feature)

In [56]:
df4.write.format("parquet").mode("append").save(outpath_outnooc_feature)

In [12]:
df_outOC_feature.count(), len(df_outOC_feature.columns), df_outnoOC_feature.count(),len(df_outnoOC_feature.columns)

(16706, 66, 15810281, 66)

In [34]:
#write_df(df_outOC_feature, outpath_outoc_feature)

In [64]:
outpath_outnooc_feature = "/Users/jill/Downloads/2020_Project/EDA/concat_data/outpatient/outpatient_noOC_feature"

In [71]:
# Check datasets
#dfout_oc = spark.read.load(outpath_outoc_feature)
dfout_nooc = spark.read.load(outpath_outnooc_feature)

In [72]:
dfout_nooc.count(),len(dfout_nooc.columns)

(15810281, 66)

In [24]:
dfout_oc.count(), len(dfout_oc.columns), dfout_nooc.count(),len(dfout_nooc.columns)

(16706, 13, 15810281, 13)

-- *Carrier_claim feature datasets*

In [73]:
df_carrier_OC = spark.read.load("/Users/jill/Downloads/2020_Project/EDA/concat_data/carrier/carrier_OC/")
df_carrier_noOC = spark.read.load("/Users/jill/Downloads/2020_Project/EDA/concat_data/carrier/carrier_noOC/")

In [74]:
df_carrier_OC.count(), len(df_carrier_OC.columns), df_carrier_noOC.count(),len(df_carrier_noOC.columns)

(32239, 143, 47416502, 143)

In [75]:
df_carrier_OC.columns

['DESYNPUF_ID',
 'CLM_ID',
 'CLM_FROM_DT',
 'CLM_THRU_DT',
 'ICD9_DGNS_CD_1',
 'ICD9_DGNS_CD_2',
 'ICD9_DGNS_CD_3',
 'ICD9_DGNS_CD_4',
 'ICD9_DGNS_CD_5',
 'ICD9_DGNS_CD_6',
 'ICD9_DGNS_CD_7',
 'ICD9_DGNS_CD_8',
 'PRF_PHYSN_NPI_1',
 'PRF_PHYSN_NPI_2',
 'PRF_PHYSN_NPI_3',
 'PRF_PHYSN_NPI_4',
 'PRF_PHYSN_NPI_5',
 'PRF_PHYSN_NPI_6',
 'PRF_PHYSN_NPI_7',
 'PRF_PHYSN_NPI_8',
 'PRF_PHYSN_NPI_9',
 'PRF_PHYSN_NPI_10',
 'PRF_PHYSN_NPI_11',
 'PRF_PHYSN_NPI_12',
 'PRF_PHYSN_NPI_13',
 'TAX_NUM_1',
 'TAX_NUM_2',
 'TAX_NUM_3',
 'TAX_NUM_4',
 'TAX_NUM_5',
 'TAX_NUM_6',
 'TAX_NUM_7',
 'TAX_NUM_8',
 'TAX_NUM_9',
 'TAX_NUM_10',
 'TAX_NUM_11',
 'TAX_NUM_12',
 'TAX_NUM_13',
 'HCPCS_CD_1',
 'HCPCS_CD_2',
 'HCPCS_CD_3',
 'HCPCS_CD_4',
 'HCPCS_CD_5',
 'HCPCS_CD_6',
 'HCPCS_CD_7',
 'HCPCS_CD_8',
 'HCPCS_CD_9',
 'HCPCS_CD_10',
 'HCPCS_CD_11',
 'HCPCS_CD_12',
 'HCPCS_CD_13',
 'LINE_NCH_PMT_AMT_1',
 'LINE_NCH_PMT_AMT_2',
 'LINE_NCH_PMT_AMT_3',
 'LINE_NCH_PMT_AMT_4',
 'LINE_NCH_PMT_AMT_5',
 'LINE_NCH_PMT_AMT_6',
 '

In [76]:
feature_carrier = ['DESYNPUF_ID',
 'CLM_ID',
 'ICD9_DGNS_CD_1',
 'ICD9_DGNS_CD_2',
 'ICD9_DGNS_CD_3',
 'ICD9_DGNS_CD_4',
 'ICD9_DGNS_CD_5',
 'ICD9_DGNS_CD_6',
 'ICD9_DGNS_CD_7',
 'ICD9_DGNS_CD_8',
 'HCPCS_CD_1',
 'HCPCS_CD_2',
 'HCPCS_CD_3',
 'HCPCS_CD_4',
 'HCPCS_CD_5',
 'HCPCS_CD_6',
 'HCPCS_CD_7',
 'HCPCS_CD_8',
 'HCPCS_CD_9',
 'HCPCS_CD_10',
 'HCPCS_CD_11',
 'HCPCS_CD_12',
 'HCPCS_CD_13',
 ]

In [77]:
df_carrierOC_feature, df_carriernoOC_feature = df_carrier_OC.select(*feature_carrier),df_carrier_noOC.select(*feature_carrier)

In [78]:
df_carrierOC_feature.count(), len(df_carrierOC_feature.columns), df_carriernoOC_feature.count(),len(df_carriernoOC_feature.columns)


(32239, 23, 47416502, 23)

In [79]:
outpath_carrieroc_feature = "/Users/jill/Downloads/2020_Project/EDA/concat_data/carrier/carrier_oc_feature"
outpath_carriernooc_feature = "/Users/jill/Downloads/2020_Project/EDA/concat_data/carrier/carrier_nooc_feature"

In [82]:
#write_df(df_carrierOC_feature, outpath_carrieroc_feature)

In [83]:
#write_df(df_carriernoOC_feature, outpath_carriernooc_feature)

In [84]:
# Check the datasets
dfcarrier_oc_feature = spark.read.load(outpath_carrieroc_feature)
dfcarrier_nooc_feature = spark.read.load(outpath_carriernooc_feature)

In [85]:
dfcarrier_oc_feature.count(), len(dfcarrier_oc_feature.columns), dfcarrier_nooc_feature.count(),len(dfcarrier_nooc_feature.columns)


(32239, 23, 47416502, 23)

-- *PDE feature datasets*

In [158]:
df_pde = spark.read.load("/Users/jill/Downloads/2020_Project/EDA/concat_data/pde/pde_con/")

In [160]:
df_pde.count(),len(df_pde.columns)

(111085969, 8)

In [161]:
df_pde.columns

['DESYNPUF_ID',
 'PDE_ID',
 'SRVC_DT',
 'PROD_SRVC_ID',
 'QTY_DSPNSD_NUM',
 'DAYS_SUPLY_NUM',
 'PTNT_PAY_AMT',
 'TOT_RX_CST_AMT']

#### *OC INSTANCES*

In [16]:
dfin_oc.count(),dfin_nooc.count()

(2649, 1330173)

In [20]:
print ("Number of OC instances in inpaient_claim")
dfin_oc.select(F.countDistinct("DESYNPUF_ID")).show() 

Number of OC instances in inpaient_claim
+---------------------------+
|count(DISTINCT DESYNPUF_ID)|
+---------------------------+
|                       2645|
+---------------------------+



In [21]:
print ("Number of not OC instances in inpaient_claim")
dfin_nooc.select(F.countDistinct("DESYNPUF_ID")).show()

Number of not OC instances in inpaient_claim
+---------------------------+
|count(DISTINCT DESYNPUF_ID)|
+---------------------------+
|                     754338|
+---------------------------+



In [25]:
dfout_oc.count(), dfout_nooc.count()

(16706, 15810281)

In [32]:
print ("Number of OC instances in outpaient_claim")
dfout_oc.select(F.countDistinct("DESYNPUF_ID")).show()

Number of OC instances in outpaient_claim
+---------------------------+
|count(DISTINCT DESYNPUF_ID)|
+---------------------------+
|                      15899|
+---------------------------+



In [33]:
print ("Number of not OC instances in outpaient_claim")
dfout_nooc.select(F.countDistinct("DESYNPUF_ID")).show()

Number of not OC instances in outpaient_claim
+---------------------------+
|count(DISTINCT DESYNPUF_ID)|
+---------------------------+
|                    1703916|
+---------------------------+



In [38]:
dfcarrier_oc_feature.count(), dfcarrier_nooc_feature.count()

(32239, 47416502)

In [37]:
print ("Number of OC instances in carrier_claim")
dfcarrier_oc_feature.select(F.countDistinct("DESYNPUF_ID")).show()

Number of OC instances in carrier_claim
+---------------------------+
|count(DISTINCT DESYNPUF_ID)|
+---------------------------+
|                      27867|
+---------------------------+



In [39]:
print ("Number of not OC instances in carrier_claim")
dfcarrier_nooc_feature.select(F.countDistinct("DESYNPUF_ID")).show()

Number of not OC instances in carrier_claim
+---------------------------+
|count(DISTINCT DESYNPUF_ID)|
+---------------------------+
|                     986837|
+---------------------------+



In [None]:
print ("Total number of instances with OC:")

res = list(set(x+f))

In [69]:
lst_in_oc = dfin_oc.select("DESYNPUF_ID").distinct().collect()

In [71]:
len(lst_in_oc)

2645

In [77]:
lst_in_nooc = dfin_nooc.select("DESYNPUF_ID").distinct().collect()

In [78]:
len(lst_in_nooc)

754338

In [57]:
lst_in_unique = list(set(list_in))

In [58]:
len(lst_in_unique)

2645

In [72]:
lst_out_oc = dfout_oc.select("DESYNPUF_ID").distinct().collect()

In [79]:
lst_out_nooc = dfout_nooc.select("DESYNPUF_ID").distinct().collect()

In [64]:
lst_carrier_oc = dfcarrier_oc_feature.select("DESYNPUF_ID").distinct().collect()

In [73]:
len(lst_out_oc), len(lst_carrier_oc)

(15899, 27867)

In [66]:
lst_carrier_nooc = dfcarrier_nooc_feature.select("DESYNPUF_ID").distinct().collect()

In [80]:
len(lst_out_nooc), len(lst_carrier_nooc)

(1703916, 986837)

In [75]:
total_oc = list(set(lst_in_oc + lst_out_oc + lst_carrier_oc))

In [84]:
print ("Total number of OC:")
len(total_oc)

Total number of OC:


44857

In [81]:
total_nooc = list(set(lst_in_nooc + lst_out_nooc + lst_carrier_nooc))

In [83]:
print ("Total number of noOC:")
len(total_nooc)

Total number of noOC:


1859182