### UDF (User-defined functions)

In [0]:
from pyspark.sql.types import LongType

# Create cubed function

def cubed(s):
 return s * s * s

# Register UDF

spark.udf.register("cubed", cubed, LongType())

# Generate temporary view

spark.range(1, 9).createOrReplaceTempView("udf_test")

# Use Spark SQL to execute the cubed() function

spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



One of the previous prevailing issues with using PySpark UDFs was that they had slower performance than Scala UDFs. 
To resolve this problem, Pandas UDFs (also known as vectorized UDFs) were introduced as part of Apache Spark 2.3.
A Pandas UDF uses Apache Arrow to transfer data and Pandas to work with the data. 
You define a Pandas UDF using the keyword pandas_udf as the decorator, or to wrap the function itself.
Instead of operating on individual inputs row by row, you are operating on a Pandas Series or DataFrame.

From Apache Spark 3.0 with Python 3.6 and above, Pandas UDFs were split into two API categories: 
* Pandas UDFs 
* Pandas Function APIs: allow you to directly apply a local Python function to a PySpark DataFrame.

In [0]:
# Import pandas

import pandas as pd

# Import various pyspark SQL functions including pandas_udf

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the cubed function 

def cubed(a: pd.Series) -> pd.Series:
 return a * a * a

# Create the pandas UDF for the cubed function 

cubed_udf = pandas_udf(cubed, returnType=LongType())

# Create a Pandas Series

x = pd.Series([1, 2, 3])

# The function for a pandas_udf executed with local Pandas data

print(cubed(x))

0     1
1     8
2    27
dtype: int64


In [0]:
# Let’s switch to a Spark DataFrame.
# Create a Spark DataFrame, 'spark' is an existing SparkSession

df = spark.range(1, 4)

# Execute function as a Spark vectorized UDF
df.select("id", cubed_udf(col("id"))).show()

# Using a vectorized UDF will result in the execution of Spark jobs

+---+---------+
| id|cubed(id)|
+---+---------+
|  1|        1|
|  2|        8|
|  3|       27|
+---+---------+



Spark SQL shell
A convenient tool for executing Spark SQL queries is the spark-sql CLI. 
To start the Spark SQL CLI, execute the following command in the $SPARK_HOME
folder:
./bin/spark-sql
Once you’ve started the shell, you can use it to interactively perform Spark SQL queries.

In [0]:
%sql

CREATE TABLE people1 (name STRING, age int);

In [0]:
%sql
INSERT INTO people1 VALUES ("Michael", NULL);

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
INSERT INTO people1 VALUES ("Andy", 30);

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
INSERT INTO people1 VALUES ("Samantha", 19);

num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
SHOW TABLES;

database,tableName,isTemporary
default,airport_codes_na_txt,False
default,departuredelayswindow,False
,airports_na,True
,departuredelays,True
,foo,True
,udf_test,True


In [0]:
%sql
SELECT * FROM people1 WHERE age < 20;

name,age
Samantha,19


In [0]:
%sql
SELECT name FROM people1 WHERE age IS NULL;

name
Michael


Higher-order functions take anonymous lambda functions as arguments.
-- In SQL
transform(values, value -> lambda expression)

The transform() function takes an array (values) and anonymous function (lambda expression) as input.

The function transparently creates a new array by applying the anonymous function to each element, and then assigning the result to the output array (similar to the UDF approach, but more efficiently).

In [0]:
from pyspark.sql.types import *

schema = StructType([StructField("celsius", ArrayType(IntegerType()))])

t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")

In [0]:
# Show the DataFrame
t_c.show()

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



In [0]:
# Calculate Fahrenheit from Celsius for an array of temperatures

spark.sql("""
SELECT celsius,
transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit 
 FROM tC
""").show()


+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



In [0]:
#  Filter temperatures > 38C for array of temperatures

spark.sql("""
SELECT celsius, 
 filter(celsius, t -> t > 38) as high 
 FROM tC
""").show()

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



In [0]:
#  Is there a temperature of 38C in the array of temperatures

spark.sql("""
SELECT celsius, 
 exists(celsius, t -> t = 38) as threshold
 FROM tC
""").show()

+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



In [0]:
# Calculate average temperature and convert to F

spark.sql("""
SELECT celsius, 
 reduce(
 celsius, 
 0, 
 (t, acc) -> t + acc, 
 acc -> (acc div size(celsius) * 9 div 5) + 32
 ) as avgFahrenheit 
 FROM tC
""").show()

+--------------------+-------------+
|             celsius|avgFahrenheit|
+--------------------+-------------+
|[35, 36, 32, 30, ...|           96|
|[31, 32, 34, 55, 56]|          105|
+--------------------+-------------+



In [0]:
from pyspark.sql.functions import expr
tripdelaysFilePath = "/FileStore/tables/departuredelays.csv"
airportsnaFilePath = "/FileStore/tables/airport_codes_na.txt"

In [0]:
# Obtain airports data set

airportsna = (spark.read
 .format("csv")
 .options(header="true", inferSchema="true", sep="\t")
 .load(airportsnaFilePath))

airportsna.createOrReplaceTempView("airports_na")

In [0]:
# Obtain departure delays data set

departureDelays = (spark.read
 .format("csv")
 .options(header="true")
 .load(tripdelaysFilePath))
departureDelays = (departureDelays
 .withColumn("delay", expr("CAST(delay as INT) as delay"))
 .withColumn("distance", expr("CAST(distance as INT) as distance")))
departureDelays.createOrReplaceTempView("departureDelays")

In [0]:
# Create temporary small table

foo = (departureDelays
 .filter(expr("""origin == 'SEA' and destination == 'SFO' and 
 date like '01010%' and delay > 0""")))
foo.createOrReplaceTempView("foo")


The departureDelays DataFrame contains data on >1.3M flights while the foo DataFrame contains just three rows with information on flights from SEA to SFO for a specific time range.

In [0]:
spark.sql("SELECT * FROM airports_na LIMIT 10").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [0]:
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [0]:
spark.sql("SELECT * FROM foo").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



#### Union
A common pattern within Apache Spark is to union two different DataFrames with the same schema together. This can be achieved using the union() method.

In [0]:
# Union two tables

bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")

# Show the union (filtering for SEA and SFO in a specific time range)

bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
AND date LIKE '01010%' AND delay > 0""")).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



The bar DataFrame is the union of foo with delays. Using the same filtering criteria results in the bar DataFrame, we see a duplication of the foo data, as expected.

In [0]:
spark.sql("""
SELECT * 
 FROM bar 
 WHERE origin = 'SEA' 
 AND destination = 'SFO' 
 AND date LIKE '01010%' 
 AND delay > 0
""").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



#### Joins
A common DataFrame operation is to join two DataFrames (or tables) together. By default, a Spark SQL join is an inner join.

In [0]:
# Join departure delays data (foo) with airport info

foo.join(
 airportsna, 
 airportsna.IATA == foo.origin
).select("City", "State", "date", "delay", "distance", "destination").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



In [0]:
spark.sql("""
SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination 
 FROM foo f
 JOIN airports_na a
 ON a.IATA = f.origin
""").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



#### Windowing
A window function uses values from the rows in a window (a range of input rows) to return a set of values, typically in the form of another row. With window functions, it is possible to operate on a group of rows while still returning a single value for every input row.

In [0]:
%sql
DROP TABLE IF EXISTS departureDelaysWindow;

In [0]:
%sql
CREATE TABLE departureDelaysWindow AS
SELECT origin, destination, SUM(delay) AS TotalDelays
 FROM departureDelays
WHERE origin IN ('SEA', 'SFO', 'JFK')
 AND destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
GROUP BY origin, destination;

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM departureDelaysWindow;

origin,destination,TotalDelays
JFK,ORD,5608
JFK,SFO,35619
JFK,DEN,4315
JFK,ATL,12141
JFK,SEA,7856
JFK,LAX,35755
SEA,LAX,9359
SFO,ORD,27412
SFO,DEN,18688
SFO,SEA,17080


What if for each of these origin airports you wanted to find the three destinations that experienced the most delays? You could achieve this by running three different queries for each origin and then unioning the results together.
But a better approach would be to use a window function like dense_rank() .

In [0]:
spark.sql("""
SELECT origin, destination, TotalDelays, rank 
 FROM ( 
 SELECT origin, destination, TotalDelays, dense_rank() 
 OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank 
 FROM departureDelaysWindow
 ) t 
 WHERE rank <= 3
""").show()


+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
|   JFK|        LAX|      35755|   1|
|   JFK|        SFO|      35619|   2|
|   JFK|        ATL|      12141|   3|
|   SEA|        SFO|      22293|   1|
|   SEA|        DEN|      13645|   2|
|   SEA|        ORD|      10041|   3|
|   SFO|        LAX|      40798|   1|
|   SFO|        ORD|      27412|   2|
|   SFO|        JFK|      24100|   3|
+------+-----------+-----------+----+



In [0]:
# Modifications 

foo.show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [0]:
from pyspark.sql.functions import expr

foo2 = (foo.withColumn(
 "status",
 expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
 ))

In [0]:
foo2.show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



In [0]:
# Dropping columns 

foo3 = foo2.drop("delay")
foo3.show()

+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+



In [0]:
#Renaming columns 

foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()

+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+



In [0]:
%sql
--- Pivoting

SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
 FROM departureDelays
WHERE origin = 'SEA';

destination,month,delay
ORD,1,92
JFK,1,-7
DFW,1,-5
MIA,1,-3
DFW,1,-3
DFW,1,1
ORD,1,-10
DFW,1,-6
DFW,1,-2
ORD,1,-3


In [0]:
%sql
--- Pivoting allows you to place names in the month column (instead of 1 and 2 you can show Jan and Feb, respectively) as well as perform aggregate calculations (in this case average and max) on the delays by destination and month

SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
 FROM departureDelays WHERE origin = 'SEA'
)
PIVOT (
 CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay
 FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination;

destination,JAN_AvgDelay,JAN_MaxDelay,FEB_AvgDelay,FEB_MaxDelay
ABQ,19.86,316,11.42,69.0
ANC,4.44,149,7.9,141.0
ATL,11.98,397,7.73,145.0
AUS,3.48,50,-0.21,18.0
BOS,7.84,110,14.58,152.0
BUR,-2.03,56,-1.89,78.0
CLE,16.0,27,,
CLT,2.53,41,12.96,228.0
COS,5.32,82,12.18,203.0
CVG,-0.5,4,,
