# Description
----
The other version of this notebook did not have the Databricks data.
This version uses the data provided by Databricks

# Setup

In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

# Imports

In [2]:
import os
import os.path as path

# Setup Spark

In [3]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = (SparkSession
  .builder
  .appName("SparkSQLExampleApp")
  .getOrCreate())

In [4]:
def db_fname(fname):
    import os.path as path
    data_dir = '~/dev/github-bv/LearningSparkV2/databricks-datasets/learning-spark-v2/'
    return path.expanduser(path.join(data_dir, fname))

# User-Defined Functions (UDFs)

* Operate per session
* Will not be persisted in the underlying metastore

In [5]:
from pyspark.sql.types import LongType

In [6]:
# Create cubed function
def cubed(s):
    return s * s * s

# Register UDF
spark.udf.register("cubed", cubed, LongType())

# Generate temp view
spark.range(1, 9).createOrReplaceTempView("udf_test")

In [10]:
q = """
select id,
    cubed(id) as id_cubed
from udf_test
"""
spark.sql(q).show()

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



## Evaluation Order and Null Checking in Spark Sql

1. Make the UDF `null` aware and do `null` checking in the UDF
2. Use `If` or `CASE WHEN` expressions to do the `null` check, and invoke the UDF in a conditional branch

## Pandas UDFs

In [32]:
import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the cubed function
def cubed(a: pd.Series) -> pd.Series:
    return a * a * a

# Create the pandas UDF for the cubed function
cubed_udf = pandas_udf(cubed, returnType=LongType())

### Usage - Pandas

In [33]:
x = pd.Series([1, 2, 3])
print(cubed(x))

0     1
1     8
2    27
dtype: int64


In [34]:
(pd.DataFrame(x, columns=['A'])
    .assign(B=lambda z: cubed(z)))

Unnamed: 0,A,B
0,1,1
1,2,8
2,3,27


### Usage - Spark

In [35]:
df = spark.range(1, 4)

# Execute function as a Spark vectorized UDF
(df
 .select("id", 
         cubed_udf("id").alias('B'),
         cubed_udf(col("id")).alias('C'))
 .show())

+---+---+---+
| id|  B|  C|
+---+---+---+
|  1|  1|  1|
|  2|  8|  8|
|  3| 27| 27|
+---+---+---+



# Higher-Order Functions in DataFrames and Spark SQL

## Option 1: Explode and Collect

## Option 2: User-Defined Function

## Higher-Order Functions

Example

In [38]:
from pyspark.sql.types import *
schema = StructType([
    StructField('celsius', ArrayType(IntegerType()))
])

t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView('tC')

In [39]:
t_c.show()

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



### Transform

```
transform(array<T>, function<T, U>): array<U>
```

In [45]:
q = """
select celsius,
transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit
from tC
"""
spark.sql(q).show()

+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



### Filter

```
filter(array<T>, function<T, Boolean>): array<T>
```

In [46]:
spark.sql("""
SELECT celsius, 
 filter(celsius, t -> t > 38) as high 
  FROM tC
""").show()

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



### Exists
```
exists(array<T>, function<T, V, Boolean>): Boolean
```

In [47]:
# Is there a temperature of 38C in the array of temperatures
spark.sql("""
SELECT celsius, 
       exists(celsius, t -> t = 38) as threshold
  FROM tC
""").show()


+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



### Reduce
```
reduce(array<T>, B, function<B, T, B>, function<B, R>)
```

The `reduce()` function reduces the elements of the array to a single value by merging the elements into a buffer `B` using `function<B, T, B>` and applying a finishing `function<B, R>` on the final buffer:

In [83]:
# Calculate average temperature and convert to F
spark.sql("""
SELECT celsius, 
       reduce(
          celsius, 
          0, 
          (t, acc) -> t + acc, 
          acc -> (acc div size(celsius) * 9 div 5) + 32
        ) as avgFahrenheit 
  FROM tC
""").show()


AnalysisException: "Undefined function: 'reduce'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 3 pos 7"

# Common Relational Operations

In [51]:
from pyspark.sql.functions import expr

In [56]:
tripdelaysFilePath = db_fname('flights/departuredelays.csv')
airportsnaFilePath = db_fname('flights/airport-codes-na.txt')

In [59]:
airportsna = (spark.read
             .format('csv')
             .options(header='true', 
                     inferSchema='true',
                     sep='\t')
             .load(airportsnaFilePath))

airportsna.createOrReplaceTempView("airports_na")

In [61]:
departureDelays = (spark.read
                  .format('csv')
                  .options(header='true')
                  .load(tripdelaysFilePath)
                  .withColumn('delay', expr('CAST(delay as INT) as delay'))
                  .withColumn('distance', expr('CAST(distance as INT) as distance')))

departureDelays.createOrReplaceTempView('departureDelaysspark.sql(q)')

In [62]:
# Create temporary small table
foo = (departureDelays
      .filter(expr("""origin == 'SEA'
                  and destination == 'SFO'
                  and date like '01010%'
                  and delay > 0""")))
foo.createOrReplaceTempView('foo')

In [63]:
spark.sql('select * from airports_na limit 10').show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [64]:
spark.sql('select * from departureDelays limit 10').show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [65]:
spark.sql('select * from foo').show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



## Unions

In [66]:
bar = departureDelays.union(foo)
bar.createOrReplaceTempView('bar')

bar.filter(expr("""origin == 'SEA'
                  and destination == 'SFO'
                  and date like '01010%'
                  and delay > 0""")).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [68]:
q = """
select *
from bar
where origin = 'SEA'
and destination = 'SFO'
and date like '01010%'
and delay > 0
"""
spark.sql(q).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



## Joins

In [71]:
(foo.join(
    airportsna,
    airportsna.IATA == foo.origin)
    .select('City', 'State', 'date', 'delay', 'distance', 'destination')
    .show())

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



In [74]:
(foo.join(
    airportsna,
    airportsna['IATA'] == foo['origin'])
    .select('city', 'State', 'DATE', 'delay', 'distance', 'destination')
    .show())

+-------+-----+--------+-----+--------+-----------+
|   city|State|    DATE|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



In [76]:
q = """
select a.city,
    a.state,
    f.date,
    f.delay,
    f.distance,
    f.destination
from foo f
join airports_na a
on a.IATA = f.origin
"""
spark.sql(q).show()

+-------+-----+--------+-----+--------+-----------+
|   city|state|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



## Windowing

A window function uses values from the rows in a window (a range of input rows) to return a set of values, typically in the form of another row. With window functions, it is possible to operate on a group of rows while still returning a single value for every input row

In [91]:
q = """
SELECT origin, destination, SUM(delay) AS TotalDelays 
  FROM departureDelays 
 WHERE origin IN ('SEA', 'SFO', 'JFK') 
   AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL') 
 GROUP BY origin, destination
"""
(spark.sql(q)).createOrReplaceTempView('departureDelaysWindow')

In [92]:
spark.sql('select * from departureDelaysWindow').show()

+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
|   JFK|        ORD|       5608|
|   SEA|        LAX|       9359|
|   JFK|        SFO|      35619|
|   SFO|        ORD|      27412|
|   JFK|        DEN|       4315|
|   SFO|        DEN|      18688|
|   SFO|        SEA|      17080|
|   SEA|        SFO|      22293|
|   JFK|        ATL|      12141|
|   SFO|        ATL|       5091|
|   SEA|        DEN|      13645|
|   SEA|        ATL|       4535|
|   SEA|        ORD|      10041|
|   JFK|        SEA|       7856|
|   JFK|        LAX|      35755|
|   SFO|        JFK|      24100|
|   SFO|        LAX|      40798|
|   SEA|        JFK|       4667|
+------+-----------+-----------+



In [93]:
q = """
select origin, destination, TotalDelays, rank
from (
    select origin, destination, TotalDelays, dense_rank()
        over (partition by origin order by TotalDelays DESC) as rank
        from departureDelaysWindow
        ) t
where rank <= 3
"""
spark.sql(q).show()

+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
|   SEA|        SFO|      22293|   1|
|   SEA|        DEN|      13645|   2|
|   SEA|        ORD|      10041|   3|
|   SFO|        LAX|      40798|   1|
|   SFO|        ORD|      27412|   2|
|   SFO|        JFK|      24100|   3|
|   JFK|        LAX|      35755|   1|
|   JFK|        SFO|      35619|   2|
|   JFK|        ATL|      12141|   3|
+------+-----------+-----------+----+



## Modifications

In [94]:
foo.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



### Adding new columns

In [95]:
from pyspark.sql.functions import expr
foo2 = (foo.withColumn('status',
                      expr("""case when delay <= 10 then 'on-time'
                          else 'delayed' end""")))

In [97]:
foo2.show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|delayed|
|01010955|  104|     590|   SEA|        SFO|delayed|
|01010730|    5|     590|   SEA|        SFO|on-time|
+--------+-----+--------+------+-----------+-------+



### Drop

In [98]:
foo3 = foo2.drop('delay')
foo3.show()

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|delayed|
|01010955|     590|   SEA|        SFO|delayed|
|01010730|     590|   SEA|        SFO|on-time|
+--------+--------+------+-----------+-------+



### Rename columns

In [99]:
foo4 = foo3.withColumnRenamed('status', 'flight_status')
foo4.show()

+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      delayed|
|01010955|     590|   SEA|        SFO|      delayed|
|01010730|     590|   SEA|        SFO|      on-time|
+--------+--------+------+-----------+-------------+



### Pivoting

In [100]:
q = """
select destination,
    cast(substring(date, 0, 2) as int) as month,
    delay
from departureDelays
where origin = 'SEA'
"""
spark.sql(q).show()

+-----------+-----+-----+
|destination|month|delay|
+-----------+-----+-----+
|        ORD|    1|   92|
|        JFK|    1|   -7|
|        DFW|    1|   -5|
|        MIA|    1|   -3|
|        DFW|    1|   -3|
|        DFW|    1|    1|
|        ORD|    1|  -10|
|        DFW|    1|   -6|
|        DFW|    1|   -2|
|        ORD|    1|   -3|
|        ORD|    1|    0|
|        DFW|    1|   23|
|        DFW|    1|   36|
|        ORD|    1|  298|
|        JFK|    1|    4|
|        DFW|    1|    0|
|        MIA|    1|    2|
|        DFW|    1|    0|
|        DFW|    1|    0|
|        ORD|    1|   83|
+-----------+-----+-----+
only showing top 20 rows



In [101]:
q = """
select * from (
    select destination,
        cast(substring(date, 0, 2) as int) as month,
        delay
    from departureDelays
    where origin = 'SEA'
)
pivot (
    cast(avg(delay) as decimal(4, 2)) as AvgDelay,
    max(delay) as MaxDelay
    for month in (1 JAN, 2 feb)
    )
order by destination
"""
spark.sql(q).show()

+-----------+------------+------------+------------+------------+
|destination|JAN_AvgDelay|JAN_MaxDelay|feb_AvgDelay|feb_MaxDelay|
+-----------+------------+------------+------------+------------+
|        ABQ|       19.86|         316|       11.42|          69|
|        ANC|        4.44|         149|        7.90|         141|
|        ATL|       11.98|         397|        7.73|         145|
|        AUS|        3.48|          50|       -0.21|          18|
|        BOS|        7.84|         110|       14.58|         152|
|        BUR|       -2.03|          56|       -1.89|          78|
|        CLE|       16.00|          27|        null|        null|
|        CLT|        2.53|          41|       12.96|         228|
|        COS|        5.32|          82|       12.18|         203|
|        CVG|       -0.50|           4|        null|        null|
|        DCA|       -1.15|          50|        0.07|          34|
|        DEN|       13.13|         425|       12.95|         625|
|        D

### Summary