# PHC4 AHA analysis using Pyspark - demo

## Step 0: setup

#### Imports & Spark setup

In [None]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("pyspark_demo").getOrCreate()
sc = spark.sparkContext

#### Input file paths

In [None]:
## include file extension (.csv | .txt | .tsv | .bed )
aha_file_path = "/path/to/aha.txt" 
phc4_file_path = "/path/to/phc4.tsv"

<br>  

<div class="alert alert-block alert-info">
**Note**: Spark/Pyspark is NOT limited to flat text files!  


> Spark/Pyspark can load data from various external storage systems: file systems, key-value stores, directly from SQL databases, etc... 

</div>

<br>  

## Step 1: load input files

In [None]:
phc4_orig_df = spark.read.csv(phc4_file_path, sep='\t', header=True)

phc4_orig_df.show(3)

In [None]:
aha_orig_df = spark.read.csv(aha_file_path, sep='\t', header=True)

aha_orig_df.show(3)

<br>  

## Step 2: preprocess DataFrames

#### PHC4

In [None]:
phc4_filt_df = phc4_orig_df.filter(col("OpCode") == "Surgery")
phc4_filt_df = phc4_filt_df.filter(col("Year").between(1994, 2005))
phc4_filt_df = phc4_filt_df.na.drop(["PatientID"])

phc4_filt_df.show(3)

#### AHA

In [None]:
aha_filt_df = aha_orig_df.na.drop(["AHA", "Zip"])

aha_filt_df.show(3)

<br>  

## Step 3: join DataFrames

In [None]:
join_df = aha_filt_df.join(phc4_filt_df, on=['HospitalID'], how="inner")

join_df.show(3)

<br>  

## Step 4: process joined DataFrame

In [None]:
join_filt_df = join_df.filter((col("TotalCharge") < 1000000) & (col("Cost") < 500000))
join_filt_df = join_filt_df.na.drop(["Cost-to-charge"])

join_filt_df.show(3)

In [None]:
join_filt_df = join_filt_df.filter(...)