<style>
    div.container {
      max-width: 800px!important;
    }
</style>

# Spark SQL Basics

Spark SQL provides the means for working with structured data within Apache Spark.  Structured data is represented by the `DataFrame` abstraction (which is a type alias for `Dataset[Row]`), and we can act on them using familiar-looking SQL queries, or else the `DataFrame` API.  In this lesson, we cover `DataFrame` basics, including:

* creating `DataFrame`s in code
* creating `DataFrame`s from external sources (CSV, parquet, hive, PostgreSQL, etc.)
* manipulating and summarising `DataFrame`s using both SQL and the `DataFrame` API

## Preliminaries

This workbook makes use of the [Almond Scala kernel for Jupyter](https://almond.sh/).  To use Spark, we have to first add a few libraries to the classpath, which we can do as follows:

In [1]:
def init: Unit = {
  import ammonite.ops._
  val jars = ls! root/'opt/'spark/'jars |? (_.ext == "jar")
  jars.foreach(interp.load.cp(_))   
}

init

defined [32mfunction[39m [36minit[39m

Spark is also pretty verbose with respect to logging, so it can be useful to change the logging policy to de-clutter our outputs:

In [2]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.ERROR)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

And just to get it out of the way up front, we import a number of objects that we need throughout the rest of the document:

In [3]:
import org.apache.spark.sql._

[32mimport [39m[36morg.apache.spark.sql._[39m

Finally, sometimes a code block will produce a large amount of output, some of which is unimportant, and so obfuscatory.  To hide this, we sometimes wrap things in an object like so:

In [4]:
object foo {
  val x = 1
  val y = 2
}

foo.x + foo.y

defined [32mobject[39m [36mfoo[39m
[36mres3_1[39m: [32mInt[39m = [32m3[39m

The object `foo` serves no functional purpose here other than to hide the interpreter output that results from the assignment of `x` and `y`.  But doing this means we need to use dot notation to refer to object members.  Another option, which is only really useful when results span multiple lines, is to use Scala's lazy evaluation:

In [5]:
lazy val x = 1
lazy val y = 2
x + y

## Creating a `SparkSession`

As of Spark 2.x, the usual method of interacting with Spark is by creating `SparkSession` to function as a single entrypoing.  In our case, we do this as follows:

In [6]:
val spark = SparkSession
  .builder
  .config("hive.metastore.uris","thrift://localhost:9083") 
  .config("spark.sql.warehouse.dir", "/data/hive/warehouse")
  .master("local[*]")
  .appName("Spark SQL Basics")
  .enableHiveSupport()
  .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@101ea05

Here we explicitly configure our session to use Hive by setting values for `hive.metastore.uris` and `spark.sql.warehouse.dir`.  It is common for this to work automatically by configuration (via the file `hive-site.xml`), but this does not appear to the case in this context.  We also tell Spark to work in pseudo-distributed mode by setting `master` to `local[*]`.  There are various other configurations possible, but that's out of scope here.  See [Configuration - Spark 2.4.3 Documentation](https://spark.apache.org/docs/latest/configuration.html) for details.

When working with Spark SQL, it is very common to use the object `spark.sparkContext`.  So for convenience, we also assign this to a variable, commonly `sc`, as follows:

In [7]:
val sc = spark.sparkContext

[36msc[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mSparkContext[39m = org.apache.spark.SparkContext@7fc937e0

N.b. that we decreased the amount of debugging already in above.  We can also do this via the `SparkSession` object by running `sc.setLogLevel("ERROR")`, but then we'd still be subjected to the logging that occurs as a result of creating the `SparkSession` itself.

## A Comment on File Systems

Spark can read happily from a local filesystem, and it can also read from the Hadoop distributed file system (HDFS).  If we refer to a file such as `sc.textFile("/a/file")`, it could be a file on either&ndash;which will be determined by:

In [8]:
sc.hadoopConfiguration.get("fs.defaultFS")

[36mres7[39m: [32mString[39m = [32m"file:///"[39m

Regardless of the value of this parameter, we can explicitly refer to local files with the syntax `file:///path/to/file`, and to files on HDFS as `hdfs://server:port/path/to/file`.  In our case, HDFS is accessible at `hdfs://localhost:9000`, which is configured in `/opt/hadoop/conf/core-site.xml` via the `fs.defaultFS` parameter.  `core-site.xml` will often be read by Spark, though it doesn't seem to be here.  It is probably worth being explicit.

## Resilient Distributed Dataset (`RDD`)

As noted, `DataFrame` is the central data abstraction when working with structured data.  However, these build on an earlier abstraction called Resilient Distributed Datasets (`RDD`), and one will still have occasion to use these.  An `RDD` is essentially just a normal Scala collection that's been parallelised for use with Spark.  For example:

In [9]:
val beatles = Seq("John", "Paul", "Ringo", "George")
val distributedBeatles = spark.sparkContext.parallelize(beatles)

[36mbeatles[39m: [32mSeq[39m[[32mString[39m] = [33mList[39m([32m"John"[39m, [32m"Paul"[39m, [32m"Ringo"[39m, [32m"George"[39m)
[36mdistributedBeatles[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[[32mString[39m] = ParallelCollectionRDD[0] at parallelize at cmd8.sc:2

We can treat the resulting `RDD` in much the same way as the original collection, but the `RDD` will be worked on in parallel.  This means that when iterating over an `RDD` the order we will process entries will be unstable.  And because of this, certain operations that require a strictly ordered sequence, like `head` and `tail`, will not be available.  Regardless, the lack of a stable ordering is easily demonstrated:

In [10]:
// stable order for Scala Seq type
println(beatles.fold("")(_ + _))
println(beatles.fold("")(_ + _))

JohnPaulRingoGeorge
JohnPaulRingoGeorge


In [11]:
// but not for a parallelized collection
println(distributedBeatles.fold("")(_ + _))
println(distributedBeatles.fold("")(_ + _))

RingoJohnGeorgePaul
RingoPaulGeorgeJohn


## Creating `DataFrame`s Programmatically

There are a number of ways we can create a `DataFrame`.  Since we just introduced `RDD`s, let us first demonstrate how we can create a `DataFrame` from an `RDD`: 

In [12]:
val data = Seq(
  ("George", "Harrison"),
  ("Ringo", "Starr"),
  ("John", "Lennon"),
  ("Paul", "McArtney")
)

[36mdata[39m: [32mSeq[39m[([32mString[39m, [32mString[39m)] = [33mList[39m(
  ([32m"George"[39m, [32m"Harrison"[39m),
  ([32m"Ringo"[39m, [32m"Starr"[39m),
  ([32m"John"[39m, [32m"Lennon"[39m),
  ([32m"Paul"[39m, [32m"McArtney"[39m)
)

In [13]:
import spark.implicits._

val beatles = sc
  .parallelize(data)
  .toDF("firstName", "lastName")

beatles.show

+---------+--------+
|firstName|lastName|
+---------+--------+
|   George|Harrison|
|    Ringo|   Starr|
|     John|  Lennon|
|     Paul|McArtney|
+---------+--------+



[32mimport [39m[36mspark.implicits._

[39m
[36mbeatles[39m: [32mDataFrame[39m = [firstName: string, lastName: string]

## Creating `DataFrame`s from External Sources

`DataFrame` provides a single common interface for working with structured data.  Still, we can create a `DataFrame` from a number of different input types.  Here we cover several common scenarios.

### CSV

We can read a variety of external formats via the `SparkContext`, and the pattern is largely the same from format to format.  In this case, we read the famous iris dataset which has been saved locally as a csv file as `/data/csv/iris.csv`.  To import it:

In [14]:
val iris = spark
  .read
  .format("csv")
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .load("file:///data/csv/iris.csv")

[36miris[39m: [32mDataFrame[39m = [Sepal.Length: double, Sepal.Width: double ... 3 more fields]

In [15]:
iris.limit(5).show

+------------+-----------+------------+-----------+-------+
|Sepal.Length|Sepal.Width|Petal.Length|Petal.Width|Species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+



Of course, if we are working in a 'Big Data' environment, we might expect that our file is saved in Hadoop, rather than locally on disk.  For example, we could copy our file to the Hadoop filesystem by running the following:

```bash
$ hadoop hdfs -mkdir /data
$ hadoop hdfs -mkdir /data/csv
$ hadoop hdfs -put /data/csv/iris.csv /data/csv/
```

As noted earlier, the Hadoop filesystem in our case is `hdfs://localhost:9000`, and to import from there instead we would just change the import as follows:

```scala
val iris = spark
  .read
  .format("csv")
  .option("header", "true")
  .option("delimiter", ",")
  .option("inferSchema", "true")
  .load("hdfs://localhost:9000/user/root/iris.csv")
```

### Hive

Apache Hive is a data warehouse that, among other things, provides SQL-like access to data stored on Hadoop. The `SparkContext` in this notebook has support for Hive enabled, and so we can query data in Hive tables using standard-looking SQL queries.  We first import `spark.sql` so we can write `sql(<query>)` instead of `spark.sql(<query>)`&ndash;not a huge convencience, but commonly done in other resources and code bases so we include it for consistency.  Then, we list the available databases:

In [16]:
import spark.sql
sql("show databases").show()

19/08/06 20:51:33 INFO metastore: Trying to connect to metastore with URI thrift://localhost:9083
19/08/06 20:51:33 INFO metastore: Connected to metastore.


+------------+
|databaseName|
+------------+
|     default|
|  nycflights|
+------------+



[32mimport [39m[36mspark.sql
[39m

The `nycflights` database is pre-populated from [Bureau of Transportation Statistics](https://www.transtats.bts.gov/) data, as provided by the [nycflights13](https://github.com/hadley/nycflights13) R package.  We can list the available tables as follows:

In [17]:
sql("use nycflights")
sql("show tables").show

+----------+---------+-----------+
|  database|tableName|isTemporary|
+----------+---------+-----------+
|nycflights| airlines|      false|
|nycflights| airports|      false|
|nycflights|  flights|      false|
|nycflights|   planes|      false|
|nycflights|  weather|      false|
+----------+---------+-----------+



[36mres16_0[39m: [32mDataFrame[39m = []

So, to pull the whole `airlines` table, for example, and save the results in a `DataFrame` called `airlines`, we just run the following:

In [18]:
val airlines: DataFrame = sql("SELECT * FROM nycflights.airlines")
airlines.show

+-------+--------------------+
|carrier|                name|
+-------+--------------------+
|     9E|   Endeavor Air Inc.|
|     AA|American Airlines...|
|     AS|Alaska Airlines Inc.|
|     B6|     JetBlue Airways|
|     DL|Delta Air Lines Inc.|
|     EV|ExpressJet Airlin...|
|     F9|Frontier Airlines...|
|     FL|AirTran Airways C...|
|     HA|Hawaiian Airlines...|
|     MQ|           Envoy Air|
|     OO|SkyWest Airlines ...|
|     UA|United Air Lines ...|
|     US|     US Airways Inc.|
|     VX|      Virgin America|
|     WN|Southwest Airline...|
|     YV|  Mesa Airlines Inc.|
+-------+--------------------+



[36mairlines[39m: [32mDataFrame[39m = [carrier: string, name: string]

And we can also run familiar-looking queries such as joins and grouped summaries.  For example

In [19]:
val flightsByCarrier = sql("""
select 
  name, sum(1) as num_flights 
from 
  (
    select 
      airlines.name, flights.* 
    from 
      nycflights.airlines 
    inner join 
      nycflights.flights 
    on 
      airlines.carrier = flights.carrier
  ) a
group by 
  name 
order by 
  name
""")

flightsByCarrier.show()

+--------------------+-----------+
|                name|num_flights|
+--------------------+-----------+
|AirTran Airways C...|       3260|
|Alaska Airlines Inc.|        714|
|American Airlines...|      32729|
|Delta Air Lines Inc.|      48110|
|   Endeavor Air Inc.|      18460|
|           Envoy Air|      26397|
|ExpressJet Airlin...|      54173|
|Frontier Airlines...|        685|
|Hawaiian Airlines...|        342|
|     JetBlue Airways|      54635|
|  Mesa Airlines Inc.|        601|
|SkyWest Airlines ...|         32|
|Southwest Airline...|      12275|
|     US Airways Inc.|      20536|
|United Air Lines ...|      58665|
|      Virgin America|       5162|
+--------------------+-----------+



[36mflightsByCarrier[39m: [32mDataFrame[39m = [name: string, num_flights: bigint]

### Relational Database

Because Spark runs on the JVM, we typically access realational databases via JDBC.  In this case, we have a copy of the `nycflights` database in Postgresql, accessible on `localhost:5432` using the username `guest` and password `guest` (in a production environment, the specifics of authentication would likely be different).  So, we first need add the JDBC driver, and there are serveral ways we could do this.  We have a local copy of the driver available as `/usr/share/java/postgresql-jdbc4.jar`, and this can be added for use in this interactive setting as follows:

In [20]:
import ammonite.ops._
interp.load.cp(os.Path("/usr/share/java/postgresql-jdbc4.jar"))

[32mimport [39m[36mammonite.ops._
[39m

Otherwise, we could download the dependency from Maven or similar as follows:

```scala
import $ivy.`org.postgresql::postgresql:42.2.6`
```

To read the `airlines` table as before, we would then run:

In [21]:
object pgsetup {
  import java.util.Properties

  // register Driver implementation with DriverManager
  Class.forName("org.postgresql.Driver")

  val connectionProperties = new Properties()
  connectionProperties.setProperty("Driver", "org.postgresql.Driver")
  connectionProperties.setProperty("user", "guest")
  connectionProperties.setProperty("password", "guest")
    
  val url = "jdbc:postgresql://localhost:5432/nycflights"
}

defined [32mobject[39m [36mpgsetup[39m

In [22]:
val query = "(select * from airlines) as airlines"

val airlines = spark
  .read
  .jdbc(pgsetup.url, query, pgsetup.connectionProperties)

airlines.show

+-------+--------------------+
|carrier|                name|
+-------+--------------------+
|     9E|   Endeavor Air Inc.|
|     AA|American Airlines...|
|     AS|Alaska Airlines Inc.|
|     B6|     JetBlue Airways|
|     DL|Delta Air Lines Inc.|
|     EV|ExpressJet Airlin...|
|     F9|Frontier Airlines...|
|     FL|AirTran Airways C...|
|     HA|Hawaiian Airlines...|
|     MQ|           Envoy Air|
|     OO|SkyWest Airlines ...|
|     UA|United Air Lines ...|
|     US|     US Airways Inc.|
|     VX|      Virgin America|
|     WN|Southwest Airline...|
|     YV|  Mesa Airlines Inc.|
+-------+--------------------+



[36mquery[39m: [32mString[39m = [32m"(select * from airlines) as airlines"[39m
[36mairlines[39m: [32mDataFrame[39m = [carrier: string, name: string]

### Parquet and Optimized Row Columnar (ORC)

Parquet and ORC are popular columnar formats&ndash;parquet more so for Spark, and ORC more so for Hive.  Because data is stored in columns, compression algorithms appropriate for specific columns can be applied, and so the formats generally have good to excellent compression performance.  As an example, the airlines database used above is 53.8MB on disk when stored as CSV, but 7.5MB and 6.0MB when stored as ORC and parquet, respectively.  They also tend to perform very well in read applications like grouped aggregates, though don't fare as well in write applications.  Either way, we could repreduce the example above where we calculated the number of flights by airline as follows:

In [23]:
val airlines = spark
  .read
  .format("parquet")
  .load("file:///data/parquet/nycflights/airlines/")

val flights = spark
  .read
  .format("orc")
  .load("file:///data/orc/nycflights/flights/")

[36mairlines[39m: [32mDataFrame[39m = [carrier: string, name: string]
[36mflights[39m: [32mDataFrame[39m = [year: int, month: int ... 17 more fields]

In [24]:
import org.apache.spark.sql.functions._

airlines.as("airlines")
  .join(flights.as("flights"), col("airlines.carrier") === col("flights.carrier"), "inner")
  .groupBy("name")
  .count
  .withColumnRenamed("count", "num_flights")
  .orderBy("name")
  .show

+--------------------+-----------+
|                name|num_flights|
+--------------------+-----------+
|AirTran Airways C...|       3260|
|Alaska Airlines Inc.|        714|
|American Airlines...|      32729|
|Delta Air Lines Inc.|      48110|
|   Endeavor Air Inc.|      18460|
|           Envoy Air|      26397|
|ExpressJet Airlin...|      54173|
|Frontier Airlines...|        685|
|Hawaiian Airlines...|        342|
|     JetBlue Airways|      54635|
|  Mesa Airlines Inc.|        601|
|SkyWest Airlines ...|         32|
|Southwest Airline...|      12275|
|     US Airways Inc.|      20536|
|United Air Lines ...|      58665|
|      Virgin America|       5162|
+--------------------+-----------+



[32mimport [39m[36morg.apache.spark.sql.functions._

[39m

## Saving `DataFrame`s

Saving a `DataFrame` is relatively straightforward.  For example, to save the `beatles` `DataFrame` in CSV format we'd run:

In [25]:
import org.apache.spark.sql.SaveMode

beatles
  .write
  .mode(SaveMode.Overwrite)
  .format("csv")
  .save("file:///notebooks/scratch/beatles")

[32mimport [39m[36morg.apache.spark.sql.SaveMode

[39m

At this point, though, things might not be as one expects.  In this case, for example, what we get is a folder called `beatles` with the following content:

In [26]:
val d = new java.io.File("/notebooks/scratch/beatles")
d.listFiles.filter(_.toString endsWith ".csv").foreach(println)

/notebooks/scratch/beatles/part-00000-cecaa9d0-e840-4ea1-8907-ceb76ad3ce29-c000.csv
/notebooks/scratch/beatles/part-00001-cecaa9d0-e840-4ea1-8907-ceb76ad3ce29-c000.csv
/notebooks/scratch/beatles/part-00007-cecaa9d0-e840-4ea1-8907-ceb76ad3ce29-c000.csv
/notebooks/scratch/beatles/part-00003-cecaa9d0-e840-4ea1-8907-ceb76ad3ce29-c000.csv
/notebooks/scratch/beatles/part-00005-cecaa9d0-e840-4ea1-8907-ceb76ad3ce29-c000.csv


[36md[39m: [32mjava[39m.[32mio[39m.[32mFile[39m = /notebooks/scratch/beatles

This is a result of working with a local filesystem in distributed mode.  That is, the `DataFrame` is a parallelized collection, and we can't very well have different workers attempting to write to the same file.  If we instead work with a distributed files system such as HDFS, things will be as we expect: 

In [27]:
beatles
  .write
  .mode(SaveMode.Overwrite)
  .format("csv")
  .save("hdfs://localhost:9000/user/root/beatles.csv")

Interacting with HDFS is a little convoluted, but we can confirm that this appears as a single file in Hadoop as follows:

In [28]:
object ls {
  import org.apache.hadoop.conf.Configuration
  import org.apache.hadoop.fs._

  val config = new Configuration()
  config.set("fs.defaultFS", "hdfs://localhost:9000")
  val hdfs = org.apache.hadoop.fs.FileSystem.get(config)
  val path = new org.apache.hadoop.fs.Path("/user/root")
}

ls.hdfs.listStatus(ls.path).foreach(f => println(f.getPath))

hdfs://localhost:9000/user/root/beatles.csv


defined [32mobject[39m [36mls[39m

## Actions and Transformations

In broad terms, any method you call on a `DataFrame` will be an _action_ or a _transformation_.  Transformations are essentially instructions that tell Spark how to take one or more `DataFrame`s (or `RDD`s) and _transform_ them into something else.  Crucially, Spark does not evaluate such transformation right away, but rather saves them up until evaluation is necessary.  This is an example of _lazy evaluation_, and doing this means that Spark can find the most efficient evaluation plan.  For example, say we wanted to calculate the number of flights by airline, and then select the results for one specific airline.  Clearly, if we saved both tasks up we'd see that it would be more efficient to filter for the one specific airline first, and then count the flights; rather than execute each task in turn.  Actions, on the other hand, result in evaluation.

Transformations generally yield `DataFrame`s, and they are very useful.  For example, we can break a large query up into several smaller queries, and each of the smaller queries can be reused in other larger queries.  We'll see example of this below.

## Working with `DataFrame`s / The SQL API

In this last section, we cover a number of common use cases&ndash;filtering, joining, and so on.  Most things have an obvious SQL analogue, though this isn't always the case.  User defined functions exist for a number of different relational database, for example, but none so elegantly as for `DataFrame`s.  In addition, implementations of familiar Scala methods such as `map` and `flatMap` are available.  

Throughout this section, we again make use of the `nycflights` data.  One nice way of doing this, which has the added advantage when working in an interactive setting of producing much less output, is to use an object.  That way, we get, for example, `DataFrame`s we access by via `nycflights.airlines`, rather than `airlines`, which feels a bit database-like.  

In [29]:
object nycflights {
  private def read(fname: String): DataFrame = {
    spark 
      .read
      .format("parquet")
      .load(s"file:///data/parquet/nycflights/${fname}/")
  }

  val airlines = read("airlines")
  val airports = read("airports")
  val flights = read("flights")
  val planes = read("planes")
  val weather = read("weather")
}

defined [32mobject[39m [36mnycflights[39m

If we'd rather not use dot notation, we can just reassign the `DataFrame`s.  The assignment is by-reference, so still efficient; and the objects are immutable, so it's all safe.

In [30]:
val airlines = nycflights.airlines
val airports = nycflights.airports
val flights = nycflights.flights
val planes = nycflights.planes
val weather = nycflights.weather

[36mairlines[39m: [32mDataFrame[39m = [carrier: string, name: string]
[36mairports[39m: [32mDataFrame[39m = [faa: string, name: string ... 6 more fields]
[36mflights[39m: [32mDataFrame[39m = [year: int, month: int ... 17 more fields]
[36mplanes[39m: [32mDataFrame[39m = [tailnum: string, year: int ... 7 more fields]
[36mweather[39m: [32mDataFrame[39m = [origin: string, year: int ... 13 more fields]

### Describing a `DataFrame`

Usefully, we can examine the schema of a `DataFrame` via the `schema` method:

In [31]:
nycflights.airports.schema.foreach(println)

StructField(faa,StringType,true)
StructField(name,StringType,true)
StructField(lat,FloatType,true)
StructField(lon,FloatType,true)
StructField(alt,IntegerType,true)
StructField(tz,IntegerType,true)
StructField(dst,StringType,true)
StructField(tzone,StringType,true)


(Note that the types, `*Type`, here refer to actual classes defined in `org.apache.spark.sql.types`&ndash;it would be a useful exercise to look through these as they are essentially all the types you can legally use as columns in a `DataFrame`).  If we just want column names, we can get those as an array by calling the `columns` method:

In [32]:
nycflights.airports.columns

[36mres31[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"faa"[39m,
  [32m"name"[39m,
  [32m"lat"[39m,
  [32m"lon"[39m,
  [32m"alt"[39m,
  [32m"tz"[39m,
  [32m"dst"[39m,
  [32m"tzone"[39m
)

This is valuable in a programmatic setting, though for exploratory purposes it is sometimes more useful to simply view a subset of the table.  We've already seen this in action above, but to show just the first 10 rows of the `airports` table, for example:

In [33]:
nycflights.airports
  .limit(5)
  .show

+---+--------------------+---------+---------+----+---+---+----------------+
|faa|                name|      lat|      lon| alt| tz|dst|           tzone|
+---+--------------------+---------+---------+----+---+---+----------------+
|04G|   Lansdowne Airport|41.130474|-80.61958|1044| -5|  A|America/New_York|
|06A|Moton Field Munic...| 32.46057|-85.68003| 264| -6|  A| America/Chicago|
|06C| Schaumburg Regional| 41.98934|-88.10124| 801| -6|  A| America/Chicago|
|06N|     Randall Airport| 41.43191|-74.39156| 523| -5|  A|America/New_York|
|09J|Jekyll Island Air...|31.074472|-81.42778|  11| -5|  A|America/New_York|
+---+--------------------+---------+---------+----+---+---+----------------+



Note that `nycflights.airports.limit(10)` is a transformation, and simply yields another `DataFrame`.  But `show` forces Spark to retrieve the requested rows and print them, so is an action.

### Working with columns

As observed above, the columns in the `airports` `DataFrame` are:

In [34]:
airports.columns.mkString(", ")

[36mres33[39m: [32mString[39m = [32m"faa, name, lat, lon, alt, tz, dst, tzone"[39m

Assume we want to rename `lat` to `latitude`, `lon` to `longitude`, and that we only want to retain `name`, `latitude`, and `longitude`.  We can use the `withColumnRenamed` method to rename columns, and `select` to list the columns we want to keep:

In [35]:
airports
  .withColumnRenamed("lat", "latitude")
  .withColumnRenamed("lon", "longitude")
  .select("name", "latitude", "longitude")
  .limit(5)
  .show

+--------------------+---------+---------+
|                name| latitude|longitude|
+--------------------+---------+---------+
|   Lansdowne Airport|41.130474|-80.61958|
|Moton Field Munic...| 32.46057|-85.68003|
| Schaumburg Regional| 41.98934|-88.10124|
|     Randall Airport| 41.43191|-74.39156|
|Jekyll Island Air...|31.074472|-81.42778|
+--------------------+---------+---------+



In this case, `withColumnRenamed` and `select` both take a comma separated lists of `String`s as arguments.  But `String` is an existing type, with limited built-in methods available.  To allow us to do interesting things with columns, we need to refer to them as type [`Column`](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.sql.Column).  We can do this several ways.  For example, to refer to the `name` column in the `airports` table we'd use one of:

* `col("name")`
* `column("name")`
* `$"name"` (actually has type [`ColumnName`](https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.sql.ColumnName), which is a subtype of `Column`)
* `airports("name")`, which is short-hand for `airports.apply("name")`

where `col` and `column` are defined in [org.apache.spark.sql.functions](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$).  The `Column` type has a number of functions associated with it, and can be used to compose complex expressions.  A subset are as follows:

operator | meaning                    | examples
---------|----------------------------|---------------
`lit`    | literal                    | `lit(1)`, `lit("foo")`, `lit(java.sql.Date.valueOf("2019-01-01"))`
`%`      | modulo                     | `col("a") % 2`
`&&`     | logical and                | `lit(true) && lit(false)`
`\|\|`   | logical or                 | `lit(true) \|\| lit(false)` 
`===`    | equality                   | `lit(1) === lit(1)`, `col("a") === col("b")`
`<=>`    | equality (ignores `null`)  | `lit(1) <=> lit(1)`, `col("a") <=> col("b")`
`=!=`    | inequality                 | `col("a") =!= col("b")`
`<`      | less than                  | `lit(1) < lit(2)`, `col("x") < col("b")`
`<=`     | less than or equal         | `lit(1) <= lit(2)`, `col("x") <= col("b")`
`>`      | greater than               | `lit(1) > lit(2)`, `col("a") > col("b")`
`>=`     | greater than or equal      | `lit(1) >= lit(2)`, `col("a") >= col("b")`
`*`      | mulitplication             | `lit(1) * lit(2)`, `col("a") * col("b")`
`/`      | division                   | `lit(1) / lit(2)`, `col("a") / col("b")`
`+`      | addition                   | `lit(1) + lit(2)`, `col("a") + col("b")`
`-`      | subtraction                | `lit(1) - lit(2)`, `col("a") - col("b")`
`as`     | assign alias / rename      | `col("foo") as "bar"`
`alias`  | assign alias / rename      | `col("foo") alias "bar"`

So, we could rename the `lat` and `lon` columns as above in the alternative fashion:

In [36]:
airports
  .select(col("name"), col("lat").as("latitude"), col("lon") as ("longitude"))
  .limit(5)
  .show

+--------------------+---------+---------+
|                name| latitude|longitude|
+--------------------+---------+---------+
|   Lansdowne Airport|41.130474|-80.61958|
|Moton Field Munic...| 32.46057|-85.68003|
| Schaumburg Regional| 41.98934|-88.10124|
|     Randall Airport| 41.43191|-74.39156|
|Jekyll Island Air...|31.074472|-81.42778|
+--------------------+---------+---------+



Of course, we can use `col` and the like in the same way as any other method&ndash;applying them to a collection, for example.  The following converts each column name to upper case:

In [37]:
lazy val cols = airports.columns
lazy val colsUpperCased = cols.map(c => col(c) as c.toUpperCase)
airports.select(colsUpperCased: _*).limit(5).show

+---+--------------------+---------+---------+----+---+---+----------------+
|FAA|                NAME|      LAT|      LON| ALT| TZ|DST|           TZONE|
+---+--------------------+---------+---------+----+---+---+----------------+
|04G|   Lansdowne Airport|41.130474|-80.61958|1044| -5|  A|America/New_York|
|06A|Moton Field Munic...| 32.46057|-85.68003| 264| -6|  A| America/Chicago|
|06C| Schaumburg Regional| 41.98934|-88.10124| 801| -6|  A| America/Chicago|
|06N|     Randall Airport| 41.43191|-74.39156| 523| -5|  A|America/New_York|
|09J|Jekyll Island Air...|31.074472|-81.42778|  11| -5|  A|America/New_York|
+---+--------------------+---------+---------+----+---+---+----------------+



This example less straight forward if the user is not that proficient with Scala, though the intent is hopefully obvious enough.  The first line simply gets an `Array` containing the column names as `String`s.  The second line applies a function to each element of the array, and returns an array containing the result of the function.  The function itself is just a mapping from variable `c` to `col(c).as(c.toUpperCase)`:

In [38]:
cols.map(c => col(c) as c.toUpperCase)

[36mres37[39m: [32mArray[39m[[32mColumn[39m] = [33mArray[39m(
  faa AS `FAA`,
  name AS `NAME`,
  lat AS `LAT`,
  lon AS `LON`,
  alt AS `ALT`,
  tz AS `TZ`,
  dst AS `DST`,
  tzone AS `TZONE`
)

In the final line, we pass in the array containing the renaming expressions, and the `_*` decoration tells the compiler to treat the array as though it was a comma separated list.  That is:

```scala
foo(Array(1,2,3): _*)
```

is equivalent to:

```scala
foo(1,2,3)
```

### Filtering tables

### Aggregates

Aggregates are most easily derived using the `agg` method.  The method takes a comma separated list of aggregate columns (aggregate functions applied to columns or column names), and there is an impressive set of aggregate functions that are provided.  As an example:

In [45]:
weather
  .agg(
    sum(lit(1))                as "n", 
    min("temp")                as "min", 
    max("temp")                as "max", 
    round(mean("temp"), 2)     as "mean", 
    round(stddev("temp"), 2)   as "sd", 
    round(kurtosis("temp"), 2) as "kurtosis"
  )
  .show

+-----+-----+------+-----+-----+--------+
|    n|  min|   max| mean|   sd|kurtosis|
+-----+-----+------+-----+-----+--------+
|26115|10.94|100.04|55.26|17.79|   -0.98|
+-----+-----+------+-----+-----+--------+



The same thing can be achieved for groups simply by adding the `groupBy` method:

In [40]:
weather
  .groupBy("origin", "month")
  .agg(
    sum(lit(1))                as "n", 
    min("temp")                as "min", 
    max("temp")                as "max", 
    round(mean("temp"), 2)     as "mean", 
    round(stddev("temp"), 2)   as "sd", 
    round(kurtosis("temp"), 4) as "kurtosis"
  )
  .show

+------+-----+---+-----+------+-----+-----+--------+
|origin|month|  n|  min|   max| mean|   sd|kurtosis|
+------+-----+---+-----+------+-----+-----+--------+
|   JFK|    8|738|60.08| 87.08|73.82| 4.76| -0.0795|
|   EWR|    8|740| 59.0| 89.96|74.54| 5.87| -0.2081|
|   LGA|    1|742|12.02|  59.0|35.96| 9.88|  -0.467|
|   EWR|   12|714|17.96|  71.6|37.95|11.12| -0.0266|
|   EWR|    4|720|30.92| 84.02|52.98|  9.6|  0.1855|
|   JFK|   11|713| 23.0| 66.92|45.13|10.08| -0.7933|
|   EWR|    7|741|64.04|100.04| 80.7| 7.37| -0.3601|
|   LGA|    5|744|44.96| 93.02|62.75| 9.86|  0.1686|
|   EWR|    9|719|48.02|  95.0| 67.3| 9.32| -0.4419|
|   JFK|    6|720|53.96|  89.6|69.96| 6.45| -0.2422|
|   LGA|    7|743|64.94| 98.96|80.76| 7.23| -0.4944|
|   LGA|    8|739|62.06| 89.06|75.05|  4.8|  0.0421|
|   JFK|    3|742|26.96| 57.92|39.54| 6.02| -0.4816|
|   EWR|    3|743|26.06| 60.08|40.12| 6.72| -0.2136|
|   EWR|    6|720|55.04| 93.92|73.27| 8.05| -0.5447|
|   LGA|   10|738|42.08| 84.92|60.63| 8.15|   

### Joining tables

Joins are probably better understood using smaller, illustrative datasets.  In this case, let us just programmatically create `DataFrame`s called `A` and `B` as follows:

In [57]:
object joins {
  import scala.util.Random

  val A = sc.parallelize(
    Seq("a", "b", "c")
      .map(x => (x, Random.nextInt(10), Random.nextInt(10))) 
  ).toDF("x", "u", "v")

  val B = sc.parallelize(
    Seq("b", "c", "d")
      .map(x => (x, Random.nextInt(10), Random.nextInt(10))) 
  ).toDF("x", "v", "w")
}

import joins._
A.show
B.show

defined [32mobject[39m [36mjoins[39m
[32mimport [39m[36mjoins._
[39m
[36mA[39m: [32mDataFrame[39m = [x: string, u: int ... 1 more field]
[36mB[39m: [32mDataFrame[39m = [x: string, v: int ... 1 more field]

An inner join then looks as follows:

In [66]:
A
  .join(B, Seq("x"), "inner")
  .show

+---+---+---+---+---+
|  x|  u|  v|  v|  w|
+---+---+---+---+---+
|  c|  3|  6|  2|  3|
|  b|  6|  9|  0|  5|
+---+---+---+---+---+



Here we see that `v` is duplicated, which is the correct behaviour if we are not also using `v` in the join.  This will lead to problems with ambiguous column names, so it is usually best to alias duplicates:

In [68]:
A
  .withColumnRenamed("v", "v_A")
  .join(B.withColumnRenamed("v", "v_B"), Seq("x"), "inner")
  .show

+---+---+---+---+---+
|  x|  u|v_A|v_B|  w|
+---+---+---+---+---+
|  c|  3|  6|  2|  3|
|  b|  6|  9|  0|  5|
+---+---+---+---+---+



Sometimes it useful to do something like an implicit join in SQL.  In this case the joining variable is common, and so it is duplicated also, and so this approach is possibly more useful when you want to join two tables where the names of the joining columns do not match.

In [69]:
A
  .join(B, A("x") === B("x"))
  .show

+---+---+---+---+---+---+
|  x|  u|  v|  x|  v|  w|
+---+---+---+---+---+---+
|  c|  3|  6|  c|  2|  3|
|  b|  6|  9|  b|  0|  5|
+---+---+---+---+---+---+



In [72]:
A
  .withColumnRenamed("x", "x_A")
  .join(B, col("x_A") === col("x"))
  .drop("x_A")
  .show

+---+---+---+---+---+
|  u|  v|  x|  v|  w|
+---+---+---+---+---+
|  3|  6|  c|  2|  3|
|  6|  9|  b|  0|  5|
+---+---+---+---+---+



### User defined functions (UDFs)

Having the ability to easily create your own functions that can run over the columns in a `DataTable` is an absolutely killer feature&ndash;perhaps not exciting for those accustomed to using data frame abstractions in languages such as R or Python, but probably for those who've had to do similar things with relational databases.  Consider the weather table:

In [41]:
val weathersub = weather
  .select(col("origin"), col("month"), col("temp") as "temp_F")
  
weathersub
  .limit(5)
  .show

+------+-----+------+
|origin|month|temp_F|
+------+-----+------+
|   EWR|    1| 39.02|
|   EWR|    1| 39.02|
|   EWR|    1| 39.02|
|   EWR|    1| 39.92|
|   EWR|    1| 39.02|
+------+-----+------+



[36mweathersub[39m: [32mDataFrame[39m = [origin: string, month: int ... 1 more field]

The column `temp` appears to be air temperature in degrees Farenheit.  Assume we want to add a column called `temp_C` which contained air temperature in degrees celsius.  We could do this using functions we've already seen.  Given the formula $(x − 32) \times 5/9$, we could write:

In [42]:
weathersub
  .withColumn("temp_C", round((col("temp_F") - 32) * lit(5.0/9.0), 2))
  .limit(5)
  .show

+------+-----+------+------+
|origin|month|temp_F|temp_C|
+------+-----+------+------+
|   EWR|    1| 39.02|   3.9|
|   EWR|    1| 39.02|   3.9|
|   EWR|    1| 39.02|   3.9|
|   EWR|    1| 39.92|   4.4|
|   EWR|    1| 39.02|   3.9|
+------+-----+------+------+



In [43]:
lazy val toCelsius = (x: Double) => (x - 32) * 5 / 9
lazy val toCelsiusUDF = udf(toCelsius)

weathersub
  .withColumn("temp_C", round(toCelsiusUDF(col("temp_F")), 2))
  .limit(5)
  .show

+------+-----+------+------+
|origin|month|temp_F|temp_C|
+------+-----+------+------+
|   EWR|    1| 39.02|   3.9|
|   EWR|    1| 39.02|   3.9|
|   EWR|    1| 39.02|   3.9|
|   EWR|    1| 39.92|   4.4|
|   EWR|    1| 39.02|   3.9|
+------+-----+------+------+



### User defined aggregate functions (UDAFs)

e.g. [UDAF](https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html)

<!--build a median function? -->

In [44]:
def timeit[T](block: =>T): (T, Double) = {
  val startTime = System.currentTimeMillis()
  val res: T = block
  (res, System.currentTimeMillis() - startTime)
}

defined [32mfunction[39m [36mtimeit[39m