# Learning PySpark

## Prerequisites
* pyspark installed in jupyter kernel (e.g. with *conda create -n pyspark_env pyspark ipykernel* to create an environment with pyspark installed that you can select as a kernel in a jupyter notebook)

---

# Agenda

* What is Apache Spark?
* Architecture
* Spark Ecosystem
* Resilient Distributed Dataset (RDD)
* Introduction to SparkSQL and DataFrames:
    + Creating a Spark Instance
    + Reading & Writing Data
    + The DataFrames/Datasets API
    + Spark SQL
    + Saving to a Persistent Table
    + Bucketing, Sorting and partitioning
    + Caching and caching storage levels
* Exercises

---

# What is Apache Spark?

[Spark](http://spark.apache.org/) is a general-purpose, distributed programming framework that was developed at the AMPLab at the University of California, Berkeley. It is open source software that provides an in-memory computation framework and it is also good for batch processing. Spark works well with real-time (or, better to say, near-real-time) data. It allows you to apply machine learning algorithms on semi-structured, structured, and streaming data.

According to its research paper, it is approximately 100 times faster than its peer, Hadoop, because data can be cached in memory and many machine learning and graph algorithms are iteative. Caching intermediate data in iterative algorithms provides faster processing speed. Spark can be programmed with Java, Scala, Python, and R. In addition, Spark supports multiple data sources such as Parquet, JSON, Hive, Cassandra, CSV, text files and RDBMS tables.

Spark might be considered as an improved [Hadoop](https://hadoop.apache.org/) because it uses the benefits of HDFS: reading data from and writing data to HDFS, and it is based on the [MapReduce](https://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html) algorithm. In addition, Spark handles iterative computation efficiently because data can be persisted in memory, and Spark provides APIs for machine learning, graph processing, streaming in different programming languages.

## Advantages of Spark
1. **Swift Processing:** Spark reduces the number of read-write to disk.

2. **Dynamic in Nature:** Spark provides 80 high-level operators, which can help to develop a parallel executed application. For transformations, Spark adds them to a DAG (Directed Acyclic Graph) of computation and only when the driver requests data, this DAG is executed.

3. **In-Memory Computation:** Data is cached so we do not have to read and write data to disk every time we access it (better performance).

4. **Reusability:** The Spark code can e.g. be reused for batch-processing.

5. **Fault Tolerance:** Through RDD, Spark provides fault tolerance. Spark RDDs are designed to handle the failure of any worker node in the cluster, which ensures that the loss of data is reduced to zero.

https://data-flair.training/blogs/apache-spark-features/

<center> Logistic Regression </center>
![logistic-regression](https://user-images.githubusercontent.com/9319823/46016970-9fb87380-c0d6-11e8-86e0-7123a95c0309.png)


## Disadvantages of Spark
1. **Expensive:** In-memory capability can become a bottleneck when we want cost-efficient processing of big data as keeping data in memory is quite expensive.

2. **Latency:** Apache Spark has a higher latency as compared to [Apache Flink](https://flink.apache.org/).

3. **Manual Optimization:** The Spark job requires to be manually optimized and is adequate to specific datasets.

4. **No File Management:** Apache Spark does not have its own file management system, thus it relies on other platforms like Hadoop.

5. **Problem with Small Files:** If we use Spark with HDFS, we come across the small files issue. HDFS prefers a limited number of large files rather than a large number of small files. If you store your data zipped in S3 a similar issue arises as Spark has to have all these small zipped files at one core when we want to uncompress it.

https://data-flair.training/blogs/limitations-of-apache-spark/

# Architecture

![spark_architecture](https://user-images.githubusercontent.com/9319823/45994904-09645d80-c096-11e8-87e4-2b53f058ba99.png)

The main components of the Spark architecture are the driver and the executors. For each PySpark application, there will be one driver program and one or more executors running on the cluster slave machines. Therefore, Spark follows a master/slave architecture.

## Driver process / Master (Master Daemon)
The driver is the process that coordinates with many executors running on various slave machines.

    - The ***SparkContext*** object is created by the driver, and it is the main entry point to a (Py)Spark application.
    - The Spark driver also contains various components such as *DAGScheduler*, *TaskScheduler*, *BackendScheduler* and *BlockManager* which are responsible for the translation of Spark user code into actual Spark jobs executed on the cluster. 


## Executors / Slaves (Worker Daemon)
Executors are slave processes. An executor runs tasks. It also has the capability to cache data in memory.

    - An executor is a distributed agent responsible for the execution of tasks. Every Spark applications has its own executor process.
    - They usually run for the entire lifetime of a Spark application and this phenomenon is known as **“Static Allocation of Executors”**.
    - However, users can also opt for dynamic allocations of executors wherein they can add or remove Spark executors dynamically to match with the overall workload.
    - Executor performs all the data processing.
    - Reads from and writes data to external sources.
    - Executors store the computation results data in-memory, cache or on hard disk drives.
    - Interact with the storage systems.
    
    
## Cluster Manager
An external service responsible for acquiring resources on the Spark cluster and allocating them to a Spark job.
    - Hadoop YARN and Apache Mesos are examples of cluster manager.
    - Standalone mode - simple local Spark cluster manager.
    - Choosing a cluster manager for any Spark application depends on the goals of the application because all cluster managers provide different set of scheduling capabilities.

The driver splits our application into small tasks; a task is the smallest unit of the application. Tasks are run on different executors in parallel, one task per partition. The driver is also responsible for scheduling tasks to different executors. Also, The **cluster manager** manages cluster resources. The driver talks to the cluster manager to negotiate resources. The cluster manager also schedules tasks on behalf of the driver on various slave executor processes.

Spark is dispatched with the Standalone Cluster Manager. However, it can also be configured on [YARN](https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html) and [Apache Mesos](http://mesos.apache.org/). Spark can be also started in local mode (i.e. on a single machine).

# Spark Ecosystem


![spark-stack](https://user-images.githubusercontent.com/9319823/45998657-ca3d0900-c0a3-11e8-8bb8-32672e87d119.png)

Spark ecosytem has five components: [Spark Core API](https://spark.apache.org/docs/1.6.0/index.html), [SQL and DataFrames](http://spark.apache.org/sql/), [MLlib](http://spark.apache.org/mllib/) for machine learning, [GraphX](http://spark.apache.org/graphx/), and [Spark Streaming](http://spark.apache.org/streaming/). You can combine these libraries seamlessly in the same application.

## Spark Core

All the functionalities provided by Apache Spark are built on the top of Spark Core. It provides the in-memory computation capability. Thus Spark Core is the foundation of parallel and distributed processing of huge dataset.

Spark Core is embedded with a special collection called **RDD** (resilient distributed dataset). RDD is among the abstractions of Spark. **Spark RDD handles partitioning data across all the nodes in a cluster**. It holds them in the memory pool of the cluster as a single unit. There are two operations performed on RDDs: Transformations and Actions.
   - **Transformations:** are functions that produce a new RDD from the existing RDDs.
   - **Actions:** return a value to the driver program.

Operations are **evaluated lazily**: the execution will not start until an action is triggered. This increases manageability, saves computation and thus increases optimization and performance. The **transformations are stored as directed acyclic graphs (DAG)**. So, every action on the RDD will make Apache Spark recompute the DAG.

Apache Spark supports two types of partitioning: **Hash Partitioning** and **Range Partitioning**. The partitioning technique should be based on the available resources, external data sources and transformations used to derive the RDD.

Basics of **partitioning**:
- Every node in a Spark cluster contains one or more partition(s).
- Partitions in Spark do not span multiple machines.
- Tuples in the same partition are guaranteed to be on the same machine.
- Spark assigns one task per partition and each worker can process one task at a time.
- The number of partitions used in Spark is configurable and having too few (causing less concurrency, data skewing & improper resource utilization) or too many (causing task scheduling to take more time than the actual execution time) partitions is not good. By default, it is set to the total number of cores on all the executor nodes.

https://dzone.com/articles/an-intro-to-apache-spark-partitioning-what-you-nee

**Key features of Spark Core are:** 
* essential I/O functionalities
* task dispatching
* fault recovery
* significant in programming and observing the role of the Spark cluster.

![pg57h](https://user-images.githubusercontent.com/9319823/46287815-10063f80-c584-11e8-9f03-88f5d7d033d5.png)

## Spark SQL

The Spark SQL module allows SQL-like analysis on a huge amount of structured or semi-structured data. Spark SQL can be connected to Apache Hive. Spark SQL introduced the DataFrame, which is a tabular representation of structured data, similar to a table in a relational database management system.

Spark SQL is a distributed framework for structured data processing. Using Spark SQL, **Spark gets more information about the structure of data and the computation**. With this information, Spark can perform additional optimization. It uses the same execution engine while computing an output. It **does not depend on the API/ language to express the computation**.

It also enables powerful, interactive, analytical applications across both streaming and historical data. Spark SQL is the Spark module for structured data processing. Thus, it acts as a distributed SQL query engine.

**Key features of Spark SQL include:** 
* Spark integration
* Uniform data access
* Performance and Scalability
* Full compatibility Hive
* Standard Connectivity

https://spark.apache.org/sql/

https://www.edureka.co/blog/spark-sql-tutorial/

## Spark ML

The MLlib library offers scalable and easy-to-use machine-learning algorithms. MLlib supports many machine-learning algorithms for classification, clustering, text analysis, and more. Also, some lower level machine learning primitives like generic gradient descent optimization algorithm are available in MLlib.

In Spark version 2.0, the DataFrame-based API is the primary Machine Learning API for Spark. So, from now on MLlib will not add any new feature to the RDD-based API. The reason behind this is that DataFrames are more user-friendly than RDDs. Some of the benefits of using DataFrames are:
* the usage of Spark Datasources
* SQL DataFrame queries use Tungsten and Catalyst optimizations
* uniform APIs across languages. 

MLlib also uses the **linear algebra package Breeze**. Breeze is a collection of libraries for numerical computing and machine learning.

https://spark.apache.org/docs/latest/ml-guide.html

### GraphX

GraphX is Apache Spark's API for graphs and graph-parallel computation. It unifies ETL, exploratory analysis, and iterative graph computation within a single system. You can view the same data as both graphs and collections, transform and join graphs with RDDs efficiently, and write custom iterative graph algorithms using the Pregel API.

**Clustering, classification, traversal, searching, and pathfinding** is also possible in graphs. Furthermore, GraphX extends Spark RDD by bringing in light a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. GraphX also optimizes the way in which we can represent vertex and edges when they are primitive data types. To support graph computation it supports fundamental operators (e.g., subgraph, join Vertices, and aggregate Messages) as well as an optimized variant of the Pregel API.

https://spark.apache.org/graphx/

## Spark Streaming

Spark Streaming brings Apache Spark's language-integrated API to stream processing, letting you write streaming jobs the same way you write batch jobs. It supports Java, Scala and Python. It is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.

Spark can access data from sources like Kafka, Flume, Kinesis or TCP socket. The processed data is pushed to the file system, databases and live dashboards. Spark uses micro-batching for (near) real-time streaming. **Micro-batching** is a technique that allows a process or task to treat a stream as a sequence of small batches of data. Hence, Spark Streaming groups the live data into small batches. 

Spark Streaming works in three phases: **(1) gathering**, **(2) processing**, and **(3) data storage**.
  1. It provides two categories of built-in streaming sources: 
      - **Basic sources:** file systems and socket connections 
      - **Advanced sources:** sources like Kafka, Flume, Kinesis, etc.
  2. The gathered data is processed using complex algorithms expressed with a high-level function.
  3. The processed data is pushed out to file systems, databases, and live dashboards.

A **DStream** in Spark is a continuous stream of data. We can form a DStream in two ways: from sources such as Kafka, Flume, and Kinesis or by high-level operations on other DStreams. Thus, DStream is internally a sequence of RDDs.

![streaming-arch](https://user-images.githubusercontent.com/9319823/45999822-150c5000-c0a7-11e8-8a8a-f88b2c5b1c88.png)

https://spark.apache.org/streaming/

https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html


# RDD (Resilient Distributed Dataset)

A Resilient Distributed Datasets (RDD) is the basic abstraction in Spark. It represents an **immutable, partitioned collection of elements** that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as *map*, *filter*, and *persist*.

RDDs are a collection of various data that are so big in size, that they cannot fit into a single node and should be partitioned across various nodes. As a quick reminder from the last section, Apache Spark automatically partitions RDDs and distributes the partitions across different nodes.

https://spark.apache.org/docs/2.1.0/programming-guide.html

In [2]:
import pyspark
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster('local').setAppName('myApp')
sc = SparkContext(conf=conf)

**Creating RDDs**

* The key point to note in parallelized collection is the number of partition the dataset is cut into. Spark will run **one task for each partition of the cluster**. Usually, there are **two to four partitions for each CPU in the cluster**. Spark sets number of partition based on our cluster.
* It is possible to read files from different sources such as local file system, HDFS, Cassandra, HBase, etc. 

In [3]:
# 1 - PARALLELIZED COLLECTION
temperature = sc.parallelize((28.9, 30.6, 25.0, 29.1, 32.3, 31.0))

# 2 - EXTERNAL DATASETS
text = sc.textFile("data/simple_text.txt")

# 3 - FROM ANOTHER RDD
high_temps = temperature.filter(lambda t: t >= 30)

**Returning RDD data**

* The action operation **collect()** should be used carefully since it returns all data. In this example, it returns the whole text as a list. Be careful, when you operate with large datasets!

* The action operation **take(n)** returns *n* values/results. If you want to see all results, use **collect()**.

In [4]:
text.collect()

['this is a simple', 'text file', 'with some text']

In [5]:
temperature.take(2)

[28.9, 30.6]

**Filtering data**

* **filter()** returns a new RDD, containing only the elements that meet the predicate.

In [6]:
range_temp = temperature.filter(lambda t: (t > 20) & (t < 29))
range_temp.collect()

[28.9, 25.0]

**Example of operations on RDD 01** Word count and save output into a file.

* **flatMap()** takes a line from the input RDD, applies a function on that line, and returns a list of elements.

* **map()** takes a line from the input RDD, applies a function on that line, and returns only one element.

* **reduceByKey()**, in a dataset (K, V), the pairs on the same machine with the same key are combined, before the data is shuffled.

* **saveAsTextFile()** saves the RDD object as a text file, using string representations of elements.

In [7]:
wc = text.flatMap(lambda x: x.split(" ")).map(lambda a: (a,1)).reduceByKey(lambda a,b: a + b)
wc.saveAsTextFile("data/simple_word_count")

Py4JJavaError: An error occurred while calling o73.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/home/dan/git/u42/trainings/hadoop-spark/data/simple_word_count already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:287)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1499)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1478)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478)
	at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


# Introduction to Spark SQL and DataFrames

**Spark SQL is a Spark module for structured data processing**. It originated as Apache Hive to run on top of Spark and is now directly integrated with the Spark stack. Apache Hive had certain limitations such as no resume capability and bad performance in medium-to-big sized datasets. Spark SQL was built to overcome these drawbacks and replace Apache Hive.

Unlike the basic Spark RDD API, the Spark SQL interface provides more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform additional optimizations.

Spark SQL is organized into four libraries as follows:

|           Libraries           |                            Description                          |
| :---------------------------- | :-------------------------------------------------------------- |
| Data Source API               | The universal API for loading and storing structured data       |
| DataFrame API                 | The distributed collection of data organized into named columns |
| SQL Interpreter And Optimizer | Based on the functional programming constructed in Scala        |
| SQL Service                   | The entry point for working along structured data in Spark      |


Important classes of Spark SQL (**pyspark.sql.*(class)***) and DataFrames are described in the following table:


| 	Classes		         | 			          Description				                |
| :--------------------- | :----------------------------------------------------------- |
| SparkSession           | Main entry point for **DataFrame** and SQL functionality	    |
| DataFrame              | A distributed collection of data grouped into named columns	|
| Column                 | A column expression in a **DataFrame**			            |
| Row                    | A row of data in a DataFrame					                |
| GroupedData            | Aggregation methods, returned by **DataFrame.groupBy()**	    |
| DataFrameNaFunctions   | Methods for handling missing data (null values)	 	        |
| DataFrameStatFunctions | Methods for statistics functionality				            |
| Functions              | List of built-in functions available for **DataFrame**	    |
| Types                  | List of data types available					                |
| Window                 | For working with window functions				            |


## Hive Integration
One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation. Spark SQL supports the HiveQL syntax as well as **Hive SerDes** and **UDFs**, allowing you to access existing Hive warehouses.

![sql-hive-arch](https://user-images.githubusercontent.com/9319823/47175460-71b00300-d313-11e8-9a09-5e8f3bdb97dc.png)


| Library   |     			    Description 			                              |
| :-------- | :------------------------------------------------------------------------ |
| Metastore | Stores metadata such as the schema and location for each of the tables   |
| HiveQL    | Hive Query Language based on SQL (does not strictly follow the full *SQL-92* standard) |
| UDFs      | Defines new column-based functions that extend the vocabulary of Spark SQL’s DSL* for transforming Datasets. UDFs are black boxes in their execution |
| SerDes    | Handles serialization and deserialization (I/O) and also interpreting the results of serialization as individual fields for processing |

*DSL = domain-specific language 


## Creating a Spark Instance

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .appName("learnig_pyspark") \
                    .config(conf=SparkConf()).getOrCreate()
spark

## Reading & Writing Data

- Spark SQL supports operating on a variety of data sources through the DataFrame interface. A DataFrame can be operated on using relational transformations and can also be used to create a temporary view.
- You can also manually specify the data source that will be used along with any extra options that you would like to pass to the data source.
- DataFrames loaded from any data source type can be converted into other types.
- Registering a DataFrame as a temporary view allows you to run SQL queries over its data.


Reading from **CSV file**

In [9]:
csv_data = spark.read.csv("data/people.csv", header="true", sep=";", inferSchema="true")
csv_data

DataFrame[name: string, age: int, job: string]

2nd option to load data from a source
```python
csv_data = spark.read.format("com.databricks.spark.csv")
                     .options(header="true", sep=";", inferSchema="true")
                     .load("data/people.csv")
```

Reading from **CSV file** directly from URL

In [10]:
import pandas as pd

url_file = "https://gist.githubusercontent.com/curran/a08a1080b88344b0c8a7/" \
            "raw/d546eaee765268bf2f487608c537c05e22e4b221/iris.csv"

iris = spark.createDataFrame(pd.read_csv(url_file))
iris

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string]

In [11]:
txt_data = spark.read.format("com.databricks.spark.csv")\
                     .options(header="true", sep=",", inferSchema="true").load("data/people.txt")
txt_data

DataFrame[Michael: string,  29: double]

Reading from **JSON file**

In [12]:
json_data = spark.read.json("data/people.json")

> Spark can read other formats such as `text`, `avro`, and `parquet`.

**Writing** into a file

In [14]:
csv_data.write.format("parquet").mode('overwrite').save("data/people_parquet")

> 2nd option to save dataframe into a file
```python
csv_data.write.save("data/people_parquet", format="parquet", mode="{overwrite|append|ignore}")
```


| Save Mode | When saving a DataFrame to a data source, if data already exists, ... |
|:--------- |:--------------------------------------------------------------------- |
| Error     | an exception is thrown (default)                                      |
| Append    | contens of the DataFrame is appended to existing data                 |
| Overwrite | existing data is overwritten by the content of the DataFrame          |
| Ignore    | content is not saved and existing data is not changed                 |

Creating **temporary view**

In [15]:
csv_data.createOrReplaceTempView("people_csv")

## The DataFrames/Datasets API

Creating a **DataFrame**

> **Row:** can be used to create a row object by using named arguments, the fields will be sorted by names. It is not allowed to omit a named argument to represent the value is None or missing. This should be explicitly set to None in this case.

In [16]:
from pyspark.sql import Row

r = [Row(identification=1,name='John',surname='Doe',age=29),
     Row(identification=2,name='Jane',surname='Doe',age=25)]
df_data = spark.createDataFrame(r)

df_data.show()

+---+--------------+----+-------+
|age|identification|name|surname|
+---+--------------+----+-------+
| 29|             1|John|    Doe|
| 25|             2|Jane|    Doe|
+---+--------------+----+-------+



> **show(n)** presents the first *n* elements of the DataFrame. The default value for *n* is 10. 

> **head(n)** acts similar to *show(n)* by showing the first *n* observations, but returns a list object.

In [17]:
df_data.head(3)

[Row(age=29, identification=1, name='John', surname='Doe'),
 Row(age=25, identification=2, name='Jane', surname='Doe')]

**Counting** the number of rows

In [18]:
df_data.count()

2

Checking the **columns** and **count** the number of columns

In [19]:
df_data.columns

['age', 'identification', 'name', 'surname']

In [20]:
print("number of columns: {0}".format(len(df_data.columns)))

number of columns: 4


Creating a **DataFrame** with a complex structure

In [21]:
dept1 = Row(id=1, name="Data Science")
dept2 = Row(id=2, name="Marketing")

Employee = Row("firstname","lastname","email","salary")
e1 = Employee("John","Doe","johnd@domain.de",90000)
e2 = Employee("Jone","Doe","janed@domain.de",92000)
e3 = Employee("Helena","Ster","he.st@domain.de",100000)

dwe1 = Row(department=dept1, employees=[e1,e2])
dwe2 = Row(department=dept2, employees=[e3])

dwe_seq = [dwe1, dwe2]
dwe_df = spark.createDataFrame(dwe_seq)

dwe_df.show()

+-----------------+--------------------+
|       department|           employees|
+-----------------+--------------------+
|[1, Data Science]|[[John, Doe, john...|
|   [2, Marketing]|[[Helena, Ster, h...|
+-----------------+--------------------+



Checking **DataFrame** schema

In [22]:
dwe_df.printSchema()

root
 |-- department: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- employees: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- firstname: string (nullable = true)
 |    |    |-- lastname: string (nullable = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- salary: long (nullable = true)



**Inferring** the schema using reflection

In [23]:
lines = sc.textFile("data/people.txt")
slines = lines.map(lambda x: x.split(","))
parts = slines.map(lambda x: Row(name=x[0], age=int(x[1])))

tdf = spark.createDataFrame(parts)
tdf.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



Programmatically **specifying** the Schema

In [24]:
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, StructField, StructType

def set_dta_type(stp):
    if stp == "int":
        return IntegerType()
    elif stp == "float":
        return FloatType()
    elif stp == "double":
        return DoubleType()
    else:
        return StringType() 

In [25]:
lines = sc.textFile("data/people.txt")
slines = lines.map(lambda x: x.split(","))
parts = slines.map(lambda x: (x[0], int(x[1].strip())))

schemaNames = "name age"
schemaTypes = "string int"

fields = [StructField(fn, set_dta_type(ft)) for fn, ft in zip(schemaNames.split(), schemaTypes.split())]
schema = StructType(fields)

df_people = spark.createDataFrame(parts, schema)
df_people.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [26]:
df_people.head(10)

[Row(name='Michael', age=29),
 Row(name='Andy', age=30),
 Row(name='Justin', age=19)]

**Datasets**

> **Datasets** were introduced in 2015 as part of the Apache Spark 1.6 release. The goal for datasets was to provide a type-safe, programming interface. This allowed developers to work with semi-structured data (like JSON or key-value pairs) with compile time type safety (that is, production applications can be checked for errors before they run). Part of the reason why Python does not implement a Dataset API is because Python is not a type-safe language.

> As **Dataset** is strongly typed API and Python is dynamically typed, runtime objects (values) have a type, as opposed to static typing where variables have a type. Therefore, there is no native support for the Dataset API in Pyspark. Only Scala and Java offer support for Datasets.

## SparkSQL

**Select** columns from DataFrame

To subset the columns, we need to use the **select** operation on the DataFrame and we need to pass the column names separated by commas inside the select operation.

In [27]:
from pyspark.sql import functions as F

**pyspark.sql.functions** is a collections of builtin functions such as **cols**, **asin** and **avg**.

In [28]:
iris.select("sepal_length","petal_length","species").show(5)

+------------+------------+-------+
|sepal_length|petal_length|species|
+------------+------------+-------+
|         5.1|         1.4| setosa|
|         4.9|         1.4| setosa|
|         4.7|         1.3| setosa|
|         4.6|         1.5| setosa|
|         5.0|         1.4| setosa|
+------------+------------+-------+
only showing top 5 rows



Select a **set of features** from the DataFrame

In [29]:
cols = iris.columns
to_del = ["species", "sepal_width"] # subset of features to be removed
dcols = list(set(cols) - set(to_del))

In [30]:
iris.select(dcols).show(5)

+-----------+------------+------------+
|petal_width|sepal_length|petal_length|
+-----------+------------+------------+
|        0.2|         5.1|         1.4|
|        0.2|         4.9|         1.4|
|        0.2|         4.7|         1.3|
|        0.2|         4.6|         1.5|
|        0.2|         5.0|         1.4|
+-----------+------------+------------+
only showing top 5 rows



In [31]:
iris.select("sepal_length", F.when(F.col("petal_length") < 2.1, 1).otherwise(0)).show(5)

+------------+------------------------------------------------+
|sepal_length|CASE WHEN (petal_length < 2.1) THEN 1 ELSE 0 END|
+------------+------------------------------------------------+
|         5.1|                                               1|
|         4.9|                                               1|
|         4.7|                                               1|
|         4.6|                                               1|
|         5.0|                                               1|
+------------+------------------------------------------------+
only showing top 5 rows



> [When](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=sort#pyspark.sql.functions.when) function evaluates a list of conditions and returns one of multiple possible result expressions. If **Column*.otherwise()*** is not invoked, None is returned for unmatched conditions.

Creating an **alias** for a columns

In [32]:
iris.select("sepal_length", F.when(F.col("petal_length") < 2.1, 1).otherwise(0)\
                             .alias("pl_less_thr")).show(5)

+------------+-----------+
|sepal_length|pl_less_thr|
+------------+-----------+
|         5.1|          1|
|         4.9|          1|
|         4.7|          1|
|         4.6|          1|
|         5.0|          1|
+------------+-----------+
only showing top 5 rows



**Filtering** the data

In [33]:
iris.filter(F.col("petal_length") < 2.1).select("sepal_length").show(5)

+------------+
|sepal_length|
+------------+
|         5.1|
|         4.9|
|         4.7|
|         4.6|
|         5.0|
+------------+
only showing top 5 rows



We can also pass the feature name inside square brackets:
```python
iris.filter(iris["petal_length"] < 2.1).select("sepal_length").show(5)
```

**Sorting** the DataFrame by a column

Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the cols.

In [34]:
iris.sort("sepal_width", ascending=False).show()

+------------+-----------+------------+-----------+---------+
|sepal_length|sepal_width|petal_length|petal_width|  species|
+------------+-----------+------------+-----------+---------+
|         5.7|        4.4|         1.5|        0.4|   setosa|
|         5.5|        4.2|         1.4|        0.2|   setosa|
|         5.2|        4.1|         1.5|        0.1|   setosa|
|         5.8|        4.0|         1.2|        0.2|   setosa|
|         5.4|        3.9|         1.7|        0.4|   setosa|
|         5.4|        3.9|         1.3|        0.4|   setosa|
|         7.9|        3.8|         6.4|        2.0|virginica|
|         5.1|        3.8|         1.5|        0.3|   setosa|
|         7.7|        3.8|         6.7|        2.2|virginica|
|         5.7|        3.8|         1.7|        0.3|   setosa|
|         5.1|        3.8|         1.9|        0.4|   setosa|
|         5.1|        3.8|         1.6|        0.2|   setosa|
|         5.4|        3.7|         1.5|        0.2|   setosa|
|       

Another option to sort:
```python
iris.sort(iris.sepal_width.desc()).show()
```

**Sorting** the DataFrame by passing more columns

In [35]:
iris.sort(["sepal_width","petal_length"], ascending=[0,1]).show()

+------------+-----------+------------+-----------+---------+
|sepal_length|sepal_width|petal_length|petal_width|  species|
+------------+-----------+------------+-----------+---------+
|         5.7|        4.4|         1.5|        0.4|   setosa|
|         5.5|        4.2|         1.4|        0.2|   setosa|
|         5.2|        4.1|         1.5|        0.1|   setosa|
|         5.8|        4.0|         1.2|        0.2|   setosa|
|         5.4|        3.9|         1.3|        0.4|   setosa|
|         5.4|        3.9|         1.7|        0.4|   setosa|
|         5.1|        3.8|         1.5|        0.3|   setosa|
|         5.1|        3.8|         1.6|        0.2|   setosa|
|         5.7|        3.8|         1.7|        0.3|   setosa|
|         5.1|        3.8|         1.9|        0.4|   setosa|
|         7.9|        3.8|         6.4|        2.0|virginica|
|         7.7|        3.8|         6.7|        2.2|virginica|
|         5.4|        3.7|         1.5|        0.2|   setosa|
|       

**Grouping** data by a given column

In [36]:
iris.groupBy("species").count().show()

+----------+-----+
|   species|count|
+----------+-----+
| virginica|   50|
|versicolor|   50|
|    setosa|   50|
+----------+-----+



**Grouping** data by a given column and perform the aggregation on another column

In [37]:
iris.groupBy("species").agg(F.avg("sepal_length").alias("avg_sepal_length")).show()

+----------+-----------------+
|   species| avg_sepal_length|
+----------+-----------------+
| virginica|6.587999999999998|
|versicolor|            5.936|
|    setosa|5.005999999999999|
+----------+-----------------+



Checking **missing** values

*Simple count and compare*

- It counts the total number of values that are not *null*. Therefore, you must compare it with the total number of rows to identify the missing values.

In [38]:
iris.summary("count").show()

+-------+------------+-----------+------------+-----------+-------+
|summary|sepal_length|sepal_width|petal_length|petal_width|species|
+-------+------------+-----------+------------+-----------+-------+
|  count|         150|        150|         150|        150|    150|
+-------+------------+-----------+------------+-----------+-------+



*Count the missing values*

- Create a function that receives a DataFrame and counts the missing values per column.
- The output shows the number of missing values.

In [39]:
def count_missing_values(df):
    df.select([F.count(F.when(F.isnull(c),c)).alias(c) for c in df.columns]).show()        

> Missing values on *Iris* DataFrame

In [40]:
count_missing_values(iris)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|           0|          0|           0|          0|      0|
+------------+-----------+------------+-----------+-------+



> Missing values on *People* DataFrame

In [70]:
from pyspark.sql import Row

# create people DataFrame with some missing values
r = [Row(name='John', country='USA', zip_code=89013),
     Row(name='Jane', country='Germany', zip_code=None),
     Row(name=None, country=None, zip_code=10133)]
people = spark.createDataFrame(r)

people.show()

+-------+----+--------+
|country|name|zip_code|
+-------+----+--------+
|    USA|John|   89013|
|Germany|Jane|    null|
|   null|null|   10133|
+-------+----+--------+



In [71]:
count_missing_values(people)

+-------+----+--------+
|country|name|zip_code|
+-------+----+--------+
|      1|   1|       1|
+-------+----+--------+



**Dealing** with missing values

*Fill in with a value*

- If you pass a number, it will be replaced only in numerical features. The same applies for Categorical features.

In [72]:
people.na.fill(80000).show()

+-------+----+--------+
|country|name|zip_code|
+-------+----+--------+
|    USA|John|   89013|
|Germany|Jane|   80000|
|   null|null|   10133|
+-------+----+--------+



> A **subset** of feature might be also passed.

In [73]:
people.na.fill("Jack", subset=["name"]).show()

+-------+----+--------+
|country|name|zip_code|
+-------+----+--------+
|    USA|John|   89013|
|Germany|Jane|    null|
|   null|Jack|   10133|
+-------+----+--------+



*Replace value*

In [74]:
people.na.replace(89013, 80000, subset=["zip_code"]).show()

+-------+----+--------+
|country|name|zip_code|
+-------+----+--------+
|    USA|John|   80000|
|Germany|Jane|    null|
|   null|null|   10133|
+-------+----+--------+



*Drop missing values*

In [75]:
people.na.drop().show()

+-------+----+--------+
|country|name|zip_code|
+-------+----+--------+
|    USA|John|   89013|
+-------+----+--------+



**drop() Parameters:**

- **how**: ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.

- **thresh** (int, default:None): If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.

- **subset** (optional): List of column names to consider.


In [60]:
people.na.drop(how="all").show()

+-------+----+--------+
|country|name|zip_code|
+-------+----+--------+
|    USA|John|   89013|
|Germany|Jane|    null|
|   null|null|   10133|
+-------+----+--------+



In [62]:
people.na.drop(how="any").show()

+-------+----+--------+
|country|name|zip_code|
+-------+----+--------+
|    USA|John|   89013|
+-------+----+--------+



In [64]:
people.na.drop(thresh=2).show()

+-------+----+--------+
|country|name|zip_code|
+-------+----+--------+
|    USA|John|   89013|
|Germany|Jane|    null|
+-------+----+--------+



Drop **duplicates**

In [65]:
iris.select("species").dropDuplicates().show()

+----------+
|   species|
+----------+
| virginica|
|versicolor|
|    setosa|
+----------+



> **dropDuplicates** has also a parameter *subset* that informs a subset of features to be considered.

```python
iris.dropDuplicates(["species"]).show()
```

**Adding** a column

In [66]:
iris = iris.withColumn("thr_sepal_length", F.when(F.col("sepal_length") > 4.7,1).otherwise(0))
iris.show(7)

+------------+-----------+------------+-----------+-------+----------------+
|sepal_length|sepal_width|petal_length|petal_width|species|thr_sepal_length|
+------------+-----------+------------+-----------+-------+----------------+
|         5.1|        3.5|         1.4|        0.2| setosa|               1|
|         4.9|        3.0|         1.4|        0.2| setosa|               1|
|         4.7|        3.2|         1.3|        0.2| setosa|               0|
|         4.6|        3.1|         1.5|        0.2| setosa|               0|
|         5.0|        3.6|         1.4|        0.2| setosa|               1|
|         5.4|        3.9|         1.7|        0.4| setosa|               1|
|         4.6|        3.4|         1.4|        0.3| setosa|               0|
+------------+-----------+------------+-----------+-------+----------------+
only showing top 7 rows



**Updating** a column

In [67]:
iris.withColumnRenamed("species", "target").show(5)

+------------+-----------+------------+-----------+------+----------------+
|sepal_length|sepal_width|petal_length|petal_width|target|thr_sepal_length|
+------------+-----------+------------+-----------+------+----------------+
|         5.1|        3.5|         1.4|        0.2|setosa|               1|
|         4.9|        3.0|         1.4|        0.2|setosa|               1|
|         4.7|        3.2|         1.3|        0.2|setosa|               0|
|         4.6|        3.1|         1.5|        0.2|setosa|               0|
|         5.0|        3.6|         1.4|        0.2|setosa|               1|
+------------+-----------+------------+-----------+------+----------------+
only showing top 5 rows



**Removing** a column

In [68]:
iris = iris.drop("thr_sepal_length")
iris.show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



Get the **summary statistics**

The **describe** operation is use to calculate the summary statistics of numerical column(s) in a DataFrame. If we do not specify the name of columns it will calculate summary statistics for all numerical columns present in the DataFrame.

In [69]:
iris.describe().show()

+-------+------------------+-------------------+------------------+------------------+---------+
|summary|      sepal_length|        sepal_width|      petal_length|       petal_width|  species|
+-------+------------------+-------------------+------------------+------------------+---------+
|  count|               150|                150|               150|               150|      150|
|   mean| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672|     null|
| stddev|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414|     null|
|    min|               4.3|                2.0|               1.0|               0.1|   setosa|
|    max|               7.9|                4.4|               6.9|               2.5|virginica|
+-------+------------------+-------------------+------------------+------------------+---------+



> The **describe** operation works only for numerical features, that is, we must find another way to compute the frequency of **categorical features**.

Get the frequency of **categorical features** using **GroupBy**

In [76]:
iris.groupBy("species").count().show()

+----------+-----+
|   species|count|
+----------+-----+
| virginica|   50|
|versicolor|   50|
|    setosa|   50|
+----------+-----+



Running SQL **Queries**

We must register the DataFrame as a SQL temporary view in order to run SQL queries.

**createOrReplaceTempView** creates a new temporary view using a SparkDataFrame in the Spark Session. If a temporary view with the same name already exists, it will be replaced.

In [77]:
iris.createOrReplaceTempView("iris")

*Example of query 01 : All data*

In [78]:
spark.sql("SELECT * FROM iris").show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



*Example of query 02 : Where*

In [79]:
query = """
SELECT *
FROM iris
WHERE sepal_length <= 4.6
"""

spark.sql(query).show(5)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         4.6|        3.1|         1.5|        0.2| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.3|        3.0|         1.1|        0.1| setosa|
|         4.6|        3.6|         1.0|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



*Example of query 03 : startswith / endswith*

In [82]:
iris.select("sepal_length", "petal_length", "species",
            F.col("species").startswith("se").alias("ca_eval")).filter(F.col("ca_eval") == "true").show(5)

+------------+------------+-------+-------+
|sepal_length|petal_length|species|ca_eval|
+------------+------------+-------+-------+
|         5.1|         1.4| setosa|   true|
|         4.9|         1.4| setosa|   true|
|         4.7|         1.3| setosa|   true|
|         4.6|         1.5| setosa|   true|
|         5.0|         1.4| setosa|   true|
+------------+------------+-------+-------+
only showing top 5 rows



In [83]:
iris.select("sepal_length", "petal_length", "species",
            F.col("species").endswith("ca").alias("ca_eval")).filter(F.col("ca_eval") == "true").show(5)

+------------+------------+---------+-------+
|sepal_length|petal_length|  species|ca_eval|
+------------+------------+---------+-------+
|         6.3|         6.0|virginica|   true|
|         5.8|         5.1|virginica|   true|
|         7.1|         5.9|virginica|   true|
|         6.3|         5.6|virginica|   true|
|         6.5|         5.8|virginica|   true|
+------------+------------+---------+-------+
only showing top 5 rows



*Example of query 04 : like*

In [87]:
people.select("name","country", people.name.like("Jane")).show()

+----+-------+--------------+
|name|country|name LIKE Jane|
+----+-------+--------------+
|John|    USA|         false|
|Jane|Germany|          true|
|null|   null|          null|
+----+-------+--------------+



> We could also use *filter* along with *like*

In [89]:
people.filter(F.col("name").like("Jane")).show()

+-------+----+--------+
|country|name|zip_code|
+-------+----+--------+
|Germany|Jane|    null|
+-------+----+--------+



*Example of query 05 : substring*

In [90]:
iris.select("sepal_length", "petal_length",
            F.col("species").substr(1,3).alias("spec")).show(5)

+------------+------------+----+
|sepal_length|petal_length|spec|
+------------+------------+----+
|         5.1|         1.4| set|
|         4.9|         1.4| set|
|         4.7|         1.3| set|
|         4.6|         1.5| set|
|         5.0|         1.4| set|
+------------+------------+----+
only showing top 5 rows



*Example of query 06 : between*

In [91]:
iris.select("sepal_length", "petal_length", "petal_width",
            F.col("petal_width").between(1.2,1.9).alias("pw_btween")).show(5)

+------------+------------+-----------+---------+
|sepal_length|petal_length|petal_width|pw_btween|
+------------+------------+-----------+---------+
|         5.1|         1.4|        0.2|    false|
|         4.9|         1.4|        0.2|    false|
|         4.7|         1.3|        0.2|    false|
|         4.6|         1.5|        0.2|    false|
|         5.0|         1.4|        0.2|    false|
+------------+------------+-----------+---------+
only showing top 5 rows



Run SQL **on files** directly

In [94]:
query = """
SELECT *
FROM parquet.`{0}`
""".format("data/people_parquet")

users_df = spark.sql(query)
users_df.show()

+-----+---+---------+
| name|age|      job|
+-----+---+---------+
|Jorge| 30|Developer|
|  Bob| 32|Developer|
+-----+---+---------+



## Saving to a Persistent Table

- DataFrames can also be saved as persistent tables into Hive metastore using the *saveAsTable* command.
- Spark will create a default local Hive metastore (using Derby) for you.
- Unlike the createOrReplaceTempView command, saveAsTable will materialize the contents of the DataFrame and create a pointer to the data in the Hive metastore.
- For file-based data sources, e.g. text, parquet, json, etc. you can specify a custom table path via the path option, e.g. df.write.option("path", "/some/path").saveAsTable("t"). When the table is dropped, the custom table path will not be removed and the table data is still there. If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. When the table is dropped, the default table path will be removed too.
- Starting from Spark 2.1, persistent datasource tables have per-partition metadata stored in the Hive metastore.

In [98]:
iris.select("sepal_length", "petal_width", "species").write.mode("overwrite").saveAsTable("iris2")
# saved in the spark-warehouse folder

## Bucketing, Sorting and Partitioning

- For file-based data sources, it is also possible to bucket and sort or partition the output. Bucketing and sorting are applicable only to persistent tables.
- **partitionBy** creates a directory structure as described in the [Partition Discovery](https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#partition-discovery) section. Thus, it has limited applicability to columns with high cardinality. In contrast **bucketBy** distributes data across a fixed number of buckets and can be used when a number of unique values is unbounded.

In [105]:
df_people.write.bucketBy(42, "name").sortBy("Age").saveAsTable("people_bucket", mode="overwrite")
# creates people_bucket folder in spark-warehouse

In [107]:
users_df.write.partitionBy("job").format("parquet").save("job.parquet", mode="overwrite")
# creates job.parquet folder in the current directory

In [108]:
users_df.write.partitionBy("job").bucketBy(42, "name").saveAsTable("people_partitioned_bucketed", mode="overwrite")
# creates people_partitioned_bucketed folder in spark-warehouse directory

## Caching and caching storage levels

Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small “hot” dataset or when running an iterative algorithm like PageRank.

- Proper caching is the key to high performance Spark.
- Cache a DataFrame when it is used multiple times in the script.
- The DataFrame is only cached after the first action such as *count()*.
- Apache Spark will only cache the rows that are pulled by the action, this means that it will cache as many partitions as it has to read during the action.

In [109]:
iris.cache()

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string]

In [110]:
iris.count()

150

**Checking** if the DataFrame is cached

- **StorageLevel** describes how an RDD/DataFrame is persisted.
- The default storage level has changed to **MEMORY_AND_DISK** to match Scala in 2.0.

In [111]:
iris.storageLevel

StorageLevel(True, True, False, True, 1)

```python
StorageLevel(disk=false, memory=false, offheap=false, deserialized=false, replication=1)
```

**StorageLevel() Parameters:**

- **useMemory:** Boolean - use memory for data storage using *useMemory* flag. <br>
- **useDisk:** Boolean - use disk for data storage using *useDisk* flag. <br>
- **deserialized:** Boolean - store data in deserialized format using *deserialized* flag.<br>
- **replication:** Int - replicate the data to other block managers using *replication* property.<br>

**Releasing memory** after using caching

In [112]:
iris.unpersist()
iris.storageLevel

StorageLevel(False, False, False, False, 1)

It is also possible to *remove all cached tables* from the in-memory cache:

```python
sqlContext.clearCache()
```

---

# Exercises

**RDD**
1. Create a **txt file** with the following line (heights): *1.79, 1.60, 1.89, 2.01, 2.32, 1.58, 1.47, 1.56*
2. Create **load** the file into a RDD.
3. Get the heights **above 2.0**.
4. Save the output into a **txt file**.

**DataFrame**
1. Load the Auto MPG Data Set ([here](https://archive.ics.uci.edu/ml/datasets/Auto+MPG)) into a DataFrame.
2. Verify the number of rows and columns/features. 
3. Verify the data types, and if there is something wrong with them, please, fix it.
4. Are there missing values?
5. If yes, try to solve such a problem by filling up.
4. Get a summary of the numerical values.

## Challenge: MovieLens (1m)

Download the [MovieLens](http://grouplens.org/datasets/movielens/) dataset.

**Tasks:**
1. Which are the Top 10 best rated movies (with at least 10 total ratings)?
2. Persons of which age group give the most ratings overall?
3. Persons of which occupation give the best average ratings for comedies?
4. Which single genre in average is the best rated by male persons?
5. Based on the average rating: Would you say the common saying that females love romantic movies better than males is true?

---

# References

1. Kumar, R., 2018. ***PySpark Recipes***. Apress.
2. Tomasz, D., 2017. ***Learning PySpark***. Packt Publishing.
3. Spark.apache.org. (2018). ***Apache Spark™ - Unified Analytics Engine for Big Data***. [online] Available at: http://spark.apache.org/ [Accessed 25 Sep. 2018].
4. Spark.apache.org. (2018). ***GraphX | Apache Spark***. [online] Available at: http://spark.apache.org/graphx/ [Accessed 25 Sep. 2018].
5. Spark.apache.org. (2018). ***Spark Streaming - Spark 2.3.1 Documentation***. [online] Available at: https://spark.apache.org/docs/latest/streaming-programming-guide.html [Accessed 25 Sep. 2018].
6. DeZyre. (2018). **Apache Spark Architecture Explained in Detail**. [online] Available at: https://www.dezyre.com/article/apache-spark-architecture-explained-in-detail/338 [Accessed 26 Sep. 2018].
7. Data-flair.training. (2018). Apache Spark Ecosystem – Complete Spark Components Guide – DataFlair. [online] Available at: https://data-flair.training/blogs/apache-spark-ecosystem-components/ [Accessed 1 Oct. 2018].
8. Laskowski, J. (2018). ***StorageLevel · Mastering Apache Spark***. [online] Jaceklaskowski.gitbooks.io. Available at: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-StorageLevel.html [Accessed 29 Oct. 2018].