<a href="https://colab.research.google.com/github/RodrigoSalles/Big-Data-and-Cloud-Computing---Colab/blob/master/BDCC_Spark_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to Spark SQL

**[Big Data and Cloud Computing](https://www.dcc.fc.up.pt/~edrdo/aulas/bdcc), Eduardo R. B. Marques, DCC/FCUP**


## Summary

This notebook introduces you to the use of SQL within Spark.
Spark lets you convert a __DataFrame__ into a relational table 
abstraction that can be queried using a subset of SQL.

__Reference__: 
  - [DataFrame / pyspark.sql API](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

## Install and initialize spark (as usual)
 .

In [0]:
def setupSpark():
  # Spark needs to run with Java 8 ... 
  !pip install -q findspark
  !apt-get install openjdk-8-jdk-headless > /dev/null
  !echo 2 | update-alternatives --config java > /dev/null
  # !java -version
  import os, findspark
  os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
  # !echo JAVA_HOME=$JAVA_HOME
  !pip install -q pyspark
  findspark.init(spark_home='/usr/local/lib/python3.6/dist-packages/pyspark')
  !pyspark --version

setupSpark()

from pyspark import SparkContext
from pyspark.sql import SparkSession
    
spark = SparkSession\
        .builder\
        .master('local[*]')\
        .getOrCreate()
sc = spark.sparkContext

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.5
      /_/
                        
Using Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_242
Branch HEAD
Compiled by user centos on 2020-02-02T19:38:06Z
Revision cee4ecbb16917fa85f02c635925e2687400aa56b
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.


## Data set

We will consider the __'airports'__ and __'flights_airport'__ data sets, part of  [__the Vega datasets__](https://github.com/vega/vega-datasets) that are acessible through the [__vega_datasets python package__](https://pypi.org/project/vega-datasets/). 

The data at stake relates to airports and flights between them in North America.



In [0]:
from pyspark import SparkFiles
from vega_datasets import data

def loadDataSet(dataset,debug=True):    
  # Let Spark take care of the download and storage
  sc.addFile(dataset.url) 
  localFile = SparkFiles.get(dataset.filename)
  if debug: 
    print("Data set URL: " + dataset.url)
    print("Data set filename: " + dataset.filename)
    print("Local file: " + localFile)
  return spark.read.csv(localFile, sep=',', inferSchema=True, header=True)

### Load airport data


In [0]:
airports_df = loadDataSet(data.airports,debug=True)
airports_df.printSchema()
airports_df.toPandas()

Data set URL: https://vega.github.io/vega-datasets/data/airports.csv
Data set filename: airports.csv
Local file: /tmp/spark-41854567-db98-4881-a6cc-e9809ba28299/userFiles-0ff62235-ff36-496a-8744-8acc98390525/airports.csv
root
 |-- iata: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



Unnamed: 0,iata,name,city,state,country,latitude,longitude
0,00M,Thigpen,Bay Springs,MS,USA,31.953765,-89.234505
1,00R,Livingston Municipal,Livingston,TX,USA,30.685861,-95.017928
2,00V,Meadow Lake,Colorado Springs,CO,USA,38.945749,-104.569893
3,01G,Perry-Warsaw,Perry,NY,USA,42.741347,-78.052081
4,01J,Hilliard Airpark,Hilliard,FL,USA,30.688012,-81.905944
...,...,...,...,...,...,...,...
3371,ZEF,Elkin Municipal,Elkin,NC,USA,36.280024,-80.786069
3372,ZER,Schuylkill Cty/Joe Zerbey,Pottsville,PA,USA,40.706449,-76.373147
3373,ZPH,Zephyrhills Municipal,Zephyrhills,FL,USA,28.228065,-82.155916
3374,ZUN,Black Rock,Zuni,NM,USA,35.083227,-108.791777


### Load flight data

In [0]:
flights_df = loadDataSet(data.flights_airport)
flights_df.printSchema()
flights_df.toPandas()

Data set URL: https://vega.github.io/vega-datasets/data/flights-airport.csv
Data set filename: flights-airport.csv
Local file: /tmp/spark-41854567-db98-4881-a6cc-e9809ba28299/userFiles-0ff62235-ff36-496a-8744-8acc98390525/flights-airport.csv
root
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- count: integer (nullable = true)



Unnamed: 0,origin,destination,count
0,ABE,ATL,853
1,ABE,BHM,1
2,ABE,CLE,805
3,ABE,CLT,465
4,ABE,CVG,247
...,...,...,...
5361,YUM,IPL,326
5362,YUM,LAS,99
5363,YUM,LAX,1044
5364,YUM,PHX,1961


## Create table views of the data frame, then perform a few queries

In [0]:
airports_df.createOrReplaceTempView('airports')
flights_df.createOrReplaceTempView('flights')

### Airport count

In [0]:
airport_count = spark.sql(
   '''
   SELECT COUNT(*) AS number_of_airports
   FROM airports
   '''
)
airport_count.toPandas()

Unnamed: 0,number_of_airports
0,3376


### Airports in New York state (NY)

In [0]:
ny_airports = spark.sql(
   '''
   SELECT iata, name, state
   FROM airports
   WHERE state = 'NY'
   ORDER BY name
   '''
)

ny_airports.toPandas()

Unnamed: 0,iata,name,state
0,SLK,Adirondack,NY
1,9G3,Akron,NY
2,ALB,Albany Cty,NY
3,D22,Angola,NY
4,23N,Bayport Aerodrome,NY
...,...,...,...
92,HPN,Westchester Cty,NY
93,N25,Westport,NY
94,B16,Whitford,NY
95,3G7,Williamson/Sodus,NY


### The 10 states with most airports

In [0]:
ten_states_with_more_airports = spark.sql(
   '''
   SELECT state, count(*) as count
   FROM airports
   GROUP BY state
   ORDER BY count DESC
   LIMIT 10
   '''
)

ten_states_with_more_airports.toPandas()

Unnamed: 0,state,count
0,AK,263
1,TX,209
2,CA,205
3,OK,102
4,OH,100
5,FL,100
6,GA,97
7,NY,97
8,MI,94
9,MN,89


### Outgoing flights from Albany City (ALB)

In [0]:
ALB_outgoing_flights = spark.sql(
   '''
   SELECT *
   FROM flights 
   WHERE origin = 'ALB'
   ORDER BY count DESC, destination
   '''
)
ALB_outgoing_flights.toPandas()

Unnamed: 0,origin,destination,count
0,ALB,BWI,2297
1,ALB,ORD,1630
2,ALB,ATL,1154
3,ALB,CLE,1113
4,ALB,MCO,1037
5,ALB,DTW,1013
6,ALB,CLT,956
7,ALB,CVG,914
8,ALB,MDW,730
9,ALB,EWR,611


### Outgoing flights from all airports in NY state


In [0]:
NY_outgoing_flights = spark.sql(
   '''
   SELECT count, origin, destination
   FROM flights JOIN airports ON(origin=iata)
   WHERE state = 'NY'
   ORDER BY count DESC, origin, destination
   '''
)
NY_outgoing_flights.toPandas()

Unnamed: 0,count,origin,destination
0,12035,LGA,BOS
1,11063,LGA,DCA
2,10862,LGA,ORD
3,10507,LGA,ATL
4,8078,JFK,LAX
...,...,...,...
229,1,LGA,PHL
230,1,ROC,BOS
231,1,SWF,BDL
232,1,SWF,BOS


### The same information but adding airport names

In [0]:

NY_outgoing_flights2 = spark.sql(
   '''
   SELECT count, 
          origin, org.name AS origin_name, 
          destination, dst.name AS destination_name
   FROM 
          flights JOIN airports org ON(origin=org.iata) 
                  JOIN airports dst ON(destination=dst.iata)
   WHERE 
          org.state = 'NY'
   ORDER BY 
          count DESC, origin, destination
   '''
)
NY_outgoing_flights2.toPandas()

Unnamed: 0,count,origin,origin_name,destination,destination_name
0,12035,LGA,LaGuardia,BOS,Gen Edw L Logan Intl
1,11063,LGA,LaGuardia,DCA,Ronald Reagan Washington National
2,10862,LGA,LaGuardia,ORD,Chicago O'Hare International
3,10507,LGA,LaGuardia,ATL,William B Hartsfield-Atlanta Intl
4,8078,JFK,John F Kennedy Intl,LAX,Los Angeles International
...,...,...,...,...,...
229,1,LGA,LaGuardia,PHL,Philadelphia Intl
230,1,ROC,Greater Rochester Int'l,BOS,Gen Edw L Logan Intl
231,1,SWF,Stewart,BDL,Bradley International
232,1,SWF,Stewart,BOS,Gen Edw L Logan Intl


### Distinct states

In [0]:
distinct_states = spark.sql(
   '''
   SELECT DISTINCT state
   FROM airports
   ORDER BY state
   '''
)
distinct_states.toPandas()

Unnamed: 0,state
0,AK
1,AL
2,AR
3,AS
4,AZ
5,CA
6,CO
7,CQ
8,CT
9,DC


## Exercises 

### 1. Compute the total number (SUM) of outgoing flights from Albany (ALB).







In [0]:
ALB_outgoing_flights_total = spark.sql(
   '''
   SELECT SUM(count) as total
   FROM flights 
   WHERE origin = 'ALB'
   '''
)
ALB_outgoing_flights_total.toPandas()

Unnamed: 0,total
0,13474


### 2. Compute the total number (SUM) of incoming flights from Albany (ALB).


In [0]:
ALB_incoming_flights_total = spark.sql(
   '''
   SELECT SUM(count) as total
   FROM flights 
   WHERE destination = 'ALB'
   '''
)
ALB_incoming_flights_total.toPandas()

Unnamed: 0,total
0,13468


### 3. Compute the average (AVG) number of all incoming flights to all airports in NY state.

In [0]:
NY_avg_flights_all_airports = spark.sql(
   '''
   SELECT AVG(count) AS average
   FROM flights JOIN airports ON(destination=iata)
   WHERE state = 'NY'
   '''
)
NY_avg_flights_all_airports.toPandas()

Unnamed: 0,average
0,1391.936975


### 4. Compute the total (SUM) number of all incoming flights to all airports in NY state.

In [0]:
NY_sum_flights_all_airports = spark.sql(
   '''
   SELECT SUM(count) as sum
   FROM flights JOIN airports ON(destination=iata)
   WHERE state = 'NY'
   '''
)
NY_sum_flights_all_airports.toPandas()

Unnamed: 0,sum
0,331281


### 5. Compute the total number (SUM) of all incoming flights to airports in NY state, grouped by airport name.

In [0]:
NY_sum_flights_per_airports = spark.sql(
   '''
   SELECT name, SUM(count) as sum
   FROM flights JOIN airports ON(destination=iata)
   WHERE state = 'NY'
   GROUP BY name
   ORDER BY sum DESC
   '''
)
NY_sum_flights_per_airports.toPandas()

Unnamed: 0,name,sum
0,LaGuardia,119117
1,John F Kennedy Intl,118802
2,Buffalo Niagara Intl,26277
3,Greater Rochester Int'l,14253
4,Albany Cty,13468
5,Syracuse-Hancock Intl,12036
6,Westchester Cty,10706
7,Long Island - MacArthur,10180
8,Stewart,4245
9,Elmira/Corning Regional,1336
