# <center> Analysis on Human Mortality Using Spark </center>

### Importing packages and starting the spark session

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('new').config('spark.driver.memory', '10g').getOrCreate()

In [2]:
print(pyspark.__version__)

3.5.0


### Spark Info

In [3]:
spark

### Importing data into Spark

In [4]:
df = spark.read.option('header', 'true').csv('Merged.csv', inferSchema = True)

## Step 1: Understanding the data

In [5]:
df.count()

6103609

In [6]:
type(df)

pyspark.sql.dataframe.DataFrame

In [7]:
df.printSchema()

root
 |-- Country: integer (nullable = true)
 |-- Admin1: string (nullable = true)
 |-- SubDiv: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- List: string (nullable = true)
 |-- Cause: string (nullable = true)
 |-- Sex: integer (nullable = true)
 |-- Frmat: integer (nullable = true)
 |-- IM_Frmat: double (nullable = true)
 |-- Deaths1: integer (nullable = true)
 |-- Deaths2: double (nullable = true)
 |-- Deaths3: double (nullable = true)
 |-- Deaths4: string (nullable = true)
 |-- Deaths5: string (nullable = true)
 |-- Deaths6: string (nullable = true)
 |-- Deaths7: double (nullable = true)
 |-- Deaths8: string (nullable = true)
 |-- Deaths9: double (nullable = true)
 |-- Deaths10: string (nullable = true)
 |-- Deaths11: double (nullable = true)
 |-- Deaths12: string (nullable = true)
 |-- Deaths13: double (nullable = true)
 |-- Deaths14: string (nullable = true)
 |-- Deaths15: double (nullable = true)
 |-- Deaths16: string (nullable = true)
 |-- Deaths17: double (

In [8]:
df.columns

['Country',
 'Admin1',
 'SubDiv',
 'Year',
 'List',
 'Cause',
 'Sex',
 'Frmat',
 'IM_Frmat',
 'Deaths1',
 'Deaths2',
 'Deaths3',
 'Deaths4',
 'Deaths5',
 'Deaths6',
 'Deaths7',
 'Deaths8',
 'Deaths9',
 'Deaths10',
 'Deaths11',
 'Deaths12',
 'Deaths13',
 'Deaths14',
 'Deaths15',
 'Deaths16',
 'Deaths17',
 'Deaths18',
 'Deaths19',
 'Deaths20',
 'Deaths21',
 'Deaths22',
 'Deaths23',
 'Deaths24',
 'Deaths25',
 'Deaths26',
 'IM_Deaths1',
 'IM_Deaths2',
 'IM_Deaths3',
 'IM_Deaths4']

In [9]:
df.head(3)

[Row(Country=1125, Admin1=None, SubDiv=None, Year=1955, List='07A', Cause='A000', Sex=1, Frmat=1, IM_Frmat=8.0, Deaths1=107822, Deaths2=40543.0, Deaths3=18814.0, Deaths4='8192.0', Deaths5='2860.0', Deaths6='1440.0', Deaths7=2120.0, Deaths8='1219.0', Deaths9=1059.0, Deaths10='1133.0', Deaths11=1639.0, Deaths12='1066.0', Deaths13=1656.0, Deaths14='1654.0', Deaths15=1829.0, Deaths16='2619.0', Deaths17=1894.0, Deaths18='2941.0', Deaths19=2681.0, Deaths20='3383.0', Deaths21=2013.0, Deaths22='2727.0', Deaths23='4126.0', Deaths24=None, Deaths25=None, Deaths26=214.0, IM_Deaths1=40543.0, IM_Deaths2=None, IM_Deaths3=None, IM_Deaths4=None),
 Row(Country=1125, Admin1=None, SubDiv=None, Year=1955, List='07A', Cause='A000', Sex=2, Frmat=1, IM_Frmat=8.0, Deaths1=105750, Deaths2=39632.0, Deaths3=21606.0, Deaths4='9090.0', Deaths5='3052.0', Deaths6='1404.0', Deaths7=1679.0, Deaths8='899.0', Deaths9=889.0, Deaths10='826.0', Deaths11=1414.0, Deaths12='900.0', Deaths13=1241.0, Deaths14='1118.0', Deaths15=

## Step 2: Data Wrangling

### To select specific columns

In [10]:
df.select('Country').show()

+-------+
|Country|
+-------+
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
|   1125|
+-------+
only showing top 20 rows



### To select multiple columns

In [11]:
df.select('Country', 'Year').show()

+-------+----+
|Country|Year|
+-------+----+
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
|   1125|1955|
+-------+----+
only showing top 20 rows



In [12]:
df['Year']

Column<'Year'>

In [13]:
df.dtypes

[('Country', 'int'),
 ('Admin1', 'string'),
 ('SubDiv', 'string'),
 ('Year', 'int'),
 ('List', 'string'),
 ('Cause', 'string'),
 ('Sex', 'int'),
 ('Frmat', 'int'),
 ('IM_Frmat', 'double'),
 ('Deaths1', 'int'),
 ('Deaths2', 'double'),
 ('Deaths3', 'double'),
 ('Deaths4', 'string'),
 ('Deaths5', 'string'),
 ('Deaths6', 'string'),
 ('Deaths7', 'double'),
 ('Deaths8', 'string'),
 ('Deaths9', 'double'),
 ('Deaths10', 'string'),
 ('Deaths11', 'double'),
 ('Deaths12', 'string'),
 ('Deaths13', 'double'),
 ('Deaths14', 'string'),
 ('Deaths15', 'double'),
 ('Deaths16', 'string'),
 ('Deaths17', 'double'),
 ('Deaths18', 'string'),
 ('Deaths19', 'double'),
 ('Deaths20', 'string'),
 ('Deaths21', 'double'),
 ('Deaths22', 'string'),
 ('Deaths23', 'string'),
 ('Deaths24', 'string'),
 ('Deaths25', 'string'),
 ('Deaths26', 'double'),
 ('IM_Deaths1', 'double'),
 ('IM_Deaths2', 'string'),
 ('IM_Deaths3', 'string'),
 ('IM_Deaths4', 'string')]

In [14]:
df.describe()

DataFrame[summary: string, Country: string, Admin1: string, SubDiv: string, Year: string, List: string, Cause: string, Sex: string, Frmat: string, IM_Frmat: string, Deaths1: string, Deaths2: string, Deaths3: string, Deaths4: string, Deaths5: string, Deaths6: string, Deaths7: string, Deaths8: string, Deaths9: string, Deaths10: string, Deaths11: string, Deaths12: string, Deaths13: string, Deaths14: string, Deaths15: string, Deaths16: string, Deaths17: string, Deaths18: string, Deaths19: string, Deaths20: string, Deaths21: string, Deaths22: string, Deaths23: string, Deaths24: string, Deaths25: string, Deaths26: string, IM_Deaths1: string, IM_Deaths2: string, IM_Deaths3: string, IM_Deaths4: string]

### To add a column

In [15]:
df.withColumn('After 2 Years', df['Year'] + 2).show()

+-------+------+------+----+----+-----+---+-----+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+----------+----------+----------+----------+-------------+
|Country|Admin1|SubDiv|Year|List|Cause|Sex|Frmat|IM_Frmat|Deaths1|Deaths2|Deaths3|Deaths4|Deaths5|Deaths6|Deaths7|Deaths8|Deaths9|Deaths10|Deaths11|Deaths12|Deaths13|Deaths14|Deaths15|Deaths16|Deaths17|Deaths18|Deaths19|Deaths20|Deaths21|Deaths22|Deaths23|Deaths24|Deaths25|Deaths26|IM_Deaths1|IM_Deaths2|IM_Deaths3|IM_Deaths4|After 2 Years|
+-------+------+------+----+----+-----+---+-----+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+----------+----------+----------+

### To drop a column

In [16]:
df.drop('Admin1', 'SubDiv')

DataFrame[Country: int, Year: int, List: string, Cause: string, Sex: int, Frmat: int, IM_Frmat: double, Deaths1: int, Deaths2: double, Deaths3: double, Deaths4: string, Deaths5: string, Deaths6: string, Deaths7: double, Deaths8: string, Deaths9: double, Deaths10: string, Deaths11: double, Deaths12: string, Deaths13: double, Deaths14: string, Deaths15: double, Deaths16: string, Deaths17: double, Deaths18: string, Deaths19: double, Deaths20: string, Deaths21: double, Deaths22: string, Deaths23: string, Deaths24: string, Deaths25: string, Deaths26: double, IM_Deaths1: double, IM_Deaths2: string, IM_Deaths3: string, IM_Deaths4: string]

### To rename a column

In [17]:
df.withColumnRenamed('Year', 'Happy New Year').show()

+-------+------+------+--------------+----+-----+---+-----+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+----------+----------+----------+----------+
|Country|Admin1|SubDiv|Happy New Year|List|Cause|Sex|Frmat|IM_Frmat|Deaths1|Deaths2|Deaths3|Deaths4|Deaths5|Deaths6|Deaths7|Deaths8|Deaths9|Deaths10|Deaths11|Deaths12|Deaths13|Deaths14|Deaths15|Deaths16|Deaths17|Deaths18|Deaths19|Deaths20|Deaths21|Deaths22|Deaths23|Deaths24|Deaths25|Deaths26|IM_Deaths1|IM_Deaths2|IM_Deaths3|IM_Deaths4|
+-------+------+------+--------------+----+-----+---+-----+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+----------+----------+---------

### Removing null values

#### The whole record containing all null values

In [18]:
df_no_null_values = df.na.drop(how = 'all')

#### In our case we dont have any as the count is same

In [19]:
df_no_null_values.count()

6103609

#### If null found any where it will be deleted

In [20]:
df_no_null_values = df.na.drop()

#### In our case we dont want to lose data so we are not going with this step

In [21]:
df_no_null_values.count()

61

In [22]:
df_no_null_values = df.na.drop(how = 'any', thresh = 30)

#### Here we are losing 1 million records. And we dont want this to happen

In [23]:
df_no_null_values.count()

5740893

#### Droping null values from subset of columns

In [24]:
df_no_null_values = df.na.drop(how = 'any', subset = ['Year'])

In [25]:
df_no_null_values.count()

6103607

### Filling null values

In [26]:
df_na_filled = df_no_null_values.na.fill(0, ['Deaths1', 'Deaths2', 'Deaths3', 'Deaths4', 'Deaths5', 'Deaths6', 'Deaths7', 'Deaths8', 'Deaths9', 'Deaths10',
                                            'Deaths11', 'Deaths12', 'Deaths13', 'Deaths14', 'Deaths15', 'Deaths16', 'Deaths17', 'Deaths18', 'Deaths19', 'Deaths20',
                                            'Deaths21', 'Deaths22', 'Deaths23', 'Deaths24', 'Deaths25', 'Deaths26'])

#### Mapping the data to dictionary

In [27]:
import pandas as pd
codes = pd.read_csv(r'country_codes.csv', index_col = False)

In [28]:
codes = codes.astype({'country':'string', 'name':'string'})
codes.dtypes

country    string[python]
name       string[python]
dtype: object

In [29]:
from pyspark.sql.functions import col
from pyspark.sql.types import StringType
df_na_filled = df_na_filled.withColumn('Country', col('Country').cast(StringType()))

In [30]:
df_na_filled.dtypes

[('Country', 'string'),
 ('Admin1', 'string'),
 ('SubDiv', 'string'),
 ('Year', 'int'),
 ('List', 'string'),
 ('Cause', 'string'),
 ('Sex', 'int'),
 ('Frmat', 'int'),
 ('IM_Frmat', 'double'),
 ('Deaths1', 'int'),
 ('Deaths2', 'double'),
 ('Deaths3', 'double'),
 ('Deaths4', 'string'),
 ('Deaths5', 'string'),
 ('Deaths6', 'string'),
 ('Deaths7', 'double'),
 ('Deaths8', 'string'),
 ('Deaths9', 'double'),
 ('Deaths10', 'string'),
 ('Deaths11', 'double'),
 ('Deaths12', 'string'),
 ('Deaths13', 'double'),
 ('Deaths14', 'string'),
 ('Deaths15', 'double'),
 ('Deaths16', 'string'),
 ('Deaths17', 'double'),
 ('Deaths18', 'string'),
 ('Deaths19', 'double'),
 ('Deaths20', 'string'),
 ('Deaths21', 'double'),
 ('Deaths22', 'string'),
 ('Deaths23', 'string'),
 ('Deaths24', 'string'),
 ('Deaths25', 'string'),
 ('Deaths26', 'double'),
 ('IM_Deaths1', 'double'),
 ('IM_Deaths2', 'string'),
 ('IM_Deaths3', 'string'),
 ('IM_Deaths4', 'string')]

In [31]:
map = codes.set_index('country').to_dict()['name']

In [32]:
map

{'1010': 'Algeria',
 '1020': 'Angola',
 '1025': 'Benin',
 '1030': 'Botswana',
 '1035': 'Burkina Faso',
 '1040': 'Burundi',
 '1045': 'Cameroon',
 '1060': 'Cape Verde',
 '1070': 'Central African Republic',
 '1080': 'Chad',
 '1090': 'Comoros',
 '1100': 'Congo',
 '1115': "Cote d'Ivoire",
 '1120': 'Djibouti',
 '1125': 'Egypt',
 '1130': 'Equatorial Guinea',
 '1135': 'Eritrea',
 '1140': 'Ethiopia',
 '1160': 'Gabon',
 '1170': 'Gambia',
 '1180': 'Ghana',
 '1190': 'Guinea',
 '1192': 'Guinea-Bissau',
 '1220': 'Kenya',
 '1230': 'Lesotho',
 '1240': 'Liberia',
 '1250': 'Libyan Arab Jamahiriya',
 '1260': 'Madagascar',
 '1270': 'Malawi',
 '1280': 'Mali',
 '1290': 'Mauritania',
 '1300': 'Mauritius',
 '1303': 'Mayotte',
 '1310': 'Morocco',
 '1320': 'Mozambique',
 '1325': 'Namibia',
 '1330': 'Niger',
 '1340': 'Nigeria',
 '1360': 'Reunion',
 '1365': 'Rodrigues',
 '1370': 'Rwanda',
 '1385': 'Sao Tome and Principe',
 '1390': 'Senegal',
 '1400': 'Seychelles',
 '1410': 'Sierra Leone',
 '1420': 'Somalia',
 '14

In [33]:
df_na_filled = df_na_filled.replace(map, subset = ['Country'])

In [34]:
df_na_filled.head(3)

[Row(Country='Egypt', Admin1=None, SubDiv=None, Year=1955, List='07A', Cause='A000', Sex=1, Frmat=1, IM_Frmat=8.0, Deaths1=107822, Deaths2=40543.0, Deaths3=18814.0, Deaths4='8192.0', Deaths5='2860.0', Deaths6='1440.0', Deaths7=2120.0, Deaths8='1219.0', Deaths9=1059.0, Deaths10='1133.0', Deaths11=1639.0, Deaths12='1066.0', Deaths13=1656.0, Deaths14='1654.0', Deaths15=1829.0, Deaths16='2619.0', Deaths17=1894.0, Deaths18='2941.0', Deaths19=2681.0, Deaths20='3383.0', Deaths21=2013.0, Deaths22='2727.0', Deaths23='4126.0', Deaths24=None, Deaths25=None, Deaths26=214.0, IM_Deaths1=40543.0, IM_Deaths2=None, IM_Deaths3=None, IM_Deaths4=None),
 Row(Country='Egypt', Admin1=None, SubDiv=None, Year=1955, List='07A', Cause='A000', Sex=2, Frmat=1, IM_Frmat=8.0, Deaths1=105750, Deaths2=39632.0, Deaths3=21606.0, Deaths4='9090.0', Deaths5='3052.0', Deaths6='1404.0', Deaths7=1679.0, Deaths8='899.0', Deaths9=889.0, Deaths10='826.0', Deaths11=1414.0, Deaths12='900.0', Deaths13=1241.0, Deaths14='1118.0', Dea

### GroupBy in PySpark

In [35]:
deaths_grouped = df_na_filled.groupBy('Country').sum('Deaths1')
deaths_grouped = deaths_grouped.toPandas()
# type(deaths_grouped)
deaths_grouped.rename(columns = {'sum(Deaths1)':'Total_Deaths'}, inplace = True)
deaths_grouped = deaths_grouped.sort_values(by = 'Total_Deaths', ascending = False)
deaths_grouped = deaths_grouped.head(20)

In [36]:
deaths_grouped

Unnamed: 0,Country,Total_Deaths
54,United States of America,353413616
136,Russian Federation,247516071
75,Japan,140261484
71,Brazil,123306019
95,United Kingdom,104489404
93,"United Kingdom, England and Wales",93869081
135,"USSR, Former",93459999
35,Italy,90188454
10,France,87232829
138,Ukraine,83472787


### Total Aggregated Deaths by Country from 1995 to 2022

In [37]:
import plotly.express as px
color_discrete_sequence = ['#494623'] * len(deaths_grouped)
fig = px.bar(deaths_grouped, x = deaths_grouped['Country'], y = deaths_grouped['Total_Deaths'], title = 'Total Aggregated Deaths by Count from 1995 to 2022',
            color_discrete_sequence = color_discrete_sequence)
fig.update_traces(texttemplate = '%{text:.4s}', textposition = 'outside')
fig.update_layout(uniformtext_minsize = 10, uniformtext_mode = 'hide' )
fig.show()

#### Merging Russian federation and USSR, Former

In [38]:
deaths_grouped_RI = deaths_grouped.reset_index(drop = True)
Russia = deaths_grouped_RI.loc[deaths_grouped_RI['Country'].isin(['Russian Federation', 'USSR, Former'])]
Russia.Total_Deaths.sum()

340976070

In [39]:
deaths_grouped_RI.loc[len(deaths_grouped_RI.index)] = ["Russia", 340976070]

#### Drop specific rows

In [40]:
Countries = ['Russian Federation', 'USSR, Former']
# drop rows from above list
deaths_grouped_RI = deaths_grouped_RI[deaths_grouped_RI.Country.isin(Countries) == False]

In [41]:
deaths_grouped_RI

Unnamed: 0,Country,Total_Deaths
0,United States of America,353413616
2,Japan,140261484
3,Brazil,123306019
4,United Kingdom,104489404
5,"United Kingdom, England and Wales",93869081
7,Italy,90188454
8,France,87232829
9,Ukraine,83472787
10,Mexico,68170399
11,"Germany, Former Federal Republic",64311700


In [42]:
deaths_grouped_RU = deaths_grouped_RI.sort_values(by = 'Total_Deaths', ascending = False)
deaths_grouped_RU

Unnamed: 0,Country,Total_Deaths
0,United States of America,353413616
20,Russia,340976070
2,Japan,140261484
3,Brazil,123306019
4,United Kingdom,104489404
5,"United Kingdom, England and Wales",93869081
7,Italy,90188454
8,France,87232829
9,Ukraine,83472787
10,Mexico,68170399


### Total Aggregated Deaths by Country from 1995 to 2020 [Russia Merged]

In [43]:
color_discrete_sequence = ['#8e883d'] * len(deaths_grouped)
fig = px.bar(deaths_grouped_RU, x = deaths_grouped_RU['Country'], y = deaths_grouped_RU['Total_Deaths'], title = 'Total Aggregated Deaths by Country from 1995 to 2022',
            color_discrete_sequence = color_discrete_sequence)
fig.update_traces(texttemplate = '%{text:.4s}', textposition = 'outside')
fig.update_layout(uniformtext_minsize = 10, uniformtext_mode = 'hide')
fig.show()

In [44]:
Top_Deaths = df_na_filled.filter('Deaths1 >= 1500000').select(['Country', 'Deaths1', 'Year'])

In [45]:
Top_Deaths.show()

+--------------------+-------+----+
|             Country|Deaths1|Year|
+--------------------+-------+----+
|        USSR, Former|1522673|1985|
|        USSR, Former|1519174|1988|
|        USSR, Former|1522769|1990|
|United States of ...|1769884|2020|
|United States of ...|1613845|2020|
+--------------------+-------+----+



#### Show Spark dataframe in descending order

In [46]:
top10deaths = Top_Deaths.sort(Top_Deaths.Deaths1.desc())
top10deathsPDF = top10deaths.toPandas()

### Plot the deaths greater than 1.5 million by year from 1955 to 2022

In [47]:
top10deathsPDF

Unnamed: 0,Country,Deaths1,Year
0,United States of America,1769884,2020
1,United States of America,1613845,2020
2,"USSR, Former",1522769,1990
3,"USSR, Former",1522673,1985
4,"USSR, Former",1519174,1988


### Deaths in USA by Year

In [48]:
USA_Deaths = df_na_filled.filter(df_na_filled.Country == 'United States of America').select(['Year', 'Deaths1'])
USA_DeathsPDF = USA_Deaths.toPandas()

In [49]:
USA_DeathsPDF.dtypes

Year       int32
Deaths1    int32
dtype: object

In [50]:
USA_DeathsPDF = USA_DeathsPDF.groupby(['Year']).sum('Deaths1')

In [51]:
USA_DeathsPDF = USA_DeathsPDF.reset_index()

In [52]:
color_discrete_sequence = ['#c3892b'] * len(USA_DeathsPDF)
fig3 = px.bar(USA_DeathsPDF, x = 'Year', y = 'Deaths1', hover_data = ['Year', 'Deaths1'], color_discrete_sequence = color_discrete_sequence,
            title = 'Deaths in USA by Year', labels = {'pop':'population of Canada'}, height = 400)
fig3.show()

### Top 10 causes for people are dying in USA

In [53]:
top10_causes = df_na_filled.filter(df_na_filled.Country == 'United States of America').select(['Cause', 'Deaths1'])

In [54]:
top10_causes = top10_causes.toPandas()

In [55]:
top10_causes = top10_causes.groupby(['Cause']).sum('Deaths1')
top10_causes = top10_causes.reset_index()

In [56]:
top10_causes = top10_causes.sort_values(by = 'Deaths1', ascending = False)

In [57]:
top10_causes = top10_causes.reset_index()
top10_causes

Unnamed: 0,index,Cause,Deaths1
0,478,AAA,56808193
1,124,A000,50987160
2,479,B00,42868083
3,682,B27,10205663
4,205,A081,9582277
...,...,...,...
7945,4565,O755,1
7946,522,B041,0
7947,620,B185,0
7948,108,432,0


In [58]:
top10_causes = top10_causes.iloc[3:]

In [59]:
top10_causes = top10_causes.head(20)

In [60]:
fig = px.pie(top10_causes, values = 'Deaths1', names = 'Cause', title = 'Death by Cause')
fig.show()

In [61]:
# To stop the spark session
spark.stop()