In [1]:
#Import Libraries
from IPython.core.display import display
from pyspark.shell import spark
from cassandra.cluster import Cluster
from pyspark import SparkConf
from pyspark.sql import SparkSession
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.0
      /_/

Using Python version 3.7.3 (default, Mar  6 2020 22:34:30)
SparkSession available as 'spark'.


In [2]:
# Spark configuration
sparkConf=SparkConf().setMaster("local[3]")\
                     .setAppName("project")\
                     .setAll([('spark.executor.memory','2g')])

spark=SparkSession.builder.config(conf=sparkConf).getOrCreate()

In [3]:
# Connection with Cassandra db
cluster = Cluster(['127.0.0.1'], port= 9042)
session = cluster.connect('mykeyspace')

In [3]:
# Data pre-processing

# Reading all the 10 years CSV
year2005 = (spark.read.csv(path='/Users/alessio/Documents/Projects/usa-mortality-analysis/sources/dataset/2005_data.csv',
                           header=True,
                           inferSchema=True,
                           ignoreLeadingWhiteSpace=True,
                           ignoreTrailingWhiteSpace=True).cache())
year2006 = (spark.read.csv(path='/Users/alessio/Documents/Projects/usa-mortality-analysis/sources/dataset/2006_data.csv',
                           header=True,
                           inferSchema=True,
                           ignoreLeadingWhiteSpace=True,
                           ignoreTrailingWhiteSpace=True).cache())
year2007 = (spark.read.csv(path='/Users/alessio/Documents/Projects/usa-mortality-analysis/sources/dataset/2007_data.csv',
                           header=True,
                           inferSchema=True,
                           ignoreLeadingWhiteSpace=True,
                           ignoreTrailingWhiteSpace=True).cache())
year2008 = (spark.read.csv(path='/Users/alessio/Documents/Projects/usa-mortality-analysis/sources/dataset/2008_data.csv',
                           header=True,
                           inferSchema=True,
                           ignoreLeadingWhiteSpace=True,
                           ignoreTrailingWhiteSpace=True).cache())
year2009 = (spark.read.csv(path='/Users/alessio/Documents/Projects/usa-mortality-analysis/sources/dataset/2009_data.csv',
                           header=True,
                           inferSchema=True,
                           ignoreLeadingWhiteSpace=True,
                           ignoreTrailingWhiteSpace=True).cache())
year2010 = (spark.read.csv(path='/Users/alessio/Documents/Projects/usa-mortality-analysis/sources/dataset/2010_data.csv',
                           header=True,
                           inferSchema=True,
                           ignoreLeadingWhiteSpace=True,
                           ignoreTrailingWhiteSpace=True).cache())
year2011 = (spark.read.csv(path='/Users/alessio/Documents/Projects/usa-mortality-analysis/sources/dataset/2011_data.csv',
                           header=True,
                           inferSchema=True,
                           ignoreLeadingWhiteSpace=True,
                           ignoreTrailingWhiteSpace=True).cache())
year2012 = (spark.read.csv(path='/Users/alessio/Documents/Projects/usa-mortality-analysis/sources/dataset/2012_data.csv',
                           header=True,
                           inferSchema=True,
                           ignoreLeadingWhiteSpace=True,
                           ignoreTrailingWhiteSpace=True).cache())
year2013 = (spark.read.csv(path='/Users/alessio/Documents/Projects/usa-mortality-analysis/sources/dataset/2013_data.csv',
                           header=True,
                           inferSchema=True,
                           ignoreLeadingWhiteSpace=True,
                           ignoreTrailingWhiteSpace=True).cache())
year2014 = (spark.read.csv(path='/Users/alessio/Documents/Projects/usa-mortality-analysis/sources/dataset/2014_data.csv',
                           header=True,
                           inferSchema=True,
                           ignoreLeadingWhiteSpace=True,
                           ignoreTrailingWhiteSpace=True).cache())
year2015 = (spark.read.csv(path='/Users/alessio/Documents/Projects/usa-mortality-analysis/sources/dataset/2015_data.csv',
                           header=True,
                           inferSchema=True,
                           ignoreLeadingWhiteSpace=True,
                           ignoreTrailingWhiteSpace=True).cache())

In [4]:
# Merging all 11 years data into dataframe
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

MergeData = unionAll(year2005,
                     year2006,
                     year2007,
                     year2008,
                     year2009,
                     year2010,
                     year2011,
                     year2012,
                     year2013,
                     year2014,
                     year2015)

In [6]:
# Count of MergeData
MergeData.count()

27720673

In [5]:
# Show MergeData schema
MergeData.printSchema()

root
 |-- resident_status: integer (nullable = true)
 |-- education_1989_revision: integer (nullable = true)
 |-- education_2003_revision: integer (nullable = true)
 |-- education_reporting_flag: integer (nullable = true)
 |-- month_of_death: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- detail_age_type: integer (nullable = true)
 |-- detail_age: integer (nullable = true)
 |-- age_substitution_flag: integer (nullable = true)
 |-- age_recode_52: integer (nullable = true)
 |-- age_recode_27: integer (nullable = true)
 |-- age_recode_12: integer (nullable = true)
 |-- infant_age_recode_22: integer (nullable = true)
 |-- place_of_death_and_decedents_status: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- day_of_week_of_death: integer (nullable = true)
 |-- current_data_year: integer (nullable = true)
 |-- injury_at_work: string (nullable = true)
 |-- manner_of_death: integer (nullable = true)
 |-- method_of_disposition: string (nullable = tr

In [6]:
#Columns of MergeData
MergeData.columns

['resident_status',
 'education_1989_revision',
 'education_2003_revision',
 'education_reporting_flag',
 'month_of_death',
 'sex',
 'detail_age_type',
 'detail_age',
 'age_substitution_flag',
 'age_recode_52',
 'age_recode_27',
 'age_recode_12',
 'infant_age_recode_22',
 'place_of_death_and_decedents_status',
 'marital_status',
 'day_of_week_of_death',
 'current_data_year',
 'injury_at_work',
 'manner_of_death',
 'method_of_disposition',
 'autopsy',
 'activity_code',
 'place_of_injury_for_causes_w00_y34_except_y06_and_y07_',
 'icd_code_10th_revision',
 '358_cause_recode',
 '113_cause_recode',
 '130_infant_cause_recode',
 '39_cause_recode',
 'number_of_entity_axis_conditions',
 'entity_condition_1',
 'entity_condition_2',
 'entity_condition_3',
 'entity_condition_4',
 'entity_condition_5',
 'entity_condition_6',
 'entity_condition_7',
 'entity_condition_8',
 'entity_condition_9',
 'entity_condition_10',
 'entity_condition_11',
 'entity_condition_12',
 'entity_condition_13',
 'entity_co

In [7]:
# Dropping Columns
MergeData = MergeData.drop('record_condition_1',
                           'record_condition_2',
                           'record_condition_3',
                           'record_condition_4',
                           'record_condition_5',
                           'record_condition_6',
                           'record_condition_7',
                           'record_condition_8',
                           'record_condition_9',
                           'record_condition_10',
                           'record_condition_11',
                           'record_condition_12',
                           'record_condition_13',
                           'record_condition_14',
                           'record_condition_15',
                           'record_condition_16',
                           'record_condition_17',
                           'record_condition_18',
                           'record_condition_19',
                           'record_condition_20')

MergeData = MergeData.drop('130_infant_cause_recode',
                           '39_cause_recode',
                           'number_of_entity_axis_conditions',
                           'entity_condition_1',
                           'entity_condition_2',
                           'entity_condition_3',
                           'entity_condition_4',
                           'entity_condition_5',
                           'entity_condition_6',
                           'entity_condition_7',
                           'entity_condition_8',
                           'entity_condition_9',
                           'entity_condition_10',
                           'entity_condition_11',
                           'entity_condition_12',
                           'entity_condition_13',
                           'entity_condition_14',
                           'entity_condition_15',
                           'entity_condition_16',
                           'entity_condition_17',
                           'entity_condition_18',
                           'entity_condition_19')

MergeData = MergeData.drop('age_recode_27',
                           'detail_age',
                           'entity_condition_20',
                           'education_1989_revision')

MergeData = MergeData.filter((MergeData.method_of_disposition == 'B') | (MergeData.method_of_disposition == 'C' ))

MergeData = MergeData.drop('detail_age_type',
                           'age_substitution_flag',
                           'age_substitution_flag',
                           'infant_age_recode_22',
                           '358_cause_recode',
                           'number_of_record_axis_conditions',
                           'hispanic_origin',
                           'race_recode_5')

In [8]:
#Columns of MergeData
MergeData.columns

['resident_status',
 'education_2003_revision',
 'education_reporting_flag',
 'month_of_death',
 'sex',
 'age_recode_52',
 'age_recode_12',
 'place_of_death_and_decedents_status',
 'marital_status',
 'day_of_week_of_death',
 'current_data_year',
 'injury_at_work',
 'manner_of_death',
 'method_of_disposition',
 'autopsy',
 'activity_code',
 'place_of_injury_for_causes_w00_y34_except_y06_and_y07_',
 'icd_code_10th_revision',
 '113_cause_recode',
 'race',
 'bridged_race_flag',
 'race_imputation_flag',
 'race_recode_3',
 'hispanic_originrace_recode']

In [9]:
categoricalColumns = ['resident_status',
                      'month_of_death',
                      'sex',
                      'age_recode_52',
                      'place_of_death_and_decedents_status',
                      'marital_status',
                      'injury_at_work',
                      'manner_of_death',
                      'autopsy',
                      'activity_code',
                      'place_of_injury_for_causes_w00_y34_except_y06_and_y07_',
                      'race',
                      'hispanic_originrace_recode']

In [10]:
# Null Imputations
MergeData = MergeData.fillna({'place_of_injury_for_causes_w00_y34_except_y06_and_y07_': 12})
MergeData = MergeData.fillna({'activity_code': 11})
MergeData = MergeData.fillna({'manner_of_death': 999})
MergeData = MergeData.fillna({'place_of_death_and_decedents_status': 999})
MergeData = MergeData.fillna({'education_2003_revision': 9})

In [11]:
# Create SQL Table from MergeData
MergeData.registerTempTable("mergedTable")

In [17]:
# Merging all years data into new dataframe for other queries
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

JoinedData=unionAll(year2005,
                    year2006,
                    year2007,
                    year2008,
                    year2009,
                    year2010,
                    year2011,
                    year2012,
                    year2013,
                    year2014,
                    year2015)

In [13]:
# Replacing null values in manner_of_death
JoinData = JoinedData.fillna({'manner_of_death': 12})
JoinData = JoinedData.filter(JoinedData.manner_of_death == '2')

In [14]:
# Creating joint table on new joined data which is modified
JoinData.registerTempTable("jointTable")

In [15]:
# Query 1 - Male vs female deaths by resident status
results_male_female_resident_status = spark.sql(
  """SELECT resident_status,
            sex,
            count(sex) AS sex_counts
     FROM mergedTable
     GROUP BY resident_status, sex
     ORDER BY resident_status, sex""")

results_male_female_resident_status.show()

+---------------+---+----------+
|resident_status|sex|sex_counts|
+---------------+---+----------+
|              1|  F|   7590600|
|              1|  M|   7460761|
|              2|  F|   1236340|
|              2|  M|   1414999|
|              3|  F|    210242|
|              3|  M|    266829|
|              4|  F|     10052|
|              4|  M|     20991|
+---------------+---+----------+



In [10]:
# Saving data in the db
session.execute("CREATE TABLE IF NOT EXISTS results_male_female_resident_status (ind int primary key, resident_status int, sex varchar, sex_counts int)")
stmt = session.prepare("INSERT INTO results_male_female_resident_status (ind, resident_status, sex, sex_counts) VALUES (?, ?, ?, ?)")

for ind, item in results_male_female_resident_status.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['resident_status'], item['sex'], item['sex_counts']])

In [16]:
# Query 2 - Male vs female deaths by month of the year
results_male_deaths_month = spark.sql(
  """SELECT month_of_death,
            sex,
            count(sex) AS sex_counts
     FROM mergedTable
     GROUP BY month_of_death, sex
     ORDER BY month_of_death, sex""")

results_male_deaths_month.show()

+--------------+---+----------+
|month_of_death|sex|sex_counts|
+--------------+---+----------+
|             1|  F|    849629|
|             1|  M|    838492|
|             2|  F|    758754|
|             2|  M|    750646|
|             3|  F|    813433|
|             3|  M|    804654|
|             4|  F|    747380|
|             4|  M|    754604|
|             5|  F|    738769|
|             5|  M|    753111|
|             6|  F|    696060|
|             6|  M|    717974|
|             7|  F|    714427|
|             7|  M|    740472|
|             8|  F|    711261|
|             8|  M|    735864|
|             9|  F|    700249|
|             9|  M|    719378|
|            10|  F|    749339|
|            10|  M|    763009|
+--------------+---+----------+
only showing top 20 rows



In [12]:
# Saving data in the db
session.execute("CREATE TABLE IF NOT EXISTS results_male_deaths_month (ind int primary key, month_of_death int, sex varchar, sex_counts int)")
stmt = session.prepare("INSERT INTO results_male_deaths_month (ind, month_of_death, sex, sex_counts) VALUES (?, ?, ?, ?)")

for ind, item in results_male_deaths_month.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['month_of_death'], item['sex'], item['sex_counts']])

In [17]:
# Read Disease description CSV (ICD10)
icd10 = (spark.read.csv(path='/Users/alessio/Documents/Projects/usa-mortality-analysis/sources/codes/ICD10.csv',
                        header=True,
                        inferSchema=True,
                        ignoreLeadingWhiteSpace=True,
                        ignoreTrailingWhiteSpace=True).cache())

In [18]:
# Create SQL Table of disease description
icd10.registerTempTable("icd10")

In [19]:
# Query 3 - Top 20 diseases causing deaths for either sex
results_diseases_causing_deaths_sex = spark.sql(
  """SELECT m.sex AS sex,
            m.icd_code_10th_revision AS code,
            i.description3 AS description,
            count(m.sex) AS sex_counts
     FROM mergedTable m, icd10 i
     WHERE i.code3 = m.icd_code_10th_revision
     GROUP BY i.description3 ,m.icd_code_10th_revision, m.sex
     ORDER BY count(m.sex) DESC, m.sex
     LIMIT 20""")

results_diseases_causing_deaths_sex.show()

+---+----+--------------------+----------+
|sex|code|         description|sex_counts|
+---+----+--------------------+----------+
|  M|I219|Acute myocardial ...|    487319|
|  F|G309| Alzheimer's disease|    415393|
|  F|J449|Other chronic obs...|    397198|
|  F|I219|Acute myocardial ...|    392758|
|  M|J449|Other chronic obs...|    350659|
|  M| C61|Malignant neoplas...|    204566|
|  M|G309| Alzheimer's disease|    184358|
|  F|J189|Pneumonia, unspec...|    173403|
|  M|J189|Pneumonia, unspec...|    151422|
|  M|C189|Malignant neoplas...|    150469|
|  F|C189|Malignant neoplas...|    144516|
|  M|C259|Malignant neoplas...|    137235|
|  F|C259|Malignant neoplas...|    132971|
|  F|A419|Sepsis, unspecifi...|    121247|
|  M|A419|Sepsis, unspecifi...|    104677|
|  M|I119|Hypertensive hear...|     98173|
|  M| G20| Parkinson's disease|     97789|
|  M|C159|Malignant neoplas...|     81824|
|  F| I10|Essential (primar...|     80630|
|  F|I119|Hypertensive hear...|     79545|
+---+----+-

In [16]:
# Saving data in the db
session.execute("CREATE TABLE IF NOT EXISTS results_diseases_causing_deaths_sex (ind int primary key, code varchar, description varchar, sex varchar, sex_counts int)")
stmt = session.prepare("INSERT INTO results_diseases_causing_deaths_sex (ind, code, description, sex, sex_counts) VALUES (?, ?, ?, ?, ?)")

for ind, item in results_diseases_causing_deaths_sex.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['code'], item['description'], item['sex'], item['sex_counts']])

In [20]:
# Query 4 - Method of Disposition
results_disposition = spark.sql(
  """SELECT current_data_year AS year,
            CASE method_of_disposition
            WHEN 'C' THEN 'Cremation'
            WHEN 'B' THEN 'Burial'
            WHEN 'D'THEN 'Donation'
            WHEN 'E' THEN 'Entombment'
            WHEN 'O' THEN 'Other'
            WHEN 'R' THEN 'RemovedFromUSA'
            WHEN 'U' THEN 'Unknown'
            END AS disposition,
            COUNT(*) AS counts
     FROM mergedTable
     GROUP BY 1, 2
     ORDER BY 1, 3""")

results_disposition.show()

+----+-----------+-------+
|year|disposition| counts|
+----+-----------+-------+
|2005|  Cremation| 350018|
|2005|     Burial| 553202|
|2006|  Cremation| 423282|
|2006|     Burial| 667169|
|2007|  Cremation| 472220|
|2007|     Burial| 725666|
|2008|  Cremation| 579827|
|2008|     Burial| 866384|
|2009|  Cremation| 599202|
|2009|     Burial| 802305|
|2010|  Cremation| 706224|
|2010|     Burial| 906430|
|2011|  Cremation| 780480|
|2011|     Burial| 950372|
|2012|  Cremation| 898222|
|2012|     Burial|1093628|
|2013|  Cremation| 973768|
|2013|     Burial|1113362|
|2014|  Cremation|1094292|
|2014|     Burial|1162836|
+----+-----------+-------+
only showing top 20 rows



In [19]:
# Saving data in the db
session.execute("CREATE TABLE IF NOT EXISTS results_disposition (ind int primary key, counts int, disposition varchar, year int)")
stmt = session.prepare("INSERT INTO results_disposition (ind, counts, disposition, year) VALUES (?, ?, ?, ?)")

for ind, item in results_disposition.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['counts'], item['disposition'], item['year']])

In [21]:
# Query 5 - Manner of death per month
results_deaths_month = spark.sql(
  """SELECT month_of_death AS month,
            CASE manner_of_death
            WHEN '0' THEN 'Not Specified'
            WHEN '1' THEN 'Accident'
            WHEN '2' THEN 'Suicide'
            WHEN '3' THEN 'Homicide'
            WHEN '4' THEN 'Pending investigation'
            WHEN '5' THEN 'Could not be determine'
            WHEN '6' THEN 'Self-Inflicted'
            WHEN '7' THEN 'Natural'
            ELSE 'OTHER'
            END AS manner_death,
            COUNT(*) AS counts
     FROM mergedTable
     GROUP BY 1, 2
     ORDER BY 1,2""")

results_deaths_month.show()

+-----+--------------------+-------+
|month|        manner_death| counts|
+-----+--------------------+-------+
|    1|            Accident|  73168|
|    1|Could not be dete...|   6569|
|    1|            Homicide|  10496|
|    1|             Natural|1265741|
|    1|               OTHER| 305799|
|    1|Pending investiga...|   3025|
|    1|             Suicide|  23323|
|    2|            Accident|  67509|
|    2|Could not be dete...|   6161|
|    2|            Homicide|   8666|
|    2|             Natural|1128534|
|    2|               OTHER| 275216|
|    2|Pending investiga...|   2404|
|    2|             Suicide|  20910|
|    3|            Accident|  72695|
|    3|Could not be dete...|   6817|
|    3|            Homicide|  10283|
|    3|             Natural|1211157|
|    3|               OTHER| 290285|
|    3|Pending investiga...|   2645|
+-----+--------------------+-------+
only showing top 20 rows



In [23]:
# Saving data in the db
session.execute("CREATE TABLE IF NOT EXISTS results_deaths_month (ind int primary key, counts int, manner_death varchar, month int)")
stmt = session.prepare("INSERT INTO results_deaths_month (ind, counts, manner_death, month) VALUES (?, ?, ?, ?)")

for ind, item in results_deaths_month.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['counts'], item['manner_death'], item['month']])

In [22]:
# Query 6 - Analysis about Alzheimer's death order by group age
results_alzheimer = spark.sql(
  """SELECT count(*) as deaths_counts,
            CASE age_recode_12
            WHEN '10' THEN '75 - 84 years'
            WHEN '11' THEN '85 years and over'
            WHEN '12' THEN 'Age not stated'
            WHEN '01' THEN 'Under 1 year'
            WHEN '02' THEN '1 - 4 years'
            WHEN '03' THEN '5 - 14 years'
            WHEN '04' THEN '15 - 24 years'
            WHEN '05' THEN '25 - 34 years'
            WHEN '06' THEN '35 - 44 years'
            WHEN '07' THEN '45 - 54 years'
            WHEN '08' THEN '55 - 64 years'
            WHEN '09' THEN '65 - 74 years'
            END AS age
     FROM mergedTable
     WHERE 113_cause_recode = '052'
     GROUP BY age
     ORDER BY deaths_counts DESC
     LIMIT 10""")

results_alzheimer.show()

+-------------+-----------------+
|deaths_counts|              age|
+-------------+-----------------+
|       400844|85 years and over|
|       174941|    75 - 84 years|
|        32518|    65 - 74 years|
|         5763|    55 - 64 years|
|          700|    45 - 54 years|
|           64|    35 - 44 years|
|            6|   Age not stated|
|            3|    25 - 34 years|
|            1|    15 - 24 years|
+-------------+-----------------+



In [25]:
# Saving data in the db
session.execute("CREATE TABLE IF NOT EXISTS results_alzheimer (ind int primary key, age varchar, deaths_counts int)")
stmt = session.prepare("INSERT INTO results_alzheimer (ind, age, deaths_counts) VALUES (?, ?, ?)")

for ind, item in results_alzheimer.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['age'], item['deaths_counts']])

In [20]:
# Query 7 - Count number of graduate people who deads with suicide manner
results_degree_suicides = spark.sql(
  """SELECT count(*) as deaths_counts,
            CASE education_2003_revision
            WHEN '1' THEN '8th grade or less'
            WHEN '2' THEN '9 - 12th grade, no diploma'
            WHEN '3' THEN 'high school graduate or GED completed'
            WHEN '4' THEN 'some college credit, but no degree'
            WHEN '5' THEN 'Associate degree'
            WHEN '6' THEN 'Bachelor’s degree'
            WHEN '7' THEN 'Master’s degree'
            WHEN '8' THEN 'Doctorate or professional degree'
            WHEN '9' THEN 'Unknown'
            END AS education
     FROM mergedTable
     WHERE manner_of_death = '2'
     GROUP BY education
     ORDER BY deaths_counts DESC
     LIMIT 10""")

results_degree_suicides.show()

+-------------+--------------------+
|deaths_counts|           education|
+-------------+--------------------+
|       113618|high school gradu...|
|        49674|some college cred...|
|        35932|9 - 12th grade, n...|
|        33985|   Bachelor’s degree|
|        19862|    Associate degree|
|        11797|   8th grade or less|
|        11095|     Master’s degree|
|         6076|             Unknown|
|         5657|Doctorate or prof...|
+-------------+--------------------+



In [27]:
# Saving data in the db
session.execute("CREATE TABLE IF NOT EXISTS results_degree_suicides (ind int primary key, deaths_counts int, education varchar)")
stmt = session.prepare("INSERT INTO results_degree_suicides (ind, deaths_counts, education) VALUES (?, ?, ?)")

for ind, item in results_degree_suicides.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['deaths_counts'], item['education']])

In [21]:
# Query 8 - Total suicides committed 2005-2015
results_suicides = spark.sql(
  """SELECT current_data_year AS year,
            CASE manner_of_death
            WHEN '2' THEN 'Suicide'
            END AS suicides,
            COUNT(*) AS counts
     FROM jointTable
     GROUP BY 1, 2
     ORDER BY 1, 3""")

results_suicides.show()

+----+--------+------+
|year|suicides|counts|
+----+--------+------+
|2005| Suicide| 32934|
|2006| Suicide| 33562|
|2007| Suicide| 34827|
|2008| Suicide| 36251|
|2009| Suicide| 37205|
|2010| Suicide| 38710|
|2011| Suicide| 39878|
|2012| Suicide| 40929|
|2013| Suicide| 41509|
|2014| Suicide| 43139|
|2015| Suicide| 44417|
+----+--------+------+



In [38]:
# Saving data in the db
session.execute("CREATE TABLE IF NOT EXISTS results_suicides (ind int primary key, counts int, suicides varchar, year int)")
stmt = session.prepare("INSERT INTO results_suicides (ind, counts, suicides, year) VALUES (?, ?, ?, ?)")

for ind, item in results_suicides.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['counts'], item['suicides'], item['year']])

In [23]:
# Query 9 - Result about deaths pending
results_deaths_pending = spark.sql(
    """SELECT icd.description3 AS description,
              count(*) as counts
       FROM mergedTable mt JOIN icd10 icd ON icd.code3 = mt.icd_code_10th_revision
       WHERE mt.manner_of_death == '4'
       GROUP BY description
       ORDER BY counts DESC, description"""
)
results_deaths_pending.show()

+--------------------+------+
|         description|counts|
+--------------------+------+
|Ill-defined and u...| 29259|
|Acute myocardial ...|   212|
|Hypertensive hear...|   153|
|Complications and...|   148|
|      Cardiac arrest|   102|
|Other chronic obs...|    94|
|      Cardiomyopathy|    87|
|Pneumonia, unspec...|    62|
|Cardiac arrhythmi...|    54|
|Sepsis, unspecifi...|    47|
|Shock, not elsewh...|    46|
|Essential (primar...|    42|
| Alzheimer's disease|    34|
|Obesity, unspecified|    34|
|Chronic ischemic ...|    33|
|Other disorders o...|    33|
|       Other obesity|    32|
|           Emphysema|    29|
|Nontraumatic intr...|    24|
|  Respiratory arrest|    21|
+--------------------+------+
only showing top 20 rows



In [40]:
# Saving data in the db
session.execute("CREATE TABLE IF NOT EXISTS results_deaths_pending (ind int primary key, counts int, description varchar)")
stmt = session.prepare("INSERT INTO results_deaths_pending (ind, counts, description) VALUES (?, ?, ?)")

for ind, item in results_deaths_pending.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['counts'], item['description']])

In [15]:
# Query 10 - Deaths for days of the week
results_day_week = spark.sql(
    """SELECT count(*) as counts,
              CASE day_of_week_of_death
              WHEN '1' THEN 'Sunday'
              WHEN '2' THEN 'Monday'
              WHEN '3' THEN 'Tuesday'
              WHEN '4' THEN 'Wednesday'
              WHEN '5' THEN 'Thursday'
              WHEN '6' THEN 'Friday'
              WHEN '7' THEN 'Saturday'
              WHEN '9' THEN 'Unknown'
              END AS day
      FROM mergedTable
      GROUP BY day
      ORDER BY counts"""
)

results_day_week.show()

+-------+---------+
| counts|      day|
+-------+---------+
|   1033|  Unknown|
|2581659|   Sunday|
|2585712|  Tuesday|
|2589390|Wednesday|
|2595314| Thursday|
|2601045|   Monday|
|2626127|   Friday|
|2630534| Saturday|
+-------+---------+



In [42]:
# Saving data in the db
session.execute("CREATE TABLE IF NOT EXISTS results_day_week (ind int primary key, counts int, day varchar)")
stmt = session.prepare("INSERT INTO results_day_week (ind, counts, day) VALUES (?, ?, ?)")

for ind, item in results_day_week.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['counts'], item['day']])

In [16]:
# Query 11 - Total deaths by skin's color
results_deaths_skin_color = spark.sql(
    """SELECT count(*) as counts,
            CASE race_recode_3
            WHEN '1' THEN 'White'
            WHEN '2' THEN 'Races other than b&w'
            WHEN '3' THEN 'Black'
            END AS skin_color
       FROM mergedTable
       GROUP BY skin_color
       ORDER BY counts"""
)

results_deaths_skin_color.show()

+--------+--------------------+
|  counts|          skin_color|
+--------+--------------------+
|  572732|Races other than b&w|
| 2033419|               Black|
|15604663|               White|
+--------+--------------------+



In [45]:
# Saving data in the db
session.execute("CREATE TABLE IF NOT EXISTS results_deaths_skin_color (ind int primary key, counts int, skin_color varchar)")
stmt = session.prepare("INSERT INTO results_deaths_skin_color (ind, counts, skin_color) VALUES (?, ?, ?)")

for ind, item in results_deaths_skin_color.toPandas().iterrows():
    results = session.execute(stmt, [ind, item['counts'], item['skin_color']])

In [18]:
# Spark MLlib

# Pipeline- String Indexing, encoding and Vector Assembling
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

In [19]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol='method_of_disposition', outputCol="label")
stages += [label_stringIdx]

In [20]:
assemblerInputs = [c + "classVec" for c in categoricalColumns]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [21]:
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(MergeData)
preppedDataDF = pipelineModel.transform(MergeData)

In [22]:
# Choose the column that is predicted and the columns(features) that are used for prediciting
selectedcols = ["label", "features"]
dataset = preppedDataDF.select(selectedcols)

In [23]:
# Splitting the dataset into training set and test set
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)

In [25]:
# Logistic Regression model for predicting method-of-disposition (burial vs cremation)

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [26]:
predictions = lrModel.transform(testData)

In [27]:
# Comparing the predicted value against actual value
selected = predictions.select("label", "prediction", "probability")

In [28]:
# Efficiency of Model: Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.6650842059202722

In [29]:
# Accuracy: Evaluate model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName = 'accuracy')
evaluator.evaluate(predictions)


0.6298390922961876