# Chapter 3: Apache Spark's Structured APIs

In [3]:
# Find path to PySpark
# import findspark
# findspark.init()

In [1]:
# Import PySpark and libraries
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import Row

In [2]:
# Build a SparkSession
spark = SparkSession.builder.appName("chapter3").getOrCreate()

# Print SparkSession
spark

In [3]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



# The DataFrame API

## Schemas and Creating DataFrames

A schema in Spark defines the column names and associated data types for a DataFrame. Most often, schemas come into play when you are reading structured data from an **external data source**.

Defining a schema up front as opposed to taking a schema-on-read approach offers three benefits:

* You relieve Spark from the onus of inferring data types.
* You prevent Spark from creating a separate job just to read a large portion of your file to ascertain the schema, which for a large data file can be expensive and time-consuming.
* You can detect errors early if data doesn’t match the schema.

Spark allows you to define a schema in two ways. One is to define it **programmatically**, and the other is to employ a **Data Definition Language (DDL) string**, which is much simpler and easier to read.

Nullable indicates if the concerned column can be null or not. It ensures that a specific column can't be null (if it's null while the nullable property is set to true, Spark will launch a `java.lang.RuntimeException` during the first action on the dataframe).

Spark uses a simple set of rules to determine nullable property when creating a Dataset from a statically typed structure:

* If an object of the given type can be null then its DataFrame representation is nullable.
* If object is an Option[_] then its DataFrame representation is nullable with None considered to be SQL NULL.
* Otherwise, it will be marked as not nullable.

In [4]:
# nullable – Whether the field to add should be nullable (default True)

rows = [Row(None, "CA"), Row("Reynold Xin", "CA")]

# if it's False raises ValueError exception: field Authors: This field is not nullable, but got None
schema = StructType([
    StructField("Authors", StringType(), True), 
    StructField("State", StringType(), True)
])

spark.createDataFrame(rows, schema).show()

+-----------+-----+
|    Authors|State|
+-----------+-----+
|       null|   CA|
|Reynold Xin|   CA|
+-----------+-----+



In [5]:
# Define schema for our data programmatically
schema = StructType([
   StructField("Id", IntegerType(), False),
   StructField("First", StringType(), False),
   StructField("Last", StringType(), False),
   StructField("Url", StringType(), False),
   StructField("Published", StringType(), False),
   StructField("Hits", IntegerType(), False),
   StructField("Campaigns", ArrayType(StringType()), False)])

# Define schema for our data using DDL
schema_ddl = """Id INT, First STRING, Last STRING, Url STRING,
            Published STRING, Hits INT, Campaigns ARRAY<STRING>"""

# Create our static data
data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]],
       [2, "Brooke","Wenig","https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]],
       [3, "Denny", "Lee", "https://tinyurl.3","6/7/2019",7659, ["web", "twitter", "FB", "LinkedIn"]],
       [4, "Tathagata", "Das","https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]],
       [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]],
       [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]]
]

In [6]:
# Create a DataFrame using the schema defined
blogs_df = spark.createDataFrame(data, schema)

# Show the DataFrame; it should reflect our table above
blogs_df.show()

# Print the schema used by Spark to process the DataFrame
print(blogs_df.printSchema())

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+

root
 |-- Id: integer (nullable = false)
 |-- First: string (nullable = false)
 |-- Last: string (nullable = false)
 |-- Url: string (nullable = false)
 |-- Published: string (nullable = false)
 |-- Hits: inte

If you want to use this schema elsewhere in your code, simply execute `blogs_df.schema` and it will return the schema definition:

In [7]:
print(blogs_df.schema)

StructType(List(StructField(Id,IntegerType,false),StructField(First,StringType,false),StructField(Last,StringType,false),StructField(Url,StringType,false),StructField(Published,StringType,false),StructField(Hits,IntegerType,false),StructField(Campaigns,ArrayType(StringType,true),false)))


## Columns and Expressions

Named columns in DataFrames are conceptually similar to named columns in pandas or R DataFrames or in an RDBMS table: **they describe a type of field**. You can list all the columns by their names, and you can perform operations on their values using relational or computational expressions. 

In Spark’s supported languages, columns are objects with public methods (represented by the Column type). Like any other function in those packages, `expr()` takes arguments that Spark will parse as an expression, computing the result.

You can list all the columns by their names, and you can perform operations on their values using relational or computational expressions.

You can also use logical or mathematical expressions on columns. 

For example, you could create a simple expression using *expr("columnName * 5")* or *(expr("columnName - 5") > col(anothercolumnName))*, where *columnName* is a Spark type (integer, string, etc.). `expr()` is part of the `pyspark.sql.functions`.

You’ll note that the Spark documentation refers to both **col** and **Column**. Column is the name of the object, while col() is a standard built-in function that returns a Column.

In [8]:
# Show columns of the dataframe
blogs_df.columns

['Id', 'First', 'Last', 'Url', 'Published', 'Hits', 'Campaigns']

In [9]:
# Access a particular column with col and it returns a Column type
blogs_df.select(F.col('Id'))

DataFrame[Id: int]

In [14]:
# Use an expression to compute a value
blogs_df.select(F.expr("Hits") * 2).show(2)
blogs_df.select(F.expr("Hits * 2")).show(2)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows



In [15]:
# Use col to compute value
blogs_df.select(F.col("Hits") * 2).show(2)

+----------+
|(Hits * 2)|
+----------+
|      9070|
|     17816|
+----------+
only showing top 2 rows



In [16]:
# Use an expression to compute big hitters for blogs
# WithColumn adds a new column, Big Hitters, based on the conditional expression
blogs_df.withColumn("Big Hitters", F.expr("Hits > 10000")).show()

+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|Big Hitters|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|      false|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|      false|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|      false|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|       true|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|       true|
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|       true|
+---+---------+-------+-----------------+---------+-----+--------------------+-----------+



In [17]:
# Concatenate three columns, create a new column, and show the newly created concatenated column
blogs_df.withColumn("AuthorsId", (F.concat(F.expr("First"), F.expr("Last"), F.expr("Id")))) \
    .select(F.col("AuthorsId")) \
    .show(4)

+-------------+
|    AuthorsId|
+-------------+
|  JulesDamji1|
| BrookeWenig2|
|    DennyLee3|
|TathagataDas4|
+-------------+
only showing top 4 rows



In [20]:
blogs_df.withColumn("AuthorsId", (F.concat_ws(" ", "First", "Last", "Id")))\
   .select("AuthorsId")\
   .show(4)

+---------------+
|      AuthorsId|
+---------------+
|  Jules Damji 1|
| Brooke Wenig 2|
|    Denny Lee 3|
|Tathagata Das 4|
+---------------+
only showing top 4 rows



In [21]:
# These statements return the same value, showing that expr is the same as a col method call
blogs_df.select(F.expr("Hits")).show(2)
blogs_df.select(F.col("Hits")).show(2)
blogs_df.select("Hits").show(2)

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows

+----+
|Hits|
+----+
|4535|
|8908|
+----+
only showing top 2 rows



In [23]:
# Sort by column "Id" in descending order
blogs_df.sort(F.col("Id"), ascending=False).show()
#blogs_df.sort($"Id".desc).show()

+---+---------+-------+-----------------+---------+-----+--------------------+
| Id|    First|   Last|              Url|Published| Hits|           Campaigns|
+---+---------+-------+-----------------+---------+-----+--------------------+
|  6|  Reynold|    Xin|https://tinyurl.6| 3/2/2015|25568| [twitter, LinkedIn]|
|  5|    Matei|Zaharia|https://tinyurl.5|5/14/2014|40578|[web, twitter, FB...|
|  4|Tathagata|    Das|https://tinyurl.4|5/12/2018|10568|       [twitter, FB]|
|  3|    Denny|    Lee|https://tinyurl.3| 6/7/2019| 7659|[web, twitter, FB...|
|  2|   Brooke|  Wenig|https://tinyurl.2| 5/5/2018| 8908| [twitter, LinkedIn]|
|  1|    Jules|  Damji|https://tinyurl.1| 1/4/2016| 4535| [twitter, LinkedIn]|
+---+---------+-------+-----------------+---------+-----+--------------------+



Both expressions sort the DataFrame column named Id in descending order: one uses an **explicit function**, `col("Id")`, to return a *Column object*, while the other uses **$** before the name of the column, which is a function in Spark that converts column named Id to a Column.

Eeach column is part of a row in a record and all the rows together constitute a DataFrame, which as we will see later in the chapter is really a Dataset[Row] in Scala.

## Rows

A row in Spark is a generic `Row object`, containing one or more columns. Each column may be of the same data type (e.g., integer or string), or they can have different types (integer, string, map, array, etc.). 

Because Row is an object in Spark and an ordered collection of fields, you can instantiate a Row in each of Spark’s supported languages and access its fields by an index starting at 0:

In [25]:
blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015", ["twitter", "LinkedIn"])
# access using index for individual items
blog_row[-1]

['twitter', 'LinkedIn']

Row objects can be used to create DataFrames if you need them for quick interactivity and exploration:

In [26]:
rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")]
authors_df = spark.createDataFrame(rows, ["Authors", "State"])
authors_df.show()

+-------------+-----+
|      Authors|State|
+-------------+-----+
|Matei Zaharia|   CA|
|  Reynold Xin|   CA|
+-------------+-----+



## Common DataFrames Operations

Spark provides an interface, [DataFrameReader](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader), that enables you to read data into a DataFrame from myriad data sources in formats such as JSON, CSV, Parquet, Text, Avro, ORC, etc. 

Likewise, to write a DataFrame back to a data source in a particular format, Spark uses [DataFrameWriter](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter).

### Using DataFrameReader and DataFrameWriter

Reading and writing are simple in Spark because of these high-level abstractions and contributions from the community to connect to a wide variety of data sources, including common NoSQL stores, RDBMSs, streaming engines such as Apache Kafka
and Kinesis, and more.

To get started, let’s read a large CSV file containing data on San Francisco Fire Department calls.1 As noted previously, we will define a schema for this file and use the DataFrameReader class and its methods to tell Spark what to do. 

Because this file contains 28 columns and over 4,380,660 records, **it’s more efficient to define a schema than have Spark infer it.**

In [29]:
# Programmatic way to define a schema
fire_schema = StructType([
    StructField('CallNumber', IntegerType(), True),
    StructField('UnitID', StringType(), True),
    StructField('IncidentNumber', IntegerType(), True),
    StructField('CallType', StringType(), True),
    StructField('CallDate', StringType(), True),
    StructField('WatchDate', StringType(), True),
    StructField('CallFinalDisposition', StringType(), True),
    StructField('AvailableDtTm', StringType(), True),
    StructField('Address', StringType(), True),
    StructField('City', StringType(), True),
    StructField('Zipcode', IntegerType(), True),
    StructField('Battalion', StringType(), True),
    StructField('StationArea', StringType(), True),
    StructField('Box', StringType(), True),
    StructField('OriginalPriority', StringType(), True),
    StructField('Priority', StringType(), True),
    StructField('FinalPriority', IntegerType(), True),
    StructField('ALSUnit', BooleanType(), True),
    StructField('CallTypeGroup', StringType(), True),
    StructField('NumAlarms', IntegerType(), True),
    StructField('UnitType', StringType(), True),
    StructField('UnitSequenceInCallDispatch', IntegerType(), True),
    StructField('FirePreventionDistrict', StringType(), True),
    StructField('SupervisorDistrict', StringType(), True),
    StructField('Neighborhood', StringType(), True),
    StructField('Location', StringType(), True),
    StructField('RowID', StringType(), True),
    StructField('Delay', FloatType(), True)
])

sf_fire_path = './data/sf-fire-calls.csv'

# Use the DataFrameReader interface to read a CSV file
fire_df = spark.read.csv(sf_fire_path, header=True, schema=fire_schema)

In [30]:
fire_df.select('CallType','CallDate','City','Zipcode').show(5)

+----------------+----------+----+-------+
|        CallType|  CallDate|City|Zipcode|
+----------------+----------+----+-------+
|  Structure Fire|01/11/2002|  SF|  94109|
|Medical Incident|01/11/2002|  SF|  94124|
|Medical Incident|01/11/2002|  SF|  94102|
|    Vehicle Fire|01/11/2002|  SF|  94110|
|          Alarms|01/11/2002|  SF|  94109|
+----------------+----------+----+-------+
only showing top 5 rows



The `spark.read.csv()` function reads in the CSV file and returns a DataFrame of rows and named columns with the types dictated in the schema.

To write the DataFrame into an external data source in your format of choice, you can use the **DataFrameWriter** interface. 

Like **DataFrameReader**, it supports multiple data sources. **Parquet**, a popular columnar format, is the default format; it uses snappy compression to compress the data. 

**If the DataFrame is written as Parquet, the schema is preserved as part of the Parquet metadata.** In this case, subsequent reads back into a DataFrame do not require you to manually supply a schema.

A common data operation is to explore and transform your data, and then persist the DataFrame in Parquet format or save it
as a SQL table. Persisting a transformed DataFrame is as easy as reading it. 

For example, to persist the DataFrame we were just working with as a file after reading it you would do the following:

In [35]:
df = (
    fire_df
    .where(F.col('CallType')=='Medical Incident')
    .select('UnitID', 'CallDate', 'Neighborhood','Zipcode')
)

df.show(5, False)

+------+----------+---------------------+-------+
|UnitID|CallDate  |Neighborhood         |Zipcode|
+------+----------+---------------------+-------+
|M17   |01/11/2002|Bayview Hunters Point|94124  |
|M41   |01/11/2002|Tenderloin           |94102  |
|E05   |01/11/2002|Japantown            |94115  |
|E06   |01/11/2002|Castro/Upper Market  |94114  |
|M07   |01/11/2002|Mission              |94110  |
+------+----------+---------------------+-------+
only showing top 5 rows



In [36]:
df.write.parquet('./output/sf-fire-calls.parquet')

In [116]:
parquet_path = './datasets/sf-fire-calls.parquet'
# fire_df.write.format('parquet').save(parquet_path)

Alternatively, you can save it as a table, which registers metadata with the Hive metastore (we will cover SQL managed and unmanaged tables, metastores, and DataFrames in the next chapter):

In [None]:
# parquet_table = 
# fire_df.write.format('parquet').saveAsTable(parquet_table)

### Transformations and actions

Now that you have a distributed DataFrame composed of San Francisco Fire Department calls in memory, the first thing you as a developer will want to do is examine your data to see what the columns look like. 

Are they of the correct types? Do any of them need to be converted to different types? Do they have null values?

#### Projections and filters

A projection in relational parlance is a way to return only the rows matching a certain relational condition by using filters. In Spark, projections are done with the `select()` method, while filters can be expressed using the `filter()` or `where()` method. 

We can use this technique to examine specific aspects of our SF Fire Department data set:

In [38]:
few_fire_df = (fire_df
               .select("IncidentNumber", "AvailableDtTm", "CallType")
               .where(F.col("CallType") != "Medical Incident"))

few_fire_df.show(5, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [42]:
# return number of distinct types of calls using countDistinct()
(fire_df
 .select("CallType")
 .where(F.col("CallType").isNotNull())
 .agg(F.countDistinct("CallType").alias("DistinctCallTypes"))
 .show()
)

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+



In [43]:
# filter for only distinct non-null CallTypes from all the rows
(fire_df
 .select("CallType")
 .where(F.col("CallType").isNotNull())
 .distinct()
 .show(10, False)
)

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
|Alarms                             |
|Odor (Strange / Unknown)           |
|Citizen Assist / Service Call      |
|HazMat                             |
|Watercraft in Distress             |
+-----------------------------------+
only showing top 10 rows



#### Renaming, adding and dropping columns

By specifying the desired column names in the schema with StructField, we effectively changed all names in the resulting DataFrame.

Alternatively, you could selectively rename columns with the `withColumnRenamed()` method.

In [44]:
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins")

(new_fire_df
.select("ResponseDelayedinMins")
.where(F.col("ResponseDelayedinMins") > 5)
.show(5, False))

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



Because DataFrame transformations are immutable, when we rename a column using withColumnRenamed() we get a new DataFrame while retaining the original with the old column name.

Modifying the contents of a column or its type are common operations during data exploration.

For example, in our SF Fire Department data set, the columns CallDate, WatchDate, and AlarmDtTm are strings rather than either Unix timestamps or SQL dates, both of which Spark supports and can easily manipulate during transformations or actions (e.g., during a date- or timebased analysis of the data).

So how do we convert them into a more usable format? It’s quite simple, thanks to some high-level API methods. `spark.sql.functions` has a set of to/from date/timestamp functions such as `to_timestamp()` and `to_date()` that we can use for just this purpose:

In [48]:
new_fire_df.select("CallDate", "WatchDate", "AvailableDtTm").show(3, False)

print(new_fire_df.select("CallDate", "WatchDate", "AvailableDtTm").dtypes)

+----------+----------+----------------------+
|CallDate  |WatchDate |AvailableDtTm         |
+----------+----------+----------------------+
|01/11/2002|01/10/2002|01/11/2002 01:51:44 AM|
|01/11/2002|01/10/2002|01/11/2002 03:01:18 AM|
|01/11/2002|01/10/2002|01/11/2002 02:39:50 AM|
+----------+----------+----------------------+
only showing top 3 rows

[('CallDate', 'string'), ('WatchDate', 'string'), ('AvailableDtTm', 'string')]


In [49]:
fire_ts_df = (
    new_fire_df
    .withColumn("IncidentDate", F.to_timestamp(F.col("CallDate"), "MM/dd/yyyy")).drop("CallDate")
    .withColumn("OnWatchDate", F.to_timestamp(F.col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")
    .withColumn("AvailableDtTS", F.to_timestamp(F.col("AvailableDtTm"),"MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm")
)

# Select the converted columns
fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").show(5, False)

print(fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS").dtypes)

+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows

[('IncidentDate', 'timestamp'), ('OnWatchDate', 'timestamp'), ('AvailableDtTS', 'timestamp')]


Now that we have modified the dates, we can query using functions from spark.sql.functions like `month()`, `year()`, and `day()` to explore our data further.

In [50]:
(fire_ts_df
 .select(F.year('IncidentDate'))
 .distinct()
 .orderBy(F.year('IncidentDate'))
 .show()
)

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



#### Aggregations

A handful of transformations and actions on DataFrames, such as `groupBy()`, `orderBy()`, and `count()`, offer the ability to aggregate by column names and then aggregate counts across them.

For larger DataFrames on which you plan to conduct frequent or repeated queries, you could benefit from caching. We will cover
DataFrame caching strategies and their benefits in later chapters.

In [51]:
# what were the most common types of fire calls?
(fire_ts_df
 .select("CallType")
 .where(F.col("CallType").isNotNull())
 .groupBy("CallType")
 .count()
 .orderBy("count", ascending=False)
 .show(n=10, truncate=False)
)

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



The DataFrame API also offers the `collect()` method, but for extremely large DataFrames this is resource-heavy (expensive) and
dangerous, as it can cause out-of-memory (OOM) exceptions.

Unlike `count()`, which returns a single number to the driver, `collect()` returns a collection of all the Row objects in the entire DataFrame or Dataset. 

If you want to take a peek at some Row records you’re better off with `take(n)`, which will return only the first n Row objects of the DataFrame.

#### Other common DataFrame operations

Along with all the others we’ve seen, the DataFrame API provides descriptive statistical methods like `min()`, `max()`, `sum()` and `avg()`.

In [54]:
(fire_ts_df.select(F.sum("NumAlarms"), 
                   F.avg("ResponseDelayedinMins"),
                   F.min("ResponseDelayedinMins"), 
                   F.max("ResponseDelayedinMins"))
.show())

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



## End-to-End DataFrame Example

* What were all the different types of fire calls in 2018?
* What months within the year 2018 saw the highest number of fire calls?
* Which neighborhood in San Francisco generated the most fire calls in 2018?
* Which neighborhoods had the worst response times to fire calls in 2018?
* Which week in the year in 2018 had the most fire calls?
* Is there a correlation between neighborhood, zip code, and number of fire calls?
* How can we use Parquet files or SQL tables to store this data and read it back?

In [55]:
# What were all the different types of fire calls in 2018?

(fire_ts_df.select('CallType')
 .where((F.col('CallType').isNotNull()) & (F.year('IncidentDate')==2018))
 .groupBy('CallType')
 .count()
 .orderBy('count', ascending=False)
 .show(truncate=False)
)

+-------------------------------+-----+
|CallType                       |count|
+-------------------------------+-----+
|Medical Incident               |7004 |
|Alarms                         |1144 |
|Structure Fire                 |906  |
|Traffic Collision              |433  |
|Outside Fire                   |153  |
|Other                          |114  |
|Citizen Assist / Service Call  |113  |
|Gas Leak (Natural and LP Gases)|69   |
|Water Rescue                   |43   |
|Elevator / Escalator Rescue    |36   |
|Electrical Hazard              |30   |
|Smoke Investigation (Outside)  |28   |
|Vehicle Fire                   |28   |
|Odor (Strange / Unknown)       |10   |
|Fuel Spill                     |10   |
|HazMat                         |5    |
|Train / Rail Incident          |5    |
|Suspicious Package             |3    |
|Explosion                      |1    |
|Assist Police                  |1    |
+-------------------------------+-----+



In [56]:
# What months within the year 2018 saw the highest number of fire calls?

(fire_ts_df.select('IncidentDate')
 .where(F.year('IncidentDate')==2018)
 .groupBy(F.month('IncidentDate'))
 .count()
 .orderBy('count', ascending=False)
 .show(truncate=False)
)

+-------------------+-----+
|month(IncidentDate)|count|
+-------------------+-----+
|10                 |1068 |
|5                  |1047 |
|3                  |1029 |
|8                  |1021 |
|1                  |1007 |
|7                  |974  |
|6                  |974  |
|9                  |951  |
|4                  |947  |
|2                  |919  |
|11                 |199  |
+-------------------+-----+



In [57]:
# Which neighborhood in San Francisco generated the most fire calls in 2018?

(fire_ts_df
 .where(F.year('IncidentDate')==2018)
 .groupBy('Neighborhood')
 .count()
 .orderBy('count', ascending=False)
 .show(5, truncate=False)
)

+------------------------------+-----+
|Neighborhood                  |count|
+------------------------------+-----+
|Tenderloin                    |1393 |
|South of Market               |1053 |
|Mission                       |913  |
|Financial District/South Beach|772  |
|Bayview Hunters Point         |522  |
+------------------------------+-----+
only showing top 5 rows



## The Dataset API

Spark 2.0 unified the DataFrame and Dataset APIs as Structured APIs with similar interfaces so that developers would only have to learn a single set of APIs.

Datasets take on two characteristics: **typed and untyped APIs**.

Conceptually, you can think of a **DataFrame** in Scala as an alias for a collection of generic objects, `Dataset[Row]`, where a Row is a generic untyped JVM object that may hold different types of fields. 

A **Dataset**, by contrast, is a collection of strongly typed JVM objects in Scala or a class in Java.

**Datasets make sense only in Java and Scala**, whereas in Python and R only DataFrames make sense. This is because Python and R are not compile-time type-safe; types are dynamically inferred or assigned during execution, not during compile time.

The reverse is true in Scala and Java: types are bound to variables and objects at compile time. In Scala, however, a DataFrame is just an alias for untyped Dataset[Row].

https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/

## Spark SQL and the Underlying Engine

Spark SQL allows developers to issue ANSI SQL:2003–compatible queries on structured data with a schema. Apart from that, the Spark SQL engine:

* Unifies Spark components and permits abstraction to DataFrames/Datasets in Java, Scala, Python, and R, which simplifies working with structured data sets.
* Connects to the Apache Hive metastore and tables.
* Reads and writes structured data with a specific schema from structured file formats (JSON, CSV, Text, Avro, Parquet, ORC, etc.) and converts data into temporary tables.
* Offers an interactive Spark SQL shell for quick data exploration.
* Provides a bridge to (and from) external tools via standard database JDBC/ODBC connectors.
* Generates optimized query plans and compact code for the JVM, for final execution.

At the core of the Spark SQL engine are the Catalyst optimizer and Project Tungsten. Together, these support the high-level DataFrame and Dataset APIs and SQL queries.

In [58]:
# Stop the SparkSession
spark.stop()