# Structured Data Analysis with Spark SQL

Pour éviter d'avoir à constamment manipuler des jeux de données au format texte, il peut être intéressant de structurer nos données. SparkSQL permet de structurer un jeu de données en définissant un schéma.

- On peut connecter Spark SQL sur une base de données externes, par exemple une base de données PostgreSQL.
- Le `Dataframe` permet d'obtenir le même niveau de performance peu importe le langage utilisée. Les RDD classiques sont normalement plus performant en Java ou en Scala.

## Table of Content

1. [Initialization](#1.-Initialization)  
  1.1 [Spark](#1.1-Spark)  
  1.2 [Spark SQL](#1.2-Spark-SQL)
2. [Dataframe](#2.-Dataframe)  
  2.1 [Reading Data](#2.1-Reading-Data)  
  2.2 [Structuring Data](#2.2-Structuring-Data)  
  2.3 [Creating a Dataframe](#2.3-Creating-a-Dataframe)  
  2.4 [Registering a Table](#2.4-Registering-a-Table)  
  2.5 [Querying Data](#2.5-Querying-Data)  
  2.6 [Aggregating Results](#2.6-Aggregating-Results)
3. [Writing Results to Disk](#3.-Writing-Results-to-Disk)
4. [Ending Spark SQL Analysis](#4.-Ending-Spark-SQL-Analysis)
5. [Recap](#Recap)
6. [References](#References)

## List of Exercises

1. [Exercise 1: Initialize Spark](#Exercise-1)
2. [Exercise 2: ](#Exercise-2)
3. [Exercise 3: ](#Exercise-3)
4. [Exercise 4: ](#Exercise-4)
5. [Exercise 5: ](#Exercise-5)
6. [Exercise 6: ](#Exercise-6)
7. [Exercise 7: ](#Exercise-7)
8. [Exercise 8: ](#Exercise-8)

## 1. Initialization

### 1.1 Spark
#### Exercise 1

Import the necessary Python module(s) and create a Spark context. 

**Warning**, verify if there already exist a context and handle possible exceptions.

### 1.2 Spark SQL

We can now import the components that we need to analyze structured data from Spark SQL module `pyspark.sql`.
* `SQLContext`: Main entry point for Spark SQL functionality. It will be used to create Dataframe.

In [None]:
from pyspark.sql import SQLContext

## 2. Dataframe

Textbook definition:    

> A data frame is a table, or two-dimensional array-like structure, in which each column contains measurements on one variable, and each row contains one case.

In Spark, a dataframe is a distributed collection of data grouped into named columns. It is equivalent to a relational table.

### 2.1 Reading Data

#### Exercise 3 

Create an RDD with the dataset we used previously `data/pagecounts`.

In [None]:
pagecounts = sc.textFile('data/pagecounts')

#### Exercise 4

Count the number of elements in the RDD.

#### Exercise 5

Transform the previous RDD into a second one where each field originally separated by white spaces are now elements of a list.

In [None]:
pgsplit = pagecounts.map(str.split)

#### Exercise 6

To validate the transformation, display the first 8 elements of the RDD.

In [None]:
pgsplit.take(8)

#### Exercise 7

As you can see, the third and fourth fields are numbers represented as text. Transform the RDD in order to convert these strings to `int`.

In [None]:
pgsplit = pgsplit.map(lambda x: (x[0], x[1], int(x[2]), int(x[3])))

### 2.2 Structuring Data

Our original dataset is strictly text, we now want to give a structure that is define field name and type.

To define our structure, we use Spark SQL data types that are defined in [`pyspark.spl.types`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types). We import a small subset of type that we need `LongType` and `StringType`.

In [None]:
from pyspark.sql.types import LongType, StringType

Spark SQL also provides two types to defines dataframe structure:
- `StructType`: Data type representing a row of a dataframe.
- `StructField`: Data type representing a field of a row. It is mainly defined by a name and a type.

In [None]:
from pyspark.sql.types import StructType, StructField

Using all these classes, we can define our data schema. The order in the list must correspond to the order in our dataset.

In [None]:
schema = StructType([StructField('lang',    StringType()), 
                     StructField('name',    StringType()), 
                     StructField('request', LongType()), 
                     StructField('size',    LongType())])

### 2.3 Creating a Dataframe

To create a dataframe, we simply need to invoke the `createDataFrame()` method of our Spark SQL Context and provide it an RDD and our data structure (or schema). 

#### Exercise 7

Replace `<FILL IN>` in the following cell by the proper RDD for `data/pagecounts` and run it.

In [None]:
dfPageCounts = sqlCtx.createDataFrame(pgsplit, schema)

We can then manipulate the dataframe using its [`pyspark.sql.DataFrame` API](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame).

We can, for example, show the first lines of the dataframe as a table in ASCII.

In [None]:
dfPageCounts.show()

Dataframe can also be cached in case we plan to do multiple request on it.

In [None]:
dfPageCounts.cache()

### 2.4 Registering a Table

In order to query our dataframe with SQL, we need to register it as a table in the Spark SQL context.

In [None]:
dfPageCounts.registerTempTable("page_table")

### 2.5 Querying Data

We are now able to interogate our data using the Structured Query Language (SQL). The following query request the top 10 most requested page in spanish.

In [None]:
df = sqlCtx.sql("SELECT name, request "
                "FROM page_table "
                "WHERE lang='es' "
                "ORDER BY request DESC "
                "LIMIT 10")

A query on a dataframe is a Spark transformation. Therefore, to compute the result, we need to call an action. 

#### Exercise 8 

Call the right method to display the dataframe resulting from the preceding query.

In [None]:
df.show()

### 2.5.1 SQL 101

We can decompose the preceding query in keywords:

#### SELECT

Indicate which variable we want to collect. The name of variable have been defined when structuring our data in [section 2.2](#2.2-Structuring-Data).

#### FROM

Indicate the source of data. The name of the table has been defined in [section 2.4](#2.4-Registering-a-table).

#### WHERE

Filter the entries based on predicates in function of the variables. 

#### ORDER BY [...] DESC

Indicate we wish to order the resulting dataframe in function of a certain variable, in a certain order. 

#### LIMIT N 

Return only a subset of entries.

### 2.5.2 SQL as an API



In [None]:
dfPageCounts.where("lang = 'en'")\
            .select("name", "request")\
            .orderBy("request", ascending=False)\
            .limit(10).show()

### 2.6 Aggregating Results

On remarque cependant que certaines page reviennent plusieurs fois dans notre palmarès. La raison est qu'on a omis d'additionner le nombre de vues pour une même page. Il faut effectuer une opération d'aggrégation `GROUP` et `SUM`.

In [None]:
sqlCtx.sql("SELECT name, SUM(pagecount) as sumation "
           "FROM page_table "
           "WHERE request>=100 "
           "AND lang='es' "
           "GROUP BY name "
           "ORDER BY sumation DESC "
           "LIMIT 10").collect()

#### Aggregating Using the API

In [None]:
dfPageCounts.where("lang = 'en'")\
            .select("name", "request")\
            .groupBy('name')\
            .agg({'request' : 'sum'})\
            .orderBy("sum(request)", ascending=False)\
            .limit(10).show()

10- **EXERCICE** Pour vous convaincre de l'utilité de SparkSQL pour simplifier l'analyse de données, écrivez le code nécessaire en utilisant les méthode de transformation des RDD de base (`map`, `filter`, `reduce`, etc.) et d'action (`first`, `collect`, `take`) pour produire le même résultat que la requête SQL précédente.

Utilisez le RDD que vous avez créez au début du notebook comme point de départ.

## 3. Writing Results to Disk

### 3.1 Apache Parquet

11- Pour éviter d'avoir à restructer nos données à chaque fois, on peut sauvegarder les au format [Apache Parquet](https://parquet.apache.org/). Le format va conserver le schéma et l'ordre des données intact.

In [None]:
dfPageCounts.write.parquet("data/pagecounts.parquet")

### 3.2 Reading back the Results

12- On peut ensuite facilement créer un nouveau `DataFrame` en lisant nos fichiers au format Parquet.

In [None]:
pagecount_parq = sqlCtx.read.parquet("data/pagecounts.parquet")
pagecount_parq.first()

13- Finalement, on arrête le contexte Spark.

## 4. Ending Spark SQL Analysis

Spark SQL's context do not need to be terminated prior to leaving the notebook.

#### Exercise XXX

Terminate the Spark Context.

In [None]:
sc.stop()

## 5. Recap

1. Créez un nouveu notebook.
1. Créez un nouveau contexte Spark et un contexte SQL.
1. Créez un RDD à partir des données d'entrée `data/pagecounts.parquet`.
2. Transformez le RDD en un RDD contenant la taille totale des pages vues par langue  
    1. À l'aide des méthodes Spark  
    2. À l'aide d'une requête SQL
3. Limitez le contenu du RDD au 3 langues les moins populaires 
4. Affichez le résultat.

## References