In [1]:
import pyspark.sql as sql
from pyspark.sql import SparkSession

<h1>Creating DataFrames

In [2]:
spark = SparkSession.builder.master('local[*]').appName("Spark-App-Find-Deaths").getOrCreate()

In [3]:
df = spark.read.csv("data/death-causes.csv", header=True)

In [4]:
df.show()

+----+--------------------+--------------------+--------------+------+-----------------------+
|Year|      113_Cause_Name|          Cause_Name|         State|Deaths|Age-adjusted_Death_Rate|
+----+--------------------+--------------------+--------------+------+-----------------------+
|2008|Influenza and pne...|Influenza and pne...|  South Dakota|   177|                     17|
|2005|Malignant neoplas...|              Cancer|   Connecticut|  7052|                  176.8|
|2007|Intentional self-...|             Suicide|     Wisconsin|   729|                   12.7|
|2016|Accidents (uninte...|Unintentional inj...|      Nebraska|   772|                     37|
|2013|Diseases of heart...|       Heart disease|  North Dakota|  1382|                  150.7|
|2009|Diabetes mellitus...|            Diabetes|  North Dakota|   214|                   25.4|
|2011|Nephritis, nephro...|      Kidney disease|          Ohio|  1943|                   14.2|
|2000|Diseases of heart...|       Heart disease|  

<h1>DataFrame Operations

In [5]:
df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- 113_Cause_Name: string (nullable = true)
 |-- Cause_Name: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Deaths: string (nullable = true)
 |-- Age-adjusted_Death_Rate: string (nullable = true)



In [6]:
df.dtypes

[('Year', 'string'),
 ('113_Cause_Name', 'string'),
 ('Cause_Name', 'string'),
 ('State', 'string'),
 ('Deaths', 'string'),
 ('Age-adjusted_Death_Rate', 'string')]

In [7]:
df = df.withColumnRenamed("Cause_Name", "Causes")

In [8]:
df = df.withColumn("Deaths", df["Deaths"].cast(sql.types.IntegerType()))
df = df.withColumn("Age-adjusted_Death_Rate", df["Age-adjusted_Death_Rate"].cast(sql.types.FloatType()))
df.dtypes

[('Year', 'string'),
 ('113_Cause_Name', 'string'),
 ('Causes', 'string'),
 ('State', 'string'),
 ('Deaths', 'int'),
 ('Age-adjusted_Death_Rate', 'float')]

In [9]:
df.select("Year").show(10)

+----+
|Year|
+----+
|2008|
|2005|
|2007|
|2016|
|2013|
|2009|
|2011|
|2000|
|2003|
|2013|
+----+
only showing top 10 rows



In [10]:
df.filter(df['Causes'] == "Cancer").show()

+----+--------------------+------+-------------+------+-----------------------+
|Year|      113_Cause_Name|Causes|        State|Deaths|Age-adjusted_Death_Rate|
+----+--------------------+------+-------------+------+-----------------------+
|2005|Malignant neoplas...|Cancer|  Connecticut|  7052|                  176.8|
|2003|Malignant neoplas...|Cancer|New Hampshire|  2485|                  193.7|
|1999|Malignant neoplas...|Cancer|      Montana|  1854|                  195.1|
|2012|Malignant neoplas...|Cancer| North Dakota|  1253|                  150.7|
|2015|Malignant neoplas...|Cancer|      Indiana| 13511|                  176.3|
|2002|Malignant neoplas...|Cancer| Rhode Island|  2404|                  199.3|
|2014|Malignant neoplas...|Cancer|      Alabama| 10286|                  177.6|
|2016|Malignant neoplas...|Cancer|         Utah|  3125|                  122.4|
|2012|Malignant neoplas...|Cancer|      Florida| 42188|                  158.8|
|2001|Malignant neoplas...|Cancer|    Lo

In [11]:
df.groupBy("Causes").count().show()

+--------------------+-----+
|              Causes|count|
+--------------------+-----+
|Influenza and pne...|  906|
|                CLRD|  907|
|            Diabetes|  903|
|              Stroke|  918|
|             Suicide|  919|
|              Cancer|  907|
|      Kidney disease|  908|
|       Heart disease|  910|
|Unintentional inj...|  913|
| Alzheimer's disease|  909|
|          All causes|  900|
+--------------------+-----+



In [12]:
df.groupBy(["Year", "Causes"]).sum("Deaths").show()

+----+-------------------+-----------+
|Year|             Causes|sum(Deaths)|
+----+-------------------+-----------+
|2009|           Diabetes|     130927|
|2000|               CLRD|     244018|
|2001|Alzheimer's disease|     107111|
|2011|           Diabetes|     147662|
|2003|             Stroke|     305479|
|2007|               CLRD|     255391|
|2014|         All causes|    5097328|
|2004|               CLRD|     243379|
|2006|     Kidney disease|      90688|
|2002|     Kidney disease|      73086|
|2012|     Kidney disease|      90194|
|2000|      Heart disease|    1409172|
|2013|      Heart disease|    1207118|
|2016|             Stroke|     284284|
|2009|             Stroke|     257684|
|2009|         All causes|    4831139|
|2010|             Cancer|    1146048|
|2004|     Kidney disease|      84607|
|2008|               CLRD|     275413|
|2016|Alzheimer's disease|     231485|
+----+-------------------+-----------+
only showing top 20 rows



<h1>Running SQL Queries Programmatically

In [13]:
df.createOrReplaceTempView("deaths")

In [14]:
sqlDF = spark.sql("SELECT * FROM deaths WHERE YEAR==2010")
sqlDF.show()

+----+--------------------+--------------------+--------------+------+-----------------------+
|Year|      113_Cause_Name|              Causes|         State|Deaths|Age-adjusted_Death_Rate|
+----+--------------------+--------------------+--------------+------+-----------------------+
|2010|Diabetes mellitus...|            Diabetes|       Indiana|  1587|                   22.7|
|2010|Intentional self-...|             Suicide|   Mississippi|   388|                   13.0|
|2010|Diseases of heart...|       Heart disease|      Maryland| 10915|                  182.2|
|2010|Diseases of heart...|       Heart disease|    New Mexico|  3224|                  151.2|
|2010|Accidents (uninte...|Unintentional inj...|         Maine|   540|                   36.6|
|2010|Influenza and pne...|Influenza and pne...|        Kansas|   550|                   16.4|
|2010|Intentional self-...|             Suicide|  Rhode Island|   129|                   12.3|
|2010|Intentional self-...|             Suicide|  

In [15]:
sqlDF = spark.sql("SELECT Year, Causes, sum(Deaths) FROM deaths GROUP BY Year,Causes")
sqlDF.show()

+----+-------------------+-----------+
|Year|             Causes|sum(Deaths)|
+----+-------------------+-----------+
|2009|           Diabetes|     130927|
|2000|               CLRD|     244018|
|2001|Alzheimer's disease|     107111|
|2011|           Diabetes|     147662|
|2003|             Stroke|     305479|
|2007|               CLRD|     255391|
|2014|         All causes|    5097328|
|2004|               CLRD|     243379|
|2006|     Kidney disease|      90688|
|2002|     Kidney disease|      73086|
|2012|     Kidney disease|      90194|
|2000|      Heart disease|    1409172|
|2013|      Heart disease|    1207118|
|2016|             Stroke|     284284|
|2009|             Stroke|     257684|
|2009|         All causes|    4831139|
|2010|             Cancer|    1146048|
|2004|     Kidney disease|      84607|
|2008|               CLRD|     275413|
|2016|Alzheimer's disease|     231485|
+----+-------------------+-----------+
only showing top 20 rows



<h1>Global Temporary View

In [16]:
df.createGlobalTempView("global_deaths")

In [17]:
spark.sql("SELECT * FROM global_temp.global_deaths").show()

+----+--------------------+--------------------+--------------+------+-----------------------+
|Year|      113_Cause_Name|              Causes|         State|Deaths|Age-adjusted_Death_Rate|
+----+--------------------+--------------------+--------------+------+-----------------------+
|2008|Influenza and pne...|Influenza and pne...|  South Dakota|   177|                   17.0|
|2005|Malignant neoplas...|              Cancer|   Connecticut|  7052|                  176.8|
|2007|Intentional self-...|             Suicide|     Wisconsin|   729|                   12.7|
|2016|Accidents (uninte...|Unintentional inj...|      Nebraska|   772|                   37.0|
|2013|Diseases of heart...|       Heart disease|  North Dakota|  1382|                  150.7|
|2009|Diabetes mellitus...|            Diabetes|  North Dakota|   214|                   25.4|
|2011|Nephritis, nephro...|      Kidney disease|          Ohio|  1943|                   14.2|
|2000|Diseases of heart...|       Heart disease|  

<h1>Interoperating with RDDs

<h2>Inferring the Schema Using Reflection

In [18]:
from pyspark.sql import Row

In [19]:
sc = spark.sparkContext

In [20]:
lines = sc.textFile("death-causes.txt")

In [21]:
parts = lines.map(lambda l: l.split(","))

In [22]:
deathsDF = parts.map(lambda d: Row(Year=d[0], Causes=d[1], State=d[2], Deaths=d[3], Age_adjusted_Death_Rate=d[4]))

In [23]:
schemaDeaths = spark.createDataFrame(deathsDF)

In [24]:
schemaDeaths = schemaDeaths.withColumn("Deaths", schemaDeaths["Deaths"].cast(sql.types.IntegerType()))
schemaDeaths = schemaDeaths.withColumn("Age-adjusted_Death_Rate", schemaDeaths["Age_adjusted_Death_Rate"].cast(sql.types.FloatType()))
schemaDeaths.groupBy(["Year", "Causes"]).sum("Deaths").show()

+----+-------------------+-----------+
|Year|             Causes|sum(Deaths)|
+----+-------------------+-----------+
|2009|           Diabetes|     130927|
|2000|               CLRD|     244018|
|2001|Alzheimer's disease|     107111|
|2011|           Diabetes|     147662|
|2003|             Stroke|     305479|
|2007|               CLRD|     255391|
|2014|         All causes|    5097328|
|2004|               CLRD|     243379|
|2006|     Kidney disease|      90688|
|2002|     Kidney disease|      73086|
|2012|     Kidney disease|      90194|
|2000|      Heart disease|    1409172|
|2013|      Heart disease|    1207118|
|2016|             Stroke|     284284|
|2009|             Stroke|     257684|
|2009|         All causes|    4831139|
|2010|             Cancer|    1146048|
|2004|     Kidney disease|      84607|
|2008|               CLRD|     275413|
|2016|Alzheimer's disease|     231485|
+----+-------------------+-----------+
only showing top 20 rows



In [25]:
schemaDeaths.createOrReplaceTempView("deathsTempView")

In [26]:
age_rate = spark.sql("SELECT * FROM deathsTempView")
age_rate.show()

+-----------------------+--------------------+------+--------------+----+-----------------------+
|Age_adjusted_Death_Rate|              Causes|Deaths|         State|Year|Age-adjusted_Death_Rate|
+-----------------------+--------------------+------+--------------+----+-----------------------+
|                     17|Influenza and pne...|   177|  South Dakota|2008|                   17.0|
|                  176.8|              Cancer|  7052|   Connecticut|2005|                  176.8|
|                   12.7|             Suicide|   729|     Wisconsin|2007|                   12.7|
|                     37|Unintentional inj...|   772|      Nebraska|2016|                   37.0|
|                  150.7|       Heart disease|  1382|  North Dakota|2013|                  150.7|
|                   25.4|            Diabetes|   214|  North Dakota|2009|                   25.4|
|                   14.2|      Kidney disease|  1943|          Ohio|2011|                   14.2|
|                  2

<h2>Programmatically Specifying the Schema

In [27]:
from pyspark.sql.types import *

In [28]:
lines = sc.textFile("death-causes.txt")

In [29]:
parts = lines.map(lambda l: l.split(","))

In [30]:
deaths = parts.map(lambda d: (d[0], d[1], d[2], d[3], d[4]))

In [31]:
schemaString = "Year Causes Deaths State Age_adjusted_Death_Rate"

In [32]:
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

In [33]:
schemaDeaths = spark.createDataFrame(deaths, schema)

In [34]:
schemaDeaths.createOrReplaceTempView("deathTempView")

In [35]:
results = spark.sql("SELECT * FROM deathTempView")

In [36]:
results.show()

+----+--------------------+--------------+-----+-----------------------+
|Year|              Causes|        Deaths|State|Age_adjusted_Death_Rate|
+----+--------------------+--------------+-----+-----------------------+
|2008|Influenza and pne...|  South Dakota|  177|                     17|
|2005|              Cancer|   Connecticut| 7052|                  176.8|
|2007|             Suicide|     Wisconsin|  729|                   12.7|
|2016|Unintentional inj...|      Nebraska|  772|                     37|
|2013|       Heart disease|  North Dakota| 1382|                  150.7|
|2009|            Diabetes|  North Dakota|  214|                   25.4|
|2011|      Kidney disease|          Ohio| 1943|                   14.2|
|2000|       Heart disease|      Delaware| 1983|                  262.5|
|2003|              Cancer| New Hampshire| 2485|                  193.7|
|2013|             Suicide|North Carolina| 1284|                   12.6|
|2009|       Heart disease|      Arkansas| 7295|   