# Spark DataFrames

- Enable wider audiences beyond “Big Data” engineers to leverage the power of distributed processing
- Inspired by data frames in R and Python (Pandas)
- Designed from the ground-up to support modern big
data and data science applications
- Extension to the existing RDD API

## References
- [Spark SQL, DataFrames and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)
- [Introduction to DataFrames - Python](https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html)
- [PySpark Cheat Sheet: Spark DataFrames in Python](https://www.datacamp.com/community/blog/pyspark-sql-cheat-sheet)

### DataFrames are :
- The preferred abstraction in Spark
- Strongly typed collection of distributed elements 
- Built on Resilient Distributed Datasets (RDD)
- Immutable once constructed

### With Dataframes you can :
- Track lineage information to efficiently recompute lost data 
- Enable operations on collection of elements in parallel

### You construct DataFrames
- by parallelizing existing collections (e.g., Pandas DataFrames) 
- by transforming an existing DataFrames
- from files in HDFS or any other storage system (e.g., Parquet)

### Features
- Ability to scale from kilobytes of data on a single laptop to petabytes on a large cluster
- Support for a wide array of data formats and storage systems
- Seamless integration with all big data tooling and infrastructure via Spark
- APIs for Python, Java, Scala, and R

### DataFrames versus RDDs
- Nice API for new users familiar with data frames in other programming languages.
- For existing Spark users, the API will make Spark easier to program than using RDDs
- For both sets of users, DataFrames will improve performance through intelligent optimizations and code-generation

## PySpark Shell

**Run the Spark shell:**

~~~ bash
pyspark
~~~

Output similar to the following will be displayed, followed by a `>>>` REPL prompt:

~~~
Python 3.6.5 |Anaconda, Inc.| (default, Apr 29 2018, 16:14:56)
[GCC 7.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
2018-09-18 17:13:13 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.1
      /_/

Using Python version 3.6.5 (default, Apr 29 2018 16:14:56)
SparkSession available as 'spark'.
>>>
~~~

Read data and convert to Dataset

~~~ py
df = sqlContext.read.csv("data/AnaDo_JeuDonnees_TemperatFrance.csv", sep=';', header=True)
~~~

~~~
>>> df.show()
+---+--------------------+------------+------+------------+--------+---+-----------+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+----+------+
|        _c0|Janv|Fevr|Mars|Avri| Mai|Juin|juil|Aout|Sept|Octo|Nove|Dece| Lati| Long| Moye|Ampl|Region|
+-----------+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+----+------+
|   Bordeaux| 5.6| 6.6|10.3|12.8|15.8|19.3|20.9|  21|18.6|13.8| 9.1| 6.2| 44.5|-0.34|13.33|15.4|    SO|
|      Brest| 6.1| 5.8| 7.8| 9.2|11.6|14.4|15.6|  16|14.7|  12|   9|   7|48.24|-4.29|10.77|10.2|    NO|
|   Clermont| 2.6| 3.7| 7.5|10.3|13.8|17.3|19.4|19.1|16.2|11.2| 6.6| 3.6|45.47| 3.05|10.94|16.8|    SE|
|   Grenoble| 1.5| 3.2| 7.7|10.6|14.5|17.8|20.1|19.5|16.7|11.4| 6.5| 2.3| 45.1| 5.43|10.98|18.6|    SE|
|      Lille| 2.4| 2.9|   6| 8.9|12.4|15.3|17.1|17.1|14.7|10.4| 6.1| 3.5|50.38| 3.04| 9.73|14.7|    NE|
|       Lyon| 2.1| 3.3| 7.7|10.9|14.9|18.5|20.7|20.1|16.9|11.4| 6.7| 3.1|45.45| 4.51|11.36|18.6|    SE|

only showing top 20 rows
~~~

## Transformations, Actions, Laziness

Like RDDs, DataFrames are lazy. Transformations contribute to the query plan, but they don't execute anything.
Actions cause the execution of the query.

### Transformation examples
- filter
- select
- drop
- intersect 
- join
### Action examples
- count 
- collect 
- show 
- head
- take

## Creating a DataFrame in Python

In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/88/01/a37e827c2d80c6a754e40e99b9826d978b55254cc6c6672b5b08f2e18a7f/pyspark-2.4.0.tar.gz (213.4MB)
[K    18% |██████                          | 39.9MB 5.0MB/s eta 0:00:355 0% |▏                               | 778kB 1.1MB/s eta 0:03:18

In [2]:
from pyspark import SparkContext, SparkConf, SQLContext
# The following three lines are not necessary
# in the pyspark shell
conf = SparkConf().setAppName("people").setMaster("local[*]") 
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [4]:
df = sqlContext.read.json("../data/employees.json")

df.show(5)

+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+



## Schema Inference

In this exercise, let's explore schema inference. We're going to be using a file called `AnaDo_JeuDonnees_TemperatFrance.csv`. The data is structured, but it has no self-describing schema. And, it's not JSON, so Spark can't infer the schema automatically. Let's create an RDD and look at the first few rows of the file.

In [5]:
#File adresse from the spark context
rdd = sc.textFile("../data/AnaDo_JeuDonnees_TemperatFrance.csv")
for line in rdd.take(10):
  print(line)

;Janv;F�vr;Mars;Avri;Mai;Juin;juil;Ao�t;Sept;Octo;Nove;D�ce;Lati;Long;Moye;Ampl;R�gion
Bordeaux;5.6;6.6;10.3;12.8;15.8;19.3;20.9;21;18.6;13.8;9.1;6.2;44.5;-0.34;13.33;15.4;SO
Brest;6.1;5.8;7.8;9.2;11.6;14.4;15.6;16;14.7;12;9;7;48.24;-4.29;10.77;10.2;NO
Clermont;2.6;3.7;7.5;10.3;13.8;17.3;19.4;19.1;16.2;11.2;6.6;3.6;45.47;3.05;10.94;16.8;SE
Grenoble;1.5;3.2;7.7;10.6;14.5;17.8;20.1;19.5;16.7;11.4;6.5;2.3;45.1;5.43;10.98;18.6;SE
Lille;2.4;2.9;6;8.9;12.4;15.3;17.1;17.1;14.7;10.4;6.1;3.5;50.38;3.04;9.73;14.7;NE
Lyon;2.1;3.3;7.7;10.9;14.9;18.5;20.7;20.1;16.9;11.4;6.7;3.1;45.45;4.51;11.36;18.6;SE
Marseille;5.5;6.6;10;13;16.8;20.8;23.3;22.8;19.9;15;10.2;6.9;43.18;5.24;14.23;17.8;SE
Montpellier;5.6;6.7;9.9;12.8;16.2;20.1;22.7;22.3;19.3;14.6;10;6.5;43.36;3.53;13.89;17.1;SE
Nantes;5;5.3;8.4;10.8;13.9;17.2;18.8;18.6;16.4;12.2;8.2;5.5;47.13;-1.33;11.69;13.8;NO


## Hands-on Exercises

You can look at the <a href="http://spark.apache.org/docs/2.3.1/api/python/index.html" target="_blank">DataFrames API documentation</a> 

Let's take a look to file `
AnaDo_JeuDonnees_TemperatFrance.csv`. Each line consists 
of the same information about a person:

* city
* tempJan - tempDec
* latitude
* logitude
* moyenne 
* ampl
* region

In [7]:
from collections import namedtuple

rdd = sc.textFile("../data/AnaDo_JeuDonnees_TemperatFrance.csv")

City = namedtuple('City',['Name','Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jui', 'Aug',
                             'Sep', 'Oct', 'Nov', 'Dec', 'Lat', 'Long',
                               'Moy', 'Ampl', 'Region'])
            
def map_to_city(line):
  cols = line.split(";")
  return City(Name       = cols[0],
              Jan        = cols[1],
              Feb        = cols[2],
              Mar        = cols[3],
              Apr        = cols[4],
              May        = cols[5],
              Jun        = cols[6],
              Jui        = cols[7],
              Aug        = cols[8],
              Sep        = cols[9],
              Oct        = cols[10],
              Nov        = cols[11],
              Dec        = cols[12],
              Lat        = cols[13],
              Long       = cols[14],
              Moy        = cols[15], 
              Ampl       = cols[16],
              Region     = cols[17])
    
city_rdd = rdd.map(map_to_city)
df = city_rdd.toDF()

In [8]:
df.show()

+-----------+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+----+------+
|       Name| Jan| Feb| Mar| Apr| May| Jun| Jui| Aug| Sep| Oct| Nov| Dec|  Lat| Long|  Moy|Ampl|Region|
+-----------+----+----+----+----+----+----+----+----+----+----+----+----+-----+-----+-----+----+------+
|           |Janv|F�vr|Mars|Avri| Mai|Juin|juil|Ao�t|Sept|Octo|Nove|D�ce| Lati| Long| Moye|Ampl|R�gion|
|   Bordeaux| 5.6| 6.6|10.3|12.8|15.8|19.3|20.9|  21|18.6|13.8| 9.1| 6.2| 44.5|-0.34|13.33|15.4|    SO|
|      Brest| 6.1| 5.8| 7.8| 9.2|11.6|14.4|15.6|  16|14.7|  12|   9|   7|48.24|-4.29|10.77|10.2|    NO|
|   Clermont| 2.6| 3.7| 7.5|10.3|13.8|17.3|19.4|19.1|16.2|11.2| 6.6| 3.6|45.47| 3.05|10.94|16.8|    SE|
|   Grenoble| 1.5| 3.2| 7.7|10.6|14.5|17.8|20.1|19.5|16.7|11.4| 6.5| 2.3| 45.1| 5.43|10.98|18.6|    SE|
|      Lille| 2.4| 2.9|   6| 8.9|12.4|15.3|17.1|17.1|14.7|10.4| 6.1| 3.5|50.38| 3.04| 9.73|14.7|    NE|
|       Lyon| 2.1| 3.3| 7.7|10.9|14.9|18.5|20.7|20.1|16.9|11.4| 

### Schema

In [9]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Jan: string (nullable = true)
 |-- Feb: string (nullable = true)
 |-- Mar: string (nullable = true)
 |-- Apr: string (nullable = true)
 |-- May: string (nullable = true)
 |-- Jun: string (nullable = true)
 |-- Jui: string (nullable = true)
 |-- Aug: string (nullable = true)
 |-- Sep: string (nullable = true)
 |-- Oct: string (nullable = true)
 |-- Nov: string (nullable = true)
 |-- Dec: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Moy: string (nullable = true)
 |-- Ampl: string (nullable = true)
 |-- Region: string (nullable = true)



### display

In [10]:
display(df)

DataFrame[Name: string, Jan: string, Feb: string, Mar: string, Apr: string, May: string, Jun: string, Jui: string, Aug: string, Sep: string, Oct: string, Nov: string, Dec: string, Lat: string, Long: string, Moy: string, Ampl: string, Region: string]

### select

In [11]:
df.select(df["Name"], df["Moy"], df["Region"])

DataFrame[Name: string, Moy: string, Region: string]

In [12]:
df.select(df["Name"], df["Moy"], df["Region"]).show()

+-----------+-----+------+
|       Name|  Moy|Region|
+-----------+-----+------+
|           | Moye|R�gion|
|   Bordeaux|13.33|    SO|
|      Brest|10.77|    NO|
|   Clermont|10.94|    SE|
|   Grenoble|10.98|    SE|
|      Lille| 9.73|    NE|
|       Lyon|11.36|    SE|
|  Marseille|14.23|    SE|
|Montpellier|13.89|    SE|
|     Nantes|11.69|    NO|
|       Nice|14.84|    SE|
|      Paris|11.18|    NE|
|     Rennes|11.13|    NO|
| Strasbourg| 9.72|    NE|
|   Toulouse|12.68|    SO|
|      Vichy|10.72|    SE|
+-----------+-----+------+



### filter

In [13]:
df.filter(df["Region"] == "NE").show()

+----------+---+---+---+----+----+----+----+----+----+----+---+---+-----+----+-----+----+------+
|      Name|Jan|Feb|Mar| Apr| May| Jun| Jui| Aug| Sep| Oct|Nov|Dec|  Lat|Long|  Moy|Ampl|Region|
+----------+---+---+---+----+----+----+----+----+----+----+---+---+-----+----+-----+----+------+
|     Lille|2.4|2.9|  6| 8.9|12.4|15.3|17.1|17.1|14.7|10.4|6.1|3.5|50.38|3.04| 9.73|14.7|    NE|
|     Paris|3.4|4.1|7.6|10.7|14.3|17.5|19.1|18.7|  16|11.4|7.1|4.3|48.52| 2.2|11.18|15.7|    NE|
|Strasbourg|0.4|1.5|5.6| 9.8|  14|17.2|  19|18.3|15.1| 9.5|4.9|1.3|48.35|7.45| 9.72|18.6|    NE|
+----------+---+---+---+----+----+----+----+----+----+----+---+---+-----+----+-----+----+------+



### filter + select

In [14]:
df2 = df.filter(df["Region"] == "NE").select(df['Name'].substr(1,1),df['Moy'])
df2.show()


+---------------------+-----+
|substring(Name, 1, 1)|  Moy|
+---------------------+-----+
|                    L| 9.73|
|                    P|11.18|
|                    S| 9.72|
+---------------------+-----+



In [None]:
df2.show()

### orderBy

In [15]:
(df.filter(df["Region"] == "NE")
   .select(df['Name'],df['Moy'])
   .orderBy("Moy")).show()

+----------+-----+
|      Name|  Moy|
+----------+-----+
|     Paris|11.18|
|Strasbourg| 9.72|
|     Lille| 9.73|
+----------+-----+



### groupBy

In [16]:
df.groupby(df["Region"])

<pyspark.sql.group.GroupedData at 0x7fd514bb1780>

In [17]:
df.groupby(df["Region"]).count().show()

+------+-----+
|Region|count|
+------+-----+
|    SO|    2|
|    NE|    3|
|R�gion|    1|
|    SE|    7|
|    NO|    3|
+------+-----+



WARNING: Don't confuse GroupedData.count() with DataFrame.count(). GroupedData.count() is not an action. DataFrame.count() is an action.

In [18]:
df.filter(df["Region"] == "NE").count()

3

In [19]:
df.filter(df["Region"] == "NE").select("name").show()

+----------+
|      name|
+----------+
|     Lille|
|     Paris|
|Strasbourg|
+----------+



In [20]:
df.groupBy(df["Region"]).count().show()

+------+-----+
|Region|count|
+------+-----+
|    SO|    2|
|    NE|    3|
|R�gion|    1|
|    SE|    7|
|    NO|    3|
+------+-----+



In [21]:
df.select(df['Name'],df['Moy']).show()

+-----------+-----+
|       Name|  Moy|
+-----------+-----+
|           | Moye|
|   Bordeaux|13.33|
|      Brest|10.77|
|   Clermont|10.94|
|   Grenoble|10.98|
|      Lille| 9.73|
|       Lyon|11.36|
|  Marseille|14.23|
|Montpellier|13.89|
|     Nantes|11.69|
|       Nice|14.84|
|      Paris|11.18|
|     Rennes|11.13|
| Strasbourg| 9.72|
|   Toulouse|12.68|
|      Vichy|10.72|
+-----------+-----+



### Exercises

- How many cities from the South-East region ?
- How many city with a mean temperature higher than 10 degrees ?
- How many cities in the south regions (south-east + south-west) ?
- What is the ratio of the north-est cities / all the cities ?
- Which region contains most cities ?
- List number of city for each first letter of a city name ?

In [None]:
sc.stop()