# Structured Data Analysis with Spark SQL

Big Data is not only raw text files, a lot of datasets available are structured as table and even raw text files have a underlying structure. Spark SQL allow to query structured datasets, relational database and also provide an API to structure data.

## Table of Content

1. [Initialization](#1.-Initialization)  
  1.1 [Spark](#1.1-Spark)  
  1.2 [Spark SQL](#1.2-Spark-SQL)
2. [Dataframe](#2.-Dataframe)  
  2.1 [Reading Data](#2.1-Reading-Data)  
  2.2 [Structuring Data](#2.2-Structuring-Data)  
  2.3 [Creating a Dataframe](#2.3-Creating-a-Dataframe)  
  2.4 [Registering a Table](#2.4-Registering-a-Table)  
  2.5 [Querying Data](#2.5-Querying-Data)  
  2.6 [Aggregating Results](#2.6-Aggregating-Results)
3. [Writing Results to Disk](#3.-Writing-Results-to-Disk)  
  3.1 [Apache Parquet](#3.1-Apache-Parquet)  
  3.2 [Other Formats](#3.2-Other-Formats)    
4. [Reading the Dataframe from Storage](#4.-Reading-the-Dataframe-from-Storage)  
  4.1 [Reading CSV Files](#4.1-Reading-CSV-Files)
5. [Manipulating Data in a DataFrame](#5.-Manipulating-Data-in-DataFrames)
4. [Ending Spark SQL Analysis](#6.-Ending-Spark-SQL-Analysis)
6. [Recap](#7.-Recap)
7. [References](#References)

## List of Exercises

1. [Exercise 1: Initialize Spark](#Exercise-1)
2. [Exercise 2: Create an RDD](#Exercise-2)
3. [Exercise 3: Count Elements](#Exercise-3)
4. [Exercise 4: Transform an RDD](#Exercise-4)
5. [Exercise 5: Validate Transformation](#Exercise-5)
6. [Exercise 6: Type RDD fields](#Exercise-6)
7. [Exercise 7: Return on key-value notions](#Exercise-7)
7. [Exercise 8: Create a Dataframe](#Exercise-8)
8. [Exercise 9: Display a Dataframe](#Exercise-9)
9. [Exercise 10: Create a Dataframe from a CSV](#Exercise-10)
9. [Exercise 11: Print the Dataframe's schema](#Exercise-11)
9. [Exercise 12: Aggregate the core-hours per PI](#Exercise-12)
9. [Exercise 13: End the Context](#Exercise-13)

## 1. Initialization

### 1.1 Spark
#### Exercise 1

Import the necessary Python module(s) and create a Spark context. 

**Warning**, verify if there exists a context and handle possible exceptions.

### 1.2 Spark SQL

We can now import the components that we need to analyze structured data from Spark SQL module `pyspark.sql`.
* `SQLContext`: Main entry point for Spark SQL functionality. It will be used to create Dataframe.

In [None]:
from pyspark.sql import SQLContext

Spark SQL Context requires a Spark Context as sole argument.

In [None]:
sqlContext = SQLContext(sc)

## 2. Dataframe

Textbook definition:    

> A data frame is a table, or two-dimensional array-like structure, in which each column contains measurements on one variable, and each row contains one case.

In Spark, a dataframe is a distributed collection of data grouped into named columns. It is equivalent to a relational table.

### 2.1 Reading Data

#### Exercise 2

Create an RDD with the dataset found in `/scratch/formation/spark/data/pagecounts`.

#### Exercise 3

Count the number of elements in the RDD.

#### Exercise 4

Transform the previous RDD into a second one where each field originally separated by white spaces are now elements of a list.

#### Exercise 5

To validate the transformation, display the first 8 elements of the RDD.

#### Exercise 6

As you can see, the third and fourth fields are numbers represented as text. Transform the RDD in order to convert these strings to `int`.

### 2.2 Structuring Data

Our original dataset is strictly text, we now want to give a structure that is define field name and type.


A pagecounts file looks like this
```
af Spesiaal:Onlangse_wysigings 3 101681
af Spesiaal:RecentChanges 2 2248
af Suid-Afrika 1 30698
af Tuisblad 14 155257 
af Varkgriep 4 42236
af Wikipedia 2 32796
```

It is a tabular file, where each line is a distinct entry and the columns represent

1. the project name (language);
2. the page title;
3. the number of requests;
4. the size of the content returned.

To define our structure, we use Spark SQL data types that are defined in [`pyspark.sql.types`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types). We import a small subset of type that we need `LongType` and `StringType`.

In [None]:
from pyspark.sql.types import LongType, StringType, IntegerType

Spark SQL also provides two types to defines dataframe structure:
- `StructType`: Data type representing a row of a dataframe.
- `StructField`: Data type representing a field of a row. It is mainly defined by a name and a type.

In [None]:
from pyspark.sql.types import StructType, StructField

Using all these classes, we can define our data schema. The order in the list must correspond to the order in our dataset.

In [None]:
schema = StructType([StructField('lang',    StringType()), 
                     StructField('name',    StringType()), 
                     StructField('request', LongType()), 
                     StructField('size',    LongType())])

### 2.3 Creating a Dataframe

To create a dataframe, we simply need to invoke the `createDataFrame()` method of our Spark SQL Context and provide it an RDD and our data structure (or schema). 

#### Exercise 8

Replace `<FILL IN>` in the following cell by the proper RDD for `data/pagecounts`, and the second `<FILL IN>` by the method call to persist the dataframe in memory. **Hint**: it is the same method for RDDs.

In [None]:
dfPageCounts = sqlContext.createDataFrame(<FILL IN>, schema).<FILL IN>()

We can then manipulate the dataframe using its [`pyspark.sql.DataFrame` API](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame).

We can, for example, show the first lines of the dataframe as a table in ASCII.

In [None]:
dfPageCounts.show()

### 2.4 Registering a Table

In order to query our dataframe with SQL, we need to register it as a table in the Spark SQL context.

In [None]:
dfPageCounts.registerTempTable("page_table")

### 2.5 Querying Data

We are now able to interogate our data using the Structured Query Language (SQL). The following query request the top 10 most requested page in spanish.

In [None]:
df = sqlContext.sql("SELECT name, request "
                    "FROM page_table "
                    "WHERE lang='es' "
                    "ORDER BY request DESC "
                    "LIMIT 10")

A query on a dataframe is a Spark transformation. Therefore, to compute the result, we need to call an action. 

#### Exercise 9

Call the right method to display the dataframe resulting from the preceding query.

### 2.5.1 SQL 101

We can decompose the preceding query in keywords:

#### SELECT

Indicate which variable we want to collect. The name of variable have been defined when structuring our data in [section 2.2](#2.2-Structuring-Data).

#### FROM

Indicate the source of data. The name of the table has been defined in [section 2.4](#2.4-Registering-a-table).

#### WHERE

Filter the entries based on predicates in function of the variables. 

#### ORDER BY [...] DESC

Indicate we wish to order the resulting dataframe in function of a certain variable, in a certain order. 

#### LIMIT N 

Return only a subset of entries.

### 2.5.2 SQL as an API

[Dataframe API](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame) includes methods that are named after SQL keywords. These methods can be used instead of the query language. However, the order does not match exactly as the query is a single statement, the API will return a dataframe after each method call. These calls can be chained to build a similar pipeline.

The following example extract and show the 10 most requested pages in English on Wikipedia.

In [None]:
dfPageCounts.where("lang = 'en'")\
            .select("name", "request")\
            .orderBy("request", ascending=False)\
            .limit(10).show()

### 2.6 Aggregating Results

If we look back at the preceding results, we realise that some names appear multiple time. The reason is that we have omitted to sum the number of requests per page. We need to aggregate the page entries by name and sum the requests.

To do this, we indicate in our query to sum the request (`SUM(request)`) for entries grouped by name (`GROUP BY NAME`).

In [None]:
sqlContext.sql("SELECT name, SUM(request)"
               "FROM page_table "
               "WHERE lang='en' "
               "GROUP BY name "
               "ORDER BY SUM(request) DESC "
               "LIMIT 10").show()

#### Aggregating Using the API

In [None]:
dfPageCounts.where("lang = 'en'")\
            .select("name", "request")\
            .groupBy('name')\
            .agg({'request' : 'sum'})\
            .orderBy("sum(request)", ascending=False)\
            .limit(10).show()

## 3. Writing Results to Disk

To avoid structuring our data each time, we can save the resulting dataframe to disk. This will preserve the schema and the data order.

### 3.1 Apache Parquet

[Apache Parquet](https://parquet.apache.org/) is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language. The format is fairly popular with the Spark community as it is efficient and easy to use.

In [None]:
dfPageCounts.write.parquet("pagecounts.parquet")

### 3.2 Other Formats

Dataframe can also be written as JSON file: 

In [None]:
dfPageCounts.write.json('pagecounts.json')

It can also save the dataframe as a table in [Apache Hive](http://hive.apache.org/) or a database. 

## 4. Reading the Dataframe from Storage

A new Dataframe can be created by reading back from a file in one of the format we mentionned.

For example, to create a dataframe from a Parquet file:

In [None]:
pagecount_parq = sqlContext.read.parquet("pagecounts.parquet")
pagecount_parq.show()

### 4.1 Reading CSV Files

Datasets are often found as CSV files. 

Spark 2.0, which you are currently using, has direct support for CSV files.

In [None]:
df = sqlContext.read\
               .csv(path='/scratch/formation/spark/data/store/employees.tsv',
                    header=True, inferSchema=True, sep='\t')

Once the dataframe is loaded, we can verify that the columns were correctly named and their type correctly infered.

In [None]:
df.printSchema()

If we are satisfied with the schema, we can start manipulating our dataframe or show its content

In [None]:
df.show()

## 5. Manipulating Data in DataFrames

To demonstrate how data in a dataframe can be manipulated with Spark, we are going to pursue this lesson with a more complex dataset. We are going to create a dataframe using our dataset of Moab log from lesson 2.

Here are the informations regarding this dataset:  
- path: '/scratch/formation/spark/data/moab/raw/*';
- the csv does not have a column header;
- each field is separated by a space;
- we would like Spark to infer the schema.

#### Exercise 10

Based on what you learned in 4.1 and the information above, create a new dataframe.

#### Exercise 11

Look at the schema inferred by Spark, does it correspond to the one we declared in lesson 2? 

Often, the data source will be malformed somehow. It will either include corrupted entries which format is unexpected. This is the case of Moab log data. Some lines do not correspond to job events and therefore do not have the right format. Spark allows through an argument of the csv method to indicate which behaviour we want to adopt with malformed entries.

The argument is named `mode` and can take three values `PERMISSIVE`, `DROPMALFORMED` and `FAILFAST`. We will experiment each behavior ourselves next.

**PERMISSIVE mode**

In [None]:
sqlContext.read.csv(path='/scratch/formation/spark/data/moab/raw/*', 
                    header=False,
                    sep=' ',
                    inferSchema=True,
                    mode='PERMISSIVE').show(n=5)

**DROPMALFORMED mode**

In [None]:
sqlContext.read.csv(path='/scratch/formation/spark/data/moab/raw/*', 
                    header=False,
                    sep=' ',
                    inferSchema=True,
                    mode='DROPMALFORMED').show(n=5)

**FAILFAST mode**

In [None]:
sqlContext.read.csv(path='/scratch/formation/spark/data/moab/raw/*', 
                    header=False,
                    sep=' ',
                    inferSchema=True,
                    mode='FAILFAST').show(n=5)

Based on the format of our data, Spark can sometime have a hardtime inferring the right schema. Furthermore, when our dataset do not have an header line, we will need to specify the column name manually. This can all be done using `StructType`  and `StructField` as we have seen previously.

In the next cell we define the column names and associated to each a column a type. This schema will then be used when calling the `read.csv` function.

In [None]:
columns = ['event_time', 'event_epoch', 'event_type', 'job_id', 'job_event',
           'n_nodes', 'n_cores', 'username', 'walltime',
           'status', 'queue', 'submit_time', 'dispatch_time', 'start_time',
           'end_time', 'project_id']
types = [StringType(), StringType(), StringType(), IntegerType(), StringType(),
         IntegerType(), IntegerType(), StringType(), IntegerType(),
         StringType(), StringType(), IntegerType(), IntegerType(), IntegerType(),
         IntegerType(), StringType()]
schema = StructType([StructField(name, type_) for name, type_ in zip(columns, types)])

Since we know our schema, we will instruct Spark to drop any line that do not fit it. This will have for effect to keep only the job event entries.

In [None]:
df = sqlContext.read.csv(path='/scratch/formation/spark/data/moab/raw/*', 
                    header=False,
                    sep=' ',
                    schema=schema,
                    mode='DROPMALFORMED')

### 5.1 Enhancing our dataset by derivating columns

Once we are guaranteed that each entries in the dataset respect our schema, we can transforms the data and derivate column. Like RDD, Spark's DataFrame are immutable. Therefore, if we wish to add a column or drop one, we will need to create a new DataFrame.

The following cell computes a column containing the core-hours metric per job.

In [None]:
df2 = df.withColumn('core_hours', (df['end_time'] - df['start_time']) / 3600. * df['n_cores'])

In [None]:
df2.show(n=5)

The core-hours metric do not make sense if the job is not completed. We would therefore prefer to only compute it on entries which status is `Completed`. To do this, we will create a Spark *User Defined Function* or *UDF*. We start by defining a function which take each column required by the computation in input and return the core-hours if the status is "completed". Otherwise the function returns None.

In [None]:
def completed_core_hours(start_time, end_time, n_cores, status):
    if status == 'Completed':
        return (end_time - start_time) / 3600. * n_cores
    else:
        return None

We then define the UDF using Spark's function `udf` which takes in argument a function and the type of the objects that will be returned by the function.

In [None]:
u_completed_core_hours = pyspark.sql.functions.udf(completed_core_hours, pyspark.sql.types.DoubleType())

We can then apply this function to each entry and add the result as a new column in a new dataframe.

In [None]:
df2 = df.withColumn('core_hours', u_completed_core_hours(df['start_time'],
                                                         df['end_time'],
                                                         df['n_cores'],
                                                         df['status']))

As we can see in the next cell, only the entries where the job was completed had their core-hours metric computed.

In [None]:
df2.show(n=5)

While easy to use, UDF should be used as a last resort to transform columns. Spark SQL proposes a wide list of functions to transforms columns. Find out more [here](https://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html#module-pyspark.sql.functions).

#### Exercise 12

Compute the total core-hours per PI. A PI is represented by the 7 first characters of `project_id`.

## 6. Ending Spark SQL Analysis

Spark SQL's context do not need to be terminated prior to leaving the notebook.

#### Exercise 13

Terminate the Spark Context.

## 7. Recap

In this notebook, we put in practice and learned about the following parts of 
**[Python Spark SQL API](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html)**:
1. Import Spark SQL Python module: 
**[`import pyspark.sql`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html)**
2. Create a Spark SQLContext:
**[`pyspark.sql.SQLContext()`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext)**
3. Create an RDD from text files:
**[`SparkContext.textFile(path)`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.textFile)**
4. Count the number of elements in a RDD: 
**[`Rdd.count()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.count)**
5. Apply a transformation on each element of a RDD:
**[`RDD.map(func)`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.map)**
6. Take a the first *num* elements from an RDD: 
**[`Rdd.take(num)`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.take)**
7. Structure and type data fields: **[`pyspark.sql.types`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types)**
8. Create a dataframe from an RDD: **[`SQLContext.createDataFrame(RDD)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.createDataFrame)**
9. Print the first *n* rows of a dataframe: **[`Dataframe.show(n)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show)**
10. Register a dataframe as a temporary table: **[`DataFrame.registerTempTable(name)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.registerTempTable)**
11. Query a context's registered table: **[`SQLContext.sql(name)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.sql)**
12. Use the SQL API of a dataframe: **[`DataFrame`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)**
13. Write a dataframe as Parquet file: **[`DataFrame.write.parquet(path)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.parquet)**
14. Write a dataframe as JSON file: **[`DataFrame.write.json(path)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.json)**
15. Read a dataframe from a Parquet file: **[`DataFrame.read.parquet(path)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.parquet)**  
16. Read a dataframe from a CSV file: **[`DataFrame.read.csv(path)`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv)**
10. End the SparkContext:
**[`SparkContext.stop()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.stop)**

## 8. References

* [Berkeley AmpCamp 5 - Data Exploration Using Spark SQL](http://ampcamp.berkeley.edu/5/exercises/data-exploration-using-spark-sql.html)
* [edX - Introduction to Big Data with Apache Spark](https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x)
* [edX - Introduction to Big Data with Apache Spark (Github repo)](https://github.com/spark-mooc/mooc-setup)