# Lab 2 - Warmup

## 1. Spark
### Basic RDD Operations
The following steps demonstrate how to create an RDD from a file and apply transofrmations on it. Let's first creat an RDD, named `pagecounts`, from the input files located in `data/pagecounts`. The files entries will look something like this:
```
20090507-040000 zh favicon.ico 67 62955
```

In [4]:
val pagecounts = sc.textFile("data/pagecounts")
print(pagecounts)

data/pagecounts MapPartitionsRDD[5] at textFile at <console>:29

pagecounts = data/pagecounts MapPartitionsRDD[5] at textFile at <console>:29


data/pagecounts MapPartitionsRDD[5] at textFile at <console>:29

Use the `take()` operation of an RDD to get the first K records, e.g., K = 10.

In [5]:
pagecounts.take(10)

[20090505-000000 aa Main_Page 2 9980, 20090505-000000 ab %D0%90%D0%B8%D0%BD%D1%82%D0%B5%D1%80%D0%BD%D0%B5%D1%82 1 465, 20090505-000000 ab %D0%98%D1%85%D0%B0%D0%B4%D0%BE%D1%83_%D0%B0%D0%B4%D0%B0%D2%9F%D1%8C%D0%B0 1 16086, 20090505-000000 af.b Tuisblad 1 36236, 20090505-000000 af.d Tuisblad 4 189738, 20090505-000000 af.q Tuisblad 2 56143, 20090505-000000 af Afrika 1 46833, 20090505-000000 af Afrikaans 2 53577, 20090505-000000 af Australi%C3%AB 1 132432, 20090505-000000 af Barack_Obama 1 23368]

An alternative way to print the fields is to travers the array and print each record on its own line.

In [6]:
for (x <- pagecounts.take(10)) {
   println(x)
}

20090505-000000 aa Main_Page 2 9980
20090505-000000 ab %D0%90%D0%B8%D0%BD%D1%82%D0%B5%D1%80%D0%BD%D0%B5%D1%82 1 465
20090505-000000 ab %D0%98%D1%85%D0%B0%D0%B4%D0%BE%D1%83_%D0%B0%D0%B4%D0%B0%D2%9F%D1%8C%D0%B0 1 16086
20090505-000000 af.b Tuisblad 1 36236
20090505-000000 af.d Tuisblad 4 189738
20090505-000000 af.q Tuisblad 2 56143
20090505-000000 af Afrika 1 46833
20090505-000000 af Afrikaans 2 53577
20090505-000000 af Australi%C3%AB 1 132432
20090505-000000 af Barack_Obama 1 23368


Use the `count()` function to see how many records in total are in this data set (this command will take a while, so read ahead while it is running). The pagecounts folder consists of two files, each with around 700K lines, so in total we have around 1400K lines.

In [8]:
pagecounts.count()

1398882

The second field of each record in the data set is the "project code" and contains information about the language of the pages. For example, the project code "en" indicates an English page. Let's derive an RDD, named `enPages`, containing only English pages from pagecounts. This can be done by applying a `filter()` function to `pagecounts`. For each record, we can split it by the field delimiter (i.e., a space) and get the second field, and then compare it with the string "en". To avoid reading from disks each time we perform any operations on the RDD, we can use `cache()` to cache the RDD into memory. 

In [9]:
val enPages = pagecounts.filter(x => x.split(" ")(1) == "en").cache()
enPages.count()

enPages = MapPartitionsRDD[6] at filter at <console>:29


970545

Now, let's generate a histogram of total page views on Wikipedia English pages for the date range represented in our dataset (May 5 to May 7, 2009). The high level idea of what we'll be doing is as follows. First, we generate a key value pair for each line; the key is the date (the first eight characters of the first field), and the value is the number of pageviews for that date (the fourth field).

In [10]:
val enTuples = enPages.map(x => x.split(" "))
val enKeyValuePairs = enTuples.map(x => (x(0).substring(0, 8), x(3).toInt))

enTuples = MapPartitionsRDD[7] at map at <console>:31
enKeyValuePairs = MapPartitionsRDD[8] at map at <console>:32


MapPartitionsRDD[8] at map at <console>:32

Next, we shuffle the data and group all values of the same key together. Finally we sum up the values for each key. There is a convenient method called `reduceByKey` in Spark for exactly this pattern. Note that the second argument to `reduceByKey` determines the number of reducers to use. By default, Spark assumes that the reduce function is commutative and associative and applies combiners on the mapper side. Since we know there is a very limited number of keys in this case (because there are only 3 unique dates in our data set), let’s use only one reducer.

In [11]:
enKeyValuePairs.reduceByKey((x, y) => x + y, 1).collect()

[(20090507,6175726), (20090505,7076855)]

### Key-Value RDD Operations
The following steps demonstrate how to develop a simple word count application in Spark. For this part, we will use the file located at `data/story/hamlet.txt`. To convert a text file into an RDD, we use the `SparkContext.textFile()` method. We also apply the recently defined `removePunctuation()` function using a `map()` transformation to strip out the punctuation and change all text to lowercase. Since the file is large we use `take(15)`, so that we only print 15 lines.


In [12]:
import scala.util.matching

def removePunctuation(text: String): String = {
    text.replaceAll("""\p{Punct}|^\s+|\s+$""", "").toLowerCase
} 

removePunctuation: (text: String)String


In [13]:
val hamletRDD = sc.textFile("data/story/hamlet.txt").map(removePunctuation)
hamletRDD.zipWithIndex().take(15).map(x => (x._2 + 1) + ": " + x._1).foreach(println)

1: 
2: 1604
3: 
4: 
5: the tragedy of hamlet prince of denmark
6: 
7: 
8: by william shakespeare
9: 
10: 
11: 
12: dramatis personae
13: 
14: claudius king of denmark
15: marcellus officer


hamletRDD = MapPartitionsRDD[12] at map at <console>:30


MapPartitionsRDD[12] at map at <console>:30

To do the word count, first we need to o split each line by its spaces. We can apply the string `split()` transformation to split each element of the RDD by its spaces.

In [14]:
val hamletWordsRDD = hamletRDD.flatMap(_.split(" "))
print(hamletWordsRDD.count())

33013

hamletWordsRDD = MapPartitionsRDD[14] at flatMap at <console>:32


MapPartitionsRDD[14] at flatMap at <console>:32

The next step is to filter out the empty elements. Remove all entries where the word is `''`.

In [15]:
val wordsRDD = hamletWordsRDD.filter(_ != "")
print(wordsRDD.count())

31953

wordsRDD = MapPartitionsRDD[15] at filter at <console>:34


MapPartitionsRDD[15] at filter at <console>:34

We now have an RDD that is only words, so let's produce a list of word counts.

In [16]:
val wordsCount = wordsRDD.map((_, 1)).reduceByKey(_ + _)
wordsCount.top(5).foreach(println)

(zone,1)
(youth,16)
(yourselves,1)
(yourself,15)
(yours,6)


wordsCount = ShuffledRDD[17] at reduceByKey at <console>:36


ShuffledRDD[17] at reduceByKey at <console>:36

Then, we can take the top 15 words by using the `takeOrdered()` action; however, since the elements of the RDD are pairs, we need a custom sort function that sorts using the value part of the pair. 

In [17]:
val top15WordsAndCounts = wordsCount.map(x => (x._2, x._1)).sortByKey().top(15).map(x => (x._2, x._1))
top15WordsAndCounts.map(x => x._1 + ": " + x._2).foreach(println)

the: 1090
and: 964
to: 742
of: 675
i: 577
a: 558
you: 554
my: 520
in: 434
it: 419
that: 389
ham: 358
is: 346
not: 315
his: 304


top15WordsAndCounts = Array((the,1090), (and,964), (to,742), (of,675), (i,577), (a,558), (you,554), (my,520), (in,434), (it,419), (that,389), (ham,358), (is,346), (not,315), (his,304))


[(the,1090), (and,964), (to,742), (of,675), (i,577), (a,558), (you,554), (my,520), (in,434), (it,419), (that,389), (ham,358), (is,346), (not,315), (his,304)]

## 2. Spark SQL
The entry point into all functionality in Spark is the `SparkSession` class. To create a basic `SparkSession`, just use `SparkSession.builder()`.

In [16]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("Spark SQL ID2221").master("local[*]").getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

spark = org.apache.spark.sql.SparkSession@552ed028


### DataFrame
Now, let's creates a `DataFrame` based on the content of a JSON file, located at `data/people/people.json`.

In [19]:
val df = spark.read.json("data/people/people.json")
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



df = [age: bigint, name: string]


[age: bigint, name: string]

Let's try more functions on `DataFrame`.

In [20]:
// Print the schema in a tree format
df.printSchema()

// Select only the "name" column
df.select("name").show()

// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()

// Select people older than 21
df.filter(df("age") > 21).show()

// Count people by age
df.groupBy("age").count().show()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



The `sql` function on a `SparkSession` enables applications to run SQL queries programmatically and returns the result as a `DataFrame`.

In [21]:
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



sqlDF = [age: bigint, name: string]


[age: bigint, name: string]

Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database `global_temp`, and we must use the qualified name to refer it, e.g. `SELECT * FROM global_temp.people`.

In [22]:
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### Dataset
Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use a specialized Encoder to serialize the objects for processing or transmitting over the network. While both encoders and standard serialization are responsible for turning an object into bytes, encoders are code generated dynamically and use a format that allows Spark to perform many operations like filtering, sorting and hashing without deserializing the bytes back into an object.

In [24]:
import spark.implicits._
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val peopleDS = spark.read.json("data/people/people.json").as[Person]
peopleDS.show()

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect()

+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



defined class Person
caseClassDS = [name: string, age: bigint]
peopleDS = [age: bigint, name: string]
primitiveDS = [value: int]


[2, 3, 4]

### Interoperating with RDDs
Spark SQL supports two different methods for converting existing RDDs into Datasets:
1. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection based approach leads to more concise code and works well when you already know the schema while writing your Spark application.
2. The second method for creating Datasets is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct Datasets when the columns and their types are not known until runtime.

Let's start with the first method. The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame. The case class defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as `Seqs` or `Arrays`. This RDD can be implicitly converted to a DataFrame and then be registered as a table. Tables can be used in subsequent SQL statements.

In [15]:
// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an instance of SparkContext
val sc = spark.sparkContext

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleRDD = sc.textFile("data/people/people.txt")
val peopleDF = peopleRDD.map(_.split(",")).map(x => Person(x(0), x(1).trim.toInt)).toDF()

// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()

Name: Unknown Error
Message: <console>:42: error: stable identifier required, but this.$line7$read.spark.implicits found.
       import spark.implicits._
                    ^

StackTrace: 

In the second method we can specify the schema programmatically. When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.
1. Create an RDD of Rows from the original RDD.
2. Create the schema represented by a `StructType` matching the structure of Rows in the RDD created in Step 1.
3. Apply the schema to the RDD of `Row`s via `createDataFrame` method provided by `SparkSession`.

In [13]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType}

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("data/people/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ").map(x => StructField(x, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD.map(_.split(",")).map(x => Row(x(0), x(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()

+-------------+
|        value|
+-------------+
|Name: Michael|
|   Name: Andy|
| Name: Justin|
+-------------+



peopleRDD = data/people/people.txt MapPartitionsRDD[40] at textFile at <console>:32
schemaString = name age
fields = Array(StructField(name,StringType,true), StructField(age,StringType,true))
schema = StructType(StructField(name,StringType,true), StructField(age,StringType,true))
rowRDD = MapPartitionsRDD[42] at map at <console>:42
peopleDF = [name: string, age: string]
results = [name: string]


lastException: Throwable = null


[name: string]

### Data Sources
Spark SQL supports operating on a variety of data sources through the `DataFrame` interface. A `DataFrame` can be operated on as normal RDDs and can also be registered as a temporary table. Registering a DataFrame as a table allows you to run SQL queries over its data. In the simplest form, the default data source (*parquet* unless otherwise configured by `spark.sql.sources.default`) will be used for all operations. Save operations also can optionally take a `SaveMode` that specifies how to handle existing data if present. It can take the values: `error` (default), `append`, `overwrite`, `ignore`.

In [17]:
// Load data from a parquet file
val pdf = spark.read.load("data/people/people.parquet")
pdf.select("name", "favorite_color").write.mode("ignore").save("namesAndFavColors.parquet")

pdf = [name: string, favorite_color: string ... 1 more field]


[name: string, favorite_color: string ... 1 more field]

In [18]:
// Manually specify the data source type, e.g., json, parquet, jdbc.
val jdf =  spark.read.format("json").load("data/people/people.json")
jdf.select("name", "age").write.format("parquet").mode("overwrite").save("namesAndAges.parquet")

Name: org.apache.spark.SparkException
Message: Job aborted.
StackTrace:   at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.

*Parquet* is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. Let's load data programmatically.

In [19]:
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("data/people/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()

lastException = null


Name: org.apache.spark.sql.AnalysisException
Message: path hdfs://127.0.0.1:9000/user/hrabo/people.parquet already exists.;
StackTrace:   at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.ex

Spark SQL can automatically infer the schema of a JSON dataset and load it as a `Dataset[Row]`. This conversion can be done using `SparkSession.read.json()` on either a `Dataset[String]`, or a JSON file. Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. For a regular multi-line JSON file, set the `multiLine` option to `true`.

In [20]:
// supported by importing this when creating a Dataset.
import spark.implicits._

// Read a JSON dataset
val peopleDF = spark.read.json("data/people/people.json")

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()

// Alternatively, a DataFrame can be created for a JSON dataset represented by a Dataset[String] storing 
// one JSON object per string
val otherPeopleDataset = spark.createDataset("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+------+
|  name|
+------+
|Justin|
+------+

+----------------+----+
|         address|name|
+----------------+----+
|[Columbus, Ohio]| Yin|
+----------------+----+



peopleDF = [age: bigint, name: string]
teenagerNamesDF = [name: string]
otherPeopleDataset = [value: string]
otherPeople = [address: struct<city: string, state: string>, name: string]


lastException: Throwable = null


[address: struct<city: string, state: string>, name: string]