In [10]:
case class Book(bookID: Int,
                title: String,
                authors: String,
                averageRating: Float,
                isbn: String,
                isbn13: String,
                languageCode:String,
                numPages: Int,
                ratingsCount: Int,
                textReviewsCount: Int)

defined class Book


# Not everything can run in the Jupyter Notebook

https://github.com/Valassis-Digital-Media/spylon-kernel/issues/40

Note that we cannot run all the example in the jupyter notebook due to ongoing ticket

## Reasons for a `Dataset`
* Operations require functional programming solutions
* Rigorous Type Safety

# `Dataset`

* `DataFrame` is a collection of `DataSet[Row]`
* Datasets are a strictly Java Virtual Machine (JVM) language feature
    * Work only with Scala and Java
* Can use an internal model representation structure 
  * For Scala that would be a `case class`
  * For Java that would be a Java Bean
  * There is slower performance with `DataSet` than a `DataFrame` due to conversion to custom Java objects
  * When using `case class`es it trivial to reuse them for both distributed and local workloads


## DataSets are DataFrames

* When reading in the data, `DataFrames` are `Dataset[Row]`
* This is done as a type alias `type DataFrame = Dataset[Row]`
* Therefore we can perform some functional programming like:
  * `map`
  * `flatMap`
  * `filter`
  * `foreach`

### Creating the `case class`

### Creating the Schema

In [11]:
import org.apache.spark.sql.types._
val bookSchema = new StructType(Array(
   new StructField("bookID", IntegerType, false),
   new StructField("title", StringType, false),
   new StructField("authors", StringType, false),
   new StructField("average_rating", FloatType, false),
   new StructField("isbn", StringType, false),
   new StructField("isbn13", StringType, false),
   new StructField("language_code", StringType, false),
   new StructField("# num_pages", IntegerType, false),
   new StructField("ratings_count", IntegerType, false),
   new StructField("text_reviews_count", IntegerType, false)))

import org.apache.spark.sql.types._
bookSchema: org.apache.spark.sql.types.StructType = StructType(StructField(bookID,IntegerType,false), StructField(title,StringType,false), StructField(authors,StringType,false), StructField(average_rating,FloatType,false), StructField(isbn,StringType,false), StructField(isbn13,StringType,false), StructField(language_code,StringType,false), StructField(# num_pages,IntegerType,false), StructField(ratings_count,IntegerType,false), StructField(text_reviews_count,IntegerType,false))


### Creating column names to match the `case class`

In [12]:
val columnNames = Seq("bookID", "title", "authors",
      "averageRating", "isbn",
      "isbn13", "languageCode", "numPages", "ratingsCount",
      "textReviewsCount")

columnNames: Seq[String] = List(bookID, title, authors, averageRating, isbn, isbn13, languageCode, numPages, ratingsCount, textReviewsCount)


### Read the file

In [13]:
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset

val dataset = spark
      .read
      .option("header", "true")
      .option("encoding", "UTF-8")
      .schema(bookSchema)
      .csv("../data/books.csv")
      .toDF(columnNames:_*) //Rename Columns
      .na.drop()            //Drop NA Values
dataset.show(10)

+------+--------------------+--------------------+-------------+----------+-------------+------------+--------+------------+----------------+
|bookID|               title|             authors|averageRating|      isbn|       isbn13|languageCode|numPages|ratingsCount|textReviewsCount|
+------+--------------------+--------------------+-------------+----------+-------------+------------+--------+------------+----------------+
|     1|Harry Potter and ...|J.K. Rowling-Mary...|         4.56|0439785960|9780439785969|         eng|     652|     1944099|           26249|
|     2|Harry Potter and ...|J.K. Rowling-Mary...|         4.49|0439358078|9780439358071|         eng|     870|     1996446|           27613|
|     3|Harry Potter and ...|J.K. Rowling-Mary...|         4.47|0439554934|9780439554930|         eng|     320|     5629932|           70390|
|     4|Harry Potter and ...|        J.K. Rowling|         4.41|0439554896|9780439554893|         eng|     352|        6267|             272|
|     

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
dataset: org.apache.spark.sql.DataFrame = [bookID: int, title: string ... 8 more fields]


In [14]:
dataset.filter(_.getAs[String]("title").contains("Fahrenheit")).show(10)

+------+--------------------+--------------------+-------------+----------+-------------+------------+--------+------------+----------------+
|bookID|               title|             authors|averageRating|      isbn|       isbn13|languageCode|numPages|ratingsCount|textReviewsCount|
+------+--------------------+--------------------+-------------+----------+-------------+------------+--------+------------+----------------+
|  4381|      Fahrenheit 451|Ray Bradbury-Alfr...|         3.98|0307347974|9780307347978|         spa|     175|      690801|           14489|
|  4382|      Fahrenheit 451|Ray Bradbury-Chri...|         3.98|078617627X|9780786176274|         eng|       5|         471|             142|
|  7656|      Fahrenheit 451|        Ray Bradbury|         3.98|8445074873|9788445074879|         eng|     186|        5733|             613|
| 32971|      Fahrenheit 451|        Ray Bradbury|         3.98|0965020592|9780965020596|         eng|     190|         185|              26|
| 3297

In [15]:
/* The imports are required for use */
import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset

val dataset: Dataset[Book] = spark
      .read
      .option("header", "true")
      .option("encoding", "UTF-8")
      .schema(bookSchema)
      .csv("../data/books.csv")
      .toDF(columnNames:_*) //Rename Columns
      .na.drop()            //Drop NA Values
      .as[Book]             //Conversion to Case Class
dataset.show(10)

+------+--------------------+--------------------+-------------+----------+-------------+------------+--------+------------+----------------+
|bookID|               title|             authors|averageRating|      isbn|       isbn13|languageCode|numPages|ratingsCount|textReviewsCount|
+------+--------------------+--------------------+-------------+----------+-------------+------------+--------+------------+----------------+
|     1|Harry Potter and ...|J.K. Rowling-Mary...|         4.56|0439785960|9780439785969|         eng|     652|     1944099|           26249|
|     2|Harry Potter and ...|J.K. Rowling-Mary...|         4.49|0439358078|9780439358071|         eng|     870|     1996446|           27613|
|     3|Harry Potter and ...|J.K. Rowling-Mary...|         4.47|0439554934|9780439554930|         eng|     320|     5629932|           70390|
|     4|Harry Potter and ...|        J.K. Rowling|         4.41|0439554896|9780439554893|         eng|     352|        6267|             272|
|     

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
dataset: org.apache.spark.sql.Dataset[Book] = [bookID: int, title: string ... 8 more fields]


## Running some rudimentary functional programming 

* While it is better to run with our own custom types, we can also perform functional programming using `Row`

In [16]:
dataset.map(_.authors).show(10)

org.apache.spark.SparkException:  Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): java.lang.ClassCastException: $iw cannot be cast to $iw

## Setting to a `case class`

* We can also create a `case class` of the representation of the `Row`
* We can use the schema we created as a reference for our new `case class`

```
new StructField("bookID", IntegerType, false),
new StructField("title", StringType, false),
new StructField("authors", StringType, false),
new StructField("average_rating", FloatType, false),
new StructField("isbn", StringType, false),
new StructField("isbn13", StringType, false),
new StructField("language_code", StringType, false),
new StructField("num_pages", IntegerType, false),
new StructField("ratings_count", IntegerType, false),
new StructField("text_reviews_count", IntegerType, false)))
```

In [58]:
case class Book (bookID:Long, 
                      title:String,
                      authors:String,
                      averageRating:Float,
                      isbn:String, 
                      isbn13:String,
                      languageCode:String,
                      numPages:Int, 
                      ratingsCount:Int,
                      textReviewsCount:Int)

defined class Book


### Renaming the columns so we can fit it to Scala convention

In [87]:
val columnNames = Seq("bookID", "title", "authors", "averageRating", "isbn",
                                  "isbn13", "languageCode", "numPages", "ratingsCount",
                                  "textReviewsCount")

columnNames: Seq[String] = List(bookID, title, authors, averageRating, isbn, isbn13, languageCode, numPages, ratingsCount, textReviewsCount)


### Setting it our type using `as[T]` 

* Note that the explicit declaration type `Dataset[BookEntry]` is not required
* We are showing it for understanding

In [88]:
val booksDS:Dataset[Book] = spark.read.format("csv")
                                      .schema(bookSchema)
                                      .option("header", "true")
                                      .option("encoding", "UTF-8")
                                      .csv("../data/books.csv")
                                      .toDF(columnNames:_*) //Rename Columns
                                      .na.drop()            //Drop NA Values
                                      .as[Book]             //Conversion to Case Class
booksDS.show(10)

+------+--------------------+--------------------+-------------+----------+-------------+------------+--------+------------+----------------+
|bookID|               title|             authors|averageRating|      isbn|       isbn13|languageCode|numPages|ratingsCount|textReviewsCount|
+------+--------------------+--------------------+-------------+----------+-------------+------------+--------+------------+----------------+
|     1|Harry Potter and ...|J.K. Rowling-Mary...|         4.56|0439785960|9780439785969|         eng|     652|     1944099|           26249|
|     2|Harry Potter and ...|J.K. Rowling-Mary...|         4.49|0439358078|9780439358071|         eng|     870|     1996446|           27613|
|     3|Harry Potter and ...|J.K. Rowling-Mary...|         4.47|0439554934|9780439554930|         eng|     320|     5629932|           70390|
|     4|Harry Potter and ...|        J.K. Rowling|         4.41|0439554896|9780439554893|         eng|     352|        6267|             272|
|     

booksDS: org.apache.spark.sql.Dataset[Book] = [bookID: int, title: string ... 8 more fields]


In [89]:
val filtered = booksDS.filter{x => x.title.contains("Fahrenheit")}
filtered.show()

org.apache.spark.SparkException:  Job aborted due to stage failure: Task 0 in stage 33.0 failed 1 times, most recent failure: Lost task 0.0 in stage 33.0 (TID 33, localhost, executor driver): java.lang.ClassCastException: $iw cannot be cast to $iw

## Creating from a Seq

## Taking Some of the Dataset

## Processing Functions

## Integrating DataSets with DataFrames

## Lab: Functional Programming with DataSets