# 1. Load Data

### 1.1 Imports

In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import time
from pyspark import SparkFiles
import pandas as pd

In [2]:
# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/06/05 19:34:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### 1.2 Load Data

#### Medical Conditions

In [3]:
spark.sparkContext.addFile('Resources/medical_conditions_cleaned.csv')
medical_conditions = spark.read.csv(SparkFiles.get("medical_conditions_cleaned.csv"), sep=",", header=True, inferSchema=True)
medical_conditions.show(5)

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

+--------+---------------+--------------------+----------------+-----------+--------------+---------------+---------------------+--------------------------+-------------------+---------------+-----------------+---------------------+------+-------------+---------------------+---------------+------+-------+-------------+--------------+
|      id|ever_overweight|blood_transfusion_yr|heart_attack_age|fatty_liver|liver_fibrosis|liver_cirrhosis|liver_viral_hepatitis|liver_autoimmune_hepatitis|other_liver_disease|relative_asthma|relative_diabetes|relative_heart_attack|asthma|heart_failure|chronic_heart_disease|angina_pectoris|stroke|thyroid|liver_disease|gallbladder_pr|
+--------+---------------+--------------------+----------------+-----------+--------------+---------------+---------------------+--------------------------+-------------------+---------------+-----------------+---------------------+------+-------------+---------------------+---------------+------+-------+-------------+--------

In [4]:
medical_conditions.createOrReplaceTempView("medical_conditions")

In [5]:
medical_conditions.printSchema()

root
 |-- id: double (nullable = true)
 |-- ever_overweight: integer (nullable = true)
 |-- blood_transfusion_yr: double (nullable = true)
 |-- heart_attack_age: double (nullable = true)
 |-- fatty_liver: integer (nullable = true)
 |-- liver_fibrosis: integer (nullable = true)
 |-- liver_cirrhosis: integer (nullable = true)
 |-- liver_viral_hepatitis: integer (nullable = true)
 |-- liver_autoimmune_hepatitis: integer (nullable = true)
 |-- other_liver_disease: integer (nullable = true)
 |-- relative_asthma: integer (nullable = true)
 |-- relative_diabetes: integer (nullable = true)
 |-- relative_heart_attack: integer (nullable = true)
 |-- asthma: integer (nullable = true)
 |-- heart_failure: integer (nullable = true)
 |-- chronic_heart_disease: integer (nullable = true)
 |-- angina_pectoris: integer (nullable = true)
 |-- stroke: integer (nullable = true)
 |-- thyroid: integer (nullable = true)
 |-- liver_disease: integer (nullable = true)
 |-- gallbladder_pr: integer (nullable = true

In [6]:
spark.sql(" \
            select count(*) \
            from medical_conditions ").show()

+--------+
|count(1)|
+--------+
|   14986|
+--------+



#### Demographics

In [7]:
spark.sparkContext.addFile('Resources/total_df2.csv')
demo = spark.read.csv(SparkFiles.get("total_df2.csv"), sep=",", header=True, inferSchema=True)
demo.show(5)

+--------+-----------------------+---------------------+--------------+----+------------+--------------+
|      id|Total_Cholesterol_mg_dL|Frequency_of_Drinking|Drinks_per_Day|Work|Recreational|100_Cigarettes|
+--------+-----------------------+---------------------+--------------+----+------------+--------------+
|109266.0|                  195.0|                 10.0|           1.0|   0|           1|             0|
|109274.0|                  105.0|                  4.0|           2.0|   1|           0|             0|
|109292.0|                  172.0|                  4.0|           6.0|   0|           0|             0|
|109297.0|                  214.0|                  6.0|           2.0|   1|           0|             0|
|109307.0|                  161.0|                  9.0|           1.0|   0|           0|             1|
+--------+-----------------------+---------------------+--------------+----+------------+--------------+
only showing top 5 rows



In [8]:
demo.createOrReplaceTempView("demo")

In [9]:
demo.printSchema()

root
 |-- id: double (nullable = true)
 |-- Total_Cholesterol_mg_dL: double (nullable = true)
 |-- Frequency_of_Drinking: double (nullable = true)
 |-- Drinks_per_Day: double (nullable = true)
 |-- Work: integer (nullable = true)
 |-- Recreational: integer (nullable = true)
 |-- 100_Cigarettes: integer (nullable = true)



In [10]:
spark.sql(" \
            select count(*) \
            from demo ").show()

+--------+
|count(1)|
+--------+
|    5502|
+--------+



# 2. Join Tables

### 2.1 Join

In [12]:
df_full = spark.sql(" \
            select t1.*, \
                   t2.Total_Cholesterol_mg_dL, \
                   t2.Frequency_of_Drinking, \
                   t2.Drinks_per_Day, \
                   t2.Work, \
                   t2.Recreational, \
                   t2.100_Cigarettes \
            from medical_conditions as t1 \
                join demo as t2 \
                on t1.id = t2.id ")
df_full.show(5)

23/06/05 19:35:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+--------+---------------+--------------------+----------------+-----------+--------------+---------------+---------------------+--------------------------+-------------------+---------------+-----------------+---------------------+------+-------------+---------------------+---------------+------+-------+-------------+--------------+-----------------------+---------------------+--------------+----+------------+--------------+
|      id|ever_overweight|blood_transfusion_yr|heart_attack_age|fatty_liver|liver_fibrosis|liver_cirrhosis|liver_viral_hepatitis|liver_autoimmune_hepatitis|other_liver_disease|relative_asthma|relative_diabetes|relative_heart_attack|asthma|heart_failure|chronic_heart_disease|angina_pectoris|stroke|thyroid|liver_disease|gallbladder_pr|Total_Cholesterol_mg_dL|Frequency_of_Drinking|Drinks_pe

In [19]:
df_full.printSchema()

root
 |-- id: double (nullable = true)
 |-- ever_overweight: integer (nullable = true)
 |-- blood_transfusion_yr: double (nullable = true)
 |-- heart_attack_age: double (nullable = true)
 |-- fatty_liver: integer (nullable = true)
 |-- liver_fibrosis: integer (nullable = true)
 |-- liver_cirrhosis: integer (nullable = true)
 |-- liver_viral_hepatitis: integer (nullable = true)
 |-- liver_autoimmune_hepatitis: integer (nullable = true)
 |-- other_liver_disease: integer (nullable = true)
 |-- relative_asthma: integer (nullable = true)
 |-- relative_diabetes: integer (nullable = true)
 |-- relative_heart_attack: integer (nullable = true)
 |-- asthma: integer (nullable = true)
 |-- heart_failure: integer (nullable = true)
 |-- chronic_heart_disease: integer (nullable = true)
 |-- angina_pectoris: integer (nullable = true)
 |-- stroke: integer (nullable = true)
 |-- thyroid: integer (nullable = true)
 |-- liver_disease: integer (nullable = true)
 |-- gallbladder_pr: integer (nullable = true

### 2.2 Conver to .csv file

In [16]:
df_full_pandas = df_full.toPandas()
df_full_pandas.head()

Unnamed: 0,id,ever_overweight,blood_transfusion_yr,heart_attack_age,fatty_liver,liver_fibrosis,liver_cirrhosis,liver_viral_hepatitis,liver_autoimmune_hepatitis,other_liver_disease,...,stroke,thyroid,liver_disease,gallbladder_pr,Total_Cholesterol_mg_dL,Frequency_of_Drinking,Drinks_per_Day,Work,Recreational,100_Cigarettes
0,109266.0,1,,,0,0,0,0,0,0,...,0,0,0,0,195.0,10.0,1.0,0,1,0
1,109274.0,1,,,0,0,0,0,0,0,...,0,0,0,0,105.0,4.0,2.0,1,0,0
2,109292.0,0,,,0,0,0,0,0,0,...,0,0,0,0,172.0,4.0,6.0,0,0,0
3,109297.0,0,,,0,0,0,0,0,0,...,0,0,0,0,214.0,6.0,2.0,1,0,0
4,109307.0,0,,,0,0,0,0,0,0,...,0,0,0,0,161.0,9.0,1.0,0,0,1


In [17]:
df_full_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5502 entries, 0 to 5501
Data columns (total 27 columns):
 #   Column                      Non-Null Count  Dtype  
---  ------                      --------------  -----  
 0   id                          5502 non-null   float64
 1   ever_overweight             5502 non-null   int32  
 2   blood_transfusion_yr        527 non-null    float64
 3   heart_attack_age            199 non-null    float64
 4   fatty_liver                 5502 non-null   int32  
 5   liver_fibrosis              5502 non-null   int32  
 6   liver_cirrhosis             5502 non-null   int32  
 7   liver_viral_hepatitis       5502 non-null   int32  
 8   liver_autoimmune_hepatitis  5502 non-null   int32  
 9   other_liver_disease         5502 non-null   int32  
 10  relative_asthma             5502 non-null   int32  
 11  relative_diabetes           5502 non-null   int32  
 12  relative_heart_attack       5502 non-null   int32  
 13  asthma                      5502 

In [23]:
df_full_pandas.to_csv('Resources/heart_attack_final_table.csv', index=False)