# Data Preparation Warmup

## Aim: Put death rates in different US states into perspective to the state's population.

### Load the dataset:

In [1]:
val rawDeathCauses = spark.read
    .format("csv")
    .option("header", "true")
    .load("death_causes.csv")

Intitializing Scala interpreter ...

Spark Web UI available at http://start2-pc.local:4040
SparkContext available as 'sc' (version = 2.3.2, master = local[*], app id = local-1541429835871)
SparkSession available as 'spark'


rawDeathCauses: org.apache.spark.sql.DataFrame = [Year: string, 113 Cause Name: string ... 4 more fields]


In [2]:
rawDeathCauses.show(3)

+----+--------------------+--------------------+-------+------+-----------------------+
|Year|      113 Cause Name|          Cause Name|  State|Deaths|Age-adjusted Death Rate|
+----+--------------------+--------------------+-------+------+-----------------------+
|2016|Accidents (uninte...|Unintentional inj...|Alabama|  2755|                  55.50|
|2016|Accidents (uninte...|Unintentional inj...| Alaska|   439|                  63.10|
|2016|Accidents (uninte...|Unintentional inj...|Arizona|  4010|                  54.20|
+----+--------------------+--------------------+-------+------+-----------------------+
only showing top 3 rows



### Remove unnecessary and redundant information:

In [3]:
val columnReducedDeathCauses = rawDeathCauses
    .drop("Cause Name")
    .drop("Age-adjusted Death Rate")

columnReducedDeathCauses: org.apache.spark.sql.DataFrame = [Year: string, 113 Cause Name: string ... 2 more fields]


In [4]:
columnReducedDeathCauses.printSchema

root
 |-- Year: string (nullable = true)
 |-- 113 Cause Name: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Deaths: string (nullable = true)



### Cast the deaths and year to integers.

In [5]:
val deathCauses = columnReducedDeathCauses
    .withColumn("Deaths", $"Deaths".cast("Int"))
    .withColumn("Year", $"Year".cast("Int"))
    .withColumnRenamed("113 Cause Name", "Death Cause")

deathCauses: org.apache.spark.sql.DataFrame = [Year: int, Death Cause: string ... 2 more fields]


In [6]:
deathCauses.printSchema
deathCauses.show(3)

root
 |-- Year: integer (nullable = true)
 |-- Death Cause: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Deaths: integer (nullable = true)

+----+--------------------+-------+------+
|Year|         Death Cause|  State|Deaths|
+----+--------------------+-------+------+
|2016|Accidents (uninte...|Alabama|  2755|
|2016|Accidents (uninte...| Alaska|   439|
|2016|Accidents (uninte...|Arizona|  4010|
+----+--------------------+-------+------+
only showing top 3 rows



### Load the census dataset:

In [7]:
val censusSecondDecade = spark.read
    .format("csv")
    .option("header", "true")
    .load("census.csv")

censusSecondDecade: org.apache.spark.sql.DataFrame = [SUMLEV: string, REGION: string ... 119 more fields]


In [8]:
censusSecondDecade.printSchema

root
 |-- SUMLEV: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- DIVISION: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- CENSUS2010POP: string (nullable = true)
 |-- ESTIMATESBASE2010: string (nullable = true)
 |-- POPESTIMATE2010: string (nullable = true)
 |-- POPESTIMATE2011: string (nullable = true)
 |-- POPESTIMATE2012: string (nullable = true)
 |-- POPESTIMATE2013: string (nullable = true)
 |-- POPESTIMATE2014: string (nullable = true)
 |-- POPESTIMATE2015: string (nullable = true)
 |-- POPESTIMATE2016: string (nullable = true)
 |-- POPESTIMATE2017: string (nullable = true)
 |-- NPOPCHG_2010: string (nullable = true)
 |-- NPOPCHG_2011: string (nullable = true)
 |-- NPOPCHG_2012: string (nullable = true)
 |-- NPOPCHG_2013: string (nullable = true)
 |-- NPOPCHG_2014: string (nullable = true)
 |-- NPOPCHG_2015: string (nullable = true)
 |-- NPOPCHG_2016: string (nullable = true)
 |-- NPOPCHG_2017: string (n

### The state name is contained in the name column and not in the state column.

### Select information:

In [9]:
val statePopEst = censusSecondDecade.select(
    "NAME",
    "REGION",
    "DIVISION",
    "POPESTIMATE2010",
    "POPESTIMATE2011",
    "POPESTIMATE2012",
    "POPESTIMATE2013",
    "POPESTIMATE2014",
    "POPESTIMATE2015",
    "POPESTIMATE2016",
    "POPESTIMATE2017"
)

statePopEst: org.apache.spark.sql.DataFrame = [NAME: string, REGION: string ... 9 more fields]


### Illustration of the resulting dataset.

In [10]:
statePopEst.drop("POPESTIMATE2013" ,"POPESTIMATE2014", "POPESTIMATE2015", "POPESTIMATE2016", "POPESTIMATE2017").show(3)

2018-11-05 15:57:29 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
+----------------+------+--------+---------------+---------------+---------------+
|            NAME|REGION|DIVISION|POPESTIMATE2010|POPESTIMATE2011|POPESTIMATE2012|
+----------------+------+--------+---------------+---------------+---------------+
|   United States|     0|       0|      309338421|      311644280|      313993272|
|Northeast Region|     1|       0|       55388349|       55642659|       55860261|
|  Midwest Region|     2|       0|       66973360|       67141501|       67318295|
+----------------+------+--------+---------------+---------------+---------------+
only showing top 3 rows



### Filter the census data of US states and validate that there are 50 states.

In [11]:
val states = List("Alabama", "Alaska", "Arizona", "Arkansas", "California", "Colorado", "Connecticut", "Delaware", "District of Columbia", "Florida", "Georgia", "Hawaii", "Idaho", "Illinois", "Indiana", "Iowa", "Kansas", "Kentucky", "Louisiana", "Maine", "Montana", "Nebraska", "Nevada", "New Hampshire", "New Jersey", "New Mexico", "New York", "North Carolina", "North Dakota", "Ohio", "Oklahoma", "Oregon", "Maryland", "Massachusetts", "Michigan", "Minnesota", "Mississippi", "Missouri", "Pennsylvania", "Rhode Island", "South Carolina", "South Dakota", "Tennessee", "Texas", "Utah", "Vermont", "Virginia", "Washington", "West Virginia", "Wisconsin", "Wyoming")

states: List[String] = List(Alabama, Alaska, Arizona, Arkansas, California, Colorado, Connecticut, Delaware, District of Columbia, Florida, Georgia, Hawaii, Idaho, Illinois, Indiana, Iowa, Kansas, Kentucky, Louisiana, Maine, Montana, Nebraska, Nevada, New Hampshire, New Jersey, New Mexico, New York, North Carolina, North Dakota, Ohio, Oklahoma, Oregon, Maryland, Massachusetts, Michigan, Minnesota, Mississippi, Missouri, Pennsylvania, Rhode Island, South Carolina, South Dakota, Tennessee, Texas, Utah, Vermont, Virginia, Washington, West Virginia, Wisconsin, Wyoming)


In [12]:
val reducedStatePopEst = statePopEst.filter(col("NAME").isin(states:_*))

reducedStatePopEst: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [NAME: string, REGION: string ... 9 more fields]


In [13]:
print("Number of states in census 2010-2017 dataset: " + (reducedStatePopEst.count()-1))

Number of states in census 2010-2017 dataset: 50

In [14]:
reducedStatePopEst.printSchema

root
 |-- NAME: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- DIVISION: string (nullable = true)
 |-- POPESTIMATE2010: string (nullable = true)
 |-- POPESTIMATE2011: string (nullable = true)
 |-- POPESTIMATE2012: string (nullable = true)
 |-- POPESTIMATE2013: string (nullable = true)
 |-- POPESTIMATE2014: string (nullable = true)
 |-- POPESTIMATE2015: string (nullable = true)
 |-- POPESTIMATE2016: string (nullable = true)
 |-- POPESTIMATE2017: string (nullable = true)



### Population estimates are casted to Integers. 

### Each year receives a seperate row. This allows us to join on the year later on.

In [15]:
val popPerYear = (2010 to 2017)
    .map(year => reducedStatePopEst.select("NAME", "DIVISION", "REGION", s"POPESTIMATE${year}")
                .withColumnRenamed(s"POPESTIMATE${year}", "Population")
                .withColumn("Year", lit(year)))
    .map(df => df.withColumn("Population", $"population".cast("Int")))
    .reduce((df1, df2) => df1.union(df2))
    .withColumnRenamed("NAME","State")
    .withColumnRenamed("DIVISION","Division")
    .withColumnRenamed("REGION","Region")


popPerYear: org.apache.spark.sql.DataFrame = [State: string, Division: string ... 3 more fields]


### Resulting census dataset.

In [16]:
popPerYear.printSchema
popPerYear.show

root
 |-- State: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Year: integer (nullable = false)

+--------------------+--------+------+----------+----+
|               State|Division|Region|Population|Year|
+--------------------+--------+------+----------+----+
|             Alabama|       6|     3|   4785579|2010|
|              Alaska|       9|     4|    714015|2010|
|             Arizona|       8|     4|   6407002|2010|
|            Arkansas|       7|     3|   2921737|2010|
|          California|       9|     4|  37327690|2010|
|            Colorado|       8|     4|   5048029|2010|
|         Connecticut|       1|     1|   3580171|2010|
|            Delaware|       5|     3|    899712|2010|
|District of Columbia|       5|     3|    605040|2010|
|             Florida|       5|     3|  18846461|2010|
|             Georgia|       5|     3|   9712696|2010|
|              Hawaii|      

### Resulting death causes dataset.

In [17]:
deathCauses.show

+----+--------------------+--------------------+------+
|Year|         Death Cause|               State|Deaths|
+----+--------------------+--------------------+------+
|2016|Accidents (uninte...|             Alabama|  2755|
|2016|Accidents (uninte...|              Alaska|   439|
|2016|Accidents (uninte...|             Arizona|  4010|
|2016|Accidents (uninte...|            Arkansas|  1604|
|2016|Accidents (uninte...|          California| 13213|
|2016|Accidents (uninte...|            Colorado|  2880|
|2016|Accidents (uninte...|         Connecticut|  1978|
|2016|Accidents (uninte...|            Delaware|   516|
|2016|Accidents (uninte...|District of Columbia|   401|
|2016|Accidents (uninte...|             Florida| 12561|
|2016|Accidents (uninte...|             Georgia|  4701|
|2016|Accidents (uninte...|              Hawaii|   577|
|2016|Accidents (uninte...|               Idaho|   849|
|2016|Accidents (uninte...|            Illinois|  5508|
|2016|Accidents (uninte...|             Indiana|

### Join on year and  state to receive death causes per state and year in relation to the states population.

In [18]:
val deathCausesPerStateYear = popPerYear.join(deathCauses, Seq("Year","State"), "inner")

deathCausesPerStateYear: org.apache.spark.sql.DataFrame = [Year: int, State: string ... 5 more fields]


In [19]:
deathCausesPerStateYear.show()

+----+--------------------+--------+------+----------+--------------------+------+
|Year|               State|Division|Region|Population|         Death Cause|Deaths|
+----+--------------------+--------+------+----------+--------------------+------+
|2016|             Alabama|       6|     3|   4860545|Accidents (uninte...|  2755|
|2016|              Alaska|       9|     4|    741522|Accidents (uninte...|   439|
|2016|             Arizona|       8|     4|   6908642|Accidents (uninte...|  4010|
|2016|            Arkansas|       7|     3|   2988231|Accidents (uninte...|  1604|
|2016|          California|       9|     4|  39296476|Accidents (uninte...| 13213|
|2016|            Colorado|       8|     4|   5530105|Accidents (uninte...|  2880|
|2016|         Connecticut|       1|     1|   3587685|Accidents (uninte...|  1978|
|2016|            Delaware|       5|     3|    952698|Accidents (uninte...|   516|
|2016|District of Columbia|       5|     3|    684336|Accidents (uninte...|   401|
|201

## Analysis examples

### Finding states with the highest amount of deaths per resident:

In [20]:
deathCausesPerStateYear.createOrReplaceTempView("deathCausesPerStateYear")

val deathRatio = spark
    .sql("SELECT State, Year, Round(sum(deaths)/sum(population)*100,6) AS DeathRatio " +
         "FROM deathCausesPerStateYear " +
         "GROUP BY State, Year " +
         "ORDER BY DeathRatio desc")

deathRatio: org.apache.spark.sql.DataFrame = [State: string, Year: int ... 1 more field]


or finding the states with the highest amount of suicides per resident:

In [21]:
deathRatio.show()

+-------------+----+----------+
|        State|Year|DeathRatio|
+-------------+----+----------+
|West Virginia|2016|  0.195938|
|West Virginia|2015|  0.194881|
|West Virginia|2014|  0.189767|
|West Virginia|2011|  0.187323|
|West Virginia|2012|  0.186864|
|West Virginia|2013|  0.185908|
|West Virginia|2010|  0.182557|
|        Maine|2015|  0.171571|
|  Mississippi|2016|  0.171433|
|     Arkansas|2015|  0.171301|
|  Mississippi|2015|  0.171008|
|     Arkansas|2016|  0.170879|
|     Kentucky|2016|   0.17077|
|      Alabama|2016|  0.170654|
|      Alabama|2015|  0.169485|
|        Maine|2016|  0.168118|
|     Kentucky|2015|  0.166909|
|     Arkansas|2014|  0.164491|
|     Arkansas|2013|  0.164267|
|     Oklahoma|2015|  0.163691|
+-------------+----+----------+
only showing top 20 rows



### Finding states with the lowest and highest suicide rates.

In [34]:
deathCausesPerStateYear
    .filter($"Death Cause".contains("suicide"))
    .createOrReplaceTempView("suicidesPerStateYear")

val highestSuicideRatio = spark
    .sql("SELECT State, Year, Round(sum(deaths)/sum(population)*100,4) AS SuicideRatio " +
         "FROM suicidesPerStateYear " +
         "GROUP BY State, Year " +
         "ORDER BY SuicideRatio desc")

highestSuicideRatio: org.apache.spark.sql.DataFrame = [State: string, Year: int ... 1 more field]


In [35]:
highestSuicideRatio.show()

+----------+----+------------+
|     State|Year|SuicideRatio|
+----------+----+------------+
|   Wyoming|2012|      0.0297|
|    Alaska|2015|      0.0272|
|   Wyoming|2015|      0.0268|
|   Montana|2015|      0.0265|
|    Alaska|2016|       0.026|
|   Montana|2016|      0.0257|
|   Montana|2014|      0.0246|
|   Wyoming|2016|      0.0246|
|   Montana|2013|       0.024|
|New Mexico|2015|       0.024|
|   Wyoming|2011|      0.0233|
|   Montana|2011|      0.0233|
|   Wyoming|2010|      0.0232|
|   Montana|2012|      0.0232|
|    Alaska|2013|      0.0232|
|    Alaska|2012|       0.023|
|    Alaska|2010|       0.023|
|   Montana|2010|      0.0229|
|    Alaska|2014|      0.0227|
|New Mexico|2016|      0.0226|
+----------+----+------------+
only showing top 20 rows



In [37]:
val lowestSuicideRatio = spark
    .sql("SELECT State, Year, Round(sum(deaths)/sum(population)*100,4) AS SuicideRatio " +
         "FROM suicidesPerStateYear " +
         "GROUP BY State, Year " +
         "ORDER BY SuicideRatio asc")

lowestSuicideRatio: org.apache.spark.sql.DataFrame = [State: string, Year: int ... 1 more field]


In [38]:
lowestSuicideRatio.show()

+--------------------+----+------------+
|               State|Year|SuicideRatio|
+--------------------+----+------------+
|District of Columbia|2015|      0.0051|
|District of Columbia|2013|      0.0058|
|District of Columbia|2012|      0.0058|
|District of Columbia|2016|      0.0058|
|District of Columbia|2011|       0.006|
|District of Columbia|2010|      0.0068|
|          New Jersey|2012|      0.0077|
|          New Jersey|2016|      0.0077|
|          New Jersey|2011|      0.0078|
|District of Columbia|2014|      0.0079|
|            New York|2010|       0.008|
|          New Jersey|2010|      0.0082|
|            New York|2015|      0.0083|
|       Massachusetts|2013|      0.0085|
|            New York|2016|      0.0085|
|          New Jersey|2013|      0.0085|
|            New York|2011|      0.0085|
|            New York|2014|      0.0086|
|            New York|2013|      0.0086|
|            New York|2012|      0.0087|
+--------------------+----+------------+
only showing top