In [None]:
# Install Java, Spark, and Findspark
%%bash
apt-get install openjdk-8-jdk-headless -qq > /dev/null
[ ! -e "$(basename spark-3.1.2-bin-hadoop2.7.tgz)" ] && wget  http://apache.osuosl.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz  
tar xf spark-3.1.2-bin-hadoop2.7.tgz
pip install -q findspark

In [None]:
#Set Environment Variables
#Set the locations where Spark and Java are installed.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
# Start a SparkSession
import findspark
findspark.init()
from pyspark.sql import SparkSession

# get a spark session. 
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
# Create Dataframe in Spark!
! [ ! -e "$(basename AH_Provisional_Diabetes_Death_Counts_for_2020.csv)" ] && wget  'https://storage.googleapis.com/files.mobibootcamp.com/2021-datasets/AH_Provisional_Diabetes_Death_Counts_for_2020.csv'
df = spark.read.csv('AH_Provisional_Diabetes_Death_Counts_for_2020.csv',
                      header= True, 
                      inferSchema = True)
#df.show()

print(df.columns)

--2021-11-18 00:47:15--  https://storage.googleapis.com/files.mobibootcamp.com/2021-datasets/AH_Provisional_Diabetes_Death_Counts_for_2020.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 142.250.188.208, 142.251.33.208, 172.217.9.208, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.250.188.208|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 17509 (17K) [text/csv]
Saving to: ‘AH_Provisional_Diabetes_Death_Counts_for_2020.csv’


2021-11-18 00:47:15 (9.95 MB/s) - ‘AH_Provisional_Diabetes_Death_Counts_for_2020.csv’ saved [17509/17509]

['Data as of', 'Date_Of_Death_Year', 'Date_Of_Death_Month', 'AgeGroup', 'Sex', 'COVID19', 'Diabetes.uc', 'Diabetes.mc', 'C19PlusDiabetes', 'C19PlusHypertensiveDiseases', 'C19PlusMajorCardiovascularDiseases', 'C19PlusHypertensiveDiseasesAndMCVD', 'C19PlusChronicLowerRespiratoryDisease', 'C19PlusKidneyDisease', 'C19PlusChronicLiverDiseaseAndCirrhosis', 'C19PlusObesity']


In [None]:
# Determine how many rows are present in the dataset
df.count()

226

In [None]:
# Determine how many columns are present in the dataset
print(len(df.columns))

16


In [None]:
df.take(5)

[Row(Data as of='10/20/2020', Date_Of_Death_Year=2020, Date_Of_Death_Month=1, AgeGroup='<18 years', Sex='Female (F)', COVID19=0, Diabetes.uc=3, Diabetes.mc=5, C19PlusDiabetes=0, C19PlusHypertensiveDiseases=0, C19PlusMajorCardiovascularDiseases=0, C19PlusHypertensiveDiseasesAndMCVD=0, C19PlusChronicLowerRespiratoryDisease=0, C19PlusKidneyDisease=0, C19PlusChronicLiverDiseaseAndCirrhosis=0, C19PlusObesity=0),
 Row(Data as of='10/20/2020', Date_Of_Death_Year=2020, Date_Of_Death_Month=1, AgeGroup='<18 years', Sex='Male (M)', COVID19=0, Diabetes.uc=3, Diabetes.mc=3, C19PlusDiabetes=0, C19PlusHypertensiveDiseases=0, C19PlusMajorCardiovascularDiseases=0, C19PlusHypertensiveDiseasesAndMCVD=0, C19PlusChronicLowerRespiratoryDisease=0, C19PlusKidneyDisease=0, C19PlusChronicLiverDiseaseAndCirrhosis=0, C19PlusObesity=0),
 Row(Data as of='10/20/2020', Date_Of_Death_Year=2020, Date_Of_Death_Month=1, AgeGroup='18-29 years', Sex='Female (F)', COVID19=0, Diabetes.uc=20, Diabetes.mc=36, C19PlusDiabetes=0

In [None]:
# Cleaning step - Change column names                           
df = df.withColumnRenamed('Data as of','date')
df = df.withColumnRenamed('Date_Of_Death_Year','death_year')
df = df.withColumnRenamed('Date_Of_Death_Month','death_month')
df = df.withColumnRenamed('AgeGroup','agegroup')
df = df.withColumnRenamed('Sex','sex')
df = df.withColumnRenamed('COVID19','covid19')
df = df.withColumnRenamed('Diabetes.uc','diabetes_uc')
df = df.withColumnRenamed('Diabetes.mc','diabetes_mc')
df = df.withColumnRenamed('C19PlusDiabetes','c19_diabete')
df = df.withColumnRenamed('C19PlusHypertensiveDiseases','c19_hypertensive')
df = df.withColumnRenamed('C19PlusMajorCardiovascularDiseases','c19_cardio_vascular')
df = df.withColumnRenamed('C19PlusHypertensiveDiseasesAndMCVD','c19_hypertensive_mcvd')
df = df.withColumnRenamed('C19PlusChronicLowerRespiratoryDisease','c19_lower_respiratory')
df = df.withColumnRenamed('C19PlusKidneyDisease','c19_kidney')
df = df.withColumnRenamed('C19PlusChronicLiverDiseaseAndCirrhosis','c19_liver_cirrhosis')
df = df.withColumnRenamed('C19PlusObesity','c19_obesity')
print(df.columns)

['date', 'death_year', 'death_month', 'agegroup', 'sex', 'covid19', 'diabetes_uc', 'diabetes_mc', 'c19_diabete', 'c19_hypertensive', 'c19_cardio_vascular', 'c19_hypertensive_mcvd', 'c19_lower_respiratory', 'c19_kidney', 'c19_liver_cirrhosis', 'c19_obesity']


In [None]:
# check datatypes of all columns
df.dtypes

[('date', 'string'),
 ('death_year', 'int'),
 ('death_month', 'int'),
 ('agegroup', 'string'),
 ('sex', 'string'),
 ('covid19', 'int'),
 ('diabetes_uc', 'int'),
 ('diabetes_mc', 'int'),
 ('c19_diabete', 'int'),
 ('c19_hypertensive', 'int'),
 ('c19_cardio_vascular', 'int'),
 ('c19_hypertensive_mcvd', 'int'),
 ('c19_lower_respiratory', 'int'),
 ('c19_kidney', 'int'),
 ('c19_liver_cirrhosis', 'int'),
 ('c19_obesity', 'int')]

In [None]:
df.show()

+----------+----------+-----------+-----------+----------+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|      date|death_year|death_month|   agegroup|       sex|covid19|diabetes_uc|diabetes_mc|c19_diabete|c19_hypertensive|c19_cardio_vascular|c19_hypertensive_mcvd|c19_lower_respiratory|c19_kidney|c19_liver_cirrhosis|c19_obesity|
+----------+----------+-----------+-----------+----------+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|10/20/2020|      2020|          1|  <18 years|Female (F)|      0|          3|          5|          0|               0|                  0|                    0|                    0|         0|                  0|          0|
|10/20/2020|      2020|          1|  <18 years|  Male (M)|      0|          3|          3|  

In [None]:
# Find Count of Null, None, NaN of All DataFrame Columns
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

+----+----------+-----------+--------+---+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|date|death_year|death_month|agegroup|sex|covid19|diabetes_uc|diabetes_mc|c19_diabete|c19_hypertensive|c19_cardio_vascular|c19_hypertensive_mcvd|c19_lower_respiratory|c19_kidney|c19_liver_cirrhosis|c19_obesity|
+----+----------+-----------+--------+---+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|   0|         0|          0|       1|  0|      0|          0|          0|          0|               0|                  0|                    0|                    0|         0|                  0|          0|
+----+----------+-----------+--------+---+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+-----------

In [None]:
# Returning new dataframe named df_dropdup restricting rows with null values
df_dropdup = df.dropna()
df_dropdup.count()
df_dropdup = df_dropdup.where(col('agegroup').isNotNull())
df_dropdup.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]
   ).show()

+----+----------+-----------+--------+---+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|date|death_year|death_month|agegroup|sex|covid19|diabetes_uc|diabetes_mc|c19_diabete|c19_hypertensive|c19_cardio_vascular|c19_hypertensive_mcvd|c19_lower_respiratory|c19_kidney|c19_liver_cirrhosis|c19_obesity|
+----+----------+-----------+--------+---+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|   0|         0|          0|       0|  0|      0|          0|          0|          0|               0|                  0|                    0|                    0|         0|                  0|          0|
+----+----------+-----------+--------+---+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+-----------

In [None]:
# Drop 'date' and 'death_year' columns from a dataframe
df_dropdup = df_dropdup.drop('date', 'death_year')
df_dropdup.show()

+-----------+-----------+----------+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|death_month|   agegroup|       sex|covid19|diabetes_uc|diabetes_mc|c19_diabete|c19_hypertensive|c19_cardio_vascular|c19_hypertensive_mcvd|c19_lower_respiratory|c19_kidney|c19_liver_cirrhosis|c19_obesity|
+-----------+-----------+----------+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|          1|  <18 years|Female (F)|      0|          3|          5|          0|               0|                  0|                    0|                    0|         0|                  0|          0|
|          1|  <18 years|  Male (M)|      0|          3|          3|          0|               0|                  0|                    0|                    0|         0|        

In [None]:
# Drop Unknown age rows which were existed in 'agegroup' column from a DataFrame
df_dropdup = df_dropdup.filter((df_dropdup.agegroup != 'Unknown age'))
df_dropdup.show()
df_dropdup.count()

+-----------+-----------+----------+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|death_month|   agegroup|       sex|covid19|diabetes_uc|diabetes_mc|c19_diabete|c19_hypertensive|c19_cardio_vascular|c19_hypertensive_mcvd|c19_lower_respiratory|c19_kidney|c19_liver_cirrhosis|c19_obesity|
+-----------+-----------+----------+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|          1|  <18 years|Female (F)|      0|          3|          5|          0|               0|                  0|                    0|                    0|         0|                  0|          0|
|          1|  <18 years|  Male (M)|      0|          3|          3|          0|               0|                  0|                    0|                    0|         0|        

216

In [None]:
# Computes summary statistics
df_dropdup.describe().show()
df_dropdup.describe(['diabetes_uc', 'diabetes_mc']).show()

+-------+-----------------+-----------+----------+------------------+------------------+------------------+------------------+------------------+-------------------+---------------------+---------------------+------------------+-------------------+-----------------+
|summary|      death_month|   agegroup|       sex|           covid19|       diabetes_uc|       diabetes_mc|       c19_diabete|  c19_hypertensive|c19_cardio_vascular|c19_hypertensive_mcvd|c19_lower_respiratory|        c19_kidney|c19_liver_cirrhosis|      c19_obesity|
+-------+-----------------+-----------+----------+------------------+------------------+------------------+------------------+------------------+-------------------+---------------------+---------------------+------------------+-------------------+-----------------+
|  count|              216|        216|       216|               216|               216|               216|               216|               216|                216|                  216|            

As we can see in the above table, the mean value for diabetes_uc is about 633 but mean of diabetes_mc is higher and equal to 2316! 

In [None]:
# Find maximum value of covid19 column and return its related row
v2 = df_dropdup.select("covid19").rdd.max()[0] 
print(v2)
df1 = df_dropdup.filter(df_dropdup.covid19 == v2)
df1.show()

15882
+-----------+---------+----------+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|death_month| agegroup|       sex|covid19|diabetes_uc|diabetes_mc|c19_diabete|c19_hypertensive|c19_cardio_vascular|c19_hypertensive_mcvd|c19_lower_respiratory|c19_kidney|c19_liver_cirrhosis|c19_obesity|
+-----------+---------+----------+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|          4|80+ years|Female (F)|  15882|       1569|       7200|       1521|            3274|               7001|                 7001|                 1094|       806|                 25|        100|
+-----------+---------+----------+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+------------

As we see in the above table, the maximum death due to covid19 happened in April 2020 for females who were above 80 years old

In [None]:
# Find outliers that are 3 standard deviation from the mean
std_val = df_dropdup.agg({'diabetes_uc': 'stddev'}).collect()[0][0]
mean_val = df_dropdup.agg({'diabetes_uc': 'mean'}).collect()[0][0]

hi_bound = mean_val + (3 * std_val)
print(hi_bound)
low_bound = mean_val - (3 * std_val)
print(low_bound)
df2 = df_dropdup
df2.where((df2['diabetes_uc'] < hi_bound) & (df2['diabetes_uc'] > low_bound)).count() 


std_val1 = df_dropdup.agg({'diabetes_mc': 'stddev'}).collect()[0][0]
mean_val1 = df_dropdup.agg({'diabetes_mc': 'mean'}).collect()[0][0]

hi_bound1 = mean_val1 + (3 * std_val1)
print(hi_bound1)
low_bound1 = mean_val1 - (3 * std_val1)
print(low_bound1)
df2.where((df2['diabetes_mc'] < hi_bound1) & (df2['diabetes_mc'] > low_bound1)).count()


2104.1947986461696
-785.713317164688
8167.877120687844
-3341.7660095767324


216

Therefore, the outlier for two columns, diabetes_uc and diabetes_mc which are 3 standard deviation from the mean, are 2104.19, -785.7, 8167.877 and -3341.76, respectively.

In [None]:
# print the schema of the DataFrame to know more about the dataset
df_dropdup.printSchema()
# The DataFrame consists of 14 features or columns. Each column contains string-type or integer-type values.

root
 |-- death_month: integer (nullable = true)
 |-- agegroup: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- covid19: integer (nullable = true)
 |-- diabetes_uc: integer (nullable = true)
 |-- diabetes_mc: integer (nullable = true)
 |-- c19_diabete: integer (nullable = true)
 |-- c19_hypertensive: integer (nullable = true)
 |-- c19_cardio_vascular: integer (nullable = true)
 |-- c19_hypertensive_mcvd: integer (nullable = true)
 |-- c19_lower_respiratory: integer (nullable = true)
 |-- c19_kidney: integer (nullable = true)
 |-- c19_liver_cirrhosis: integer (nullable = true)
 |-- c19_obesity: integer (nullable = true)



In [None]:
# Find unique values of a categorical column which is agegroup and sex in our dataset
print(df_dropdup.select('agegroup').distinct().rdd.map(lambda r: r[0]).collect())
print(df_dropdup.select('sex').distinct().rdd.map(lambda r: r[0]).collect())

['85+ years', '50-59 years', '40-49 years', '70-79 years', '18-29 years', '60-69 years', '80+ years', '65-74 years', '75-84 years', '<18 years', '30-39 years', '50-64 years']
['Female (F)', 'Male (M)']


In [None]:
# Do the groupBy() on agegroup and sex columns and then find the sum of covid19 for each agegroup using sum() aggregate function and sort them
df_dropdup.groupBy("agegroup", "sex").sum("covid19").sort("agegroup").show(truncate=False)

+-----------+----------+------------+
|agegroup   |sex       |sum(covid19)|
+-----------+----------+------------+
|18-29 years|Female (F)|329         |
|18-29 years|Male (M)  |583         |
|30-39 years|Female (F)|827         |
|30-39 years|Male (M)  |1740        |
|40-49 years|Male (M)  |4569        |
|40-49 years|Female (F)|2107        |
|50-59 years|Male (M)  |11219       |
|50-59 years|Female (F)|5628        |
|50-64 years|Female (F)|11101       |
|50-64 years|Male (M)  |20905       |
|60-69 years|Male (M)  |21760       |
|60-69 years|Female (F)|12697       |
|65-74 years|Female (F)|16662       |
|65-74 years|Male (M)  |26397       |
|70-79 years|Male (M)  |28960       |
|70-79 years|Female (F)|20230       |
|75-84 years|Male (M)  |28935       |
|75-84 years|Female (F)|23529       |
|80+ years  |Female (F)|49108       |
|80+ years  |Male (M)  |38602       |
+-----------+----------+------------+
only showing top 20 rows



In [None]:
# To see how many female and male died due to covid19
df_dropdup.groupBy("sex").sum("covid19").show(truncate=False)

+----------+------------+
|sex       |sum(covid19)|
+----------+------------+
|Female (F)|178631      |
|Male (M)  |208036      |
+----------+------------+



As we can see in the above table, the number of males who died due to covid19 is about 14% more than females. 

In [None]:
# Sort the entire dataframe based on covid19 column(by default it is sorted in ascending order)
df_dropdup.orderBy("covid19").show()

+-----------+-----------+----------+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|death_month|   agegroup|       sex|covid19|diabetes_uc|diabetes_mc|c19_diabete|c19_hypertensive|c19_cardio_vascular|c19_hypertensive_mcvd|c19_lower_respiratory|c19_kidney|c19_liver_cirrhosis|c19_obesity|
+-----------+-----------+----------+-------+-----------+-----------+-----------+----------------+-------------------+---------------------+---------------------+----------+-------------------+-----------+
|          1|  <18 years|Female (F)|      0|          3|          5|          0|               0|                  0|                    0|                    0|         0|                  0|          0|
|          1|  <18 years|  Male (M)|      0|          3|          3|          0|               0|                  0|                    0|                    0|         0|        

In [None]:
# To return Array of Row type not a DataFrame use collect() which returns data in an Array to the driver.
df_dropdup.collect()

[Row(death_month=1, agegroup='<18 years', sex='Female (F)', covid19=0, diabetes_uc=3, diabetes_mc=5, c19_diabete=0, c19_hypertensive=0, c19_cardio_vascular=0, c19_hypertensive_mcvd=0, c19_lower_respiratory=0, c19_kidney=0, c19_liver_cirrhosis=0, c19_obesity=0),
 Row(death_month=1, agegroup='<18 years', sex='Male (M)', covid19=0, diabetes_uc=3, diabetes_mc=3, c19_diabete=0, c19_hypertensive=0, c19_cardio_vascular=0, c19_hypertensive_mcvd=0, c19_lower_respiratory=0, c19_kidney=0, c19_liver_cirrhosis=0, c19_obesity=0),
 Row(death_month=1, agegroup='18-29 years', sex='Female (F)', covid19=0, diabetes_uc=20, diabetes_mc=36, c19_diabete=0, c19_hypertensive=0, c19_cardio_vascular=0, c19_hypertensive_mcvd=0, c19_lower_respiratory=0, c19_kidney=0, c19_liver_cirrhosis=0, c19_obesity=0),
 Row(death_month=1, agegroup='18-29 years', sex='Male (M)', covid19=0, diabetes_uc=44, diabetes_mc=65, c19_diabete=0, c19_hypertensive=0, c19_cardio_vascular=0, c19_hypertensive_mcvd=0, c19_lower_respiratory=0, c

Advantages of Pyspark comparying to Pandas:
1. Processing time is faster.
2. sparkDataFrame.count() returns the number of rows which is so straightforward but pandasDataFrame.count() returns the number of non NA/null observations for each column.

Disadvantages of Pyspark comparying to Pandas:
1. Updating, adding, and deleting columns are quite easier using Pandas.
2. Debugging of pyspark coding is harder than pandas.
