# Spark: more practice

<h1>Содержание<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Preparation" data-toc-modified-id="Preparation-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Preparation</a></span></li><li><span><a href="#Query-with-SQL" data-toc-modified-id="Query-with-SQL-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Query with SQL</a></span></li><li><span><a href="#Tables-and-Views" data-toc-modified-id="Tables-and-Views-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Tables and Views</a></span></li><li><span><a href="#Spark-catalog" data-toc-modified-id="Spark-catalog-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Spark catalog</a></span></li><li><span><a href="#Read" data-toc-modified-id="Read-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Read</a></span></li><li><span><a href="#Functions" data-toc-modified-id="Functions-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Functions</a></span></li><li><span><a href="#Operations" data-toc-modified-id="Operations-7"><span class="toc-item-num">7&nbsp;&nbsp;</span>Operations</a></span></li></ul></div>

## Preparation

Устанавливаем необходимые библиотеки:

In [1]:
import findspark
findspark.init()
import pyspark

from pyspark.sql import SparkSession

from pyspark.sql.types import * 
import pyspark.sql.functions as F

Задаём Спарк-сессию:

In [2]:
spark = (SparkSession
        .builder
        .appName("SparkSQLExampleApp")
        .getOrCreate())

## Query with SQL

Задаём путь к файлу:

In [3]:
# Path to data set
csv_file = "C:\datasets\departuredelays.csv"

Читаем файл и создаём временный view:

In [4]:
# Read and create a temporary view
# Infer schema (note that for larger files you 
# may want to specify the schema)
df = (spark.read.format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  .load(csv_file))
df.createOrReplaceTempView("us_delay_flights_tbl")

Находим все рейсы, расстояние перелёта которых составляет более 1 тысячи миль - с помощью spark.sql:

In [5]:
#find all flights whose distance is greater than 1,000 miles
spark.sql("""SELECT distance, origin, destination 
FROM us_delay_flights_tbl WHERE distance > 1000 
ORDER BY distance DESC""").show(10)

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



То же самое с помощью pyspark:

In [6]:
#Same in PySpark 
# In Python
from pyspark.sql.functions import col, desc
(df.select("distance", "origin", "destination")
  .where(col("distance") > 1000)
  .orderBy(desc("distance"))).show(10)

# Or
(df.select("distance", "origin", "destination")
  .where("distance > 1000")
  .orderBy("distance", ascending=False).show(10))

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows

+--------+------+-----------+
|distance|origin|destination|
+--------+------+-----------+
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
|    4330|   HNL|        JFK|
+--------+------+-----------+
only showing top 10 rows



Находим рейсы из Сан-Франциско в Чикаго, задержка которых длилась по крайней мере 2 часа:

In [7]:
#find all flights between San Francisco (SFO) and Chicago (ORD) with at least a two-hour delay
spark.sql("""SELECT date, delay, origin, destination 
FROM us_delay_flights_tbl 
WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' 
ORDER by delay DESC""").show(10)

+-------+-----+------+-----------+
|   date|delay|origin|destination|
+-------+-----+------+-----------+
|2190925| 1638|   SFO|        ORD|
|1031755|  396|   SFO|        ORD|
|1022330|  326|   SFO|        ORD|
|1051205|  320|   SFO|        ORD|
|1190925|  297|   SFO|        ORD|
|2171115|  296|   SFO|        ORD|
|1071040|  279|   SFO|        ORD|
|1051550|  274|   SFO|        ORD|
|3120730|  266|   SFO|        ORD|
|1261104|  258|   SFO|        ORD|
+-------+-----+------+-----------+
only showing top 10 rows



Распределение всех полётов по категориям длительности:

In [8]:
# label all US flights, regardless of origin and destination, with an indication of the delays they experienced: 
# Very Long Delays (> 6 hours), Long Delays (2–6 hours), etc
spark.sql("""SELECT delay, origin, destination, 
              CASE
                  WHEN delay > 360 THEN 'Very Long Delays'
                  WHEN delay >= 120 AND delay <= 360 THEN 'Long Delays'
                  WHEN delay >= 60 AND delay < 120 THEN 'Short Delays'
                  WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'
                  WHEN delay = 0 THEN 'No Delays'
                  ELSE 'Early'
               END AS Flight_Delays
               FROM us_delay_flights_tbl
               ORDER BY origin, delay DESC""").show(10)

+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|  333|   ABE|        ATL|  Long Delays|
|  305|   ABE|        ATL|  Long Delays|
|  275|   ABE|        ATL|  Long Delays|
|  257|   ABE|        ATL|  Long Delays|
|  247|   ABE|        ATL|  Long Delays|
|  247|   ABE|        DTW|  Long Delays|
|  219|   ABE|        ORD|  Long Delays|
|  211|   ABE|        ATL|  Long Delays|
|  197|   ABE|        DTW|  Long Delays|
|  192|   ABE|        ORD|  Long Delays|
+-----+------+-----------+-------------+
only showing top 10 rows



## Tables and Views

Создаём базу данных и указываем Спарку, что мы хотим использовать её:

In [9]:
#create a database called learn_spark_db and tell Spark we want to use that database

#spark.sql("CREATE DATABASE learn_spark_db")
#spark.sql("USE learn_spark_db")

Создаём managed-таблицу:

In [10]:
#Creating a managed table
# option 1

#spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT,  \
#  distance INT, origin STRING, destination STRING)")

Второй вариант сделать managed-таблицу:

In [11]:
# option 2
# Path to our US flight delays CSV file 

# csv_file = "C:\datasets\departuredelays.csv"

# Schema as defined in the preceding example
#schema="date STRING, delay INT, distance INT, origin STRING, destination STRING"
#flights_df = spark.read.csv(csv_file, schema=schema)
#flights_df.write.saveAsTable("managed_us_delay_flights_tbl")

Создаём unmanaged-таблицу:

In [12]:
#Creating an unmanaged table
# option 1

#spark.sql("""CREATE TABLE us_delay_flights_tbl(date STRING, delay INT, 
#  distance INT, origin STRING, destination STRING) 
#  USING csv OPTIONS (PATH 
#  'departuredelays.csv')""")

Второй вариант сделать то же самое:

In [13]:
# option 2

#(flights_df
#  .write
#  .option("path", "/tmp/data/us_flights_delay")
#  .saveAsTable("us_delay_flights_tbl"))

Создаём view:

In [14]:
#Creating Views
# In Python

#df_sfo = spark.sql("SELECT date, delay, origin, destination FROM \
#  us_delay_flights_tbl WHERE origin = 'SFO'")
    
#df_jfk = spark.sql("SELECT date, delay, origin, destination FROM \
#  us_delay_flights_tbl WHERE origin = 'JFK'")

Создаём global и temporary view:

In [15]:
# Create a temporary and global temporary view

#df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view")
#df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")

Удаляем view:

In [16]:
# Drop views

#spark.catalog.dropGlobalTempView("us_origin_airport_SFO_global_tmp_view")
#spark.catalog.dropTempView("us_origin_airport_JFK_tmp_view")

## Spark catalog

In [17]:
# Catalog

#spark.catalog.dropGlobalTempView("us_origin_airport_SFO_global_tmp_view")
#spark.catalog.dropTempView("us_origin_airport_JFK_tmp_view")

## Read

Попрактикуемся в чтении файлов разных форматов.

**parquet**

Прочитаем файл parquet:

In [18]:
#Use Parquet 
# read

#file = "C:\datasets\spark_summary_data\parquet\summary.parquet\*"
#df = spark.read.format("parquet").load(file)

In [19]:
# write

#(df.write.format("parquet")
#  .mode("overwrite")
#  .option("compression", "snappy")
#  .save("/tmp/data/parquet/df_parquet"))

**csv**

Следующий формат - csv:

In [20]:
#Use CSV
# read

#file = "C:\datasets\spark_summary_data\csv\*"

#schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT"

#df = (spark.read.format("csv") 
#  .option("header", "true")  
#  .schema(schema) 
#  .option("mode", "FAILFAST")  # Exit if any errors 
#  .option("nullValue", "")     # Replace any null data field with quotes 
#  .load(file))

In [21]:
# write

#df.write.format("csv").mode("overwrite").save("/data/csv/df_csv")

**json**

In [22]:
#Use JSON
# read

#file = "C:\datasets\spark_summary_data\json\*"
#df = spark.read.format("json").load(file) 

In [23]:
# write

#df.write.format("json") \
#  .mode("overwrite") \
#  .option("compression", "snappy") \
#  .save("/tmp/data/json/df_json") 

**orc**

In [24]:
# ORC
# read

#file = "C:\datasets\spark_summary_data\orc\*"
#df = spark.read.format("orc").option("path", file).load()
#df.show(10, False)

## Functions

In [25]:
schema = StructType([StructField("celsius", ArrayType(IntegerType()))])

In [26]:
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]

In [27]:
t_c = spark.createDataFrame(t_list, schema)

In [28]:
t_c.createOrReplaceTempView("tC")

In [29]:
# Show the DataFrame
t_c.show()

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



In [30]:
# transform()
spark.sql("""
SELECT celsius, 
 transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit 
  FROM tC
""").show()

# +--------------------+--------------------+
# |             celsius|          fahrenheit|
# +--------------------+--------------------+
# |[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
# |[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
# +--------------------+--------------------+

+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



In [31]:
# filter()
spark.sql("""
SELECT celsius, 
 filter(celsius, t -> t > 38) as high 
  FROM tC
""").show()

# +--------------------+--------+
# |             celsius|    high|
# +--------------------+--------+
# |[35, 36, 32, 30, ...|[40, 42]|
# |[31, 32, 34, 55, 56]|[55, 56]|
# +--------------------+--------+

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



In [32]:
#exists()
spark.sql("""
SELECT celsius, 
       exists(celsius, t -> t = 38) as threshold
  FROM tC
""").show()

# +--------------------+---------+
# |             celsius|threshold|
# +--------------------+---------+
# |[35, 36, 32, 30, ...|     true|
# |[31, 32, 34, 55, 56]|    false|
# +--------------------+---------+

+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



In [33]:
# reduce()
spark.sql("""
SELECT celsius, 
       reduce(
          celsius, 
          0, 
          (t, acc) -> t + acc, 
          acc -> (acc div size(celsius) * 9 div 5) + 32
        ) as avgFahrenheit 
  FROM tC
""").show()

# +--------------------+-------------+
# |             celsius|avgFahrenheit|
# +--------------------+-------------+
# |[35, 36, 32, 30, ...|           96|
# |[31, 32, 34, 55, 56]|          105|
# +--------------------+-------------+

+--------------------+-------------+
|             celsius|avgFahrenheit|
+--------------------+-------------+
|[35, 36, 32, 30, ...|           96|
|[31, 32, 34, 55, 56]|          105|
+--------------------+-------------+



## Operations





In [34]:
# Set file paths
from pyspark.sql.functions import expr

In [35]:
tripdelaysFilePath = "C:\datasets\departuredelays.csv"

In [36]:
airportsFilePath = "C:\datasets\spark_summary_data\codes-airport.txt"

In [37]:
# Obtain airports data set
airports = (spark.read 
  .format("csv")
  .options(header="true", inferSchema="true", sep="\t")
  .load(airportsFilePath))
airports.createOrReplaceTempView("airports")

In [38]:
# Obtain departure delays data set
departureDelays = (spark.read 
  .format("csv") 
  .options(header="true") 
  .load(tripdelaysFilePath))

In [39]:
departureDelays = (departureDelays 
  .withColumn("delay", expr("CAST(delay as INT) as delay")) 
  .withColumn("distance", expr("CAST(distance as INT) as distance")))
departureDelays.createOrReplaceTempView("departureDelays")

In [40]:
# Create temporary small table
foo = (departureDelays 
  .filter(expr("""origin == 'SEA' and destination == 'SFO' and 
    date like '01010%' and delay > 0"""))) 
foo.createOrReplaceTempView("foo")

In [41]:
spark.sql("SELECT * FROM airports LIMIT 10").show()

# +-----------+-----+-------+----+
# |       City|State|Country|IATA|
# +-----------+-----+-------+----+
# | Abbotsford|   BC| Canada| YXX|
# |   Aberdeen|   SD|    USA| ABR|
# |    Abilene|   TX|    USA| ABI|
# |      Akron|   OH|    USA| CAK|
# |    Alamosa|   CO|    USA| ALS|
# |     Albany|   GA|    USA| ABY|
# |     Albany|   NY|    USA| ALB|
# |Albuquerque|   NM|    USA| ABQ|
# | Alexandria|   LA|    USA| AEX|
# |  Allentown|   PA|    USA| ABE|
# +-----------+-----+-------+----+

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [42]:
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

# +--------+-----+--------+------+-----------+
# |    date|delay|distance|origin|destination|
# +--------+-----+--------+------+-----------+
# |01011245|    6|     602|   ABE|        ATL|
# |01020600|   -8|     369|   ABE|        DTW|
# |01021245|   -2|     602|   ABE|        ATL|
# |01020605|   -4|     602|   ABE|        ATL|
# |01031245|   -4|     602|   ABE|        ATL|
# |01030605|    0|     602|   ABE|        ATL|
# |01041243|   10|     602|   ABE|        ATL|
# |01040605|   28|     602|   ABE|        ATL|
# |01051245|   88|     602|   ABE|        ATL|
# |01050605|    9|     602|   ABE|        ATL|
# +--------+-----+--------+------+-----------+

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [43]:
spark.sql("SELECT * FROM foo").show()

# +--------+-----+--------+------+-----------+
# |    date|delay|distance|origin|destination|
# +--------+-----+--------+------+-----------+
# |01010710|   31|     590|   SEA|        SFO|
# |01010955|  104|     590|   SEA|        SFO|
# |01010730|    5|     590|   SEA|        SFO|
# +--------+-----+--------+------+-----------+

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [44]:
# Union
# Union two tables
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")

In [45]:
# Show the union (filtering for SEA and SFO in a specific time range)
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
AND date LIKE '01010%' AND delay > 0""")).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [46]:
spark.sql("""
SELECT * 
  FROM bar 
 WHERE origin = 'SEA' 
   AND destination = 'SFO' 
   AND date LIKE '01010%' 
   AND delay > 0
""").show()

# +--------+-----+--------+------+-----------+
# |    date|delay|distance|origin|destination|
# +--------+-----+--------+------+-----------+
# |01010710|   31|     590|   SEA|        SFO|
# |01010955|  104|     590|   SEA|        SFO|
# |01010730|    5|     590|   SEA|        SFO|
# |01010710|   31|     590|   SEA|        SFO|
# |01010955|  104|     590|   SEA|        SFO|
# |01010730|    5|     590|   SEA|        SFO|
# +--------+-----+--------+------+-----------+

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [47]:
# Joins
foo.join(
  airports, 
  airports.IATA == foo.origin
).select("City", "State", "date", "delay", "distance", "destination").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



In [48]:
spark.sql("""
SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination 
  FROM foo f
  JOIN airports a
    ON a.IATA = f.origin
""").show()

# +-------+-----+--------+-----+--------+-----------+
# |   City|State|    date|delay|distance|destination|
# +-------+-----+--------+-----+--------+-----------+
# |Seattle|   WA|01010710|   31|     590|        SFO|
# |Seattle|   WA|01010955|  104|     590|        SFO|
# |Seattle|   WA|01010730|    5|     590|        SFO|
# +-------+-----+--------+-----+--------+-----------+

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



In [49]:
# Modifications
from pyspark.sql.functions import expr
foo2 = (foo.withColumn(
          "status", 
          expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
        ))

foo2.show()

# +--------+-----+--------+------+-----------+-------+
# |    date|delay|distance|origin|destination| status|
# +--------+-----+--------+------+-----------+-------+
# |01010710|   31|     590|   SEA|        SFO|Delayed|
# |01010955|  104|     590|   SEA|        SFO|Delayed|
# |01010730|    5|     590|   SEA|        SFO|On-time|
# +--------+-----+--------+------+-----------+-------+

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



In [50]:
foo3 = foo2.drop("delay")
foo3.show()

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+



In [51]:
# Renamning columns
foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()

# +--------+--------+------+-----------+-------------+
# |    date|distance|origin|destination|flight_status|
# +--------+--------+------+-----------+-------------+
# |01010710|     590|   SEA|        SFO|      Delayed|
# |01010955|     590|   SEA|        SFO|      Delayed|
# |01010730|     590|   SEA|        SFO|      On-time|
# +--------+--------+------+-----------+-------------+

# +--------+--------+------+-----------+-------+
# |    date|distance|origin|destination| status|
# +--------+--------+------+-----------+-------+
# |01010710|     590|   SEA|        SFO|Delayed|
# |01010955|     590|   SEA|        SFO|Delayed|
# |01010730|     590|   SEA|        SFO|On-time|
# +--------+--------+------+-----------+-------+

+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+



In [52]:
# Pivoting in SQL
spark.sql("""
    SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay 
  FROM departureDelays WHERE origin = 'SEA' 
) 
PIVOT (
  CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay
  FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination
""").show()

# +-----------+------------+------------+------------+------------+
# |destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
# +-----------+------------+------------+------------+------------+
# |        ABQ|       19.86|         316|       11.42|          69|
# |        ANC|        4.44|         149|        7.90|         141|
# |        ATL|       11.98|         397|        7.73|         145|
# |        AUS|        3.48|          50|       -0.21|          18|
# |        BOS|        7.84|         110|       14.58|         152|
# |        BUR|       -2.03|          56|       -1.89|          78|
# |        CLE|       16.00|          27|        null|        null|
# |        CLT|        2.53|          41|       12.96|         228|
# |        COS|        5.32|          82|       12.18|         203|
# |        CVG|       -0.50|           4|        null|        null|
# |        DCA|       -1.15|          50|        0.07|          34|
# |        DEN|       13.13|         425|       12.95|         625|
# |        DFW|        7.95|         247|       12.57|         356|
# |        DTW|        9.18|         107|        3.47|          77|
# |        EWR|        9.63|         236|        5.20|         212|
# |        FAI|        1.84|         160|        4.21|          60|
# |        FAT|        1.36|         119|        5.22|         232|
# |        FLL|        2.94|          54|        3.50|          40|
# |        GEG|        2.28|          63|        2.87|          60|
# |        HDN|       -0.44|          27|       -6.50|           0|
# +-----------+------------+------------+------------+------------+

+-----------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|FEB_AvgDelay|FEB_MaxDelay|
+-----------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|
|        ANC|        4.44|         149|        7.90|         141|
|        ATL|       11.98|         397|        7.73|         145|
|        AUS|        3.48|          50|       -0.21|          18|
|        BOS|        7.84|         110|       14.58|         152|
|        BUR|       -2.03|          56|       -1.89|          78|
|        CLE|       16.00|          27|        NULL|        NULL|
|        CLT|        2.53|          41|       12.96|         228|
|        COS|        5.32|          82|       12.18|         203|
|        CVG|       -0.50|           4|        NULL|        NULL|
|        DCA|       -1.15|          50|        0.07|          34|
|        DEN|       13.13|         425|       12.95|         625|
|        D