# Spark SQL

For some time, the better alternative to Spark RDDs has been provided by Spark SQL.  This API—built on top of RDDs—improves the efficiency and programming model for interacting with data.  Not only is the API more convenient for manipulating data with many records, it also offers what amounts to an optimizing compiler that will compute a more efficient plan for your job automatically, which is a huge benefit.  This compiler can be extended with custom types and operators.  In fact, the [RasterFrames project](github.com/locationtech/rasterframes) has already added the ability to work with Geotrellis types.

This is the present and future of Spark, and is generally a better tool than the pure RDD interface.

## Initializing Spark

In [None]:
import $ivy.`org.apache.logging.log4j:log4j-core:2.17.0`
import $ivy.`org.apache.logging.log4j:log4j-1.2-api:2.17.0`
import $ivy.`org.apache.spark::spark-sql:3.3.1`

import org.apache.log4j.{Level, Logger}
Logger.getRootLogger.setLevel(Level.WARN)
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("org").setLevel(Level.OFF)

import org.apache.spark.sql._

val spark = {
  NotebookSparkSession.builder()
    .master("local[4]")
    .getOrCreate()
}

import spark.implicits._  // Provides a bunch of extra sugar

It's not necessary to extract the `SparkContext`, as we will be working directly with the `SparkSession` object here.

## Datasets

Much as the RDD is the core data structure of vanilla Spark, the `Dataset` forms the core of Spark SQL.  It wraps an RDD with additional machinery and syntactic sugar that improves its usability over a plain RDD.  Consider the following:

In [None]:
// The following line is needed to make this work in this Jupyter notebook;
// this will not be needed for normal code.  Expect to see this workaround 
// below, since it allows us to declare case classes in the correct scope.
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

case class Person(name: String, id: Int)

val personDS = spark.createDataset(
    Seq(Person("Judy", 23), Person("Paul", 19), Person("Arthur", 42))
)

We now have a structure containing a set of case class instances.  This is useful on its own, but there is a hidden easter egg that we can reveal by displaying the Dataset:

In [None]:
personDS.show

This shows that Spark SQL understands the fields of the case class, and recognizes that these data have two columns.  We can see further that it understands the types as well:

In [None]:
personDS.printSchema

What this provides is a table-like representation of arbitrary data, provided that there is an [`Encoder`](https://spark.apache.org/docs/3.3.1/api/scala/org/apache/spark/sql/Encoder.html), available explicitly or implicitly, that understands the type of data you are attempting to add to a Dataset.  In our initialization block, we saw
```scala
import spark.implicits._
```
which brought a host of implicit Encoders into scope that allowed us to simply add a sequence of our custom case class to a Dataset.

### Dataframe conversion

Datasets give us the ability to have a strongly typed, RDD-like experience, but with the added optimizations from Spark SQL.  We can get a more dynamic experience by using a DataFrame:

In [None]:
val personDF = personDS.toDF
personDF.show
personDF.printSchema

The differences here are obscured, but show up when we interrogate the types of the contents:

In [None]:
println(f"type(personDS.first) = ${personDS.first.getClass}")
println(f"type(personDF.first) = ${personDF.first.getClass}")

(Here we can see why we have to use the special incantation in the cells where we declare the case classes.  Because of the way that the Ammonite interpreter declares classes, we have to make sure `Person` is in scope.)  The Dataframe has quietly reencoded the content of the Dataset in an internal [`Row`](https://spark.apache.org/docs/3.3.1/api/scala/org/apache/spark/sql/Row.html) instance.  In fact, in the Scala implementation, `DataFrame` is just a type alias for `Dataset[Row]`.  We are able to convert a DataFrame back into a Dataset as well:

In [None]:
personDF.as[Person].first

An interesting thing, is that the Encoder system allows us to convert a Dataset or DataFrame to any case class which has a schema that is a subset of the schema of the source container.  For example:

In [None]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

case class NamedEntity(name: String)

personDF.as[NamedEntity].first

If we attempt to convert to a type that does not match the schema, we'll get a fairly clear error:

In [None]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

case class Employee(name: String, title: String)

try {
    personDS.as[Employee]
} catch {
    case e: Exception => println(e)
}

In a limited sense, this provides a type-safe `select` statement.

We can then use the standard battery of processing functions to do tasks, such as `filter`, `map`, `reduce`, etc.  See the [Dataset documentation](https://spark.apache.org/docs/3.3.1/api/scala/org/apache/spark/sql/Dataset.html) for details.

### Aggregations

It's also possible to do operations that group by key, and then apply aggregation to the result.  Unlike with RDDs, one does not need to be wary of using the Dataset `groupByKey` operation.  This is due to the optimization of Spark SQL's execution engine.  Because `groupByKey` does not produce a `Dataset`, but a [`KeyValueGroupedDataset`](https://spark.apache.org/docs/3.3.1/api/scala/org/apache/spark/sql/KeyValueGroupedDataset.html), which has only aggregation-type methods associated to it, we should consider that `groupByKey` and the aggregation function that necessarily follows it are, together, playing the role of a `combineByKey`-style operation.  The optimizer works hard to limit the shuffle data volume to what is needed to make the operation work.

We should look at an example of grouped aggregation.  This requires a datatype.

In [None]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

case class Item(department: String, item_name: String, quantity: Int, price: Double)

val inventory = spark.createDataset(Seq(
    Item("produce", "oranges", 20, 0.99),
    Item("produce", "persimmons", 12, 2.50),
    Item("grocery", "spaghetti", 31, 1.29),
    Item("deli", "cheddar", 6, 3.99),
    Item("grocery", "chips", 18, 2.99)
))

To do the calculation that I've got in mind, we need to bring in the `org.apache.spark.sql.functions` module, which provides a wide variety of Column functions to work with.  These are functions which take columnar elements of a Dataset/DataFrame and recombine them into new columnar elements.  To compute the value in the department, we can simply create a new Column entity.  For us, the sum of the product of quantity and price is what we want.  The `sum` is an aggregation function, but the product is a standard Column function.  So, we need a handle on the columns in the Dataset, which are inferred from the case class elements.

There are a variety of ways to identify a column:

In [None]:
col("name")
$"name"
'name

The last two rely on implicit conversions that were brought in with the `import spark.implicits._` directive above.  Note that

In [None]:
$"name": Column
'name: Column

both coerce the types as desired.

We can then multiply the quantity and price columns, and then aggregate the result:

In [None]:
import org.apache.spark.sql.functions._

val departmentValue = inventory.groupByKey(_.department).agg(
    sum('quantity * 'price).as("value").as[Double]
).withColumnRenamed("key", "department")
departmentValue.show

Keep in mind that because we are in the strongly-typed Dataset regime, we need to use a `TypedColumn` to describe the result of the `sum`.  This is what the `as[Double]` does for us (the first is to alias the column to a more intuitive name).  DataFrame operations usually can be specified with plain `Column`s.

Also note that some aggregation tasks fall outside the reach of the standard functions.  In these cases, we will have to write [custom aggregation functions](https://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html).  This topic is out of scope for this tutorial, but best to know that it is there in case you need it.

## DataFrames

At some point, we are likely to want to exit this rigidly-typed regime and take advantage of the flexibility of the DataFrame type.  By doing this, we get a much more SQL-like interface.  Joins and querys take on a more familiar appearance, and we have to worry less about typing.  DataFrames are also how the DataSource infrastructure returns results, which means we will interact with them at least a little bit in most cases.

### Reading data

Getting data into the system with RDDs was tricky due for a couple reasons:
1. the burden of having to read data either as text, later coercing it into a form that we can work with, or needing to use the Hadoop file system approach, which adds an additional layer of complexity; and
2. for the lack of good, efficient representations of tabular data.

The Dataset/DataFrame infrastructure fixes the latter problem, and the DataSource infrastructure fixes the former.  Spark SQL comes built in with a variety of readers: CSV, text, ORC, Parquet, JSON, JDBC, and others.  This infrastructure is extensible, so custom data sources can be created if needed.

> Note: The DataSource API has been traditionally the least stable part of the Spark SQL API, so if you go this route, be mindful of the possibility of breakage during Spark version upgrades.

The datasource infrastructure is available as methods hanging off `spark.read` and `<dataset>.write`.  Let's load some data to work with.  We'll opt to do a little bit of processing on an OpenStreetMap excerpt of the Isle of Man.  This has been converted into an ORC file with [osm2orc](https://github.com/mojodna/osm2orc), so we can use the ORC loading facility from Spark.

In [None]:
import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy("id").orderBy(desc("version"))

val iom = spark.read.orc("data/isle-of-man.orc")
  .withColumn("row_number", row_number().over(windowSpec))
  .filter('row_number === 1 && 'visible ===true)
  .drop('row_number)

We've done a light amount of pre-processing here to only keep the most recent live version of the OSM elements.  The window function is used to enumerate the various elements by their version.  We keep only the most recent versions if they are visible—meaning they haven't been deleted.  This operation will make sure we don't double count as we go forward.

For clarity, let's look at the schema for the DataFrame:

In [None]:
iom.printSchema

For this example, we can attempt to measure the total length of the rivers on the Isle of Man that have been added to OSM.  In OSM parlance, this means finding the ways which have a tag named `waterway` with a value of `river`.  Looking above, we can see that the `tags` field is a Map, so we can access the `waterway` field of that map and check for the expected value.

Once we have all the ways with the river tag, we need to prep for a join by exploding over the `nds` field.  Because `nds` has this odd nested struct definition, we have to unpack it as well.  We can quickly do all of these things in the following expression:

In [None]:
val wayNds = iom
  .filter('type === "way" && 'tags("waterway") === "river")
  .select('id, posexplode('nds))
  .select('id, 'pos, $"col.ref" as "nid")
wayNds.printSchema

We'll also need to find all the node coordinates, indexed by their `id`, since this is what we'll join to the table above.  We'll rename the `id` column to `nid` to match the above and make the upcoming join easier.

In [None]:
val ndXYs = iom
  .filter('type === "node") // && 'lat.isNotNull && 'lon.isNotNull)
  .select('id as "nid", 'lat, 'lon)
ndXYs.show

Now, the join.

In [None]:
val wayXYs = wayNds.join(ndXYs, Seq("nid")).drop('nid)
wayXYs.printSchema

Earlier, we had used `posexplode`, which gave us the node ids in each way, indexed by their position in the `nds` array.  This way, we can reconstruct the geometry.

Computing the length is relatively easily done by taking three arrays—the position index, the latitudes, and the longitudes—and reconsitituting the geometry.  We can use the Geotrellis vector facilities to construct the LineString, project it to WebMercator, where the units are roughly eqivalent to meters (modulo a latitude-based scaling factor that we'll not worry about for this simple example), and compute the length of the reprojcted geometry.  Here's the function that does this:

In [None]:
import $ivy.`org.locationtech.geotrellis::geotrellis-vector:3.6.3`
import geotrellis.vector._
import geotrellis.proj4.{LatLng, WebMercator}

def reconstructLineString(idx: Array[Int], lat: Array[Double], long: Array[Double]): LineString = {
    val llCoords = idx.zip(lat.zip(long)).sortBy(_._1).map(_._2)
    LineString(llCoords)
}

def wayLengthFn(idx: Array[Int], lat: Array[Double], long: Array[Double]): Double = 
  reconstructLineString(idx, lat, long).reproject(LatLng, WebMercator).getLength

The astute observer will now pick up on the fact that `wayLengthFn` is not a Column function, and so cannot be used in a Spark SQL expression tree.  How we bridge the divide is by creating a special column type called a _user defined function_.  These are created by using the `udf` function, passing it a function.  The resulting Column will have an arity equivalent to the argument function, using the same types, and returning the same type as the function.

In [None]:
val wayLength = udf(wayLengthFn(_, _, _))

To apply this function, we need to group by the way ID, collect the corresponding `pos`, `lat`, and `lon` fields into arrays, and then pass the results to the `wayLength` UDF.

In [None]:
val wayLengths = wayXYs 
  .groupBy('id)
  .agg(
    collect_list('pos) as "pos", 
    collect_list('lat) as "lat", 
    collect_list('lon) as "lon"
  )
  .select('id, wayLength('pos, 'lon, 'lat) as 'length)

wayLengths.show

To complete the obective, we can do a simple sum aggregation (which has a bit of an odd syntax), grab the single result using `first`, and since that result is packed in a `Row`, we need to unpack the value using a `get`.  This is a point at which we see the dynamic nature of a DataFrame: it doesn't know the types of its contents.  If we use a raw `get`, the return value will be of `Any` type.  We will have to know the result of our computation, and use the appropriately-typed `get` version, in this case, `getDouble`.

In [None]:
wayLengths
  .agg(sum('length))
  .first
  .getDouble(0)

## User-defined types

At the moment, what we have is an extremely useful, SQL-like mechanism for working with structured data.  In it's native form, there's a great deal of work we can do.  However, we're missing an important feature that we had access to in RDD-land: the ability to use non-standard-typed objects in our Datasets and DataFrames.  Considering that we often use JTS Geometry instances and Geotrellis raster types in our work, this is a significant loss of function.  Consider that, in our current state, we could not make a UDF that returned Geometry:

In [None]:
try {
    udf(reconstructLineString(_, _, _))
    ()
} catch {
    case e: Exception => println(e)
}

Fortunately, Spark SQL also offers _user defined types_, and even more fortunately, there is a project that provides UDTs for the most important JTS and Geotrellis types, as well as a host of functions for manipulating them: [Rasterframes](https://github.com/locationtech/rasterframes).

We can import this project and enable its features:

In [None]:
import $ivy.`org.locationtech.rasterframes::rasterframes:0.11.1`
import org.locationtech.rasterframes._

The following line registers the custom UDTs and accompanying UDFs:

In [None]:
spark.withRasterFrames

Now, we can define the UDF successfully:

In [None]:
val convertToLineString = udf(reconstructLineString(_, _, _))

The result of this UDF can be stored in a DataFrame:

In [None]:
val wayGeoms = wayXYs 
  .groupBy('id)
  .agg(
    collect_list('pos) as "pos", 
    collect_list('lat) as "lat", 
    collect_list('lon) as "lon"
  )
  .select('id, convertToLineString('pos, 'lon, 'lat) as 'geom)
wayGeoms.printSchema

Finally, we can use the provided `st_lengthSphere` function to compute the lengths and sum up as before.

In [None]:
import org.locationtech.rasterframes.functions._
import org.locationtech.geomesa.spark.jts.st_lengthSphere

wayGeoms
  .select(st_lengthSphere('geom) as 'length)
  .agg(sum('length))
  .first
  .getDouble(0)

The specific value doesn't match, but the general procedure is sound, and our interest here is to show the capabilities more than it is to get a reliable answer.

In [None]:
spark.stop