In [2]:
import findspark
findspark.init('D:\spark\spark-3.3.2-bin-hadoop2/')
import pyspark

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

In [4]:
pyspark.__version__

'3.3.2'

In [5]:
mongo_ip = 'mongodb://localhost:27017/Insurance.'

In [6]:
spark = SparkSession \
.builder\
.appName("myApp")\
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")\
.getOrCreate()

In [6]:
ben_2009 = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",mongo_ip + "Ben_2009").load()

In [7]:
ben_2009.createOrReplaceTempView("Benef")

In [8]:
ben_2009.printSchema()

root
 |-- BENE_BIRTH_DT: string (nullable = true)
 |-- BENE_COUNTY_CD: string (nullable = true)
 |-- BENE_DEATH_DT: string (nullable = true)
 |-- BENE_ESRD_IND: string (nullable = true)
 |-- BENE_HI_CVRAGE_TOT_MONS: string (nullable = true)
 |-- BENE_HMO_CVRAGE_TOT_MONS: string (nullable = true)
 |-- BENE_RACE_CD: string (nullable = true)
 |-- BENE_SEX_IDENT_CD: string (nullable = true)
 |-- BENE_SMI_CVRAGE_TOT_MONS: string (nullable = true)
 |-- BENRES_CAR: string (nullable = true)
 |-- BENRES_IP: string (nullable = true)
 |-- BENRES_OP: string (nullable = true)
 |-- DESYNPUF_ID: string (nullable = true)
 |-- MEDREIMB_CAR: string (nullable = true)
 |-- MEDREIMB_IP: string (nullable = true)
 |-- MEDREIMB_OP: string (nullable = true)
 |-- PLAN_CVRG_MOS_NUM: string (nullable = true)
 |-- PPPYMT_CAR: string (nullable = true)
 |-- PPPYMT_IP: string (nullable = true)
 |-- PPPYMT_OP: string (nullable = true)
 |-- SP_ALZHDMTA: string (nullable = true)
 |-- SP_CHF: string (nullable = true)
 |-

In [9]:
ben_2009.columns

['BENE_BIRTH_DT',
 'BENE_COUNTY_CD',
 'BENE_DEATH_DT',
 'BENE_ESRD_IND',
 'BENE_HI_CVRAGE_TOT_MONS',
 'BENE_HMO_CVRAGE_TOT_MONS',
 'BENE_RACE_CD',
 'BENE_SEX_IDENT_CD',
 'BENE_SMI_CVRAGE_TOT_MONS',
 'BENRES_CAR',
 'BENRES_IP',
 'BENRES_OP',
 'DESYNPUF_ID',
 'MEDREIMB_CAR',
 'MEDREIMB_IP',
 'MEDREIMB_OP',
 'PLAN_CVRG_MOS_NUM',
 'PPPYMT_CAR',
 'PPPYMT_IP',
 'PPPYMT_OP',
 'SP_ALZHDMTA',
 'SP_CHF',
 'SP_CHRNKIDN',
 'SP_CNCR',
 'SP_COPD',
 'SP_DEPRESSN',
 'SP_DIABETES',
 'SP_ISCHMCHT',
 'SP_OSTEOPRS',
 'SP_RA_OA',
 'SP_STATE_CODE',
 'SP_STRKETIA',
 '_id']

In [10]:
drop_col = ['_id','BENE_COUNTY_CD','BENRES_CAR','MEDREIMB_CAR','PPPYMT_CAR']

In [11]:
ben_2009 = ben_2009.drop(*drop_col)

In [12]:
ben_2009 = ben_2009.withColumnRenamed("BENE_BIRTH_DT","Date_of_birth") \
.withColumnRenamed('BENE_DEATH_DT','Date_of_Death')\
.withColumnRenamed('BENE_ESRD_IND','End_stage_renal_disease_Indicator')\
.withColumnRenamed('BENE_HI_CVRAGE_TOT_MONS','Total_months_partA_coverage')\
.withColumnRenamed('BENE_HMO_CVRAGE_TOT_MONS','Total_months_HMO_coverage')\
.withColumnRenamed('BENE_SEX_IDENT_CD','Gender')\
.withColumnRenamed('BENE_RACE_CD','Race_Code')\
.withColumnRenamed('BENE_SMI_CVRAGE_TOT_MONS','Total_months_partB_coverage')\
.withColumnRenamed('BENRES_IP','Inpatient_responsibility_amount')\
.withColumnRenamed('BENRES_OP','Outpatient_responsibility_amount')\
.withColumnRenamed('MEDREIMB_IP','Inpatient_reimbursement_amount')\
.withColumnRenamed('MEDREIMB_OP','Outpatient_reimbursement_amount')\
.withColumnRenamed('PLAN_CVRG_MOS_NUM','Total_months_partD_coverage')\
.withColumnRenamed('PPPYMT_IP','Inpatient_primary_payer_reimbursement_amount')\
.withColumnRenamed('PPPYMT_OP','Outpatient_primary_payer_reimbursement_amount')\
.withColumnRenamed('SP_ALZHDMTA','Alzheimer')\
.withColumnRenamed('SP_CHF','Heart_Failure')\
.withColumnRenamed('SP_CHRNKIDN','Chronic_Kidney_Disease')\
.withColumnRenamed('SP_CNCR','Cancer')\
.withColumnRenamed('SP_COPD','Chronic_Obstructive_Pulmonary_Disease')\
.withColumnRenamed('SP_DEPRESSN','Depression')\
.withColumnRenamed('SP_DIABETES','Diabetes')\
.withColumnRenamed('SP_ISCHMCHT','Ischemic_Heart_Disease')\
.withColumnRenamed('SP_OSTEOPRS','Osteoporosis')\
.withColumnRenamed('SP_RA_OA','Rheumatoid_arthritis_osteoarthritis')\
.withColumnRenamed('SP_STATE_CODE','State_Code')\
.withColumnRenamed('SP_STRKETIA','Stroke_transient_Ischemic_Attack')

In [13]:
ben_2009.columns

['Date_of_birth',
 'Date_of_Death',
 'End_stage_renal_disease_Indicator',
 'Total_months_partA_coverage',
 'Total_months_HMO_coverage',
 'Race_Code',
 'Gender',
 'Total_months_partB_coverage',
 'Inpatient_responsibility_amount',
 'Outpatient_responsibility_amount',
 'DESYNPUF_ID',
 'Inpatient_reimbursement_amount',
 'Outpatient_reimbursement_amount',
 'Total_months_partD_coverage',
 'Inpatient_primary_payer_reimbursement_amount',
 'Outpatient_primary_payer_reimbursement_amount',
 'Alzheimer',
 'Heart_Failure',
 'Chronic_Kidney_Disease',
 'Cancer',
 'Chronic_Obstructive_Pulmonary_Disease',
 'Depression',
 'Diabetes',
 'Ischemic_Heart_Disease',
 'Osteoporosis',
 'Rheumatoid_arthritis_osteoarthritis',
 'State_Code',
 'Stroke_transient_Ischemic_Attack']

In [14]:
from datetime import datetime
from pyspark.sql.functions import col,udf
from pyspark.sql.types import DateType
from pyspark.sql.functions import date_format

In [15]:
func =  udf (lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
ben_2009 = ben_2009.withColumn('Date_of_birth', date_format(func(col('Date_of_birth')), 'yyyy-MM-dd'))

In [16]:
from pyspark.sql.functions import to_date
ben_2009 = ben_2009.withColumn("Date_of_birth", to_date("Date_of_birth", "yyyy-MM-dd"))

In [17]:
from pyspark.sql.functions import col,when
ben_2009 = ben_2009.withColumn("Date_of_death", \
       when(col("Date_of_death")=="","18000101") \
          .otherwise(col("Date_of_death")))

In [18]:
func =  udf (lambda x: datetime.strptime(x, '%Y%m%d'), DateType())
ben_2009 = ben_2009.withColumn('Date_of_death', date_format(func(col('Date_of_death')), 'yyyy-MM-dd'))

In [19]:
from pyspark.sql.functions import to_date
ben_2009 = ben_2009.withColumn("Date_of_death", to_date("Date_of_death", "yyyy-MM-dd"))

In [20]:
from pyspark.sql.functions import col,when,to_date
date_string = "1800-01-01"
ben_2009 = ben_2009.withColumn("Date_of_death", \
       when(col("Date_of_death")==date_string ,None) \
          .otherwise(col("Date_of_death")))

In [24]:
myColumns = ['Alzheimer',
 'Heart_Failure',
 'Chronic_Kidney_Disease',
 'Cancer',
 'Chronic_Obstructive_Pulmonary_Disease',
 'Depression',
 'Diabetes',
 'Ischemic_Heart_Disease',
 'Osteoporosis',
 'Rheumatoid_arthritis_osteoarthritis',
 'Stroke_transient_Ischemic_Attack']

In [25]:
from pyspark.sql.functions import regexp_replace
for i in myColumns:
    ben_2009 = ben_2009.withColumn(i, regexp_replace(i, '2', '0'))

In [29]:
ben_2009 = ben_2009.withColumn('total_number_of_diseases', ben_2009.Alzheimer + ben_2009.Heart_Failure + ben_2009.Chronic_Kidney_Disease + ben_2009.Cancer + ben_2009.Chronic_Obstructive_Pulmonary_Disease + ben_2009.Depression + ben_2009.Diabetes + ben_2009.Stroke_transient_Ischemic_Attack + ben_2009.Rheumatoid_arthritis_osteoarthritis + ben_2009.Osteoporosis + ben_2009.Ischemic_Heart_Disease)

In [31]:
ben_2009.dtypes

[('Date_of_birth', 'date'),
 ('Date_of_death', 'date'),
 ('End_stage_renal_disease_Indicator', 'string'),
 ('Total_months_partA_coverage', 'string'),
 ('Total_months_HMO_coverage', 'string'),
 ('Race_Code', 'string'),
 ('Gender', 'string'),
 ('Total_months_partB_coverage', 'string'),
 ('Inpatient_responsibility_amount', 'string'),
 ('Outpatient_responsibility_amount', 'string'),
 ('DESYNPUF_ID', 'string'),
 ('Inpatient_reimbursement_amount', 'string'),
 ('Outpatient_reimbursement_amount', 'string'),
 ('Total_months_partD_coverage', 'string'),
 ('Inpatient_primary_payer_reimbursement_amount', 'string'),
 ('Outpatient_primary_payer_reimbursement_amount', 'string'),
 ('Alzheimer', 'string'),
 ('Heart_Failure', 'string'),
 ('Chronic_Kidney_Disease', 'string'),
 ('Cancer', 'string'),
 ('Chronic_Obstructive_Pulmonary_Disease', 'string'),
 ('Depression', 'string'),
 ('Diabetes', 'string'),
 ('Ischemic_Heart_Disease', 'string'),
 ('Osteoporosis', 'string'),
 ('Rheumatoid_arthritis_osteoarthrit

In [32]:
ben_2009 = ben_2009.withColumn("total_number_of_diseases", 
                                  ben_2009["total_number_of_diseases"]
                                  .cast('int'))

In [35]:
for i in myColumns:
    ben_2009 = ben_2009.withColumn(i,ben_2009[i].cast('boolean'))

In [38]:
from pyspark.sql.functions import year
from pyspark.sql.functions import to_date

In [39]:
ben_2009 = ben_2009.withColumn('year',year(ben_2009.Date_of_birth))

In [43]:
ben_2009.dtypes

[('Date_of_birth', 'date'),
 ('Date_of_death', 'date'),
 ('End_stage_renal_disease_Indicator', 'string'),
 ('Total_months_partA_coverage', 'string'),
 ('Total_months_HMO_coverage', 'string'),
 ('Race_Code', 'string'),
 ('Gender', 'string'),
 ('Total_months_partB_coverage', 'string'),
 ('Inpatient_responsibility_amount', 'string'),
 ('Outpatient_responsibility_amount', 'string'),
 ('DESYNPUF_ID', 'string'),
 ('Inpatient_reimbursement_amount', 'string'),
 ('Outpatient_reimbursement_amount', 'string'),
 ('Total_months_partD_coverage', 'string'),
 ('Inpatient_primary_payer_reimbursement_amount', 'string'),
 ('Outpatient_primary_payer_reimbursement_amount', 'string'),
 ('Alzheimer', 'boolean'),
 ('Heart_Failure', 'boolean'),
 ('Chronic_Kidney_Disease', 'boolean'),
 ('Cancer', 'boolean'),
 ('Chronic_Obstructive_Pulmonary_Disease', 'boolean'),
 ('Depression', 'boolean'),
 ('Diabetes', 'boolean'),
 ('Ischemic_Heart_Disease', 'boolean'),
 ('Osteoporosis', 'boolean'),
 ('Rheumatoid_arthritis_ost

In [44]:
ben_2009 = ben_2009.withColumn("Age", 2009 - ben_2009.year)

In [47]:
dummy_2009 = ben_2009[['DESYNPUF_ID','Total_months_partA_coverage','Total_months_HMO_coverage','Total_months_partB_coverage','Total_months_partD_coverage']]

In [48]:
dummy_col = ['Total_months_partA_coverage','Total_months_HMO_coverage','Total_months_partB_coverage','Total_months_partD_coverage']

In [49]:
for i in dummy_col:
    dummy_2009 = dummy_2009.withColumn(i, \
              when(dummy_2009[i] > 0, 1).otherwise(0))

In [50]:
dummy_2009_pandas = dummy_2009.toPandas()

In [51]:
dummy_2009_pandas['total_number_of_Insurance'] = dummy_2009_pandas['Total_months_partA_coverage'] + dummy_2009_pandas['Total_months_HMO_coverage'] + dummy_2009_pandas['Total_months_partB_coverage'] + dummy_2009_pandas['Total_months_partD_coverage']

In [52]:
dummy_2009_spark = spark.createDataFrame(dummy_2009_pandas)

In [53]:
dummy_2009_spark.createOrReplaceTempView('dummy_2009_spark')

In [54]:
dummy_2009_spark.dtypes

[('DESYNPUF_ID', 'string'),
 ('Total_months_partA_coverage', 'bigint'),
 ('Total_months_HMO_coverage', 'bigint'),
 ('Total_months_partB_coverage', 'bigint'),
 ('Total_months_partD_coverage', 'bigint'),
 ('total_number_of_Insurance', 'bigint')]

In [55]:
dummy_2009_spark = dummy_2009_spark.drop('Total_months_partA_coverage','Total_months_HMO_coverage','Total_months_partB_coverage','Total_months_partD_coverage')

In [56]:
final_2009 = (ben_2009.join(dummy_2009_spark,
                               on = ben_2009['DESYNPUF_ID'] == dummy_2009_spark['DESYNPUF_ID'],
                               how = 'inner')
                         .select(ben_2009['*'],
                                 dummy_2009_spark['total_number_of_Insurance']))

In [60]:
final_2009_pandas = age_query.toPandas()

In [62]:
final_2009.dtypes

[('Date_of_birth', 'date'),
 ('Date_of_death', 'date'),
 ('End_stage_renal_disease_Indicator', 'string'),
 ('Total_months_partA_coverage', 'string'),
 ('Total_months_HMO_coverage', 'string'),
 ('Race_Code', 'string'),
 ('Gender', 'string'),
 ('Total_months_partB_coverage', 'string'),
 ('Inpatient_responsibility_amount', 'string'),
 ('Outpatient_responsibility_amount', 'string'),
 ('DESYNPUF_ID', 'string'),
 ('Inpatient_reimbursement_amount', 'string'),
 ('Outpatient_reimbursement_amount', 'string'),
 ('Total_months_partD_coverage', 'string'),
 ('Inpatient_primary_payer_reimbursement_amount', 'string'),
 ('Outpatient_primary_payer_reimbursement_amount', 'string'),
 ('Alzheimer', 'boolean'),
 ('Heart_Failure', 'boolean'),
 ('Chronic_Kidney_Disease', 'boolean'),
 ('Cancer', 'boolean'),
 ('Chronic_Obstructive_Pulmonary_Disease', 'boolean'),
 ('Depression', 'boolean'),
 ('Diabetes', 'boolean'),
 ('Ischemic_Heart_Disease', 'boolean'),
 ('Osteoporosis', 'boolean'),
 ('Rheumatoid_arthritis_ost

In [63]:
final_2009.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("spark.mongodb.output.uri","mongodb://localhost:27017/Cleaned_Insurance.ben_2009").save()