# Spark SQL and DataFrames: Interacting with External Data Sources

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Spark Chapter 5") \
    .getOrCreate()

21/08/13 18:55:48 WARN Utils: Your hostname, OutOne resolves to a loopback address: 127.0.1.1; using 192.168.1.84 instead (on interface enp9s0)
21/08/13 18:55:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/08/13 18:55:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In the previous chapter, we explored interacting with the built-in data sources in Spark. We also took a closer look at the DataFrame API and its interoperability with Spark SQL. In this chapter, we will focus on how Spark SQL interfaces with external components. Specifically, we discuss how Spark SQL allows you to:   
* Use user-defined functions for both Apache Hive and Apache Spark.   
* Connect with external data sources such as JDBC and SQL databases, PostgreSQL, MySQL, Tableau, Azure Cosmos DB, and MS SQL Server.   
* Work with simple and complex types, higher-order functions, and common relational operators.   

We’ll also look at some different options for querying Spark using Spark SQL, such as the Spark SQL shell, Beeline, and Tableau.


## Spark SQL and Apache Hive

Spark SQL is a foundational component of Apache Spark that integrates relational processing with Spark’s functional programming API. Its genesis was in previous work on Shark. Shark was originally built on the Hive codebase on top of Apache Spark and became one of the first interactive SQL query engines on Hadoop systems.  
It demonstrated that it was possible to have the best of both worlds; as fast as an enterprise data warehouse, and scaling as well as Hive/MapReduce.


## User-defined functions (UDFs)

The benefit of creating your own PySpark or Scala UDFs is that you (and others) will be able to make use of them within Spark SQL itself.  
For example, a data scientist can wrap an ML model within a UDF so that a data analyst can query its predictions in Spark SQL **without necessarily understanding the internals of the model.**


In [2]:
from pyspark.sql.types import LongType
# Create cubed function
def cubed(s):
    return s * s * s
# Register UDF
spark.udf.register("cubed", cubed, LongType())
# Generate temporary view
spark.range(1, 9).createOrReplaceTempView("udf_test")


And now I can use this function by simply calling it on the query.

In [3]:
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()

                                                                                

+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+



## Speeding up and distributing PySpark UDFs with Pandas UDFs

With Apache Spark 3.0, Pandas UDFs infer the Pandas UDF type from Pythontype hints in Pandas UDFs such as pandas.Series, pandas.DataFrame, Tuple, and Iterator. Previously you needed to manually define and specify each Pandas UDF type. Currently, the supported cases of Python type hints in Pandas UDFs are Series to Series, Iterator of Series to Iterator of Series, Iterator of Multiple Series to Iterator of Series, and Series to Scalar (a single value).

In [4]:
# In Python
# Import pandas
import pandas as pd
# Import various pyspark SQL functions including pandas_udf
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the cubed function
def cubed(a: pd.Series) -> pd.Series:
    return a * a * a
# Create the pandas UDF for the cubed function
cubed_udf = pandas_udf(cubed, returnType=LongType())

In [5]:
cubed_udf

<function __main__.cubed(a: pandas.core.series.Series) -> pandas.core.series.Series>

In [6]:
%%timeit
# Create a Pandas Series
x = pd.Series([1, 2, 3])
# The function for a pandas_udf executed with local Pandas data
cubed(x)

334 µs ± 8.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html

In [8]:
#from pyspark.sql import DataFrame
#u.explain()

## PostgreSQL
To connect to a PostgreSQL database, build or download the JDBC jar from [Maven](https://mvnrepository.com/artifact/org.postgresql/postgresql) and add it to your classpath. Then start a Spark shell (spark-shell or pyspark), specifying that jar:    
`bin/spark-shell --jars postgresql-42.2.6.jar`


In [9]:
# Read Option 1: Loading data from a JDBC source using load method
"""
jdbcDF1 = (spark
    .read
    .format("jdbc")
    .option("url", "jdbc:postgresql://[DBSERVER]")
    .option("dbtable", "[SCHEMA].[TABLENAME]")
    .option("user", "[USERNAME]")
    .option("password", "[PASSWORD]")
    .load()
            )

# Read Option 2: Loading data from a JDBC source using jdbc method
jdbcDF2 = (spark
    .read
    .jdbc("jdbc:postgresql://[DBSERVER]", "[SCHEMA].[TABLENAME]",
    properties={"user": "[USERNAME]", "password": "[PASSWORD]"})
          )
# Write Option 1: Saving data to a JDBC source using save method
(jdbcDF1
    .write
    .format("jdbc")
    .option("url", "jdbc:postgresql://[DBSERVER]")
    .option("dbtable", "[SCHEMA].[TABLENAME]")
    .option("user", "[USERNAME]")
    .option("password", "[PASSWORD]")
    .save()
)
# Write Option 2: Saving data to a JDBC source using jdbc method
(jdbcDF2
    .write
    .jdbc("jdbc:postgresql:[DBSERVER]", "[SCHEMA].[TABLENAME]",
    properties={"user": "[USERNAME]", "password": "[PASSWORD]"})
)
""""

Py4JJavaError: An error occurred while calling o43.load.
: java.sql.SQLException: No suitable driver
	at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:108)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:38)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)


## Higher-Order Functions

[PySpark Built-in functions](https://spark.apache.org/docs/latest/api/sql/index.html)

Higher-order functions that take anonymous lambda functions as arguments. An example of a higher-
order function is the following:

In [10]:
# In Python
from pyspark.sql.types import *
schema = StructType([StructField("celsius", ArrayType(IntegerType()))])
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")
# Show the DataFrame
t_c.show()


+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+



### Transform

`transform(array<T>, function<T, U>): array<U>`

In [11]:
spark.sql("""
SELECT celsius,
transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit
FROM tC
""").show()

+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+



In [12]:
%%timeit 
spark.sql("""
SELECT celsius,
transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit
FROM tC
""")

9.15 ms ± 1.28 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


### Filter

`filter(array<T>, function<T, Boolean>): array<T>`

In [14]:
## Filter temperatures > 38C for array of temperatures
spark.sql("""
    SELECT celsius,
    filter(celsius, t -> t > 38) as high
    FROM tC
    """).show()

+--------------------+--------+
|             celsius|    high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]|
|[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+



In [16]:
%%timeit 
spark.sql("""
    SELECT celsius,
    filter(celsius, t -> t > 38) as high
    FROM tC
    """)

3.43 ms ± 450 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


### Exists

`exists(array<T>, function<T, V, Boolean>): Boolean`


In [19]:
spark.sql("""
SELECT celsius,
exists(celsius, t -> t = 38) as threshold
FROM tC
""").show()


+--------------------+---------+
|             celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...|     true|
|[31, 32, 34, 55, 56]|    false|
+--------------------+---------+



### Aggregate
`aggregate(array<T>, B, function<B, T, B>, function<B, R>)`


In [20]:
spark.sql("""
    SELECT celsius,
    aggregate(
    celsius,
    0,
    (t, acc) -> t + acc,
    acc -> (acc div size(celsius) * 9 div 5) + 32
    ) as avgFahrenheit
    FROM tC
    """).show()


+--------------------+-------------+
|             celsius|avgFahrenheit|
+--------------------+-------------+
|[35, 36, 32, 30, ...|           96|
|[31, 32, 34, 55, 56]|          105|
+--------------------+-------------+



In [22]:
%%timeit
spark.sql("""
    SELECT celsius,
    aggregate(
    celsius,
    0,
    (t, acc) -> t + acc,
    acc -> (acc div size(celsius) * 9 div 5) + 32
    ) as avgFahrenheit
    FROM tC
    """)

6.64 ms ± 749 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


### Unions, Joins, etc

Perform these DataFrame operations, we’ll first prepare some data. In the following code snippet, we:   
1. Import two files and create two DataFrames, one for airport (airportsna) information and one for US flight delays (departureDelays).
2. Using expr(), convert the delay and distance columns from STRING to INT.
3. Create a smaller table, foo, that we can focus on for our demo examples; it contains only information on three flights originating from Seattle (SEA) to the destination of San Francisco (SFO) for a small time range.


In [26]:
# Set file paths
from pyspark.sql.functions import expr
tripdelaysFilePath ="../repo/databricks-datasets/learning-spark-v2/flights/departuredelays.csv"
airportsnaFilePath ="../repo/databricks-datasets/learning-spark-v2/flights/airport-codes-na.txt"
# Obtain airports data set
airportsna = (spark.read
    .format("csv")
    .options(header="true", inferSchema="true", sep="\t")
    .load(airportsnaFilePath))

airportsna.createOrReplaceTempView("airports_na")
    

# Obtain departure delays data set
departureDelays = (spark.read
    .format("csv")
    .options(header="true")
    .load(tripdelaysFilePath))
departureDelays = (departureDelays
    .withColumn("delay", expr("CAST(delay as INT) as delay"))
    .withColumn("distance", expr("CAST(distance as INT) as distance")))

departureDelays.createOrReplaceTempView("departureDelays")


# Create temporary small table
foo = (departureDelays
    .filter(expr("""origin == 'SEA' and destination == 'SFO' and
    date like '01010%' and delay > 0""")))

foo.createOrReplaceTempView("foo")

In [27]:
spark.sql("SELECT * FROM airports_na LIMIT 10").show()

+-----------+-----+-------+----+
|       City|State|Country|IATA|
+-----------+-----+-------+----+
| Abbotsford|   BC| Canada| YXX|
|   Aberdeen|   SD|    USA| ABR|
|    Abilene|   TX|    USA| ABI|
|      Akron|   OH|    USA| CAK|
|    Alamosa|   CO|    USA| ALS|
|     Albany|   GA|    USA| ABY|
|     Albany|   NY|    USA| ALB|
|Albuquerque|   NM|    USA| ABQ|
| Alexandria|   LA|    USA| AEX|
|  Allentown|   PA|    USA| ABE|
+-----------+-----+-------+----+



In [28]:
spark.sql("SELECT * FROM departureDelays LIMIT 10").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+



In [29]:
spark.sql("SELECT * FROM foo").show()


+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



In [30]:
# In Python
# Union two tables
bar = departureDelays.union(foo)
bar.createOrReplaceTempView("bar")
# Show the union (filtering for SEA and SFO in a specific time range)
bar.filter(expr("""origin == 'SEA' AND destination == 'SFO'
AND date LIKE '01010%' AND delay > 0""")).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
|01010710|   31|     590|   SEA|        SFO|
|01010955|  104|     590|   SEA|        SFO|
|01010730|    5|     590|   SEA|        SFO|
+--------+-----+--------+------+-----------+



                                                                                

In [32]:
foo.join(
    airportsna,
    airportsna.IATA == foo.origin
    ).select("City", "State", "date", "delay", "distance", "destination").show()

+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



In [34]:
## In SQL
spark.sql("""
    SELECT a.City, a.State, f.date, f.delay, f.distance, f.destination
    FROM foo f
    JOIN airports_na a
    ON a.IATA = f.origin
    """).show()


+-------+-----+--------+-----+--------+-----------+
|   City|State|    date|delay|distance|destination|
+-------+-----+--------+-----+--------+-----------+
|Seattle|   WA|01010710|   31|     590|        SFO|
|Seattle|   WA|01010955|  104|     590|        SFO|
|Seattle|   WA|01010730|    5|     590|        SFO|
+-------+-----+--------+-----+--------+-----------+



### Windowing

A window function uses values from the rows in a window (a range of input rows) to return a set of values, typically in the form of another row. With window functions, it is possible to operate on a group of rows while still returning a single value for every input row.

Let’s start with a review of the TotalDelays (calculated by sum(Delay)) experienced by flights originating from Seattle (SEA), San Francisco (SFO), and New York City (JFK) and going to a specific set of destination locations.

What if for each of these origin airports you wanted to find the three destinations that
experienced the most delays? You could achieve this by running three different quer‐
ies for each origin and then unioning the results together, like this:


In [39]:
dfView = spark.sql("""SELECT
     origin, destination, SUM(delay) AS TotalDelays
    FROM
     departureDelays
    WHERE
     origin IN ('SEA', 'SFO', 'JFK')
    AND
     destination IN ('SEA', 'SFO', 'JFK', 'DEN', 'ORD', 'LAX', 'ATL')
    GROUP
     BY origin, destination;
    """)

dfView.createOrReplaceTempView("departureDelaysWindow")

spark.sql("""
    SELECT origin, destination, TotalDelays, rank
    FROM (
    SELECT origin, destination, TotalDelays, dense_rank()
    OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
    FROM departureDelaysWindow
    ) t
    WHERE rank <= 3
    """).show()


+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
|   SEA|        SFO|      22293|   1|
|   SEA|        DEN|      13645|   2|
|   SEA|        ORD|      10041|   3|
|   SFO|        LAX|      40798|   1|
|   SFO|        ORD|      27412|   2|
|   SFO|        JFK|      24100|   3|
|   JFK|        LAX|      35755|   1|
|   JFK|        SFO|      35619|   2|
|   JFK|        ATL|      12141|   3|
+------+-----------+-----------+----+



By using the dense_rank() window function, we can quickly ascertain which destinations with the worst delays for the three origin cities were.

### Modifications

**Underlying RDDs are immutable**    
While DataFrames themselves are immutable, you can modify them through operations that create new, different DataFrames, with different columns, for example. 

#### Add new columns

In [40]:
# In Python
from pyspark.sql.functions import expr
foo2 = (foo.withColumn(
    "status",
    expr("CASE WHEN delay <= 10 THEN 'On-time' ELSE 'Delayed' END")
    ))


In [44]:
foo2.show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination| status|
+--------+-----+--------+------+-----------+-------+
|01010710|   31|     590|   SEA|        SFO|Delayed|
|01010955|  104|     590|   SEA|        SFO|Delayed|
|01010730|    5|     590|   SEA|        SFO|On-time|
+--------+-----+--------+------+-----------+-------+



#### Dropping columns

In [45]:
# In Python
foo3 = foo2.drop("delay")
foo3.show()


+--------+--------+------+-----------+-------+
|    date|distance|origin|destination| status|
+--------+--------+------+-----------+-------+
|01010710|     590|   SEA|        SFO|Delayed|
|01010955|     590|   SEA|        SFO|Delayed|
|01010730|     590|   SEA|        SFO|On-time|
+--------+--------+------+-----------+-------+



#### Renaming columns

In [46]:
# In Python
foo4 = foo3.withColumnRenamed("status", "flight_status")
foo4.show()

+--------+--------+------+-----------+-------------+
|    date|distance|origin|destination|flight_status|
+--------+--------+------+-----------+-------------+
|01010710|     590|   SEA|        SFO|      Delayed|
|01010955|     590|   SEA|        SFO|      Delayed|
|01010730|     590|   SEA|        SFO|      On-time|
+--------+--------+------+-----------+-------------+



#### Pivoting

https://sparkbyexamples.com/pyspark/pyspark-pivot-and-unpivot-dataframe/

```sql

SELECT * FROM (
SELECT destination, CAST(SUBSTRING(date, 0, 2) AS int) AS month, delay
FROM departureDelays WHERE origin = 'SEA'
)
PIVOT (
CAST(AVG(delay) AS DECIMAL(4, 2)) AS AvgDelay, MAX(delay) AS MaxDelay
FOR month IN (1 JAN, 2 FEB)
)
ORDER BY destination
```