d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px; height: 163px">
</div>

# Transformations & Actions Lab

## Learning Objectives
In this lab, you will:
- Explore data to understand contents
- Validate data consistency
- Derive new fields from current data format
- Convert fields where necessary
- Calculate aggregates
- Save aggregate data

### Skills Explored
* Explore and use DataFrames and SQL transformations
* Gain familiarity with various methods in `spark.sql.functions` by reading API docs
* Review how jobs are triggered in association with actions
* Reinforce concepts of how Spark executes logic against a data source

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Getting Started

Run the following cell to configure our "classroom."

In [4]:
%run ./Includes/Classroom-Setup

## Overview of the Data

This lab reuses the weather data from the previous lab.

The data include multiple entries from a selection of weather stations, including average temperatures recorded in either Fahrenheit or Celcius. The schema for the table:

|ColumnName  | DataType| Description|
|------------|---------|------------|
|NAME        |string   | Station name |
|STATION     |string   | Unique ID |
|LATITUDE    |float    | Latitude |
|LONGITUDE   |float    | Longitude |
|ELEVATION   |float    | Elevation |
|DATE        |date     | YYYY-MM-DD |
|UNIT        |string   | Temperature units |
|TAVG        |float    | Average temperature |

While the total number of rows in this dataset would make manual exploration extremely inefficient, many aggregations on these data produce a small enough output for manual review.

## Register Table and Load Data to DataFrame

The following cell re-executes the logic from the last lab and ensures all students will have the same environment.

#### A Breakdown of Operations
1. The first line drops the table `weather` if it exists. This ensures that no conflicts in the metastore will occur.
1. The schema is defined in SQL DDL. Note that column names are provided in all caps to match the formatting of the source files.
1. The `sourcePath` specifies the parquet data to be read. The source directory is read-only and was mounted above when running `Includes/Classroom-Setup`.
1. The `tablePath` variable specifies where the files associated with the unmanaged table will be stored. The `userhome` portion points to a directory created with each student's username on the default object store (root DBFS) associated with the Databricks workspace.
1. The multiline block of code that includes `spark.read...write...saveAsTable` specifies a source format, schema, and path for reading. The data from the source are copied to the destination `tablePath`, overwriting any data that may already exist in that directory. The table `weather` is registered to the specified files in the destination path using the schema provided. Note that this logic takes advantage of parquet being the default format when writing with Spark (the data written in the `tablePath` will be parquet files).
1. The final line creates a DataFrame from the `weather` table.

The table `weather` and the DataFrame `weatherDF` share the same metadata definitions, meaning that both the schema and the files referenced are identical. This provides analogous access to the data through either the DataFrames API or Spark SQL. Changes to the data saved in the `tablePath` will be immediately reflected in subsequent queries through either of these APIs.

In [7]:
spark.sql("DROP TABLE IF EXISTS weather")

schemaDDL = "NAME STRING, STATION STRING, LATITUDE FLOAT, LONGITUDE FLOAT, ELEVATION FLOAT, DATE DATE, UNIT STRING, TAVG FLOAT"

sourcePath = "/mnt/training/weather/StationData/stationData.parquet/"

tablePath = f"{userhome}/weather"

(spark.read
  .format("parquet")
  .schema(schemaDDL)
  .load(sourcePath)
  .write
  .option("path", tablePath)
  .mode("overwrite")
  .saveAsTable("weather"))

weatherDF = spark.table("weather")

## Import Needed Functions

Based on user preference, this lab can be completed using SQL, Python, or Scala. Remember that SQL queries and the DataFrames API can be bridged by using `spark.sql`, which will return a DataFrame object.

Many of the methods used for DataFrame transformations live within the `sql.functions` module. Links to the Scala and Python API docs are provided here:

- [pyspark.sql.functions API docs](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)
- [Scala spark.sql.functions API docs](https://spark.apache.org/docs/2.2.1/api/java/org/apache/spark/sql/functions.html)

Note that the methods in each will compile to the same plan on execution. Some individuals report navigation of the Scala docs to be easier, even when coding in Python.

If coding in SQL, the built-in functions can be found [here](https://spark.apache.org/docs/2.3.1/api/sql/index.html).

Extending on SQL, _most_ of the operations from Hive DML are also supported; full docs [here](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML).

**The cell below currently only imports the `col` function. Feel free to append to the import list and re-execute this cell, or import functions as needed later in the lab.**

In [9]:
from pyspark.sql.functions import col # add additional methods as a comma-separated list

# Alternatively, you can import all functions by commenting out and executing the following line:

# from pyspark.sql.functions import *


-sandbox
## Preview 20 Lines of the Data

Begin by displaying 20 lines of the data to get an idea of how the data is formatted.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> `limit`, `show`, `head`, and `take` will all accomplish this, but will trigger different numbers of jobs.

In [11]:
# ANSWER

display(weatherDF.limit(20))

NAME,STATION,LATITUDE,LONGITUDE,ELEVATION,DATE,UNIT,TAVG
"HAYWARD AIR TERMINAL, CA US",USW00093228,37.6542,-122.115,13.1,2018-05-27,F,61.0
"BIG ROCK CALIFORNIA, CA US",USR0000CBIR,38.0394,-122.57,457.2,2018-01-05,C,11.7
"SAN FRANCISCO INTERNATIONAL AIRPORT, CA US",USW00023234,37.6197,-122.3647,2.4,2018-02-24,C,8.3
"LAS TRAMPAS CALIFORNIA, CA US",USR0000CTRA,37.8339,-122.0669,536.4,2018-03-26,C,9.4
"HOUSTON INTERCONTINENTAL AIRPORT, TX US",USW00012960,29.98,-95.36,29.0,2018-05-25,F,80.0
"BIG ROCK CALIFORNIA, CA US",USR0000CBIR,38.0394,-122.57,457.2,2018-05-16,C,11.1
"BLACK DIAMOND CALIFORNIA, CA US",USR0000CBKD,37.95,-121.8844,487.7,2018-05-25,C,10.6
"LAS TRAMPAS CALIFORNIA, CA US",USR0000CTRA,37.8339,-122.0669,536.4,2018-05-21,C,11.7
"WOODACRE CALIFORNIA, CA US",USR0000CWOO,37.9906,-122.6447,426.7,2018-05-26,F,53.0
"BRIONES CALIFORNIA, CA US",USR0000CBRI,37.9442,-122.1178,442.0,2018-04-08,F,53.0


In [12]:
%sql
-- ANSWER

SELECT * 
FROM weather
LIMIT 20

NAME,STATION,LATITUDE,LONGITUDE,ELEVATION,DATE,UNIT,TAVG
"HAYWARD AIR TERMINAL, CA US",USW00093228,37.6542,-122.115,13.1,2018-05-27,F,61.0
"BIG ROCK CALIFORNIA, CA US",USR0000CBIR,38.0394,-122.57,457.2,2018-01-05,C,11.7
"SAN FRANCISCO INTERNATIONAL AIRPORT, CA US",USW00023234,37.6197,-122.3647,2.4,2018-02-24,C,8.3
"LAS TRAMPAS CALIFORNIA, CA US",USR0000CTRA,37.8339,-122.0669,536.4,2018-03-26,C,9.4
"HOUSTON INTERCONTINENTAL AIRPORT, TX US",USW00012960,29.98,-95.36,29.0,2018-05-25,F,80.0
"BIG ROCK CALIFORNIA, CA US",USR0000CBIR,38.0394,-122.57,457.2,2018-05-16,C,11.1
"BLACK DIAMOND CALIFORNIA, CA US",USR0000CBKD,37.95,-121.8844,487.7,2018-05-25,C,10.6
"LAS TRAMPAS CALIFORNIA, CA US",USR0000CTRA,37.8339,-122.0669,536.4,2018-05-21,C,11.7
"WOODACRE CALIFORNIA, CA US",USR0000CWOO,37.9906,-122.6447,426.7,2018-05-26,F,53.0
"BRIONES CALIFORNIA, CA US",USR0000CBRI,37.9442,-122.1178,442.0,2018-04-08,F,53.0


Limiting the view of the data to 20 lines is a transformation, but any time data is returned to display, an action (and **_at least 1_** job) will be triggered.

## Define a New DataFrame or View Containing All Distinct Names

Because of lazy evaluation, DataFrames and views don't execute until an action is called against them.

Registering intermediate temp views or DataFrames essentially allow a set of transformations against a dataset to be given a name. This can be helpful when building up complex logic, as no data will be replicated when an intermediate state will be used multiple times in later queries.

Use the `distinct` command on the `NAME` column and save the result to a new DataFame named `uniqueNamesDF`. Note that your definition should not trigger a job.

In [15]:
# ANSWER

uniqueNamesDF = weatherDF.select("NAME").distinct()

In [16]:
%sql
-- ANSWER

CREATE OR REPLACE TEMP VIEW unique_names
AS (
  SELECT DISTINCT(NAME)
  FROM weather
  )

-sandbox
Now return the count and display the unique names. Each of these is a separate action.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> `display()` will suppress any console output, so do this in 2 cells.

In [18]:
# ANSWER
display(uniqueNamesDF)

NAME
"BIG ROCK CALIFORNIA, CA US"
"PULGAS CALIFORNIA, CA US"
"LOS PRIETOS CALIFORNIA, CA US"
"SAN FRANCISCO INTERNATIONAL AIRPORT, CA US"
"LAS TRAMPAS CALIFORNIA, CA US"
"BARNABY CALIFORNIA, CA US"
"HOUSTON WILLIAM P HOBBY AIRPORT, TX US"
"SPRING VALLEY CALIFORNIA, CA US"
"OAKLAND NORTH CALIFORNIA, CA US"
"OAKLAND SOUTH CALIFORNIA, CA US"


In [19]:
%sql
-- ANSWER
SELECT * FROM unique_names

NAME
"BIG ROCK CALIFORNIA, CA US"
"PULGAS CALIFORNIA, CA US"
"LOS PRIETOS CALIFORNIA, CA US"
"SAN FRANCISCO INTERNATIONAL AIRPORT, CA US"
"LAS TRAMPAS CALIFORNIA, CA US"
"BARNABY CALIFORNIA, CA US"
"HOUSTON WILLIAM P HOBBY AIRPORT, TX US"
"OAKLAND NORTH CALIFORNIA, CA US"
"SPRING VALLEY CALIFORNIA, CA US"
"OAKLAND SOUTH CALIFORNIA, CA US"


In [20]:
# ANSWER
uniqueNamesDF.count()

In [21]:
%sql
-- ANSWER
SELECT COUNT(*) FROM unique_names

count(1)
17


Again, the lazy evaluation in Spark waits until these results need to be returned to trigger a job.

## Confirm Station Information Consistency
The fields `NAME`, `STATION`, `LATITUDE`, `LONGITUDE`, and `ELEVATION` should remain consistent for each unique station throughout the data. 
If this is true, the count of distinct names should be equivalent to the count of the distinct combinations of these five columns in the present data. Write a query that confirms this using the DataFrame or view defined in the previous step as a point of reference.

In [24]:
# ANSWER

weatherDF.select("NAME", "STATION", "LATITUDE", "LONGITUDE", "ELEVATION").distinct().count() == uniqueNamesDF.count()

In [25]:
%sql
-- ANSWER
SELECT (
  SELECT COUNT(DISTINCT NAME, STATION, LATITUDE, LONGITUDE, ELEVATION)
  FROM weather
  ) = (
  SELECT COUNT(*) FROM unique_names
  )

(scalarsubquery() = scalarsubquery())
True


## Examine Date Range for Each Station

Create a DataFrame or view containing the earliest and latest date recorded for each station, alongside the total count of records.

After a `groupBy`, the `agg` function will allow multiple aggregate calls in a single query.

Make sure to `alias` the aggregate columns, as the default outputs will include parentheses (which aren't valid when saving to parquet).

In [27]:
# ANSWER
from pyspark.sql.functions import min, max, count

stationAggDF = (weatherDF.groupBy("NAME").agg(
  min("DATE").alias("date_min"),
  max("DATE").alias("date_max"),
  count("DATE").alias("count")))

In [28]:
%sql
-- ANSWER

CREATE OR REPLACE TEMP VIEW station_agg_temp
AS (
  SELECT NAME, MIN(DATE) date_min, MAX(DATE) date_max, COUNT(DATE) count 
  FROM weather
  GROUP BY NAME
  )

Displaying the results of this aggregation to the notebook allows the user to manually review an interactive table of the results.

In [30]:
display(stationAggDF)

NAME,date_min,date_max,count
"BIG ROCK CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"PULGAS CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"LOS PRIETOS CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"SAN FRANCISCO INTERNATIONAL AIRPORT, CA US",2018-01-01,2018-05-31,149
"LAS TRAMPAS CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"BARNABY CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"HOUSTON WILLIAM P HOBBY AIRPORT, TX US",2018-01-01,2018-05-31,150
"OAKLAND NORTH CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"SPRING VALLEY CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"OAKLAND SOUTH CALIFORNIA, CA US",2018-01-01,2018-05-31,151


In [31]:
%sql
-- ANSWER
SELECT * FROM station_agg_temp

NAME,date_min,date_max,count
"BIG ROCK CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"PULGAS CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"LOS PRIETOS CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"SAN FRANCISCO INTERNATIONAL AIRPORT, CA US",2018-01-01,2018-05-31,149
"LAS TRAMPAS CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"BARNABY CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"HOUSTON WILLIAM P HOBBY AIRPORT, TX US",2018-01-01,2018-05-31,150
"OAKLAND NORTH CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"SPRING VALLEY CALIFORNIA, CA US",2018-01-01,2018-05-31,151
"OAKLAND SOUTH CALIFORNIA, CA US",2018-01-01,2018-05-31,151


Stations have roughly the same number of records over the same 5 month period.

-sandbox
## Save data

Using the provided path, write the data using [parquet file format](https://parquet.apache.org/), a columnar storage format that is [drastically better than CSV or JSON](https://databricks.com/session/spark-parquet-in-depth), especially when working in Spark.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> When first creating an unmanaged table with SQL, you can specify the format and location to save to. The main difference between this and using the DataFrameWriter (with `overwrite` mode) is that a table will be registered to the Hive metastore.

```
CREATE TABLE table_name
USING format
LOCATION "/path/to/directory"
AS SELECT * FROM final_view;```

In [34]:
#ANSWER

stationAggPath = f"{userhome}/station-agg"
(stationAggDF
  .write
  .format("parquet")
  .mode("overwrite")
  .save(stationAggPath))

In [35]:
# ANSWER
spark.sql("DROP TABLE IF EXISTS station_agg")

spark.sql(f"""
CREATE TABLE station_agg
USING PARQUET
LOCATION "{stationAggPath}"
AS SELECT * FROM station_agg_temp
""")

## Wait--What Data is Being Written?

Throughout this notebook, many transformations and actions were called. The approach each individual took may vary, but actions were called every time:
- data was displayed
- counts were returned
- results were written to disk

These **actions** are easy to spot reviewing the notebook: if a job was triggered, an action occurred.

_Everything else_ is a transformation. This includes:
- reading data
- extracting strings
- grouping and aggregation
- creating new columns
- creating views
- defining new DataFrames from other DataFrames

When the final write is triggered, Spark looks back to the source (here the files associated with the `weather` table) and creates a plan against those data. **All the computed values, DataFrame definitons, and views of the data returned to the notebook are ignored.** The final DataFrame still indicates a series of transformations against files on disk, just as the first transformation (the `read` operation) referred to these files. While some stages or tasks may be skipped because of implicit caching, Spark will always use your original data source as the single point of truth when building out the physical plan for execution.

## Synopsis

This notebook explored:
* How to use DataFrames and SQL transformations
* Implementing various methods in `spark.sql.functions` (which likely required reading API docs)
* How **actions** trigger jobs while transformations do not
* Which data is referenced as transformations and actions are executed

-sandbox
&copy; 2020 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>