In this notebook, I will try to fit the regression model of the trend of amount of the deceased from 2005 to 2014 and use this model to predict amount of the deceased in 2015

The trend includes resident status, race, gender and death cause of the deceased

# Loading and setting up the session

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import numpy as np 
import pandas as pd 
import time
import json
import gc
import xgboost as xgb 
from sklearn.model_selection import GridSearchCV
import matplotlib.pyplot as plt

In [5]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"
# os.environ["SPARK_HOME"] ="/content/drive/MyDrive/Colab Notebooks/BigData/spark-2.4.7-bin-hadoop2.7"


In [7]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [8]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql.functions import udf,col


# Loading and intial cleaning.

In [9]:
start = time.time()
data_path="/content/drive/MyDrive/Colab Notebooks/BigData/Final project/Death-Big-data-Analytics/archive"
df=spark.read.options(header=True,inferSchema=True).csv(data_path+'/sampled.csv')
df.show(truncate=False)
print((time.time()-start)/60)

+---------------+-----------------------+-----------------------+------------------------+--------------+---+---------------+----------+---------------------+-------------+-------------+-------------+--------------------+-----------------------------------+--------------+--------------------+-----------------+--------------+---------------+---------------------+-------+-------------+------------------------------------------------------+----------------------+----------------+----------------+-----------------------+---------------+--------------------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------------------+-----

In [10]:
df.dtypes

[('resident_status', 'string'),
 ('education_1989_revision', 'string'),
 ('education_2003_revision', 'string'),
 ('education_reporting_flag', 'string'),
 ('month_of_death', 'string'),
 ('sex', 'string'),
 ('detail_age_type', 'string'),
 ('detail_age', 'string'),
 ('age_substitution_flag', 'string'),
 ('age_recode_52', 'string'),
 ('age_recode_27', 'string'),
 ('age_recode_12', 'string'),
 ('infant_age_recode_22', 'string'),
 ('place_of_death_and_decedents_status', 'string'),
 ('marital_status', 'string'),
 ('day_of_week_of_death', 'string'),
 ('current_data_year', 'string'),
 ('injury_at_work', 'string'),
 ('manner_of_death', 'string'),
 ('method_of_disposition', 'string'),
 ('autopsy', 'string'),
 ('activity_code', 'string'),
 ('place_of_injury_for_causes_w00_y34_except_y06_and_y07_', 'string'),
 ('icd_code_10th_revision', 'string'),
 ('358_cause_recode', 'string'),
 ('113_cause_recode', 'string'),
 ('130_infant_cause_recode', 'string'),
 ('39_cause_recode', 'string'),
 ('number_of_en

In [11]:
df=df.filter("current_data_year in ('2005','2006','2007','2008','2009','2010','2011','2012','2013','2014','2015')")\
               .filter('sex in ("M","F")')


In [12]:
features = ['current_data_year','month_of_death','resident_status','sex','race_recode_3','39_cause_recode']
train_1=df.select(features)

In [13]:
train_1.groupby("race_recode_3").count().show()
train_1.groupby("39_cause_recode").count().show()
train_1.groupby("resident_status").count().show()
train_1.groupby("current_data_year").count().show()
train_1.groupby("month_of_death").count().show()

+-------------+-----+
|race_recode_3|count|
+-------------+-----+
|            3|   55|
|            1|  807|
|            2|  227|
+-------------+-----+

+---------------+-----+
|39_cause_recode|count|
+---------------+-----+
|             07|   17|
|             15|   70|
|             11|    6|
|             42|    1|
|             30|   14|
|             34|    3|
|             01|    1|
|             28|   46|
|             22|   97|
|             16|   30|
|             31|   22|
|             27|   25|
|             17|   35|
|             26|    6|
|             09|   28|
|             05|    2|
|             23|    9|
|             41|    4|
|             08|   81|
|             40|   33|
+---------------+-----+
only showing top 20 rows

+---------------+-----+
|resident_status|count|
+---------------+-----+
|              3|   23|
|              1|  942|
|              2|  124|
+---------------+-----+

+-----------------+-----+
|current_data_year|count|
+-----------------+---

In [14]:
def read_json(path):
    with open(path,'r', encoding = 'utf-8') as f:
        definition = json.load(f)
    return definition

In [15]:
path="/content/drive/MyDrive/Colab Notebooks/BigData/Final project/Death-Big-data-Analytics/archive"
definition = read_json(path+'/2005_codes.json')
definition['place_of_death_and_decedents_status']['1'] = 'Hospital, Clinic or Medical Center'

In [24]:
cause_definition = {}
neoplasm = list(range(4,16))
heart = list(range(19,23))

cause_dict = {'accident':1, 'heart':2, 'neoplasm':3,'others':4}

for i in range(1,43):
    if i in neoplasm:
        cause_definition[i] = cause_dict['neoplasm'] # code for neoplasmic deaths
    elif i in heart:
        cause_definition[i] = cause_dict['heart'] # code for heart related deaths
    elif i in [38,39]:
        cause_definition[i] = cause_dict['accident'] # code for accident related deaths
    else:
        cause_definition[i] = cause_dict['others'] # code for other deaths

In [25]:
cause_definition

{1: 4,
 2: 4,
 3: 4,
 4: 3,
 5: 3,
 6: 3,
 7: 3,
 8: 3,
 9: 3,
 10: 3,
 11: 3,
 12: 3,
 13: 3,
 14: 3,
 15: 3,
 16: 4,
 17: 4,
 18: 4,
 19: 2,
 20: 2,
 21: 2,
 22: 2,
 23: 4,
 24: 4,
 25: 4,
 26: 4,
 27: 4,
 28: 4,
 29: 4,
 30: 4,
 31: 4,
 32: 4,
 33: 4,
 34: 4,
 35: 4,
 36: 4,
 37: 4,
 38: 1,
 39: 1,
 40: 4,
 41: 4,
 42: 4}

# Preparing The training dataset

In [26]:
features = ['current_data_year','month_of_death','resident_status','sex','race_recode_3','39_cause_recode']
train_1=df.select(features)

In [27]:
train_1.count()

1089

In [28]:
@udf(returnType=IntegerType()) 
def convertCause(val):
  return cause_definition[int(val)]

@udf(returnType=IntegerType()) 
def convertResidency(val):
  return 1 if val=="1" or val=="4" else 0

@udf(returnType=IntegerType()) 
def convertSex(val):
  return 1 if val=="M"  else 0

In [29]:
train_1=train_1.withColumn('cause_recode',convertCause(col('39_cause_recode')))\
               .withColumn('resident_status',convertResidency(col('resident_status')))\
               .withColumn('sex',convertSex(col('sex')))\
               .withColumn("current_data_year",col("current_data_year").cast("Integer"))\
               .withColumn("month_of_death",col("month_of_death").cast("Integer"))\
               .withColumn("race_recode",col("race_recode_3").cast("Integer"))\
               .withColumn("numeric_month",col("month_of_death")+(col("current_data_year")-2005)*12)\
               .filter('cause_recode != 4') # remove all of death samples with cause code (others)

In [30]:
train_1.show()

+-----------------+--------------+---------------+---+-------------+---------------+------------+-----------+-------------+
|current_data_year|month_of_death|resident_status|sex|race_recode_3|39_cause_recode|cause_recode|race_recode|numeric_month|
+-----------------+--------------+---------------+---+-------------+---------------+------------+-----------+-------------+
|             2005|             1|              1|  0|            1|             15|           3|          1|            1|
|             2005|             1|              1|  0|            1|             08|           3|          1|            1|
|             2005|             1|              0|  0|            1|             15|           3|          1|            1|
|             2005|             1|              1|  1|            3|             06|           3|          3|            1|
|             2005|             1|              1|  1|            1|             15|           3|          1|            1|
|       

In [31]:
train_1.dtypes

[('current_data_year', 'int'),
 ('month_of_death', 'int'),
 ('resident_status', 'int'),
 ('sex', 'int'),
 ('race_recode_3', 'string'),
 ('39_cause_recode', 'string'),
 ('cause_recode', 'int'),
 ('race_recode', 'int'),
 ('numeric_month', 'int')]

# Extracting the features from the dataset

In [32]:
cols=['numeric_month','resident_status','sex','race_recode','cause_recode']
death_cnt_group= train_1.select(cols)\
                        .groupby('numeric_month','resident_status','sex','race_recode','cause_recode')\
                        .count()

In [33]:
cols=['numeric_month','resident_status','count']
death_cnt_group2 = death_cnt_group.select(cols).groupby('numeric_month','resident_status').agg(F.mean('count')).withColumnRenamed("avg(count)","resident_status_death_mean")
death_cnt_group=death_cnt_group.join(death_cnt_group2,['numeric_month','resident_status'] ,how='left_outer')

In [34]:
cols=['numeric_month','sex','count']
death_cnt_group3 = death_cnt_group.select(cols).groupby('numeric_month','sex').agg(F.mean('count')).withColumnRenamed("avg(count)","sex_death_mean")
death_cnt_group=death_cnt_group.join(death_cnt_group3, ['numeric_month','sex'],how='left')

In [35]:
cols=['numeric_month','race_recode','count']
death_cnt_group4 = death_cnt_group.select(cols).groupby('numeric_month','race_recode').agg(F.mean('count')).withColumnRenamed("avg(count)","race_recode_death_mean")
death_cnt_group=death_cnt_group.join(death_cnt_group4, ['numeric_month','race_recode'],how='left')

In [36]:
cols=['numeric_month','cause_recode','count']
death_cnt_group5 = death_cnt_group.select(cols).groupby('numeric_month','cause_recode').agg(F.mean('count')).withColumnRenamed("avg(count)","cause_recode_death_mean")
death_cnt_group=death_cnt_group.join(death_cnt_group5,['numeric_month','cause_recode'],how='left')

In [37]:
death_cnt_group.dtypes

[('numeric_month', 'int'),
 ('cause_recode', 'int'),
 ('race_recode', 'int'),
 ('sex', 'int'),
 ('resident_status', 'int'),
 ('count', 'bigint'),
 ('resident_status_death_mean', 'double'),
 ('sex_death_mean', 'double'),
 ('race_recode_death_mean', 'double'),
 ('cause_recode_death_mean', 'double')]

In [38]:
col_lags=['count','resident_status_death_mean','sex_death_mean','race_recode_death_mean']
for column in col_lags:
  cols=['numeric_month','resident_status','sex','race_recode','cause_recode',column]
  for lag in [1,6,12]:
    tmp = death_cnt_group.select(cols)
    shifted=tmp.withColumn("numeric_month",col("numeric_month")+lag)\
               .withColumnRenamed(column,f"{column}_lag_{lag}")
    death_cnt_group = death_cnt_group.join(shifted,cols[:-1],how='left')


In [39]:
death_cnt_group.dtypes

[('numeric_month', 'int'),
 ('resident_status', 'int'),
 ('sex', 'int'),
 ('race_recode', 'int'),
 ('cause_recode', 'int'),
 ('count', 'bigint'),
 ('resident_status_death_mean', 'double'),
 ('sex_death_mean', 'double'),
 ('race_recode_death_mean', 'double'),
 ('cause_recode_death_mean', 'double'),
 ('count_lag_1', 'bigint'),
 ('count_lag_6', 'bigint'),
 ('count_lag_12', 'bigint'),
 ('resident_status_death_mean_lag_1', 'double'),
 ('resident_status_death_mean_lag_6', 'double'),
 ('resident_status_death_mean_lag_12', 'double'),
 ('sex_death_mean_lag_1', 'double'),
 ('sex_death_mean_lag_6', 'double'),
 ('sex_death_mean_lag_12', 'double'),
 ('race_recode_death_mean_lag_1', 'double'),
 ('race_recode_death_mean_lag_6', 'double'),
 ('race_recode_death_mean_lag_12', 'double')]

In [40]:
death_cnt_group=death_cnt_group.fillna(0)

In [None]:
save_path="/content/drive/MyDrive/Colab Notebooks/BigData/Final project/"
death_cnt_group.toPandas().to_csv(save_path+"features.csv")