In [1]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import lit, substring, rand, array
from pyspark.sql.types import *
import csv
import numpy as np

In [2]:
#set up PySpark environment
conf = SparkConf().setAppName('claims_analysis').setMaster('local[*]')
sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

# Load benefits data

In [None]:
#define schema of benefits data 
#Patient IDs are non-identifying
#Source: https://www.cms.gov/Research-Statistics-Data-and-Systems/Downloadable-Public-Use-Files/SynPUFs/DE_Syn_PUF
field = [StructField('DESYNPUF_ID',StringType(), True),
         StructField('BENE_BIRTH_DT', IntegerType(), True),
         StructField('BENE_DEATH_DT', IntegerType(), True),
         StructField('BENE_SEX_IDENT_CD', IntegerType(), True),
         StructField('BENE_RACE_CD', IntegerType(), True),
         StructField('BENE_ESRD_IND', IntegerType(), True),
         StructField('SP_STATE_CODE', IntegerType(), True),
         StructField('BENE_COUNTY_CD', IntegerType(), True),
         StructField('SP_ALZHDMTA', IntegerType(), True),
         StructField('SP_CHF', IntegerType(), True),
         StructField('SP_CHRNKIDN', IntegerType(), True),
         StructField('SP_CNCR', IntegerType(), True),
         StructField('SP_COPD', IntegerType(), True),
         StructField('SP_DEPRESSN', IntegerType(), True),
         StructField('SP_DIABETES', IntegerType(), True),
         StructField('YEAR', IntegerType(), True),
        ]

schema = StructType(field)
bene = spark.createDataFrame(sc.emptyRDD(), schema)

#process all files stored in the Data/Beneficiary folder
for year in ['2008', '2009', '2010']:
    for i in range(20):
        df = spark.read.csv('Data/Beneficiary/DE1_0_{}_Beneficiary_Summary_File_Sample_{}.csv'.format(year, i+1), sep=',', header=True, inferSchema=True)
        df = df.select(['DESYNPUF_ID', 'BENE_BIRTH_DT', 'BENE_DEATH_DT', 
                          'BENE_SEX_IDENT_CD', 'BENE_RACE_CD', 'BENE_ESRD_IND',
                          'SP_STATE_CODE', 'BENE_COUNTY_CD', 
                         'SP_ALZHDMTA', 'SP_CHF', 'SP_CHRNKIDN', 'SP_CNCR', 'SP_COPD', 'SP_DEPRESSN', 'SP_DIABETES'
                         ])
                      
        df = df.withColumn('YEAR', lit(year))
        
        bene = bene.union(df)

# Load claims data

In [None]:
#define schema for the Outpatient Claims data
#Patient IDs are non-identifying
#Source: https://www.cms.gov/Research-Statistics-Data-and-Systems/Downloadable-Public-Use-Files/SynPUFs/DE_Syn_PUF
field = [StructField('DESYNPUF_ID',StringType(), True),
         StructField('CLM_ID', IntegerType(), True),
         StructField('CLM_FROM_DT', IntegerType(), True),
         StructField('CLM_THRU_DT', IntegerType(), True),
         StructField('CLM_PMT_AMT', IntegerType(), True),
         StructField('AT_PHYSN_NPI', IntegerType(), True),
         StructField('ICD9_DGNS_CD_1', IntegerType(), True),
        ]

schema = StructType(field)
claims = spark.createDataFrame(sc.emptyRDD(), schema)

#process all 20 outpatient claims files and store into single 'claims' variable
for i in range(20):
    df = spark.read.csv('Data/Outpatient/DE1_0_2008_to_2010_Outpatient_Claims_Sample_{}.csv'.format(i+1), sep=',', header=True, inferSchema=True)
    df = df.select(['DESYNPUF_ID',
         'CLM_ID', 
         'CLM_FROM_DT', 
         'CLM_THRU_DT', 
         'CLM_PMT_AMT', 
         'AT_PHYSN_NPI', 
         'ICD9_DGNS_CD_1'])
        
    claims = claims.union(df)
    
claims = claims.withColumn('YEAR', substring('CLM_THRU_DT',1,4))

# Load NPI data

In [None]:
#NPI is National Prescriber Identifier and is a standard identifier for health care professionals
#This data set will allow us to link the location of HCPs to determine zip codes with the highest concentration of prescribing patterns
#Source: https://download.cms.gov/nppes/NPI_Files.html
npi = spark.read.csv('Data/NPI/NPPES_Data_Dissemination_November_2020/npidata_pfile_20050523-20201108.csv',
                          header=True, inferSchema=True)

#Limit dataset to necesary HCP identifying attriubutes. 
npi = npi.select(['NPI'
,'Provider Last Name (Legal Name)'
,'Provider First Name'
,'Provider Middle Name'
,'Provider Credential Text'
,'Provider First Line Business Mailing Address'
,'Provider Second Line Business Mailing Address'
,'Provider Business Mailing Address City Name'
,'Provider Business Mailing Address State Name'
,'Provider Business Mailing Address Postal Code'
,"`Provider Business Mailing Address Country Code (If outside U.S.)`"
,'Provider Business Mailing Address Telephone Number'
,'Provider Business Mailing Address Fax Number'
])

npi = npi.withColumn('ZIP_CODE', substring('Provider Business Mailing Address Postal Code',1,5))

# Load the ICD Lookup table

In [None]:
#This will help us understand what the official ICD diagnosis codes actually mean
# Source: https://www.cms.gov/Medicare/Coding/ICD9ProviderDiagnosticCodes/codes
icd_lookup = spark.read.csv('Data/Outpatient/CMS28_DESC_LONG_SHORT_DX.csv',
                           header=True, inferSchema=True)