<a href="https://colab.research.google.com/github/ElizabethSeth/Simplon_tasks/blob/Plateforme_docker/intro_SparkSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## DataFrames et Spark SQL




In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=2930b20e1b6155f017247e8b563f8c79ac32b395dd4d5f52e25cfd8374a130a0
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [3]:
# create entry points to spark
try:
    sc.stop()
except:
    pass

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("Spark SQL") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

sc = spark.sparkContext
sc

### Generate your own DataFrame
Instead of accessing the file system, let's create a DataFrame by generating the data.  In this case, we'll first create the `stringRDD` RDD and then convert it into a DataFrame when we're reading `stringJSONRDD` using `spark.read.json`.

In [4]:
# Generate our own JSON data

stringJSONRDD = sc.parallelize(("""
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""",
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

In [5]:
# Create DataFrame
swimmersJSON = spark.read.json(stringJSONRDD)

In [6]:
swimmersJSON

DataFrame[age: bigint, eyeColor: string, id: string, name: string]

In [7]:
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



DataFrame.createOrReplaceTempView(name: str) → None

Creates or replaces a local temporary view with this DataFrame.

=> The lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame.

In [8]:
# Create temporary table

swimmersJSON.createOrReplaceTempView("swimmersJSON")

In [9]:
# DataFrame API
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [None]:
# SQL Query
spark.sql("select * from swimmersJSON").collect()

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

#### Inferring the Schema Using Reflection
Note that Apache Spark is inferring the schema using reflection; i.e. it automaticlaly determines the schema of the data based on reviewing the JSON data.

In [None]:
# Print the schema
swimmersJSON.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



Notice that Spark was able to determine infer the schema (when reviewing the schema using `.printSchema`).

But what if we want to programmatically specify the schema?

#### Programmatically Specifying the Schema
In this case, let's specify the schema for a `CSV` text file.

In [None]:
from pyspark.sql.types import *

# Generate our own CSV data
#   This way we don't have to access the file system yet.
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])

# The schema is encoded in a string, using StructType we define the schema using various pyspark.sql.types
schemaString = "id name age eyeColor"
schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)

# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")

In [None]:
# Print the schema
#   Notice that we have redefined id as Long (instead of String)
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



In [None]:
# SQL Query
spark.sql("select * from swimmers").collect()

[Row(id=123, name='Katie', age=19, eyeColor='brown'),
 Row(id=234, name='Michael', age=22, eyeColor='green'),
 Row(id=345, name='Simone', age=23, eyeColor='blue')]

As you can see from above, we can programmatically apply the `schema` instead of allowing the Spark engine to infer the schema via reflection.

Additional Resources include:
* [PySpark API Reference](https://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html)
* [Spark SQL, DataFrames, and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema): This is in reference to Programmatically Specifying the Schema using a `CSV` file.

#### SparkSession

Notice that we're no longer using `sqlContext.read...` but instead `spark.read...`.  This is because as part of Spark 2.0, `HiveContext`, `SQLContext`, `StreamingContext`, `SparkContext` have been merged together into the Spark Session `spark`.
* Entry point for reading data
* Working with metadata
* Configuration
* Cluster resource management

For more information, please refer to [How to use SparkSession in Apache Spark](https://sparkbyexamples.com/spark/sparksession-explained-with-examples/) .

### Querying with the DataFrame API
With DataFrames, you can start writing your queries using the DataFrame API

In [None]:
# Show the values
swimmers.show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [None]:
# Using Databricks `display` command to view the data easier
display(swimmers)

DataFrame[id: bigint, name: string, age: bigint, eyeColor: string]

In [None]:
# Get count of rows
swimmers.count()

3

In [None]:
# Get the id, age where age = 22
swimmers.select("id", "age").filter("age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [None]:
# Get the name, eyeColor where eyeColor like 'b%'
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



### Querying with SQL
With DataFrames, you can start writing your queries using `Spark SQL` - a SQL dialect that is compatible with the Hive Query Language (or HiveQL).

In [None]:
# Execute SQL Query and return the data
spark.sql("select * from swimmers").show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



Let's get the row count:

In [None]:
# Get count of rows in SQL
spark.sql("select count(1) from swimmers").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [None]:
# Query id and age for swimmers with age = 22 via DataFrame API
swimmers.select("id", "age").filter("age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [None]:
# Query id and age for swimmers with age = 22 via DataFrame API in another way
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()


+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [None]:
# Query id and age for swimmers with age = 22 in SQL
spark.sql("select id, age from swimmers where age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [None]:
# Query name and eye color for swimmers with eye color starting with the letter 'b'
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



## Application:

Query flight departure delays by State and City by joining the departure delay and join to the airport codes (to identify state and city).

* On-Time Performance Datasets

The source `airports` dataset can be found at [OpenFlights Airport, airline and route data](https://openflights.org/data.php).

The `flights`, also known as the `departuredelays`, dataset can be found at [Airline On-Time Performance and Causes of Flight Delays: On_Time Data](https://catalog.data.gov/dataset/airline-on-time-performance-and-causes-of-flight-delays-on-time-data)

1- Read into spark DataFrames the datasets departuredelays.csv and airport-codes.txt.

2- display dataframe with .show(), .cache() , print the data schema

3- Create a local temporary view with these dataframes.

4- answer the queries below:

* Query Sum of Flight Delays by City and Origin Code (for Washington State)
* Query Sum of Flight Delays by State (for the US)
* Add 2 more analysis axes of your choice

In [12]:
#TBD
spark = SparkSession \
        .builder \
        .appName("Spark_Departure_flights") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

sc = spark.sparkContext
sc

departures = "departuredelays.csv"
airoports = "airport-codes.txt"

depart = spark.read.csv(departures, header=True)
airport_codes = spark.read.csv(airoports, header=True, sep="\t")

depart.createOrReplaceTempView("departures")
airport_codes.createOrReplaceTempView("airports")

depart.show()
airport_codes.show()


+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [13]:
depart.printSchema()
airport_codes.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)



In [20]:
depart.cache()
#airport_codes.cache()

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [54]:
airport_codes.select("City", "State").filter(airport_codes.State == "NM").show()

+-----------+-----+
|       City|State|
+-----------+-----+
|Albuquerque|   NM|
|   Carlsbad|   NM|
|     Clovis|   NM|
| Farmington|   NM|
|      Hobbs|   NM|
| Las Cruces|   NM|
|    Roswell|   NM|
|   Santa Fe|   NM|
|Silver City|   NM|
+-----------+-----+



In [49]:
depart.select("origin", "delay").filter(depart.origin == "ABE").show()

+------+-----+
|origin|delay|
+------+-----+
|   ABE|    6|
|   ABE|   -8|
|   ABE|   -2|
|   ABE|   -4|
|   ABE|   -4|
|   ABE|    0|
|   ABE|   10|
|   ABE|   28|
|   ABE|   88|
|   ABE|    9|
|   ABE|   -6|
|   ABE|   69|
|   ABE|    0|
|   ABE|   -3|
|   ABE|    0|
|   ABE|    0|
|   ABE|    0|
|   ABE|    0|
|   ABE|    0|
|   ABE|    0|
+------+-----+
only showing top 20 rows



In [34]:
spark.sql("""SELECT origin, ROUND(AVG(delay), 2) AS avg_delay
FROM departures
GROUP BY origin
ORDER BY avg_delay DESC
""").show()

+------+---------+
|origin|avg_delay|
+------+---------+
|   GUM|    33.88|
|   LSE|    26.53|
|   MQT|    23.87|
|   EGE|    20.57|
|   ROA|    19.89|
|   MDW|    19.66|
|   BTV|    18.72|
|   ORD|    18.59|
|   IAD|     18.4|
|   SCE|    17.92|
|   GUC|    17.73|
|   ISP|     17.7|
|   ALO|    17.16|
|   LNK|    17.02|
|   DEN|    16.92|
|   BWI|    16.83|
|   CID|    16.58|
|   PBI|    16.56|
|   FLL|    16.51|
|   JFK|    16.46|
+------+---------+
only showing top 20 rows



In [55]:
result_df = spark.sql("""
SELECT a.City, d.origin, SUM(d.delay) AS total_delay
FROM departures d
JOIN airports a ON d.origin = a.IATA
WHERE a.State = 'WA'
GROUP BY a.City, d.origin
ORDER BY total_delay DESC
""")

result_df.printSchema()

root
 |-- City: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- total_delay: double (nullable = true)



In [42]:
spark.sql("""
SELECT a.City, d.origin, SUM(d.delay) AS total_delay
FROM departures d
JOIN airports a ON d.origin = a.IATA
WHERE a.State = 'WA'
GROUP BY a.City, d.origin
ORDER BY total_delay DESC
""").show()

+-------+------+-----------+
|   City|origin|total_delay|
+-------+------+-----------+
|Seattle|   SEA|   159086.0|
|Spokane|   GEG|    12404.0|
|  Pasco|   PSC|      949.0|
+-------+------+-----------+



# Useful Ressources:

For more information, please refer to:
* [Spark SQL, DataFrames and Datasets Guide](http://spark.apache.org/docs/latest/sql-programming-guide.html#sql)
* [PySpark SQL Module: DataFrame](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)
* [PySpark SQL Functions Module](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html)