# Just Enough Scala for Spark

Dean Wampler, Ph.D. [@deanwampler](http://twitter.com/deanwampler) ([email](mailto:dean.wampler@lightbend.com))

Welcome. This notebook teaches you the core concepts of [Scala](http://scala-lang.org) necessary to use [Apache Spark's](http://spark.apache.org) Scala API effectively. Spark does a nice job exploiting the nicest features of Scala, while avoiding most of the more difficult and obscure features. 

## Why Scala?
Spark lets you use Scala, Java, Python, R, and SQL to do your work. Scala and Java appeal to _data engineers_, who do the heavy lifting of building resilient and scalable infrastructures for _Big Data_. Python, R, and SQL appeal to _data scientists_, who build models for analyzing data, including machine learning, as well explore data interactively, where SQL is very convenient.

These aren't hard boundaries. Many people do both roles. Many data engineers like Python and may use SQL and R. Many data scientists have decided to use Scala with Spark.

Briefly, some of the advantages of using Scala include the following:
* Since Spark is written in Scala, you get the best performance and the most complete API coverage when you use Scala. It's true that with [DataFrames](http://spark.apache.org/docs/latest/sql-programming-guide.html), code written in all five languages performs about the same. If you need to use the [RDD](http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds) API, then Scala provides the best performance, with Java a close second.
* When runtime problems occur, understanding the exception stack frames and other debug information is easiest if you know Scala. Unfortunately, the "abstraction leaks" when problems occur.
* Compared to Java, Scala code is much more concise and several features of Scala make your code even more concise. This elevates your productivity and makes it easier to imagine a design approach and then write it down without having to translate to much to idiomatic API and language conventions. (You'll see this in action as we go.)
* Compared to Python and R, Scala code benefits from _static typing_ with _type inference_. _Static typing_ means that the Scala parser finds more errors in your expressions, when they don't match expected types, rather than discovering the problem when you actually run the code. However, _type inference_ means you don't have to add a lot of explicit type information to you code. In many cases, Scala infers the correct types.

But Scala isn't perfect. The biggest disadvantage compared to Python and R is the rich ecosystem of data analytics libraries available in those languages. Scala is trying to catch up, but Python and R are still well ahead.

Obviously, I can only scratch the surface of Scala here. For more information:
* [Programming Scala](http://shop.oreilly.com/product/0636920033073.do): My comprehensive introduction to Scala.
* [Scala Language Website](http://scala-lang.org/): Where to download Scala, find documentation (e.g., the [Scaladocs](http://www.scala-lang.org/api/current/#package): Scala library documentation, like [Javadocs](https://docs.oracle.com/javase/8/docs/api/)), and other information.

## About Notebooks
You're using the [Jupyter Notebook](http://jupyter.org/) environment with [Apache Toree](https://toree.incubator.apache.org/) to provide a Spark backend. The [README](README.md) explains how to set up the environment.

Notebooks let you mix documentation, like this [Markdown](https://daringfireball.net/projects/markdown/) "cell", with cells that contain code, like the next cell. 

The menus and toolbar at the top provide options for evaluating a cell, adding and deleting cells, etc. You'll want to learn keyboard shortcuts if you use notebooks a lot. I recommend that you invoke the _Help > Keyboard Shortcuts_ menu item, then capture the page as an image (it's a modal dialog, unfortunately). Learn a few shortcuts each day.

For now, just note that when you're in a cell, `shift+enter` evaluates the cell (parses and renders the Markdown or evaluates the code), then moves to the next cell. Try it for a few cells. I'll wait...

Okay. Note that you can click into any cell to move the focus.

Let's configure Toree to always show us the types of expressions.

In [84]:
%showTypes on

Types will be printed.


## The Environment
When you start this notebook, Toree creates a [SparkContext](http://spark.apache.org/docs/latest/programming-guide.html#initializing-spark) for you. This is the entry point of any Spark application. It knows how to connect to your cluster (or run locally in the same JVM), how to configure properties, etc. It also runs a Web UI that lets you monitor your running jobs. The instance of `SparkContext` is called `sc`. The next cell simply confirms that it exists.

In [45]:
sc

org.apache.spark.SparkContext = org.apache.spark.SparkContext@7898967

You can get more information about the environment and statistics about the jobs you've run or are currently running from the Spark UI: <a href="http://localhost:4040" target="spark_ui">http://localhost:4040</a> (Opens in a new tab.)

## Let's Download Some Data
We're going to write a real Spark program and use it as a vehicle for learning Scala and how to use it with Spark. 

But first, we need to download some text files we'll use, which contain some of the plays of Shakespeare. The next few cells define some helper methods (functions) to do this and then perform the download. So, we'll also learn some basic Scala concepts.

> **NOTE: "method" vs. "function"**

> Scala follows a common object-oriented convention where the term _method_ is used for a function that's attached to a class or instance. Unlike Java, at least before Java 8, Scala also has _functions_ that are not associated with a particular class or instance. 

> In our next code example, we'll define a _method_ that becomes part of a generated class that the Scala interpreter creates to wrap all our notebook code. The interpreter has to do this in order to generate valid JVM byte code. We'll see what a real _function_ looks likes like soon.

> Unfortunately, it can be a bit confusing when to use a method vs. a function, reflecting Scala's hybrid nature as an object-oriented and a functional language. Fortunately, in many cases, we can use methods and functions interchangably, so we won't worry about the distinction too much from now on.

Okay, let's define two convenience _methods_ for printing either an error message or a simple "information" message. We'll explain the syntax in the cell that follows.

In [86]:
/*
 * "error" takes a single String argument, prints a formatted error message,
 * and returns false. 
 */
def error(message: String): Boolean = {   
    
    // Print the string passed to "println" and add a linefeed ("ln"):
    // See the next cell for an explanation of how the string is constructed.
    println(s"""
        |********************************************************************
        |
        |  ERROR: $message
        |
        |********************************************************************
        |""".stripMargin)
    
    // The last expression in a block, {...}, is the return value. 
    // The "return" keyword is not needed.
    false
}

/*
 * "info" takes a single String argument, prints it on a line,
 * and returns true. 
 */
def info(message: String): Boolean = {
    println(message)
    true
}

Method definitions have the following elements, in order:
* The `def` keyword.
* The method's name (`error` and `info` in these cases).
* The argument list in parentheses. If there are no arguments, the empty parentheses can be omitted. This is common for "getter"-like methods that simply return a field in an instance, etc.
* A colon followed by the type of the value returned by the method. (This can often be inferred, so it's optional - but recommended for readibility - in those cases).
* An `=` (equals) sign that separates the method _signature_ from the _body_.
* The body in braces `{ ... }`, although if the body consists of a single expression, the braces are optional.
* The last expression in the body is used as the return value. The `return` keyword is optional and rarely used.
* Like `return`, semicolons (`;`) are inferred at the end of lines (in most cases) and rarely used.

In the argument list for `error`, `(message: String)`, `message` is the argument name and its type is `String`. This convention for _type annotations_, `: Type`, is also used for the return type, `error(...): Boolean`. Type annotations are required by Scala for method arguments. They are optional in most cases for the return type. We'll see that Scala can infer the types of many expressions and variable declarations.

Scala uses the same comment conventions as Java, `// ...` for a single line, and `/* ... */` for a comment block.

> An _expression_ has a value, while a _statement_ does not. Hence, when we assign an expression to a variable, the value the expression returns is assigned to the variable.

Inside `error`, we used a combined _interpolated_ and _triple-quoted_ string, with the syntax `s"""..."""`:
* Triple-quoted strings, `"""..."""`, can embed newlines, as in the example. (We'll see another benefit later.)
* String interpolation, invoked by putting `s` before the string, e.g., `s"..."` or `s"""..."""`, lets us embed variable references and expressions, where the string conversion will be inserted automatically. For example: 

In [55]:
s"""Use braces for expressions: ${sys.env.get("USER")}.
You can omit the braces when just using a variable: $sc
However, watch for ambiguities like ${sc}andextrastuff"""

String = Use braces for expressions: Some(deanwampler).
You can omit the braces when just using a variable: org.apache.spark.SparkContext@7898967
However, watch for ambiguities like org.apache.spark.SparkContext@7898967andextrastuff

Another feature we're using for triple-quoted strings is the ability to strip the leading whitespace off each line. The `stripMargin` method removes all whitespace before and including the `|`. This lets you indent those lines for proper code formatting, but not have that whitespace remain in the string. In the following example, the resulting string has blank lines at the beginning and end. Also note what happens with `line2`:

In [58]:
s"""
    |line 1
    |  line 2
    |""".stripMargin

String = "
line 1
  line 2
"

Now let's define a method that works similar to the popularity [curl](http://linux.die.net/man/1/curl) utility. It's a bit long and you don't need to understand all the details, but we'll need it to download data we need for the notebook.

The whole body is one big [scala.util.Try](http://www.scala-lang.org/api/current/#scala.util.Try) expression. `Try` is a useful class for "trying" code that might throw an exception (more precisely, a [java.lang.Throwable](https://docs.oracle.com/javase/8/docs/api/java/lang/Throwable.html)). The long block, that is `Try {...}`, ends with `outFile`, a `File` instance. This is the result returned by the block, if everything succeeds. This `File` is wrapped in a `Try` subclass instance of type [scala.util.Success](http://www.scala-lang.org/api/current/scala/util/Success.html). However, if an exception is thrown in the block, then the exception is wrapped in an instance of another subclass, [scala.util.Failure](http://www.scala-lang.org/api/current/scala/util/Failure.html). (We'll discuss other advantages of `Try` and alternatives to it <a href="#TryOptionNull">here</a>.)

Another construct we'll see for the first time is how to declare variables:
* `val immutableValue = ...`: Once initialized, we can't assign a _different_ value to `immutableValue`.
* `var mutableVariable = ...`: We can assign new values to `mutableVariable` as often as we want.

It's _highly recommended_ that you only use `vals` unless you have a good reason for needing mutability, which is a very common source of bugs!!

Most of the types used here are from the Java's library (JDK). Because Scala compiles to JVM byte code, you can use any Java library you want from Scala:
* [java.net.URL](https://docs.oracle.com/javase/8/docs/api/java/net/URL.html): Handles URL formatting and connections.
* [java.io.File](https://docs.oracle.com/javase/8/docs/api/java/io/File.html): Working with files and directories.
* [java.io.BufferedInputStream](https://docs.oracle.com/javase/8/docs/api/java/io/BufferedInputStream.html): Buffered input from an underlying stream.
* [java.io.BufferedOutputStream](https://docs.oracle.com/javase/8/docs/api/java/io/BufferedOutputStream.html): Buffered output to an underlying stream.
* [java.io.FileOutputStream](https://docs.oracle.com/javase/8/docs/api/java/io/FileOutputStream.html): Output to a file, specifically.

As before, we'll use comments again to explain a few other new Scala constructs. 

In [87]:
// Import this utility for working with URLs. Unlike Java the semicolon ';' is not required.
import java.net.URL   

// Use {...} to provide a list of things to import, when you don't want to import everything 
// in a package (_ in Scala) and you don't want to write a separate line for each one.
import java.io.{File, BufferedInputStream, BufferedOutputStream, FileOutputStream}

import scala.util.Try

// Download a file at a URL and write it to a target directory:
def curl(sourceURLString: String, targetDirectoryString: String): Try[File] = Try {

    // The path separator on your platform: "/" on Linux and MacOS, "\" on Windows.
    val pathSeparator = File.separator

    // Use the name of the remote file as the file name in the target directory.
    // We split on the URL path elements using the separator, which is ALWAYS "/"
    // on all platforms.
    val sourceFileName = sourceURLString.split("/").last  
    val outFileName = targetDirectoryString + pathSeparator + sourceFileName

    println(s"Downloading $sourceURLString to $outFileName")
    val sourceURL = new URL(sourceURLString)
    val connection = sourceURL.openConnection()
    val in = new BufferedInputStream(connection.getInputStream()) // Used to read the bytes.

    // If here, the connection was successfully opened (i.e., no exception was thrown).
    // Now create the target directory (nothing happens if already there).
    val targetDirectory = new File(targetDirectoryString)
    targetDirectory.mkdirs()

    // Setup the output file and a stream to write to it.
    val outFile = new File(outFileName)
    val out = new BufferedOutputStream(new FileOutputStream(outFile))
    
    // Create the buffer into which we'll hold in-flight bytes.
    val hundredK = 100*1024
    val bytes = Array.fill[Byte](hundredK)(0)   // Create a byte buffer, elements set to 0

    // Loop until we've read everything.
    var loops = 0
    var count = in.read(bytes, 0, hundredK)     // Read up to "hundredK" bytes at a time.
    while (count != -1) {                       // Haven't hit the end of input yet?
        if (loops % 10 == 0) print(".")         // Print occasional feedback.
        loops += 1
        out.write(bytes, 0, count)              // Write to the new file.
        count = in.read(bytes, 0, hundredK)     // Read the next chunk and loop...
    }
    println("\nFinished!")
    in.close()                                  // Clean up! Close file & stream handles
    out.flush()
    out.close()
    outFile                                     // Returned file (if we got this far)
}

Okay, before we actually use `curl`, let's create the target directory. (This is also done in `curl`, but we're using the success or failure for other purposes here.) 

Note that Scala's `if` construct is actually an expression (in Java they are _statements_). The `if` expression will return `true` or `false` and assign it to `success`, which we'll use in a moment. This is why both `error` and `info` return `Boolean`, for convenience.

In [71]:
// The target directory, which we'll now create, if necessary.
val shakespeare = new File("data/shakespeare")

val success = if (shakespeare.exists == false) {   // doesn't exist already?
    if (shakespeare.mkdirs() == false) {           // did the attempt fail??
        error(s"Failed to create directory path: $shakespeare")
    } else {                                       // successful
        info(s"Created $shakespeare")
    }
} else {
    info(s"$shakespeare already exists")
}

Created data/shakespeare


If we successfully created the output directory (or it already existed), let's download a handful of files, each with one play of Shakespeare, from [http://www.cs.usyd.edu.au/~matty/Shakespeare/](http://www.cs.usyd.edu.au/~matty/Shakespeare/).

In [89]:
val pathSeparator = File.separator
val targetDirName = shakespeare.toString
val urlRoot = "http://www.cs.usyd.edu.au/~matty/Shakespeare/texts/comedies/"
val plays = Seq(
    "tamingoftheshrew", "comedyoferrors", "loveslabourslost", "midsummersnightsdream",
    "merrywivesofwindsor", "muchadoaboutnothing", "asyoulikeit", "twelfthnight")

if (success) {
    for {
        play <- plays
        playFile = new File(targetDirName + pathSeparator + play)
        if (playFile.exists == false)
        result = curl(urlRoot + play, targetDirName)
    } {
        // If successful, then `foreach` extracts the `File` so we can use it.
        result.foreach(file => info(s"Downloaded $play and wrote $file"))
        // If unsuccessful, `recover` extracts the `Throwable`...
        result.recover{ case throwable => error(s"Failed to download $play. $throwable") }
    }
}

Here we use something called a `for` _comprehension_. They are _expressions_, not _statements_ as in Java's `for` loops, although we ignore the return value here. `for (play <- plays)` iterates through a collection, `plays`, and assigns each one to the `play` variable. Subsequent steps use it, first to create a `File` instance (`playFile`) and then to evaluate a conditional - do we already have this file downloaded?

If we haven't downloaded it yet, the final expression in the `for` comprehension uses `curl` to download it. Recall that `curl` returns a `Try[File]`. We actually don't care about the `File`, but we do care about success or failure. Note how we process this in the second block:
* `result.foreach` actually does nothing if a `Failure` is returned, but if a `Success` is returned, calling `foreach` extracts the `File` inside the `Success` and we use it for an information message.
* `result.recover` does nothing if a `Success` is returned, but if a `Failure` is returned, the `Throwable` is extracted and we use it to print an error message. 

> For both `result.foreach` and `result.recover`, we pass an _anonymous function_ to them to do the processing we want. We'll explain this concept and the two kinds of syntax you see here in subsequent sections.

This use of a `for` comprehension is sometimes called a "for loop" because it returns nothing; it just prints messages. For completeness, here's another example, where we construct a new collection using a comprehension. We'll use it as a sanity check to verify we were successful.

In [98]:
println(s"We should have ${plays.size} files:")
val list = for {
    play <- plays 
    playFileString = targetDirName + pathSeparator + play
    playFile = new File(playFileString)
} yield {
    "%-40s\t%s".format(playFileString, if (playFile.exists) "Success!" else "NOT FOUND!!")
}
list.foreach(println)

We should have 8 files:
data/shakespeare/tamingoftheshrew       	Success!
data/shakespeare/comedyoferrors         	Success!
data/shakespeare/loveslabourslost       	Success!
data/shakespeare/midsummersnightsdream  	Success!
data/shakespeare/merrywivesofwindsor    	Success!
data/shakespeare/muchadoaboutnothing    	Success!
data/shakespeare/asyoulikeit            	Success!
data/shakespeare/twelfthnight           	Success!


The `yield` keyword tells Scala that we want to construct a new collection, using the expression that follows to construct elements. Here we use another way to format a string, using C-style `printf` formatting. Because there is only one expression after the `yield`, the braces `{...}` are optional.

## Inverted Index - When You're Tired of Counting Words...

Whew! We've learned a lot of Scala already while doing typical data science chores (i.e., fetching data). 

Now let's implement a real algorithm using Spark, _Inverted Index_. You'll want this when you create your next "Google killer". It takes in a corpus of documents (e.g., web pages), tokenizes the words, and outputs for each word a list of the documents that contain it, along with the corresponding counts. 

This is a slightly more interesting algorithm than _Word Count_, the classic "hello world" program everyone implements when they learn Spark.

The term _inverted_ here means we started with the words as part of the input _values_, where the _keys_ where the document identifiers, and we switched to using the words as keys and the document identifiers as values.

Here's our first version, all at once. This is _one long expression_. Note the periods `.` at the end of the subexpressions.

In [91]:
val iiFirstPass1 = sc.wholeTextFiles(shakespeare.toString).
    flatMap { location_contents_tuple2 => 
        val words = location_contents_tuple2._2.split("""\W+""")
        val fileName = location_contents_tuple2._1.split(pathSeparator).last
        words.map(word => ((word, fileName), 1))
    }.
    reduceByKey((count1, count2) => count1 + count2).
    map { word_file_count_tup3 => 
        (word_file_count_tup3._1._1, (word_file_count_tup3._1._2, word_file_count_tup3._2)) 
    }.
    groupByKey.
    sortByKey(ascending = true).
    mapValues { iterable => 
        val vect = iterable.toVector.sortBy { file_count_tup2 => 
            (-file_count_tup2._2, file_count_tup2._1)
        }
        vect.mkString(",")
    }

Now we'll break it down into steps, assigning each step to a variable. This extra verbosity let's us see what Scala infers for the type returned by the expression. 

This is one of the nice features of Scala. We don't have to put in the type information ourselves, like we would have to do for Java code, but the compiler gives us feedback about what we just created. This is especially useful when you're learning.

In [94]:
val fileContents = sc.wholeTextFiles(shakespeare.toString)
fileContents

org.apache.spark.rdd.RDD[(String, String)] = data/shakespeare MapPartitionsRDD[81] at wholeTextFiles at <console>:37

The second line with `fileContents` is there so the notebook will show us its type information. (Try to remove it and re-evaluate the cell. Nothing is printed.). 

The output is telling us that `fileContents` has the type `RDD[(String,String)]`, but the instance is actually a `MapPartitionsRDD`, which is a "private" implementation subclass of `RDD`. Note that `RDD[...]` means there is a type parameter required in the brackets, `[...]`, which represents the type of records being held. 

The expression `(String,String)` is a convenient shorthand for `Tuple2[String,String]`. That is, we have two-element _tuples_ as records, where the first element is for each file's fully-qualified path and the second element is the contents of the file. This is what `SparkContext.wholetextFiles` returns for us. We'll use the path to remember where we found words, while the contents contains the words themselves, of course.

We'll see shortly that you can also write _instances_ of [Tuple2](http://www.scala-lang.org/api/current/index.html#scala.Tuple2) with the same syntax, e.g., `("foo", 101)`, for a `(String,Int)` tuple, and similarly for _higher-arity_ tuples (up to 22 elements...), e.g., `("foo", 101, 3.14159, ("bar", 202L))`. Run the next cell to the type signature for this four-element tuple. Do you understand it? Do you understand why it has four elements and not five?

In [96]:
("foo", 101, 3.14159, ("bar", 202L))

(String, Int, Double, (String, Long)) = (foo,101,3.14159,(bar,202))

How many `fileContents` records do we have? Not many. It should be the same number as the number of files we downloaded (see above).

In [97]:
fileContents.count

Long = 8

Now for our next step in the calculation, split the contents on non-alphanumeric characters (which also removes the newlines), extract the last element of the fully-qualified path, the file name, and return a new tuple.

Our parsing is very crude. It improperly handles contractions, like `it's` and hyphenated words like `world-changing`. When you kill Google, be sure to use a real _natural language processing_ parsing technique.

In [101]:
val wordFileNameOnes = fileContents.flatMap { location_contents_tuple2 => 
    val words = location_contents_tuple2._2.split("""\W+""")
    val fileName = location_contents_tuple2._1.split(pathSeparator).last
    words.map(word => ((word, fileName), 1))
}
wordFileNameOnes

org.apache.spark.rdd.RDD[((String, String), Int)] = MapPartitionsRDD[82] at flatMap at <console>:41

If find this hard to read and shortly I'll show you a much more elegant alternative syntax.

First, if we called `fileContents.map`, it would return exactly _one_ new record for each record in _fileContents_. What we actually want now are records for each word-fileName combination, a significantly larger number (but the data in each record will be much smaller). For this we use `fileContents.flatMap`. In general, a `flatMap` returns a _collection_ of new records, zero or more, for _each_ input record. These collections are then _flattened_ into one big collection, another `RDD` in this case.

What should `flatMap` actually do with each record? We saw earlier the syntax for defining _methods_. Now we see the syntax for _functions_, `argument_list => body`. We pass a literal function definition, without a name, so it's an _anonymous function_. 

Here, we have a single argument, the record, which we named `location_contents_tuple2`, a verbose way to say that it's a two-element tuple with an input file's location and contents. We don't require a type parameter for `location_contents_tuple2`. The `=>` "arrow" separates the argument list from the body, which appears on the next few lines.

If we had more than one argument or we added the type _annotations_ (optional most of the time), then we would need parentheses, e.g.,:

```scala
(location_contents_tuple2: Tuple2[String,String]) => ...
(arg1, arg2, arg3) => ...
```

Wait, I said we're passing a function as an argument to `flatMap`. If so, why are using braces `{...}` around this function instead of parentheses `(...)` like you would normally expect when passing arguments to a method like `flatMap`? 

It's because Scala lets us use braces instead of parentheses so we have the familiar block-like syntax we know and love for `if` and `for` expressions. You can use either braces or parentheses here, although the informal convention for a multi-line anonymous function is to use braces and to use parentheses for a single expression on the same line. (There's one exception where braces are required, which we'll see later.)

Now, for each `location_contents_tuple2`, we access the _first_ element using the `_1` method and the _second_ element using `_2`.

So, we grab the `contents` in the second element split it using a _regular expression_ for non-alphanumeric characters. That returns an `Array` of words. For the first element, we extract the file name at the end of the location path. This isn't really necessary, but it makes the output more readable if we remove the common prefix from the path. 
Finally, we use `Array.map` in the Scala library, _not_ `RDD.map` to transform each `word` into tuple of the form `((word, fileName), 1)`.

Why did we embed a tuple of `(word, fileName)` inside the "outer" tuple with a `1` as the second element? Why not just write a three-element tuple, `(word, fileName, 1)`? We will use the `(word, fileName)` as a _key_ in the next step, to find all unique word-fileName combinations, using the equivalent of a `group by` statement. The `1` is a "seed" count, which we'll use to count the occurrences of the unique `(word, fileName)` pairs.

> **Notes:**
> * For historical reasons, tuple indices start at 1, not 0 like arrays and other Scala collections.
> * Another benefit of triple-quoted strings that makes them nice for regular expressions is that you don't have to escape regular expression metacharacters, like `\W`. If we used a single-quoted string, we would have to write it as `"\\W+"`. Your choice...

Let's count the number of records we have and look at a few of the lines. We'll use the `RDD.take` method to grab the first 10 lines, then loop over them and print them.

> **Note:** In a cluster, the actually lines chosen will be the first lines from an arbitrarily-chosen _partition_ of the data, so they won't necessarily be the very first lines from the first file, alphabetically sorted, or even the first ten lines of the chosen file, if the file's data is in more than one partition!

In [102]:
wordFileNameOnes.count

Long = 173336

In [103]:
wordFileNameOnes.take(10).foreach(println)

((,asyoulikeit),1)
((AS,asyoulikeit),1)
((YOU,asyoulikeit),1)
((LIKE,asyoulikeit),1)
((IT,asyoulikeit),1)
((DRAMATIS,asyoulikeit),1)
((PERSONAE,asyoulikeit),1)
((DUKE,asyoulikeit),1)
((SENIOR,asyoulikeit),1)
((living,asyoulikeit),1)


We asked for results, so we forced Spark to run a job to compute results. Spark pipelines, like `iiFirstPass1` are _lazy_; nothing is computed until we ask for results. It's useful when learning to print some data to better understand what happening. Just be aware of the extra overhead of running more Spark jobs.

Depending on which 10 records Spark selected, you might see "" (blank) as a word, e.g., this is one of the records:

```
((,asyoulikeit),1)
```
Also, some words are all capital letters:

```
((DRAMATIS,asyoulikeit),1)
```

(You can see where these occur in the original files.) We'll filter out the blank word and capitalization later.

Now, let's join all the unique `(word,fileName)` pairs together. 

In [104]:
val uniques = wordFileNameOnes.reduceByKey((count1, count2) => count1 + count2)
uniques

org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[83] at reduceByKey at <console>:43

In SQL you would use `GROUP BY` for this (including in Spark's [DataFrame](http://spark.apache.org/docs/latest/sql-programming-guide.html) API. However, in the `RDD` API, this is too expensive for our needs, because we don't care about the groups, the long list of repeated `(word,fileName)` pairs. We only care about how many elements are in the groups, that is their _size_. That's the purpose of the `1` in the tuples and the use of `RDD.reduceByKey`. It brings together all records with the same key, the unique `(word,fileName)` pairs, and then applies the anonymous function we provide to "reduce" the values, the `1s`. We simply sum them up to compute the group counts.

Note that this anonymous function takes two arguments, so we need parentheses around the argument list. Since this function fits on the same line, we used parentheses for `reduceByKey`, instead of braces.

> **Note:** All the `*ByKey` methods operate on two-element tuples and treat the first element as the key.

How many are there? Let's see a few:

In [105]:
uniques.count

Long = 27276

In [106]:
uniques.take(30).foreach(println)

((dexterity,merrywivesofwindsor),1)
((crest,asyoulikeit),1)
((whole,comedyoferrors),2)
((lamb,muchadoaboutnothing),2)
((force,muchadoaboutnothing),2)
((letter,merrywivesofwindsor),19)
((blunt,tamingoftheshrew),3)
((bestow,asyoulikeit),1)
((rear,midsummersnightsdream),1)
((crossing,tamingoftheshrew),1)
((wronged,merrywivesofwindsor),4)
((S,tamingoftheshrew),10)
((HIPPOLYTA,midsummersnightsdream),19)
((revolve,twelfthnight),1)
((er,merrywivesofwindsor),11)
((renown,asyoulikeit),1)
((cubiculo,twelfthnight),1)
((All,twelfthnight),3)
((power,loveslabourslost),8)
((Albeit,asyoulikeit),1)
((lips,tamingoftheshrew),3)
((upshot,twelfthnight),1)
((approach,midsummersnightsdream),4)
((mean,muchadoaboutnothing),5)
((embossed,asyoulikeit),1)
((varnish,loveslabourslost),2)
((Apollo,midsummersnightsdream),1)
((spangled,midsummersnightsdream),1)
((gentlemen,comedyoferrors),1)
((Rebuke,loveslabourslost),1)


As you might expect from a `GROUP BY`-like statement, the number of records is smaller than before.

We want our final keys to be the words themselves, so let's restructure the tuples from `((word,fileName),count)` to `(word,(fileName,count))`. Now, we'll still output two-element, key-value tuples, but the `word` will be the key and the `(fileName,count)` tuple will be the value.

In [108]:
val words = uniques.map { word_file_count_tup3 => 
    (word_file_count_tup3._1._1, (word_file_count_tup3._1._2, word_file_count_tup3._2)) 
}

The nested tuple methods, e.g., `_1._2`, are hard to read, making the logic here somewhat obscure. We'll see a beautiful and elegant alternative shortly.

Now we'll use an actual `group by` operation, because we now need the "groups". Calling `RDD.groupByKey` uses the first tuple element and brings together all occurrences of the unique words. Next, we'll sort the result by word, ascending alphabetically.

In [109]:
val wordGroups = words.groupByKey.sortByKey(ascending = true)
wordGroups

org.apache.spark.rdd.RDD[(String, Iterable[(String, Int)])] = ShuffledRDD[88] at sortByKey at <console>:47

Note that the "group" is actually a Scala [Iterable](http://www.scala-lang.org/api/current/index.html#scala.collection.Iterable), i.e., an abstraction for some sort of collection. (We're about to see that it's a Spark-defined, private collection called a `CompactBuffer`.)

In [110]:
wordGroups.count

Long = 11951

In [111]:
wordGroups.take(30).foreach(println)

(,CompactBuffer((tamingoftheshrew,1), (asyoulikeit,1), (merrywivesofwindsor,1), (comedyoferrors,1), (midsummersnightsdream,1), (twelfthnight,1), (loveslabourslost,1), (muchadoaboutnothing,1)))
(A,CompactBuffer((loveslabourslost,78), (midsummersnightsdream,39), (muchadoaboutnothing,31), (merrywivesofwindsor,38), (comedyoferrors,42), (asyoulikeit,34), (twelfthnight,47), (tamingoftheshrew,59)))
(ABOUT,CompactBuffer((muchadoaboutnothing,18)))
(ACT,CompactBuffer((asyoulikeit,22), (comedyoferrors,11), (tamingoftheshrew,12), (loveslabourslost,9), (muchadoaboutnothing,17), (twelfthnight,18), (merrywivesofwindsor,23), (midsummersnightsdream,9)))
(ADAM,CompactBuffer((asyoulikeit,16)))
(ADO,CompactBuffer((muchadoaboutnothing,18)))
(ADRIANA,CompactBuffer((comedyoferrors,85)))
(ADRIANO,CompactBuffer((loveslabourslost,111)))
(AEGEON,CompactBuffer((comedyoferrors,20)))
(AEMELIA,CompactBuffer((comedyoferrors,16)))
(AEMILIA,CompactBuffer((comedyoferrors,3)))
(AEacides,CompactBuffer((tamingoftheshrew,1)

Finally, let's clean up these `CompactBuffers`. Let's convert each to a Scala [Vector](http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Vector) (a collection with _O(1)_ performance for most operations), then sort it descending by count, so the locations that mention the corresponding word the _most_ appear _first_ in the list. Note we're using `Vector.sortBy`, not an `RDD` sorting method. It takes a function that accepts each collection element and returns something used to sort the collection. By returning `(-fileNameCountTuple2._2, fileNameCountTuple2)`, we effectively say, "sort by the counts _descending_ first, then the file names."

Finally, we take the result `Vector` and make a comma-separated string with the elements.

What's `RDD.mapValues`? We could use `RDD.map`, but we aren't changing the keys (words), so rather than have to deal with those too, `mapValues` is a convenience method that just passes in the value part of the tuple and reconstructs new `(key,value)` tuples with the returned, new value.

In [113]:
val iiFirstPass2 = wordGroups.mapValues { iterable => 
    val vect = iterable.toVector.sortBy { file_count_tup2 => 
        (-file_count_tup2._2, file_count_tup2._1)
    }
    vect.mkString(",")
}

We're done! The number of records is the same as for `wordGroups` (do you understand why?), so let's just see see some of the records.

In [114]:
iiFirstPass2.take(30).foreach(println)

(,(asyoulikeit,1),(comedyoferrors,1),(loveslabourslost,1),(merrywivesofwindsor,1),(midsummersnightsdream,1),(muchadoaboutnothing,1),(tamingoftheshrew,1),(twelfthnight,1))
(A,(loveslabourslost,78),(tamingoftheshrew,59),(twelfthnight,47),(comedyoferrors,42),(midsummersnightsdream,39),(merrywivesofwindsor,38),(asyoulikeit,34),(muchadoaboutnothing,31))
(ABOUT,(muchadoaboutnothing,18))
(ACT,(merrywivesofwindsor,23),(asyoulikeit,22),(twelfthnight,18),(muchadoaboutnothing,17),(tamingoftheshrew,12),(comedyoferrors,11),(loveslabourslost,9),(midsummersnightsdream,9))
(ADAM,(asyoulikeit,16))
(ADO,(muchadoaboutnothing,18))
(ADRIANA,(comedyoferrors,85))
(ADRIANO,(loveslabourslost,111))
(AEGEON,(comedyoferrors,20))
(AEMELIA,(comedyoferrors,16))
(AEMILIA,(comedyoferrors,3))
(AEacides,(tamingoftheshrew,1))
(AEgeon,(comedyoferrors,7))
(AEgle,(midsummersnightsdream,1))
(AEmilia,(comedyoferrors,4))
(AEsculapius,(merrywivesofwindsor,1))
(AGUECHEEK,(twelfthnight,2))
(ALL,(midsummersnightsdream,2),(tamingof

Okay. Looks reasonable. 

Next, we'll refine the code using a very powerful feature, _pattern matching_, which both makes the code more concise and easier to understand.

Before we do that, try a few refinements on your own.

**Exercises:**

* Add a filter statement to remove the first entry for the blank word "". You could do this one of two ways, using another "step" with [RDD.filter](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD) (search the [Scaladoc page]((http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD) for the `filter` method), _or_ using the similar Scala collections method, [scala.collection.Seq.filter](http://www.scala-lang.org/api/current/index.html#scala.collection.Seq). Both versions take a _predicate_ function, one that returns `true` if the record should be _retained_ and `false` otherwise. Do you think one choice is better than the other? Why? Or, are they basically the same? Reasons might include code comprehension and performance of one over the other.
* Convert all words to lower case. Calling `toLowerCase` on a string is all you need. Where's a good place to insert this logic?

We'll implement both changes in subsequent refinements below.

> **NOTE:** If you would prefer to make a copy of the code in a new cell, use the _Insert_ menu above. Or use the keyboard shortcuts `ESC` (escape key), followed by `A` for insert before or `B` for insert after. Then hit return to edit. Note the toolbar pop-down for the format of the cell. This cell you're reading is _Markdown_. Use _Code_ for source code (obviously).

## Pattern Matching
We wrote a real program and we've already learned quite a bit of Scala. Let's improve it with one of my favorite Scala features, _pattern matching_.

Here's our "first pass" version again for easy reference.

In [117]:
val iiFirstPass1b = sc.wholeTextFiles(shakespeare.toString).
    flatMap { location_contents_tuple2 => 
        val words = location_contents_tuple2._2.split("""\W+""")
        val fileName = location_contents_tuple2._1.split(pathSeparator).last
        words.map(word => ((word, fileName), 1))
    }.
    reduceByKey((count1, count2) => count1 + count2).
    map { word_file_count_tup3 => 
        (word_file_count_tup3._1._1, (word_file_count_tup3._1._2, word_file_count_tup3._2)) 
    }.
    groupByKey.
    sortByKey(ascending = true).
    mapValues { iterable => 
        val vect = iterable.toVector.sortBy { file_count_tup2 => 
            (-file_count_tup2._2, file_count_tup2._1)
        }
        vect.mkString(",")
    }

Now here it is another implementation, this time using _pattern matching_, which I'll explain in a moment. 

I've also made two other changes. Recall that we have entries for empty words "" and also mixed capitalization. I've added fixes for these:
* `filter(word => word.size > 0)` to remove the empty words. (In Spark and Scala collections, `filter` has the positive sense; what should be retained?) It's indicated by the comment `// #1`.
* `word.toLowerCase` to convert all words to lower case uniformly, so that words like HAMLET, Hamlet, and hamlet in the original texts are treated as the same, since we're counting word occurrences. See comment `// #2`.

In [120]:
val ii1 = sc.wholeTextFiles(shakespeare.toString).
    flatMap {
        case (location, contents) => 
            val words = contents.split("""\W+""").
                filter(word => word.size > 0)                      // #1
            val fileName = location.split(pathSeparator).last
            words.map(word => ((word.toLowerCase, fileName), 1))   // #2
    }.
    reduceByKey((count1, count2) => count1 + count2).
    map { 
        case ((word, fileName), count) => (word, (fileName, count)) 
    }.
    groupByKey.
    sortByKey(ascending = true).
    mapValues { iterable => 
        val vect = iterable.toVector.sortBy { 
            case (fileName, count) => (-count, fileName) 
        }
        vect.mkString(",")
    }

Note that I added the filtering inside the function passed to `flatMap`. My choice reduces the number of output records from `flatMap` by at most one record per line, which won't have a significant impact on performance. Filtering itself adds some extra overhead. 

Also, the way Spark implements steps like `map`, `flatMap`, `filter`, it would incur about the same overhead if we used `RDD.filter` instead. Note that you could also do the filtering later in the pipeline, any step after `groupByKey`. However you implemented this above is probably fine. You could do performance profiling of the different options, but you may not notice a difference except for very large input data sets.

Let's verify we get the same results, but with the two improvements described. We'll use Spark's [DataFrame](http://spark.apache.org/docs/latest/sql-programming-guide.html) for convenience.

In [121]:
val ii1DF = sqlContext.createDataFrame(ii1).toDF("word", "locations_counts")

The `%%dataframe` _cell magic_ provides a nice table layout display.

In [123]:
%%dataframe
ii1DF

word,locations_counts
a,"(loveslabourslost,507),(merrywivesofwindsor,494),(muchadoaboutnothing,492),(asyoulikeit,461),(tamingoftheshrew,445),(twelfthnight,416),(midsummersnightsdream,281),(comedyoferrors,254)"
abandon,"(asyoulikeit,4),(tamingoftheshrew,1),(twelfthnight,1)"
abate,"(loveslabourslost,1),(midsummersnightsdream,1),(tamingoftheshrew,1)"
abatement,"(twelfthnight,1)"
abbess,"(comedyoferrors,8)"
abbey,"(comedyoferrors,9)"
abbominable,"(loveslabourslost,1)"
abbreviated,"(loveslabourslost,1)"
abed,"(asyoulikeit,1),(twelfthnight,1)"
abetting,"(comedyoferrors,1)"


Now let's explore our new implementation. We start off as before, by calling `wholeTextFiles`:

```scala
val ii = sc.wholeTextFiles(shakespeare.toString).
```

The function we pass to `flatMap` now looks like this:

```scala
flatMap { 
    case (location, contents) => 
        val words = contents.split("""\W+""").
            filter(word => word.size > 0)                      // #1
        val fileName = location.split(pathSep).last
        words.map(word => ((word.toLowerCase, fileName), 1))   // #2
}.
```

Compared to the previous version (without the enhancements marked with the \#1 and \#2 comments):

```scala
flatMap { location_contents_tuple2 => 
    val words = location_contents_tuple2._2.split("""\W+""")
    val fileName = location_contents_tuple2._1.split(pathSeparator).last
    words.map(word => ((word, fileName), 1))
}.
```    

Instead of `location_contents_tuple2` a variable name for the whole tuple, we write `case (location, contents)`. The `case` keyword says we want to _pattern match_ on the object passed to the function. If it is a two-element tuple (and we know it always will be in this case), then _extract_ the first element and assign it to a variable named `location` and extract the second element and assign it to a variable named `contents`.

Now, instead of accessing the location and content with the slighly obscure `location_contents_tuple2._1` and `location_contents_tuple2._2`, respectively, we use meaningful names, `location` and `contents`. The code becomes more concise and readable. 

We'll explore more pattern matching features as we proceed.

The `reduceByKey` step is unchanged:

```scala
reduceByKey((count1, count2) => count1 + count2).
```

To be clear, this isn't a pattern-matching expression; there is no `case` keyword. It's just "regular" function that takes two arguments, for the two things we're adding.

My favorite improvement is the next line:

```scala
map { 
    case ((word, fileName), count) => (word, (fileName, count)) 
}.
```

Compare it to the previous, obscure version:

```
map { word_file_count_tup3 => 
    (word_file_count_tup3._1._1, (word_file_count_tup3._1._2, word_file_count_tup3._2)) 
}.
```

The new implementation makes it clear what we're doing; just shifting parentheses! That's all it takes to go from our `(word, fileName)` keys with `count` values to `word` keys and `(fileName, count)` values. Note that pattern matching works just fine with nested structures, like `((word, fileName), count)`.

I hope you can appreciate how elegant and concise this expression is! Note how I thought of the next transformation I needed to do in preparation for the final group-by, to switch from `((word, fileName), count)` to `(word, (fileName, count))` and _I just wrote it down exactly as I pictured it!_

Code like this makes writing Scala Spark code a sublime experience.

The next two expressions are unchanged:

```scala
groupByKey.
sortByKey(ascending = true).
```

The final `mapValues` now uses pattern matching to sort the `Vector` in each record:

```scala
mapValues { iterable => 
    val vect = iterable.toVector.sortBy { 
        case (fileName, count) => (-count, fileName) 
    }
    vect.mkString(",")
}
```

Compared to the original version, it's again easier to read:

```scala
mapValues { iterable => 
    val vect = iterable.toVector.sortBy { file_count_tup2 => 
        (-file_count_tup2._2, file_count_tup2._1)
    }
    vect.mkString(",")
}
```

## Final Version
By the way, we can write SQL queries to explore this data. First, instead of creating a string for the list of `(location,count)` pairs, which isn't really useful, let's "unzip" the collection into two Arrays, one for the locations and one for the counts. That way, if we ask for the first element of each array, we'll have nicely separate fields that work better with structured queries, as provided by Spark SQL.

Here's is `ii1` rewritten with this change. The comments explain what we're doing.

In [125]:
val ii = sc.wholeTextFiles(shakespeare.toString).
    flatMap {
        case (location, contents) => 
            val words = contents.split("""\W+""").
                filter(word => word.size > 0)                      // #1
            val fileName = location.split(pathSeparator).last
            words.map(word => ((word.toLowerCase, fileName), 1))   // #2
    }.
    reduceByKey((count1, count2) => count1 + count2).
    map { 
        case ((word, fileName), count) => (word, (fileName, count)) 
    }.
    groupByKey.
    sortByKey(ascending = true).
    map {                         // Must use map now, because we'll reformat new records. 
      case (word, iterable) =>    // Hence, pattern match on the whole input record.
        val vect = iterable.toVector.sortBy { 
            case (fileName, count) => (-count, fileName) 
        }

        // Use `Vector.unzip`, which returns a single, two element tuple, where each
        // element is a collection, one for the locations and one for the counts. 
        // We use pattern matching to extract these two collections into variables.
        val (locations, counts) = vect.unzip  
        
        // Lastly, we'll compute the total count across all locations and return 
        // a new record with all four fields. The `reduceLeft` method takes a function
        // that knows how to "reduce" the collection down to a final value, working 
        // from the left.
        val totalCount = counts.reduceLeft((n1,n2) => n1+n2)
        
        (word, totalCount, locations, counts)
    }

In [126]:
val iiDF = sqlContext.createDataFrame(ii).toDF("word", "total_count", "locations", "counts")
iiDF.registerTempTable("inverted_index")

Now, write a SQL query that extracts the top location by count for each word, as well as the total count across all locations for the word. The Spark SQL dialect supports Hive SQL syntax for extracting elements from arrays, maps, and structs ([details](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-CollectionFunctions)). Here we access the first element (index zero) from each array. 

In [127]:
%%SQL
SELECT word, total_count, locations[0] AS top_location, counts[0] AS top_count 
FROM inverted_index 

+-----------+-----------+----------------+---------+
|       word|total_count|    top_location|top_count|
+-----------+-----------+----------------+---------+
|          a|       3350|loveslabourslost|      507|
|    abandon|          6|     asyoulikeit|        4|
|      abate|          3|loveslabourslost|        1|
|  abatement|          1|    twelfthnight|        1|
|     abbess|          8|  comedyoferrors|        8|
|      abbey|          9|  comedyoferrors|        9|
|abbominable|          1|loveslabourslost|        1|
|abbreviated|          1|loveslabourslost|        1|
|       abed|          2|     asyoulikeit|        1|
|   abetting|          1|  comedyoferrors|        1|
+-----------+-----------+----------------+---------+
only showing top 10 rows



Unfortunately, the output formatting for Toree's `%%SQL` "cell magic" is not configurable. The `%%DataFrame` magic handles variable width layout and also provides more display options. First, to see the options:

In [128]:
%%dataframe

%%dataframe [arguments]
DATAFRAME_CODE

DATAFRAME_CODE can be any numbered lines of code, as long as the
last line is a reference to a variable which is a DataFrame.
    Option    Description                       
------    -----------                       
--limit   The number of records to return   
            (default: 10)                   
--output  The type of the output: html, csv,
            json (default: html)            


Now here's the previous query, with the addition of a `WHERE` clause, too:

In [129]:
val topLocations = sqlContext.sql("""
    SELECT word,  total_count, locations[0] AS top_location, counts[0] AS top_count
    FROM inverted_index 
    WHERE word LIKE '%love%' OR word LIKE '%hate%'
""")

In [None]:
val topTwoLocations = sqlContext.sql("""
    SELECT word, total_count, 
        locations[0] AS first_location,  counts[0] AS first_count
        locations[1] AS second_location, counts[1] AS second_count
    FROM inverted_index 
    WHERE word LIKE '%love%' OR word LIKE '%hate%'
""")
%%dataframe --limit 100
topTwoLocations

In [130]:
%%dataframe --limit 100
topLocations

word,total_count,top_location,top_count
beloved,11,tamingoftheshrew,4
cloven,1,loveslabourslost,1
cloves,1,loveslabourslost,1
glove,3,loveslabourslost,2
glover,1,merrywivesofwindsor,1
gloves,5,merrywivesofwindsor,3
hate,22,midsummersnightsdream,9
hated,6,midsummersnightsdream,4
hateful,5,midsummersnightsdream,3
hates,5,asyoulikeit,2


A _natural language processing_ (NLP) expert might tell you that _love_, _loved_, _loves_, etc. are really the same word, because they are different conjugations of the verb _to love_. Similarly, should _gloves_ (plural) and _glove_ (singular) be handled differently?

What we really should do is extract the _stems_ of these words and use those instead. NLP toolkits handle this _stemming_ for you.

There's also a useful `show` method on `DataFrames`.

In [137]:
topLocations.show

+-------+-----------+--------------------+---------+
|   word|total_count|        top_location|top_count|
+-------+-----------+--------------------+---------+
|beloved|         11|    tamingoftheshrew|        4|
| cloven|          1|    loveslabourslost|        1|
| cloves|          1|    loveslabourslost|        1|
|  glove|          3|    loveslabourslost|        2|
| glover|          1| merrywivesofwindsor|        1|
| gloves|          5| merrywivesofwindsor|        3|
|   hate|         22|midsummersnightsd...|        9|
|  hated|          6|midsummersnightsd...|        4|
|hateful|          5|midsummersnightsd...|        3|
|  hates|          5|         asyoulikeit|        2|
| hateth|          1|midsummersnightsd...|        1|
|   love|        662|    loveslabourslost|      121|
|  loved|         38|         asyoulikeit|       13|
| lovely|         15|midsummersnightsd...|        7|
|  lover|         33|         asyoulikeit|       14|
| lovers|         31|midsummersnightsd...|    

By default, it truncates column widths and only prints 20 rows. You can override that:

In [138]:
topLocations.show(numRows = 40, truncate = false)

+--------+-----------+---------------------+---------+
|word    |total_count|top_location         |top_count|
+--------+-----------+---------------------+---------+
|beloved |11         |tamingoftheshrew     |4        |
|cloven  |1          |loveslabourslost     |1        |
|cloves  |1          |loveslabourslost     |1        |
|glove   |3          |loveslabourslost     |2        |
|glover  |1          |merrywivesofwindsor  |1        |
|gloves  |5          |merrywivesofwindsor  |3        |
|hate    |22         |midsummersnightsdream|9        |
|hated   |6          |midsummersnightsdream|4        |
|hateful |5          |midsummersnightsdream|3        |
|hates   |5          |asyoulikeit          |2        |
|hateth  |1          |midsummersnightsdream|1        |
|love    |662        |loveslabourslost     |121      |
|loved   |38         |asyoulikeit          |13       |
|lovely  |15         |midsummersnightsdream|7        |
|lover   |33         |asyoulikeit          |14       |
|lovers  |

By the way, we used _named parameters_, `(numRows = 40, truncate = false)`, for legibility, but this is optional in Scala. You can also use it to write the arguments in any order you want, not just declaration order. In this case, we could have just written `(40, false)`, but then you would rightly wonder what `false` means in this context.

**Exercise:** Modify the query to return the top two locations and counts.

Before we move on, try writing other queries. Edit the query in the following cell:

In [140]:
val sql1 = sqlContext.sql("""
    SELECT * FROM inverted_index
""")
sql1.show(40, false)

+-----------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------+
|word       |total_count|locations                                                                                                                                       |counts                                  |
+-----------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------+
|a          |3350       |[loveslabourslost, merrywivesofwindsor, muchadoaboutnothing, asyoulikeit, tamingoftheshrew, twelfthnight, midsummersnightsdream, comedyoferrors]|[507, 494, 492, 461, 445, 416, 281, 254]|
|abandon    |6          |[asyoulikeit, tamingoftheshrew, twelfthnight]                                                                                  

#### Removing the "Stop Words"
Did you notice that one record we saw above was for the word "a". Not very useful if you're using this data for text searching, _sentiment mining_, etc. So called _stop words_, like _a_, _an_, _the_, _he_, _she_, _it_, etc., could also be removed.

Recall the `filter` logic we added to remove "", `word => word.size > 0`. We could replace it with `word => keep(word)`, where `keep` is a method that does any additional filtering we want, like removing stop words.

**Exercise:**

* Implement the `keep(word: String):Boolean` method and change the `filter` function to use it. Have `keep` return `false` for a small, hard-coded list of stop words. (The solution is at the end of this notebook.)

## More on Pattern Matching Syntax
We've only scratched the surface of pattern matching. Let's explore it some more.

Here's another anonymous function that uses this pattern matching form, which automatically drops empty files. (It could be a little smarter and look for files that have arbitrary whitespace, too.)

```scala
{
    case (location, "") => 
        Array.empty[((String, String), Int)]  // Return an empty array
    case (location, contents) => 
        val words = contents.split("""\W+""")
        val fileName = location.split(pathSep).last
        words.map(word => ((word, fileName), 1))
}.
```

So, you can have multiple `case` clauses, some of which might match on specific literal values ("" in this case) and others which are more general. Pattern matching is _eager_. The first successful match in the order as written will win. If you reversed the order here, the `case (location, "")` would never match and the compiler would throw an "unreachable code" warning.

Note that you don't have to put the lines after the `=>` inside braces, `{...}` (although you can). The `=>` and `case` keywords (or the final `}`) are sufficient to mark these blocks. Also, for a single-expression block, like the first `case` clause, you can put the expression on the same line as the `case` clause. 

Finally, if none of the `case` clauses matches, then a [MatchError](http://www.scala-lang.org/api/current/index.html#scala.MatchError) exception is thrown. In our case, we _always_ know we'll have two-element tuples, so the examples so far are fine. Here's a final contrived example to illustrate what's possible, using a sequence of objects of different types:

In [None]:
val stuff = Seq(1, 3.14159, 2L, 4.4F, ("one", 1), (404F, "boo"), ((1, 2), 3, 4), "hello")

stuff.foreach {
    case i: Int               => println(s"Found an Int:   $i")
    case l: Long              => println(s"Found a Long:   $l")
    case f: Float             => println(s"Found a Float:  $f")
    case d: Double            => println(s"Found a Double: $d")
    case (anyType1, anyType2) => 
        println(s"Found a two-element tuple with arbitrary elements: ($anyType1, $anyType2)")
    case ((anyType1, anyType2), _, anyType4) => 
        println(s"Found a three-element tuple with (1st, 2nd) and 4th elements: ($anyType1, $anyType2) and $anyType4")
    case default              => println(s"Found something else: $default")
}

A few notes.
* A literal like `1` is inferred to be `Int`, while `3.14159` is inferred to be `Double`. Add `L` or `F`, respectively, to infer `Long` and `Float` instead.
* Note how we mixed specific type checking, e.g., `i: Int`, with more loosely-typed expressions, e.g., `(anyType1, anyType2)`, which expects a two-element tuple, but the element types are unconstrained.
* All the words `i`, `l`, `f`, `d`, `anyType1`, `anyType2`, `anyType3`, and `default` are arbitrary variable names. Yes `default` is not a keyword, but an arbitrary choice. We could use anything we want.
* The last `default` clause specifies a variable with no type information. Hence, it matches _anything_, which is why it must appear last. This last clause is the idiom to use when you don't know any information about what you're matching against. 
* If you want to match that something _exists_, but you don't need to bind it to a variable, then use `_`, as in the three-element tuple example.
* The three-element tuple example also demonstrates that arbitrary nesting of types is supported.

This pattern matching function `{ case firstCase => ...; case secondCase => ...; ... }` (yes, you could put expressions on the same line, separated by `;`) has a special name. It's called a _partial function_. All that means is that we only "promise" to accept arguments that match at least one of our `case` clauses, not any possible input. The other kind of anonymous function we've seen is a _total function_, to be precise. Recall we said that for those you could use either `(...)` or `{...}`, depending on the "look" you want. For _partial functions_, you must use `{...}`.

Also, recall that we used pattern matching with assignment:

```scala
val (locations, counts) = vect.unzip  
```
[Vector.unzip] returns a two-element tuple, where each element is a collection. We matched on that tuple and assigned each piece to a variable. Here's another contrived example, with nested tuple elements:

In [None]:
val (a, (b, (c1, c2), d)) = ("A", ("B", ("C1", "C2"), "D"))
println(s" $a, $b, $c1, $c2, $d")

We'll show one more useful example of pattern matching soon, with _case classes_.

## Scala's Object Model
Scala is a _hybrid_, object-oriented and functional programming language. The philosophy of Scala is that you exploit object orientation for encapsulation of details, but use functional programming for its logical precision, to implement those details. Most of what we've seen so far falls into the functional programming camp. Much of data manipulation and analysis is really Mathematics. Functional programming tries to stay close to how functions and values work in Mathematics.

However, when writing non-trivial Spark programs, it's occasionally useful to exploit the object-oriented features.

### Classes vs. Instances
Scala uses the same distinction between classes and instances. Classes are like _templates_ used to create instances. 

We've talked about the _types_ of things, like `word` is a `String` and `totalCount` is an `Int`. A class defines a _type_ in the same sense.

Here is an example class that we might use to represent the inverted index records we just created:

In [None]:
class IIRecord1(
    word: String, 
    total_count: Int, 
    locations: Array[String], 
    counts: Array[Int]) {
    
    /** CSV formatted string, but use [a,b,c] for the arrays */
    override def toString: String = 
        s"$word,$total_count,[${locations.mkString(",")}],[${counts.mkString(",")}]"
}

new IIRecord1("hello", 3, Array("one", "two"), Array(1, 2))

When defining a class, the argument list after the class name is the argument list for the _primary constructor_. You can define secondary constructors, too, but it's not very common, in part for reasons we'll see shortly.

Note that when you override a method that's defined in a parent class, like Java's `Object.toString`, Scala requires you to add the `override` keyword.

We created an _instance_ of `IIRecord1` using `new`, just like in Java.

Finally, as a side note, we've been using `Ints` (integers) all along for the various counts, but really for "big data", we should probably use `Longs`.

### Objects

I've been careful to use the word _instance_ for things we create from classes. That's because Scala has built-in support for the [Singleton Design Pattern](https://en.wikipedia.org/wiki/Singleton_pattern), i.e., when we only want one instance of a class. We use the `object` keyword. 

For example, in Java, you define a class with a `static void main(String[] arguments)` method as your entry point into your program. In Scala, you use an `object` to hold `main`, as follows:

In [None]:
object MySparkJob {

    val greeting = "Hello Spark!"
    
    def main(arguments: Array[String]) = {
        println(greeting)
        
        // Create your SparkContext, etc., etc.
    }
}

Just as for classes, the name of the object can be anything you want. There is no `static` keyword in Scala. Instead of adding `static` methods and fields to classes as in Java, you put them in an object instead, as here.

> **NOTE:** Because the Scala compiler must generate valid JVM byte code, these definitions are converted into the equivalent, Java-like static definitions in the output byte code.

### Case Classes
Tuples are handy for representing records, but it would be nice if the fields were _named_, as well as _typed_. A good use for a class, like our `IIRecord1` above, us to represent this structure and give us named fields. Let's now refine that class definition to exploit some extra, very useful features in Scala.

Consider the following definition of a _case class_ that represents our final record type.

In [18]:
case class IIRecord(
    word: String, 
    total_count: Int = 0, 
    locations: Array[String] = Array.empty, 
    counts: Array[Int] = Array.empty) {

    /** 
     * Different than our CSV output above, but see toCSV.
     * Array.toString is useless, so format these ourselves. 
     */
    override def toString: String = 
        s"""IIRecord($word, $total_count, [${locations.mkString(", ")}], [${counts.mkString(", ")}])"""
    
    /** CSV-formatted string, but use [a,b,c] for the arrays */
    def toCSV: String = 
        s"$word,$total_count,[${locations.mkString(",")}],[${counts.mkString(",")}]"
        
    /** Return a JSON-formatted string for the instance. */
    def toJSONString: String = 
        s"""{
        |  "word":        "$word", 
        |  "total_count": $total_count, 
        |  "locations":   [${toJSONArrayString(locations)}],
        |  "counts"       [${counts.mkString(", ")}]
        |}
        |""".stripMargin

    private def toJSONArrayString(array: Array[String]): String =
        array.map(quote).mkString(", ")
    
    private def quote(word: String): String = "\"" + word + "\""  
}

I said that defining secondary constructors is not very common. In part, it's because I used a convenient feature, the ability to define default values for arguments. The default values mean that I can create instances without providing the arguments explicitly. Consider these two examples:

In [19]:
val hello = new IIRecord("hello")
val world = new IIRecord("world!", 3, Array("one", "two"), Array(1, 2))

println("\n`toString` output:")
println(hello)
println(world)

println("\n`toJSONString` output:")
println(hello.toJSONString)
println(world.toJSONString)

println("\n`toCSV` output:")
println(hello.toCSV)
println(world.toCSV)


`toString` output:
IIRecord(hello, 0, [], [])
IIRecord(world!, 3, [one, two], [1, 2])

`toJSONString` output:
{
  "word":        "hello", 
  "total_count": 0, 
  "locations":   [],
  "counts"       []
}

{
  "word":        "world!", 
  "total_count": 3, 
  "locations":   ["one", "two"],
  "counts"       [1, 2]
}


`toCSV` output:
hello,0,[],[]
world!,3,[one,two],[1,2]


I added `toJSONString` to illustrate adding public (default visibility) and private methods to a class definition. If there are no methods or other values to define, I can omit the body complete; no empty `{}` required.

Recall that the `override` keyword is required when redefining `toString`.

Okay, what about that `case` keyword? It tells the compiler to do several useful things for us, eliminating a lot of boilerplate that we would have to write for ourselves with other languages, like Java:

1. Treat each constructor argument as an immutable (`val`) private field of the instance.
1. Generate a public reader method for the field with the same name (e.g., `word`).
1. Generate _correct_ implementations of the `equals` and `hashCode` methods, which people often implement incorrectly, as well as a default `toString` method. You can use your own definitions by adding them explicitly to the body. We did this for `toString`, so format the arrays in a nicer way than the default.
1. Generate an `object IIRecord`, i.e., with the same name. The object is called the _companion object_.
1. Generate a "factory" method in the companion object that takes the same argument list and instantiates an instance.
1. Generate helper methods in the companion object that support pattern matching.

Points 1 and 2 make each argument behave as if they are public, read-only fields of the instance, but they are actually implemented as described.

Point 3 is important for correct behavior. Case class instances are often used as keys in [Maps](http://www.scala-lang.org/api/current/index.html#scala.collection.Map), Spark RDD and DataFrame methods, etc. In fact, you should _only_ use your case classes or Scala built-in types with the same properties, like `Int` and other number types, `String`, tuples, etc.

For point 4, the _companion object_ is generated automatically by the compiler. It adds a "factory" method, point5 (discussed next), and methods that support pattern matching (point 6). You can explicitly define it yourself, if you want to add additional methods and fields to it. The compiler will still insert these other methods. However, see <a href="#Ambiguities">Ambiguities with Companion Objects</a>. The bottom line is that you shouldn't define case classes in notebooks like this with extra methods in the companion object, due to parsing ambiguities.

Point 5 means you actually rarely use `new` when creating instances. That is, the following are effectively equivalent:

In [11]:
val hello1 = new IIRecord("hello1")
val hello2 = IIRecord("hello2")

What actually happens in the second case, without `new`? The "factory" method is actually called `apply`. In Scala, whenever you put an argument list after an object, as in the `hello2` case, Scala looks for an `apply` method to call. The arguments have to match the argument list for apply (number of arguments, types of arguments, accounting for default argument values, etc.). Hence, the `hello2` declaration is really this:

In [12]:
val hello2b = IIRecord.apply("hello2b")

You can exploit this feature, too, in your other classes. We talked about word stemming above. Suppose you write a stemming library and declare an object for as the entry point. Here, I'll just do something simple; assume a trailing "s" means the word is a plural and remove it (a bad assumption...):

In [13]:
object stem {
    def apply(word: String): String = word.replaceFirst("s$", "") // insert real implementation!
}

println(stem("dog"))
println(stem("dogs"))

dog
dog


Note how it looks like we're calling a function or method named `stem`. (Scala allows object and class names to start with a lower case letter.)

Finally, point 6 means we can use our custom case classes in pattern matching expressions. I won't go into the methods actually implemented in the companion object and how the support pattern matching, but here is an example of how you would use it with our previously-defined `hello` and `world` instances.

In [15]:
Seq(hello, world).map {
    case IIRecord(word, 0, _, _) => s"$word with no occurrences."
    case IIRecord(word, cnt, locs, cnts) => 
        s"$word occurs $cnt times: ${locs.zip(cnts).mkString(", ")}"
}

List(hello with no occurrences., world! occurs 3 times: (one,1), (two,2))

The first case match ignored the locations and counts, because we know they will be empty arrays if the total count is 0! 

The second case match uses the `zip` method to put the locations and counts back together. Recall we used `unzip` to create the separate collections.

## DataFrames
So far, we've used Spark's RDD API. It's common to use case classes to represent the "schema" of records when working with [Datasets](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset).

A problem with [DataFrames](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame) is the fact that the fields are untyped until you try to access them. `Datasets` restore the type safety of `RDDs` by using a case class as the definition of the schema.

But first, we need to explain one use of imports in Spark.
TODO

In [16]:
val sqlc = sqlContext
import sqlc.implicits._

In [None]:
val iiDS = iiDF.as[IIRecord]
iiDS

In [None]:
iiDS.show

In [None]:
// Requires Spark 2.X:
iiDS.select(iiDS("word"), iiDS("total_count"), sum(iiDS("counts")))

## More Scala
We've covered a lot already in this notebook, focusing on the first things you need to know about Scala. This section discusses additional details about Scala that you'll encounter relatively quickly. 

At this point, I suggest you create a new notebook and play with Spark using what you've learned, then come back to this section if you run into something you don't understand or when you're ready to learn more Scala.

### Importing Everything in a Package
In Java, `import foo.bar.*;` means import everything in the `bar` package.

In Scala, `*` is actually a legal method name; think of defining multiplication for custom numeric types, like `Matrix`. Hence, this import statement in Scala would be ambigious. Therefore, Scala uses `_` instead of `*`, `import foo.bar._` (with the semicolon inferred).

### Traits
_Traits_ are similar to Java 8 _interfaces_, used to define abstractions, but with the ability to provide "default" implementations of the methods declared. Unlike Java 8 interfaces, traits can also have fields representing "state" information about instances. There is a blury line between traits and _abstract classes_, again where some member methods or fields are not defined. In both cases, subtypes of a trait and/or abstract class must define any undefined members, or else it won't be allowed to construct instances of those subtypes.

So, why have both traits and abstract classes? It's because Java only allows _single inheritance_; there can be only one _parent_ type, which is normally where you would use an abstract class, but Scala lets you "mix in" one or more additional traits (or use a trait as the parent class - yes, confusing). A great example "mix in" trait is one that implements logging. Any "service" type can mix in the logging trait to get "instance" access to this functionality. Schematically, it looks like the following:

```scala
// Assume severity `Level` and `Logger` types defined elsewhere...
trait Logging {

    def log(level: Level, message: String): Unit = logger.log(level, message)
    
    private logger: Logger = ...
}

abstract class Service {
    def run(): Unit   // No body, so abstract!
}

class MyService extends Service with Logging {
    def run(): Unit = {
        log(INFO, "Staring MyService...")
        ...
        log(INFO, "Finished MyService")
    }
}
```

`Unit` is Scala's equivalent to Java's `void`. It actually is a return value, unlike `void`, but we use it in the same sense of "nothing useful will be returned".

### Ranges
What if you want some numbers between a start and end value? Use a [Range](http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Range), which has a nice literal syntax, e.g., `1 until 100`, `2 to 200 by 3`. 

The `Range` always includes the lower bound. Using `to` in a `Range` makes it _inclusive_ at the upper bound. Using `until` makes it _exclusive_ at the upper bound. Use `by` to specify a delta, which defaults to `1`.

In [2]:
1 until 10

Range(1, 2, 3, 4, 5, 6, 7, 8, 9)

In [3]:
1 to 10

Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

In [4]:
1 to 10 by 3

Range(1, 4, 7, 10)

When you need a small test data set to play with Spark, ranges can be convenient.

In [30]:
val rdd7 = sc.parallelize(1 to 50).map(i => (i, i%7)).groupBy{ case (i, seven) => seven }.sortByKey()
rdd7.take(7).foreach(println)

(0,CompactBuffer((7,0), (14,0), (21,0), (28,0), (35,0), (42,0), (49,0)))
(1,CompactBuffer((1,1), (8,1), (15,1), (22,1), (29,1), (36,1), (43,1), (50,1)))
(2,CompactBuffer((2,2), (9,2), (16,2), (23,2), (30,2), (37,2), (44,2)))
(3,CompactBuffer((3,3), (10,3), (17,3), (24,3), (31,3), (38,3), (45,3)))
(4,CompactBuffer((4,4), (11,4), (18,4), (25,4), (32,4), (39,4), (46,4)))
(5,CompactBuffer((5,5), (12,5), (19,5), (26,5), (33,5), (40,5), (47,5)))
(6,CompactBuffer((6,6), (13,6), (20,6), (27,6), (34,6), (41,6), (48,6)))


`SparkContext` also has a `range` method that effectively does the same thing as `sc.parallelize(some_range)`.

### Scala Interpreter (REPL) vs. Notebooks vs. Scala Compiler
<a name="REPL"></a>
This notebook has been using a running Scala interpreter, a.k.a. _REPL_ ("read, eval, print, loop") to parse the Scala code. The Spark distribution comes with a `spark-shell` script that also lets you use the interpreter from the command line. Hence, you don't get the nice notebook UI.

If you use `spark-shell`, there are a few behavior changes you should know about.

#### Using :paste Mode
By default the Scala interpreter treats _each line_ you enter separately. This can cause surprises compared to how the Scala _compiler_ works, where it treats all the code in the same file in the same context.

For example, the following code, where the expression continues on the second line, is handled successfully by the compiler, but not by the interpreter.

```scala
(1 to 100)
.map(i => i*i)
```

the Interpreter thinks it finished parsing the expression when it hit the new line after the literal [Range](http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Range), `1 to 100`. It then throws an error on the opening `.` on the next line. On the other hand, the compiler keeps compiling, ignoring the new line in this case. 

This notebook also does the same thing as the "raw" interpreter, but in some cases, notebooks will use an interpeter command, `:paste` that tells the parser to parse all of the following lines together, just like the compiler would parse them, until the "end of input", which you indicate with `CTRL-D`. 

You can't experiment with it through this notebook, but your session would look something like this:

```scala
scala> :paste
// Entering paste mode (ctrl-D to finish)

(1 to 10)
.map(i => i*i)
<CTRL-D>

// Exiting paste mode, now interpreting.

res0: scala.collection.immutable.IndexedSeq[Int] = Vector(1, 4, 9, 16, 25, 36, 49, 64, 81, 100)

scala>
```

#### Ambiguities with Companion Objects
<a name="Ambiguities"></a>
As I wrote this notebook, I _wanted_ to demonstrate using the companion object `IIRecord` to define a method explicitly, but this leads to an ambiguity later on in the notebook if you attempt to use this method. The notebook gets confused between the case class and the object. 

While unfortunately, it's also true that once you start defining more involved case classes, with more than trivial methods and explicit additions to the default companion object, you should really define these types outside the notebook in a compiled library that you use within the notebook.

### Scala's Object Model
Scala's Object Model, including its hierarchy of types, is similar to Java's, but with some interesting differences.

![Scala Type Hierarchy](images/ScalaTypeHierarchy.jpg)

In Java, all _reference types_ are descended from [java.lang.Object](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html). The name _reference type_ reflects the fact that the instances for all these types are allocated on the _heap_ and program variables are references to those heap locations.

The primitives types, `int`, `long`, etc. are not considered part of the type hierarchy and are treated specially. This is in part a performance optimization, as instances of these types fit in CPU registers and the values are pushed onto stack frames. However, they have wrapper or "boxed" types, `Integer`, `Long`, etc., that are part of the type hierarchy, which you must use with Java's collections, for example (with the exception of arrays).

Instead, Scala treats the primitives at the code level as basically the same as the reference types. You don't use `new Int(100)` for example, but you can call methods on `Int` instances. The code generated, in most cases, uses the optimized JVM primitives. 

Hence, the Scala type hierarchy defines a type [Any](http://www.scala-lang.org/api/current/#scala.Any) to be the a parent type of _both_ reference types and "value" types (for the primitives). Each of those subhierarchies have parent types, [AnyRef](http://www.scala-lang.org/api/current/#scala.AnyRef) is effectively the same as [java.lang.Object](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html), and [AnyVal](http://www.scala-lang.org/api/current/#scala.AnyVal) is the parent of the value types.

Finally, for better "soundness", the Scala type system defines a real type to represent [Null](http://www.scala-lang.org/api/current/#scala.Null) and [Nothing](http://www.scala-lang.org/api/current/#scala.Nothing). By defining `Null` to be the subtype of all reference types `AnyRefs` (but not `AnyVals`), it supports at the type level the (unfortunate) practice of using `null` for a reference value.

However, `null` is not allowed for an `AnyVal`, so the true "bottom type" of the hierarchy is `Nothing`. Why is that useful. I'll explain in the next section.

### Try vs. Option vs. null
<a name="TryOptionNull"></a>

Recall the signature of our `curl` method near the beginning of this notebook:

```scala
def curl(sourceURLString: String, targetDirectoryString: String): Try[File] = ...
```

We explained briefly why we used `Try`. Let's explore it in more detail and also discuss an alternative, [Option](http://www.scala-lang.org/api/current/index.html#scala.Option).

Because `curl` is declared to return [util.Try[T]](http://www.scala-lang.org/api/current/index.html#scala.util.Try), where `T` is `java.io.File` in this case, the reader knows that it might fail somehow. If so, the relevant exception will be returned wrapped in a subclass of `Try`, called [util.Failure[T]](http://www.scala-lang.org/api/current/index.html#scala.util.Failure). However, if `curl` is successful, the `File` is returned wrapped in the other subclass of `Try`, [util.Success[T]](http://www.scala-lang.org/api/current/index.html#scala.util.Success),

Because of Scala's type safety, you must determine which result was returned and handle it appropriately. Consider the alternative that's popular in many languages, including Java. Here's a `curl2` declaration, that returns `File` instead:

```scala
def curl2(sourceURLString: String, targetDirectoryString: String): File = ...
```

Now, the return type tells you nothing about the possibility of failure. In Java, you might declare that one of several exceptions might be thrown; this isn't done in Scala. Instead, we use the return type to convey this information. So which is better? The problem with declaring `File` as the return type is that `curl2` has no choice but to return `null` if failure occurs but an exception isn't thrown. Unless we remember to check for `null`, we'll bet the infamous [NullPointerException](https://docs.oracle.com/javase/8/docs/api/java/lang/NullPointerException.html). So, using `Try[T]` prevents us from this loophole.

Using `Try` rather than simply throwing an exception, means that `curl` always returns "normally", so the caller maintains full control of the call stack and special exception-catching logic isn't required.

What are all the possible valid subclasses of `Try`? Really, there are only two, `Success` and `Failure`. It would be a mistake to allow a user to define other subtypes, like `MaybeCouldFailButWhoKnows`, because users of `Try` in pattern matching will always want to know that there are only two possibilities. Scala adds a keyword to enforce this logical behavior. `Try` is actually declared as follows:

```scala
sealed abstract class Try[+T] extends AnyRef
```

(`AnyRef` is the same as Java's `Object` supertype.) The `sealed` keyword says that _no_ subclasses of `Try` can be declared, _except_ in the same source file (which the library author wrote). Hence, users of `Try` can't declare their own subclasses, subverting the logical structure of this type hierarchy and other user's code that relies on this structure.

What if we have a situation where it makes no sense to involve an exception, but we want the same logically handling? This is where [Option[T]](http://www.scala-lang.org/api/current/index.html#scala.Option) comes in. 

`Option` is analogous to `Try`, it is a `sealed` abstract type with two possible subtypes:

* [Some[T]](http://www.scala-lang.org/api/current/index.html#scala.None): I have a an instance of `T` for your, inside the `Some[T]`.
* [None](http://www.scala-lang.org/api/current/index.html#scala.None): I don't have a value for your, sorry.

Note that a hash map is a great example where I either have a value for a given key or I don't. Therefore, for Scala's [Map[K,V]](http://www.scala-lang.org/api/current/index.html#scala.collection.Map) abstraction, where `K` is the key type and `V` is the value type, the `get` method has this signature:

```scala
def get(key: K): Option[V]
```

One again, you know from the type signature that you may or may not get a value instance for the input key, _and_ you **must** determine whether you got a `Some[V]` or a `None` as the result. Once again, we avoid return a `null` value.

So, how do we determine we which result we have? Let's look a few examples using `Option`. `Try` can be used similarly, with a few other ways available that we won't discuss.

In [23]:
options.foreach { o =>
    println(o.getOrElse("None"))
}

None
2
3
None
5


In [24]:
val options = Seq(None, Some(2), Some(3), None, Some(5))

options.foreach {
    case None    => println(None)
    case Some(i) => println(i)  // Note how we extract the enclosed value.
}

None
2
3
None
5


If you just want to ignore the `None` values, use a _for comprehension_:

In [29]:
for {
    option <- options  // loop through the options, assign each to "option"
    value  <- option   // extract the value from the Somes; if None, skip to the next option
} println(value)

2
3
5


Finally, you might wonder how `None` is declared. Consider this example:

In [190]:
val opts: Seq[Option[String]] = Seq(Some("hello"), None, Some("world!"))
opts.foreach(println)

Some(hello)
None
Some(world!)


This works, so it must mean that `None` is a valid subclass of `Option[String]`. That's actually true for all `Option[T]`. How can it be a valid subtype for _all_ of them? Here is how it's declared (omitting some details):

```scala
object None extends Option[Nothing] {...}

```

`None` carries no "state" information, because it doesn't wrap an instance like `Some[T]` does. Hence, we only need one instance for all uses, so it's declared as an object. Recall we mentioned above that the type system has a [Nothing](http://www.scala-lang.org/api/current/#scala.Nothing) type, which is a subtype of all other types. Without diving into too many details, if a variable is of type `Option[String]`, then you can use an `Option[Nothing]` for it (i.e., the latter is a subtype of the former). This is why `Nothing` is useful, for cases like `None`, so we can have one instance of it, but still obey the rules of Scala's object-oriented type system.

### Implicits
Scala has a powerful mechanism known as _implicits_ that is used in the Spark Scala API. Implicits are a big topic, so we'll focus just on the uses of it that are most important to understand.  

#### Type Conversions
We used `RDD` methods like `reduceByKey` above, but if you search for this method in the [RDD Scaladoc page](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), you won't find it. Instead it's defined in the [PairRDDFunctions](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions) type (along with all the other `*ByKey` methods). So, how can we use these methods as if they are defined for `RDD`??

When the Scala compiler sees code calling a method that doesn't exist on the type, it looks for an _implicit conversion_ in the current scope, which can transform the instance into another type (i.e., by wrapping it), where the other type provides the needed method. The full signature inferred for the method as it's used must match the definition in the wrapping class.

> **Note:** If you don't find a method in the [Spark Scaladocs](http://spark.apache.org/docs/latest/api/scala/index.html#package) for a type where you think it should be defined, look for related helper types with the method.

Here's a small Scala example of how this works:

In [None]:
// A sample class. Note it doesn't define a `toJSON` method:
case class Person(name: String, age: Int = 0)

In [166]:
// To scope them, define implicit conversions within an object
object implicits {

    // `implicit` keyword tells the compiler to consider this conversion.
    // It takes a `Person`, returning a new instance of `PersonToJSONString`,
    // then resolves the invocation of `toJSON`.
    implicit class PersonToJSONString(person: Person) {
        def toJSON: String = s"""{"name": ${person.name}, "age": ${person.age}}"""
    }
}

import implicits._        // Now it is visible in the current scope.

val p = Person("Dean Wampler", 39)

// Magic conversion to `PersonToJSONString`, then `toJSON` is called.
p.toJSON

Name: Compile Error
Message: <console>:58: error: reference to Person is ambiguous;
it is imported twice in the same scope by
import INSTANCE.Person
and import INSTANCE.Person
    implicit class PersonToJSONString(person: Person) {
                                              ^
StackTrace: 

For `RDDs`, the implicit conversions to `PairRDDFunctions` and other support types are handled for you. However, when you use Spark SQL and the [DataFrame](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame) API, you'll need to import some of these conversions yourself:

In [None]:
val sqlc = sqlContext
import sqlc.implicits._  

In [172]:
val wtc = iiDF.select($"word", $"total_count")
wtc.show

+-----------+-----------+
|       word|total_count|
+-----------+-----------+
|          a|       3350|
|    abandon|          6|
|      abate|          3|
|  abatement|          1|
|     abbess|          8|
|      abbey|          9|
|abbominable|          1|
|abbreviated|          1|
|       abed|          2|
|   abetting|          1|
|abhominable|          1|
|      abhor|          5|
|     abhors|          2|
|      abide|          5|
|     abides|          1|
|    ability|          2|
|     abject|          2|
|     abjure|          1|
|    abjured|          2|
|       able|          9|
+-----------+-----------+
only showing top 20 rows



The column-reference syntax `$"name"` is implemented using the same mechanism in the Scala library that implements interpolated strings, `s"$foo"`. The `import sqlc.implicits._` makes it available. 

Note we imported something from an _instance_, rather than a package or type, as allowed in Java. This can be a useful feature in Scala, but it's also fragile, If you try `import sqlContext.implicits._`, you'll get a compiler error that a "stable identifier" is required. It turns out that doing the value assignment, `val sqlc = sqlContext` first meets this requirement. This is unique to the notebook environment. You normally won't see this problem if you use the `spark-shell` that comes with a Spark distribution or you write a Spark program and compile it with the Scala compiler.

However, it would be better if Spark defined this `implicits` object on the `SQLContext` companion object instead of on instances of it!

For completeness, but unrelated to implicits, the `DataFrame` API lets you write SQL-like queries with a programmatic API. If you want to use built in functions like `min`, `max`, etc. on columns, you need the following `import` statement:

In [173]:
import org.apache.spark.sql.functions._

Now we can use `min`, `max`, `avg`, etc.

In [175]:
val mma = iiDF.select(min("total_count"), max("total_count"), avg("total_count"))
mma.show

+----------------+----------------+------------------+
|min(total_count)|max(total_count)|  avg(total_count)|
+----------------+----------------+------------------+
|               1|            5208|16.651743683350947|
+----------------+----------------+------------------+



One other use of implicits worth understanding is _implicit arguments_ to methods. You will encounter this mechanism used when you read the Spark Scaladocs, even though you might never realize you're actually using it in your code!

Recall I mentioned previously that you can define default values for method arguments. I just used it for the `age` argument for `Person`:

```scala
case class Person(name: String, age: Int = 0)
```

Sometimes we need something more sophisticated. For example, our library might have a group of methods that need a special argument passed to them that provides useful "context" information, but you don't want the user to be required to explicitly pass this argument every time. Here's an example. 

In [181]:
trait Add[T] {
    def add(t1: T, t2: T): T
}

// Nested implicits so they don't conflict with the previous object implicits.
object Adder {
    object implicits {
        implicit val intAdd = new Add[Int] { 
            def add(i1: Int, i2: Int): Int = i1+i2 
        }
        implicit val doubleAdd = new Add[Float] { 
            def add(d1: Double, d2: Double): Double = d1+d2 
        }
        implicit val stringAdd = new Add[String] { 
            def add(s1: String, s2: String): String = s1+s2 
        }
        // etc...
    }
}

import Adder.implicits._

def sum[T](ts: Seq[T])(implicit adder: Add[T]): T = {
    ts.reduceLeft((t1, t2) => adder.add(t1, t2))
}

In [182]:
sum(0 to 10)

Int = 55

In [184]:
sum(0.0 to 5.5 by 0.3)

Float = 51.299995

In [185]:
sum(Seq("one", "two", "three"))

String = onetwothree

In [187]:
// Will fail, because there's not Add[Char] in scope:
sum(Seq('a', 'b', 'c'))   // Characters

Name: Compile Error
Message: <console>:83: error: could not find implicit value for parameter adder: Add[Char]
              sum(Seq('a', 'b', 'c'))   // Characters
                 ^
StackTrace: 

So, the implicit values `intAdd`, `doubleAdd`, and `stringAdd`, were used by the Scala interpreter for the `adder` argument in the second _argument list_ for `sum`. Note that you have to use a second argument list and all arguments there must be implicit. 

We could have avoided using implicit arguments if we defined custom `sum` methods for every type. That would have been simpler in this trivial case, but for nontrivial methods, the duplication is worth avoiding. Another advantage of this mechanism is that the user can define her own implicit `Add[T]` instances for domain types (say for example, `Money`) and they would "just work".

The Scala collections API uses this mechanism to know how to construct a new collection of the same kind as the input collection when you use `map`, `flatMap`, `reduceLeft`, etc.

Spark uses this pattern for [Encoders](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Encoder) in the Spark SQL. Encoders are used to serialize values into the new, compact memory encoding introduced in the _Tungsten_ project (see for example, [here](https://spark-summit.org/2015/events/deep-dive-into-project-tungsten-bringing-spark-closer-to-bare-metal/)). Here's an example of creating a [Dataset](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset), where the `toDS` method is first "added" to a Scala [Seq](http://www.scala-lang.org/api/current/#scala.collection.Seq) through an implicit conversion (specifically [SQLImplicits.localSeqToDatasetHolder](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLImplicits), which is brought into scope by the `import sqlc.implicits._` statement earlier) and then `toDS` uses `Encoders` internally.

In [188]:
(0 to 10).toDS()

org.apache.spark.sql.Dataset[Int] = [value: int]

## Conclusions
I appreciate the effort you put into studying this notebook. I hope you enjoyed it as much as I enjoyed writing it.

Now you know the core elements of Scala that you need for using the Spark Scala API. I hope you can appreciate the power and elegance of Scala. I hope you will choose to use it for all of your data engineering tasks, not just for Spark. 

What about data science? There are many people who use Scala for data science in Spark, but today Python and R have much richer libraries for Mathematics and Machine Learning. That will change over time, but for now, you'll need to decide which language best fits your needs.

As you use Scala, there will be more things you'll want to understand that we haven't covered, including common idioms, conventions, and tools used in the Scala community. The references at the beginning of the notebook will give you the information you need.

Best wishes and thank you.

[Dean Wampler, Ph.D.](mailto:dean@deanwampler.com)<br/>
[@deanwampler](http://twitter.com/deanwampler)

## Appendix: Exercise Solutions
Let's discuss the solutions to exercises that weren't already solved earlier in the notebook.

### Return the Top Two Locations and Counts
We used the `DataFrame` API to write a SQL query that returned the top location and count. Adding the next one is straightforward. What do you observe is returned when there isn't a second location and count?

In [135]:
val topTwoLocations = sqlContext.sql("""
    SELECT word, total_count, 
        locations[0] AS first_location,  counts[0] AS first_count,
        locations[1] AS second_location, counts[1] AS second_count
    FROM inverted_index 
    WHERE word LIKE '%love%' OR word LIKE '%hate%'
""")

In [136]:
topTwoLocations.show(100)

+--------+-----------+--------------------+-----------+--------------------+------------+
|    word|total_count|      first_location|first_count|     second_location|second_count|
+--------+-----------+--------------------+-----------+--------------------+------------+
| beloved|         11|    tamingoftheshrew|          4|         asyoulikeit|           3|
|  cloven|          1|    loveslabourslost|          1|                null|        null|
|  cloves|          1|    loveslabourslost|          1|                null|        null|
|   glove|          3|    loveslabourslost|          2|        twelfthnight|           1|
|  glover|          1| merrywivesofwindsor|          1|                null|        null|
|  gloves|          5| merrywivesofwindsor|          3|         asyoulikeit|           1|
|    hate|         22|midsummersnightsd...|          9|         asyoulikeit|           6|
|   hated|          6|midsummersnightsd...|          4|         asyoulikeit|           2|
| hateful|

### Removing Stop Words
Recall you were asked to implement a `keep(word: String):Boolean` method that filters stop words.

First, let's implement `keep`. You can find lists of stop words on the web. One such list for English can be found [here]( * From http://norm.al/2009/04/14/list-of-english-stop-words/). It includes many words that you might not consider stop words. Nevertheless, I'll just use a smaller list here.

Note that I'll use a Scala [Set](http://www.scala-lang.org/api/current/index.html#scala.collection.immutable.Set) to hold the stop words. We want _O(1)_ look-up performance. We just want to know if the word is in the set or not.

I'll also add "", so I can remove the explicit test for it.

In [142]:
val stopWords = Set("", "a", "an", "and", "I", "he", "she", "it", "the")

// If the set contains the word, we return false - we don't want to keep it!
// Note we haven't converted words to lower case before keep is called, 
// so we have to do it here. This means we'll do the conversion TWICE for
// every word, here and in the original code passed to flatMap. Consider
// how you might refacting the code to eliminate this duplication.

def keep(word: String): Boolean = stopWords.contains(word.toLowerCase) == false  

Now here is our program modified to use `keep`.

In [144]:
val iiStopWords = sc.wholeTextFiles(shakespeare.toString).
    flatMap {
        case (location, contents) => 
            val words = contents.split("""\W+""").
                filter(word => keep(word))                  // <== here
            val fileName = location.split(pathSeparator).last
            words.map(word => ((word.toLowerCase, fileName), 1))
    }.
    reduceByKey((count1, count2) => count1 + count2).
    map { 
        case ((word, fileName), count) => (word, (fileName, count)) 
    }.
    groupByKey.
    sortByKey(ascending = true).
    map { 
      case (word, iterable) => 
        val vect = iterable.toVector.sortBy { 
            case (fileName, count) => (-count, fileName) 
        }
        val (locations, counts) = vect.unzip  
        val totalCount = counts.reduceLeft((n1,n2) => n1+n2)        
        (word, totalCount, locations, counts)
    }

In [145]:
iiStopWords.take(100).foreach(println)

(abandon,6,Vector(asyoulikeit, tamingoftheshrew, twelfthnight),Vector(4, 1, 1))
(abate,3,Vector(loveslabourslost, midsummersnightsdream, tamingoftheshrew),Vector(1, 1, 1))
(abatement,1,Vector(twelfthnight),Vector(1))
(abbess,8,Vector(comedyoferrors),Vector(8))
(abbey,9,Vector(comedyoferrors),Vector(9))
(abbominable,1,Vector(loveslabourslost),Vector(1))
(abbreviated,1,Vector(loveslabourslost),Vector(1))
(abed,2,Vector(asyoulikeit, twelfthnight),Vector(1, 1))
(abetting,1,Vector(comedyoferrors),Vector(1))
(abhominable,1,Vector(loveslabourslost),Vector(1))
(abhor,5,Vector(asyoulikeit, comedyoferrors, loveslabourslost, merrywivesofwindsor, muchadoaboutnothing),Vector(1, 1, 1, 1, 1))
(abhors,2,Vector(twelfthnight),Vector(2))
(abide,5,Vector(merrywivesofwindsor, midsummersnightsdream),Vector(3, 2))
(abides,1,Vector(muchadoaboutnothing),Vector(1))
(ability,2,Vector(muchadoaboutnothing, twelfthnight),Vector(1, 1))
(abject,2,Vector(comedyoferrors, tamingoftheshrew),Vector(1, 1))
(abjure,1,Vector

One last thing, we now have `filter(word => keep(word))`, but note how we used `println` in the previous cell to see results. We can do something similar with `filter` and instead write `filter(keep)`. 

What does this mean exactly? It tells the compiler "convert the _method_ `keep` to a _function_ and pass that to `filter`." This works because `keep` already does what `filter` wants, take a single string argument and return a boolean result.

Passing `keep` is actually different than passing `word => keep(word)`, which is an _anonymous_ function that _calls_ keep. We are using `keep` as the function itself, rather than constructing a function that uses `keep`.