# Chapter 5: Spark SQL and DataFrames: Interacting with External Data Sources

In [1]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import col, pandas_udf

In [2]:
spark = (SparkSession
      .builder
      .appName("SparkSQLExampleApp")
      .getOrCreate())

22/04/22 09:14:59 WARN Utils: Your hostname, Zipcoders-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.77 instead (on interface en0)
22/04/22 09:14:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/22 09:15:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark SQL UDFs

In [3]:
 # Create cubed function
def cubed(s): 
    return s*s*s

In [4]:
# Register UDF
spark.udf.register("cubed", cubed, LongType())

<function __main__.cubed(s)>

In [5]:
# Generate temporary view
spark.range(1, 9).createOrReplaceTempView("udf_test")

In [6]:
# Query the cubed UDF
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

[Stage 0:>                                                          (0 + 4) / 4]

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



                                                                                

In [7]:
# scalar Pandas UDF for Spark 3.0
def cubed(a: pd.Series) -> pd.Series: 
    return a*a*a

In [8]:
# Create the pandas UDF for the cubed function
# cubed_udf = pandas_udf(cubed, returnType=LongType())

In [9]:
# Create a Pandas Series
x = pd.Series([1, 2, 3])

In [10]:
# The function for a pandas_udf executed with local Pandas data
print(cubed(x))

0     1
1     8
2    27
dtype: int64


In [11]:
# Create a Spark DataFrame, 'spark' is an existing SparkSession
#df = spark.range(1, 4)

In [12]:
# Execute function as a Spark vectorized UDF
#df.select("id", cubed_udf(col("id"))).show()

In [13]:
# Higher-Order Functions

In [14]:
schema = StructType([StructField("celsius", ArrayType(IntegerType()))])

In [15]:
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]

In [16]:
t_c = spark.createDataFrame(t_list, schema)

In [17]:
t_c.createOrReplaceTempView("tC")

In [18]:
# Show the DataFrame
t_c.show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



                                                                                

transform()
The transform() function produces an array by applying a function to each element
of the input array (similar to a map() function):

In [19]:
spark.sql("""
SELECT celsius,
transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit
      FROM tC
""").show()

+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



filter()
The filter() function produces an array consisting of only the elements of the input
array for which the Boolean function is true:

In [21]:
# Filter temperatures > 38C for array of temperatures spark.
spark.sql("""
SELECT celsius,
     filter(celsius, t -> t > 38) as high
      FROM tC
""").show()

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



exists() 
The exists() function returns true if the Boolean function holds for any element in
the input array:

In [28]:
# Is there a temperature of 38C in the array of temperatures 
spark.sql("""
SELECT celsius,
           exists(celsius, t -> t = 38) as threshold
      FROM tC
""").show()

+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



reduce()
The reduce() function reduces the elements of the array to a single value by merging the elements into a buffer B using function<B, T, B> and applying a finishing function<B, R> on the final buffer:

In [None]:
# Calculate average temperature and convert to F 
spark.sql("""
SELECT celsius,
           reduce(
              celsius,
              0,
              (t, acc) -> t + acc,
              acc -> (acc div size(celsius) * 9 div 5) + 32
            ) as avgFahrenheit
      FROM tC
""").show()

In [40]:
# Set file paths
from pyspark.sql.functions import expr 
tripdelaysFilePath = "/Users/allenc/PyCharmProjects/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/flightsdeparturedelays.csv"
airportsnaFilePath = "/Users/allenc/PyCharmProjects/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"

In [44]:
# Obtain airports data set
airportsna = (spark.read
.format("csv")
.options(header="true", inferSchema="true", sep="\t") .load(airportsnaFilePath))
airportsna.createOrReplaceTempView("airports_na")

In [None]:
# Obtain departure delays data set
departureDelays = (spark.read
    .format("csv")
    .options(header="true")
    .load(tripdelaysFilePath))
departureDelays = (departureDelays
    .withColumn("delay", expr("CAST(delay as INT) as delay"))
    .withColumn("distance", expr("CAST(distance as INT) as distance")))
departureDelays.createOrReplaceTempView("departureDelays")

In [None]:
# Createtemporary small table
foo = (departureDelays
    .filter(expr("""origin == 'SEA' and destination == 'SFO' and date like '01010%' and delay > 0""")))
foo.createOrReplaceTempView("foo")

In [48]:
spark.sql("SELECT * FROM airports_na LIMIT 10").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [50]:
# spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

In [52]:
# spark.sql("SELECT * FROM foo").show()

# Unions

A common pattern within Apache Spark is to union two different DataFrames with the same schema together. This can be achieved using the union() method:

In [None]:
# Union two tables
bar = departureDelays.union(foo) 
bar.createOrReplaceTempView("bar")

In [None]:
# Show the union (filtering for SEA and SFO in a specific time range)
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
AND date LIKE '01010%' AND delay > 0""")).show()