#Welcome to a random Databricks Python Notebook

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) What is Apache Spark?

Spark is a unified processing engine that can analyze big data using SQL, machine learning, graph processing or real-time stream analysis:

![Spark Engines](https://files.training.databricks.com/images/wiki-book/book_intro/spark_4engines.png)
<br/>
<br/>
* At its core is the Spark Engine.
* The DataFrames API provides an abstraction above RDDs while simultaneously improving performance 5-20x over traditional RDDs with its Catalyst Optimizer.
* Spark ML provides high quality and finely tuned machine learning algorithms for processing big data.
* The Graph processing API gives us an easily approachable API for modeling pairwise relationships between people, objects, or nodes in a network.
* The Streaming APIs give us End-to-End Fault Tolerance, with Exactly-Once semantics, and the possibility for sub-millisecond latency.

And it all works together seamlessly!

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Scala, Python, Java, R & SQL
* Besides being able to run in many environments...
* Apache Spark makes the platform even more approachable by supporting multiple languages:
  * Scala - Apache Spark's primary language.
  * Python - More commonly referred to as PySpark
  * R - <a href="https://spark.apache.org/docs/latest/sparkr.html" target="_blank">SparkR</a> (R on Spark)
  * Java
  * SQL - Closer to ANSI SQL 2003 compliance
    * Now running all 99 TPC-DS queries
    * New standards-compliant parser (with good error messages!)
    * Subqueries (correlated & uncorrelated)
    * Approximate aggregate stats
* With the older RDD API, there are significant differences with each language's implementation, namely in performance.
* With the newer DataFrames API, the performance differences between languages are nearly nonexistence (especially for Scala, Java & Python).
* With that, not all languages get the same amount of love - just the same, that API gap for each language is rapidly closing, especially between Spark 1.x and 2.x.

![RDD vs DataFrames](https://files.training.databricks.com/images/105/rdd-vs-dataframes.png)

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) RDDs
* The primary data abstraction of Spark engine is the RDD: Resilient Distributed Dataset
  * Resilient, i.e., fault-tolerant with the help of RDD lineage graph and so able to recompute missing or damaged partitions due to node failures.
  * Distributed with data residing on multiple nodes in a cluster.
  * Dataset is a collection of partitioned data with primitive values or values of values, e.g., tuples or other objects.
* The original paper that gave birth to the concept of RDD is <a href="https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf" target="_blank">Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing</a> by Matei Zaharia et al.
* Today, with Spark 2.x, we treat RDDs as the assembly language of the Spark ecosystem.
* DataFrames, Datasets & SQL provide the higher level abstraction over RDDs.

-sandbox

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) The Cluster: Drivers, Executors, Slots & Tasks
![Spark Physical Cluster, slots](https://files.training.databricks.com/images/105/spark_cluster_slots.png)

### Getting Started

Run the following cell to configure the "classroom."

In [0]:
%run ./Includes/Classroom-Setup

-sandbox
## Spark SQL and DataFrames

Since Spark 2.0, Spark SQL and the DataFrames API have provided analogous entry points to easily manipulate data. DataFrames, tables, and temp views register the necessary metadata to execute Spark logic. The SparkSession uses these metadata to optimize computation while abstracting away many of the considerations about how data will be accessed and distributed during processing.

While there are slight differences between how users will interact with DataFrames, tables, and temp views, the following holds true for all:
- They are built on top of Spark SQL, which simplifies and optimizes Spark execution.
- They are collections of rows and provide easy access to columnar transformations.
- Rows will compile down to RDDs at execution.
- They provide fault-tolerant access to data stored externally.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> It is possible to create DataFrames/tables/views directly from RDDs. This course will focus on manipulating files stored in external cloud storage or data being loaded from sources such as relational databases and pub/sub services.

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Creating DataFrames by Reading from CSV

### The Data Source
* For this exercise, use a file called **iris.csv**.
* This is the canonical [iris dataset](https://archive.ics.uci.edu/ml/datasets/iris).
* Use **&percnt;fs head ...** to view the first few lines of the file.

In [0]:
%fs head /mnt/training/iris/iris.csv

-sandbox
### Read the CSV File
The following is standard syntax for loading data to DataFrames, here with a few options specifically set for CSV files.

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> The default delimiter for reading CSV is a comma(`,`), but this is possible to change with the option `"delimiter"` to extend this method to cover files using pipes, semi-colons, tabs, or other custom separators.

In [0]:
csvFilePath = "/mnt/training/iris/iris.csv"

tempDF = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(csvFilePath))

-sandbox
### Note the options being used

#### `format`
- The `format` method specifies what type of data is being loaded.
- The default format in Spark is Parquet.
- This method provides access to dozens of file formats, as well as connections to a multitude of connected services; a fairly complete list is available [here](https://docs.databricks.com/data/data-sources/index.html).

<img alt="Side Note" title="Side Note" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.05em; transform:rotate(15deg)" src="https://files.training.databricks.com/static/images/icon-note.webp"/> Some formats have named methods (`json`, `csv`, `parquet`) that can be used in place of `load(filePath)`, e.g., `spark.read.csv(filePath)`. These methods are analogous to specifying the format using the `format` method, but less extensible.

#### `"header"`
- The `"header"` option tells the DataFrame to use the first row for column names.
- Without this option, columns will be assigned anonymous names `_c0`, `_c1`, ... `_cN`.
- The `DataFrameReader` peeks at the first line of the file to grab this information, triggering a job.

#### `"inferSchema"`
- The `"inferSchema"` option will scan the file to assign types to each column.
- Without this option, all columns will be assigned the `StringType`.
- The `DataFrameReader` will scan the entire contents of the file to determine which type to infer, triggering a job.

In [0]:
tempDF.printSchema()

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Writing to Files

In many cases the changes applied through DataFrame or SQL actions will need to be persisted. By writing files to disk, this data can easily be passed between sessions and shared with other users.

### The Parquet File Format

Parquet is the default file format when working with Spark. Parquet is a columnar format that is supported by many data processing systems. Spark is optimized to perform operations on parquet files (note that the Delta Lake format is built on top of parquet). Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When writing Parquet files, **all columns are automatically converted to be nullable for compatibility reasons.**

More discussion on <a href="http://parquet.apache.org/documentation/latest/" target="_blank">Parquet</a>

### Write Options

There are [many write options](https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/DataFrameWriter.html), and writing to some integrated services will require specific esoteric options to be passed. The syntax in the cell below is the most minimal but explicit example of using DataFrames to save data. Here, the data and schema currently associated with `irisDF` will be persisted in the directory specified by the `outputFilePath`.

#### `format`
Much like the DataFrameReader, the DataFrameWriter accepts a wide range of formats. It also supports use of a few file-specific methods (`json`, `parquet`, `csv`) in place of the `.save` syntax.

#### `mode`
Spark has several [save modes](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes).

| mode | details |
| --- | --- |
| `"error"` | **DEFAULT**; will raise an error message if data already exists at the specified path. |
| `"overwrite"` | If data exists in the target path, it will be deleted before the new data is saved. (This is used heavily throughout the course so that lessons or individual cells may be re-run without conflict.) |
| `"append"` | If data exists in the target path, new data to be saved will be appended to existant data. |
| `"ignore"` | If data exists in the target path, new data will **NOT** be saved. |

In [0]:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructField, StructType

csvSchema = StructType([
  StructField("index", IntegerType()),
  StructField("sepal_length", DoubleType()),
  StructField("sepal_width", DoubleType()),
  StructField("petal_length", DoubleType()),
  StructField("petal_width", DoubleType()),
  StructField("species", StringType())
])

In [0]:
irisDF = (spark.read
  .option('header', 'true')
  .schema(csvSchema)          # Use the specified schema
  .csv(csvFilePath)
)

In [0]:
outputFilePath = f"{userhome}/iris"

(irisDF
  .write
  .format("parquet")
  .mode("overwrite")
  .save(outputFilePath))

In [0]:
display(dbutils.fs.ls(outputFilePath))

path,name,size
dbfs:/user/saxaa004678@saipem.com/iris/_SUCCESS,_SUCCESS,0
dbfs:/user/saxaa004678@saipem.com/iris/_committed_492298399972327579,_committed_492298399972327579,121
dbfs:/user/saxaa004678@saipem.com/iris/_started_492298399972327579,_started_492298399972327579,0
dbfs:/user/saxaa004678@saipem.com/iris/part-00000-tid-492298399972327579-64cdd91a-f7e5-4f74-a604-3b5d54328f4f-6-1-c000.snappy.parquet,part-00000-tid-492298399972327579-64cdd91a-f7e5-4f74-a604-3b5d54328f4f-6-1-c000.snappy.parquet,3497


##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Laziness By Design

Fundamental to Apache Spark are the notions that
* Transformations are **LAZY**
* Actions are **EAGER**

The following code condenses the logic from the preceding lab and uses the DataFrames API to:
- Specify a schema, format, and file source for the data to be loaded
- Select columns to `GROUP BY`
- Aggregate with a `COUNT`
- Provide an alias name for the aggregate output
- Specify a column to sort on

This cell defines a series of **transformations**. By definition, this logic will result in a DataFrame and will not trigger any jobs.

In [0]:
schemaDDL = "NAME STRING, STATION STRING, LATITUDE FLOAT, LONGITUDE FLOAT, ELEVATION FLOAT, DATE DATE, UNIT STRING, TAVG FLOAT"

sourcePath = "/mnt/training/weather/StationData/stationData.parquet/"

countsDF = (spark.read
  .format("parquet")
  .schema(schemaDDL)
  .load(sourcePath)
  .groupBy("NAME", "UNIT").count()
  .withColumnRenamed("count", "counts")
  .orderBy("NAME")
)

countsDF

Because `display` is an **action**, a job _will_ be triggered, as logic is executed against the specified data to return a result.

In [0]:
display(countsDF)

NAME,UNIT,counts
"BARNABY CALIFORNIA, CA US",C,151
"BIG ROCK CALIFORNIA, CA US",C,151
"BLACK DIAMOND CALIFORNIA, CA US",C,151
"BRIONES CALIFORNIA, CA US",F,151
"CONCORD BUCHANAN FIELD, CA US",F,149
"HAYWARD AIR TERMINAL, CA US",F,149
"HOUSTON INTERCONTINENTAL AIRPORT, TX US",F,150
"HOUSTON WILLIAM P HOBBY AIRPORT, TX US",C,150
"LAS TRAMPAS CALIFORNIA, CA US",C,151
"LOS PRIETOS CALIFORNIA, CA US",F,151


-sandbox
##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Actions



In production code, actions will generally **write data to persistent storage** using the DataFrameWriter discussed in the preceding notebooks.

During interactive code development in Databricks notebooks, the `display` method will frequently be used to **materialize a view of the data** after logic has been applied.

A number of other actions provide the ability to return previews or specify physical execution plans for how logic will map to data. For the complete list, review the [API docs](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset).

| Method | Return | Description |
|--------|--------|-------------|
| `collect()` | Collection | Returns an array that contains all of Rows in this Dataset. |
| `count()` | Long | Returns the number of rows in the Dataset. |
| `first()` | Row | Returns the first row. |
| `foreach(f)` | - | Applies a function f to all rows. |
| `foreachPartition(f)` | - | Applies a function f to each partition of this Dataset. |
| `head()` | Row | Returns the first row. |
| `reduce(f)` | Row | Reduces the elements of this Dataset using the specified binary function. |
| `show(..)` | - | Displays the top 20 rows of Dataset in a tabular form. |
| `take(n)` | Collection | Returns the first n rows in the Dataset. |
| `toLocalIterator()` | Iterator | Return an iterator that contains all of Rows in this Dataset. |

<img alt="Caution" title="Caution" style="vertical-align: text-bottom; position: relative; height:1.3em; top:0.0em" src="https://files.training.databricks.com/static/images/icon-warning.svg"/> Actions such as `collect` can lead to out of memory errors by forcing the collection of all data.

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Transformations

Transformations have the following key characteristics:
* They eventually return another `DataFrame`.
* They are immutable - that is each instance of a `DataFrame` cannot be altered once it's instantiated.
  * This means other optimizations are possible - such as the use of shuffle files (to be discussed in detail later)
* Are classified as either a Wide or Narrow operation

Most operations in Spark are **transformations**. While many transformations are [DataFrame operations](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset), writing efficient Spark code will require importing methods from the `sql.functions` module, which contains [transformations corresponding to SQL built-in operations](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$).

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Narrow Transformations

The data required to compute the records in a single partition reside in at most one partition of the parent RDD.

Examples include:
* `filter(..)`
* `drop(..)`
* `coalesce()`

![](https://databricks.com/wp-content/uploads/2018/05/Narrow-Transformation.png)

In [0]:
from pyspark.sql.functions import col

display(countsDF.filter(col("NAME").like("%TX%")))

NAME,UNIT,counts
"HOUSTON INTERCONTINENTAL AIRPORT, TX US",F,150
"HOUSTON WILLIAM P HOBBY AIRPORT, TX US",C,150


##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Wide Transformations

The data required to compute the records in a single partition may reside in many partitions of the parent RDD. These operations require that data is **shuffled** between executors.

Examples include:
* `distinct()`
* `groupBy(..).sum()`
* `repartition(n)`

![](https://databricks.com/wp-content/uploads/2018/05/Wide-Transformation.png)

In [0]:
display(countsDF.groupBy("UNIT").sum("counts"))

UNIT,sum(counts)
F,1505
C,1054


#That's all folks!