# Spark SQL mini project exercises.

#### README
The exercises comes with this notebook, and a "data" folder. The data folder contains the dataset used for the exercises.
Some of the code will be written to help you get started and some explanatory text to further aid the understanding of each exercise.

The first part is setting up a database and loading the dataset used for the exercises. This is already done berforehand. Just run all the cells until the **Exercise** part.

**However**, note the path used to read the dataset from the *'data'* folder. This path works for unix based systems, but for windows users this *might* result in conflicts.

In [1]:
# Initialising the spark session.
from pyspark.sql import SparkSession
from pathlib import Path

data_path = Path.cwd() / 'data'

spark=SparkSession.builder.appName('Practise').getOrCreate()
spark

22/02/19 17:30:59 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.50.144 instead (on interface wlp4s0)
22/02/19 17:30:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/19 17:31:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/19 17:31:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [49]:
#Creates database
spark.sql("CREATE DATABASE flightDB")

#Specifies which DB to use
spark.sql("USE flightDB")

#Creates table
spark.sql(f"""
            CREATE TABLE flights (
            DEST_COUNTRY_NAME STRING COMMENT "Describes destination country", 
            ORIGIN_COUNTRY_NAME STRING COMMENT "Describes departure country", 
            count LONG COMMENT "Describes number of departures")
            USING csv OPTIONS (header true, path '{data_path}')
            """)
spark.sql("SELECT * FROM flights").show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [50]:
#METADATA
spark.sql("DESCRIBE flights").show()

+-------------------+---------+--------------------+
|           col_name|data_type|             comment|
+-------------------+---------+--------------------+
|  DEST_COUNTRY_NAME|   string|Describes destina...|
|ORIGIN_COUNTRY_NAME|   string|Describes departu...|
|              count|   bigint|Describes number ...|
+-------------------+---------+--------------------+



## Exercise 1: Basic SQL

**TODO** From *flightDB* use the table *flights* to compute the number of flights for each destination country. Order this from highest to lowest.

In [51]:
# Number of flights for each destination country. Order this from highest to lowest.
spark.sql("""SELECT DEST_COUNTRY_NAME AS Country, sum(count) AS Number_of_arriving_flights FROM flights
            GROUP BY Country 
            ORDER BY Number_of_arriving_flights DESC""").show()

+------------------+--------------------------+
|           Country|Number_of_arriving_flights|
+------------------+--------------------------+
|     United States|                   6581632|
|            Canada|                    134384|
|            Mexico|                    114240|
|    United Kingdom|                     32400|
|             Japan|                     24768|
|           Germany|                     23488|
|Dominican Republic|                     21648|
|       South Korea|                     16768|
|       The Bahamas|                     15280|
|            France|                     14960|
|          Colombia|                     13968|
|            Brazil|                     13648|
|       Netherlands|                     12416|
|             China|                     12352|
|           Jamaica|                     10656|
|        Costa Rica|                      9408|
|       El Salvador|                      8976|
|            Panama|                    

## Exercise 2: Views

**TODO** Create a *view* using the table *flights* (include all rows and columns).

**TODO** Repeat the same process for exercise 1: compute the number of flights for each destination country. Order this from highest to lowest.

In [52]:
#Repeat the same process as in exercise 1 but this time with a view
spark.sql("""CREATE OR REPLACE VIEW all_dept AS
            SELECT * FROM flights""")

spark.sql("""SELECT DEST_COUNTRY_NAME AS Country, sum(count) AS Number_of_arriving_flights FROM all_dept
            GROUP BY Country 
            ORDER BY Number_of_arriving_flights DESC""").show()

+------------------+--------------------------+
|           Country|Number_of_arriving_flights|
+------------------+--------------------------+
|     United States|                   6581632|
|            Canada|                    134384|
|            Mexico|                    114240|
|    United Kingdom|                     32400|
|             Japan|                     24768|
|           Germany|                     23488|
|Dominican Republic|                     21648|
|       South Korea|                     16768|
|       The Bahamas|                     15280|
|            France|                     14960|
|          Colombia|                     13968|
|            Brazil|                     13648|
|       Netherlands|                     12416|
|             China|                     12352|
|           Jamaica|                     10656|
|        Costa Rica|                      9408|
|       El Salvador|                      8976|
|            Panama|                    

## Exercise 3: Performance

**TODO** In the sparkUI, determine how the results of exercise 1 and exercise 2 compares. Write with words your observations and explain them.

**ANSWER**: Execution was 91 ms by querying on a Table, however only 14 ms execution time on a View.
The lower execution time is due to the data only being transformed in a View, whereas it is rewritten in a Table.

## Exercise 4: Case statements

**TODO** Imagine your boss says the system is outdated. Every row containing the values 'United States' and 'Denmark' should be 'USA' and 'DK' respectively. And for mysterious reasons (the boss won't tell you) all other values should be 0 (for the country column).

**NOTE** Use the table *partitioned_flights* to solve the exercise.

In [53]:
#Partitioned flights table
spark.sql("""
            CREATE TABLE partitioned_flights USING parquet PARTITIONED BY (DEST_COUNTRY_NAME)
            AS SELECT DEST_COUNTRY_NAME, count FROM flights LIMIT 10
            """)

#Case, when, then statement
spark.sql("""
            SELECT 
            CASE WHEN DEST_COUNTRY_NAME = 'United States' then 'USA'
            WHEN DEST_COUNTRY_NAME = 'Denmark' then 'DK'
            ELSE 0 END
            FROM partitioned_flights
            """).show()

+------------------------------------------------------------------------------------------------------------+
|CASE WHEN (DEST_COUNTRY_NAME = United States) THEN USA WHEN (DEST_COUNTRY_NAME = Denmark) THEN DK ELSE 0 END|
+------------------------------------------------------------------------------------------------------------+
|                                                                                                         USA|
|                                                                                                         USA|
|                                                                                                         USA|
|                                                                                                         USA|
|                                                                                                         USA|
|                                                                                                         USA|
|

## Exercise 5: Lists

**TODO** Convert an array into rows. The view *flights_agg* contains an array, use the created view to solve the exercise.

In [54]:
#Create flights_agg view
spark.sql("""
            CREATE OR REPLACE TEMP VIEW flights_agg AS
            SELECT DEST_COUNTRY_NAME, collect_set(count) as collected_counts
            FROM flights GROUP BY DEST_COUNTRY_NAME
            """)

# Convert an array into rows
spark.sql("""SELECT explode(collected_counts), DEST_COUNTRY_NAME FROM flights_agg""").show()

+---+--------------------+
|col|   DEST_COUNTRY_NAME|
+---+--------------------+
|  4|             Algeria|
| 15|              Angola|
| 41|            Anguilla|
|126| Antigua and Barbuda|
|180|           Argentina|
|346|               Aruba|
|329|           Australia|
| 62|             Austria|
| 21|          Azerbaijan|
| 19|             Bahrain|
|154|            Barbados|
|259|             Belgium|
|188|              Belize|
|183|             Bermuda|
| 30|             Bolivia|
| 58|Bonaire, Sint Eus...|
|853|              Brazil|
|107|British Virgin Is...|
|  3|            Bulgaria|
|  1|        Burkina Faso|
+---+--------------------+
only showing top 20 rows



## Exercise 6: User defined functions

**TODO** Create a function that determines the ratio between how many departures and arrivals each country has. **NOTE** Create a view, based on the table *flights*, containing the information needed to compute the ratio.

**TODO** Create a pandas function that also calculates the ratio using the package *pandas_udf*. Is there a performance difference? Describe your answer and explain. **NOTE** The required packages are pre imported and no further packages should be needed.

In [55]:
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import FloatType

# Prints a list of all functions.
spark.sql("""SHOW FUNCTIONS""").show()

+--------+
|function|
+--------+
|       !|
|      !=|
|       %|
|       &|
|       *|
|       +|
|       -|
|       /|
|       <|
|      <=|
|     <=>|
|      <>|
|       =|
|      ==|
|       >|
|      >=|
|       ^|
|     abs|
|    acos|
|   acosh|
+--------+
only showing top 20 rows



In [56]:
# Prints a list of only system functions.
spark.sql("""SHOW SYSTEM FUNCTIONS""").show()

+--------+
|function|
+--------+
|       !|
|      !=|
|       %|
|       &|
|       *|
|       +|
|       -|
|       /|
|       <|
|      <=|
|     <=>|
|      <>|
|       =|
|      ==|
|       >|
|      >=|
|       ^|
|     abs|
|    acos|
|   acosh|
+--------+
only showing top 20 rows



In [57]:
# Prints a list of only user defined functions.
spark.sql("""SHOW USER FUNCTIONS""").show()

+-------------------+
|           function|
+-------------------+
|       flight_ratio|
|pandas_flight_ratio|
+-------------------+



In [58]:
spark.sql("""
            CREATE OR REPLACE TEMP VIEW arrivals AS
            SELECT DEST_COUNTRY_NAME as ARRIVAL_COUNTRY, sum(count) as ARRIVALS FROM flights 
            GROUP BY ARRIVAL_COUNTRY
            """)
spark.sql("""
            CREATE OR REPLACE TEMP VIEW departures AS
            SELECT ORIGIN_COUNTRY_NAME as DEPARTURE_COUNTRY, sum(count) as DEPARTURES FROM flights 
            GROUP BY DEPARTURE_COUNTRY
            """)
def flight_ratio(arrival, departure):
    return arrival/departure

spark.udf.register("flight_ratio", flight_ratio, FloatType())

spark.sql("""
            SELECT arrivals.ARRIVAL_COUNTRY AS COUNTRY, arrivals.ARRIVALS, departures.DEPARTURES, flight_ratio(arrivals.ARRIVALS, departures.DEPARTURES) AS RATIO
            FROM arrivals
            RIGHT JOIN departures
            ON arrivals.ARRIVAL_COUNTRY = departures.DEPARTURE_COUNTRY
            """).show()

22/02/19 18:46:58 WARN SimpleFunctionRegistry: The function flight_ratio replaced a previously registered function.


+--------------------+--------+----------+----------+
|             COUNTRY|ARRIVALS|DEPARTURES|     RATIO|
+--------------------+--------+----------+----------+
|            Paraguay|     960|        96|      10.0|
|              Russia|    2816|      2576| 1.0931677|
|            Anguilla|     656|       608| 1.0789474|
|             Senegal|     640|       672|0.95238096|
|              Sweden|    1888|      1904|0.99159664|
|            Kiribati|     416|       560|0.74285716|
|              Guyana|    1024|      1008| 1.0158731|
|         Philippines|    2144|      2016| 1.0634921|
|           Singapore|      48|        16|       3.0|
|            Malaysia|      32|        48| 0.6666667|
|                Fiji|     384|       400|      0.96|
|              Turkey|    2208|      2064| 1.0697675|
|             Germany|   23488|     21376| 1.0988024|
|              Jordan|     704|       704|       1.0|
|               Palau|     480|       496| 0.9677419|
|Turks and Caicos ...|    36

In [59]:
# Pandas Solution
#!pip install pyarrow

# The function
def flight_ratio(arrivals: pd.Series, departures: pd.Series) -> pd.Series:
    return arrivals/departures

# Create the pandas UDF for the flight_ratio function
pandas_flight_ratio = pandas_udf(flight_ratio, returnType=FloatType())

# Register the function
spark.udf.register("pandas_flight_ratio", pandas_flight_ratio)

spark.sql("""
            SELECT arrivals.ARRIVAL_COUNTRY AS COUNTRY, arrivals.ARRIVALS, departures.DEPARTURES, pandas_flight_ratio(arrivals.ARRIVALS, departures.DEPARTURES) AS RATIO
            FROM arrivals
            RIGHT JOIN departures
            ON arrivals.ARRIVAL_COUNTRY = departures.DEPARTURE_COUNTRY
            """).show()

22/02/19 18:47:02 WARN SimpleFunctionRegistry: The function pandas_flight_ratio replaced a previously registered function.


+--------------------+--------+----------+----------+
|             COUNTRY|ARRIVALS|DEPARTURES|     RATIO|
+--------------------+--------+----------+----------+
|            Paraguay|     960|        96|      10.0|
|              Russia|    2816|      2576| 1.0931677|
|            Anguilla|     656|       608| 1.0789474|
|             Senegal|     640|       672|0.95238096|
|              Sweden|    1888|      1904|0.99159664|
|            Kiribati|     416|       560|0.74285716|
|              Guyana|    1024|      1008| 1.0158731|
|         Philippines|    2144|      2016| 1.0634921|
|           Singapore|      48|        16|       3.0|
|            Malaysia|      32|        48| 0.6666667|
|                Fiji|     384|       400|      0.96|
|              Turkey|    2208|      2064| 1.0697675|
|             Germany|   23488|     21376| 1.0988024|
|              Jordan|     704|       704|       1.0|
|               Palau|     480|       496| 0.9677419|
|Turks and Caicos ...|    36

# Dropping the table and database.

In [48]:
spark.sql("DROP TABLE IF EXISTS flights")
spark.sql("DROP TABLE IF EXISTS partitioned_flights")
spark.sql("DROP DATABASE IF EXISTS flightDB")

DataFrame[]