# Spark tutorial

From https://github.com/databricks/Spark-The-Definitive-Guide

Another source containing useful examples: 
https://github.com/hopelessoptimism/data-scientists-guide-apache-spark

Spark has several core abstractions: Datasets, DataFrames, SQL Tables, and Resilient Distributed Datasets (RDDs). These abstractions all represent distributed collections of data however they have di erent interfaces for working with that data. The easiest and most e icient are DataFrames, which are available in all languages.

## Transformations

There are two type of transformation in dataframes:

- Narrow; perform an operation called pipelining on narrow dependencies,
  this means that if we specify multiple filters on DataFrames
  they’ll all be performed in-memory
- Wide; will write the results to disk

## Lazy evaluation

Lazy evaluation means that Spark will wait until the very last moment to
execute the graph of computation instructions via a *plan* of transformations.
This provides immense benefits to the end user because Spark can optimize
the entire data flow from end to end.

## Actions

Transformations allow us to build up our logical transformation plan.
To trigger the computation, we run an action.

## Spark UI

Users can monitor the progress of their job through the Spark UI.
The Spark UI is available on port 4040 of the driver node.


In [1]:
// Dataframe
val myRange = spark.range(1000).toDF("number")

Intitializing Scala interpreter ...

Spark Web UI available at http://48193172d7c6:4040
SparkContext available as 'sc' (version = 2.4.0, master = local[*], app id = local-1561783532365)
SparkSession available as 'spark'


myRange: org.apache.spark.sql.DataFrame = [number: bigint]


In [2]:
// "where" transformation
val divisBy2 = myRange.where("number % 2 = 0")

divisBy2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: bigint]


In [3]:
// "count" action
divisBy2.count()

res0: Long = 500


In [79]:
val flightData2015 = spark
   .read
   .option("inferSchema", "true")
   .option("header", "true")
   .csv("spark-data/2015-summary.csv")

flightData2015: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


In [6]:
// Get the data frame schema
flightData2015.schema

res2: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,IntegerType,true))


In [5]:
// Take the first rows
flightData2015.take(3)

res1: Array[org.apache.spark.sql.Row] = Array([United States,Romania,15], [United States,Croatia,1], [United States,Ireland,344])


## Explain Physical plan

Explain plans are a bit arcane, but with a bit of practice it becomes second nature. Explain plans can be read from top to bottom, the top being the end result and the bottom being the source(s) of data.

In [7]:
flightData2015.sort("count").explain()

== Physical Plan ==
*(2) Sort [count#22 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#22 ASC NULLS FIRST, 200)
   +- *(1) FileScan csv [DEST_COUNTRY_NAME#20,ORIGIN_COUNTRY_NAME#21,count#22] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/filippos/Documents/notebooks/fsquillace-notebooks/tutorials/2015-sum..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>


## Change the number of partitions

By default, when we perform a shuffle Spark will output two hundred shuffle partitions. We will set this value to five in order to reduce the number of the output partitions from the shuffle from two hundred to five.

In [8]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

flightData2015.sort("count").take(2)

res4: Array[org.apache.spark.sql.Row] = Array([United States,Singapore,1], [Moldova,United States,1])


## More details about partitions

More info here:

- https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html
- https://spark.apache.org/docs/latest/tuning.html

Spark splits data into partitions and executes computations on the partitions in parallel.

In [2]:
val x = (1 to 10).toList
val numbersDf = x.toDF("number")

x: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
numbersDf: org.apache.spark.sql.DataFrame = [number: int]


In [3]:
numbersDf.rdd.partitions.size 

res0: Int = 10


In [6]:
// Each partition is a separate CSV file when you write a DataFrame to disc.

numbersDf.write.csv("./numbers")

### Coalescence

The coalesce method reduces the number of partitions in a DataFrame. 
It does this by moving data between nodes from some partitions to existing partitions.


**NOTE**:

- Coalescence cannot increase the number of partitions.
- Coalesce does not equally distribute the data among partitions (repartition does it).

In [8]:
val numbersDf2 = numbersDf.coalesce(2)

numbersDf2.rdd.partitions.size

numbersDf2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: int]
res4: Int = 2


In [9]:
// The coalesce does not increase the number of partitions!
val numbersDf3 = numbersDf.coalesce(numbersDf.rdd.partitions.size + 5)

numbersDf3.rdd.partitions.size

numbersDf3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: int]
res5: Int = 10


### Repartition

The repartition method can be used to either increase or decrease the number of partitions in a DataFrame.

**NOTE**: Repartition does a full data shuffle and equally distribute the data among partitions. It does not minimize the number of data movement. 

In [None]:
val numbersDf4 = numbersDf.repartition(numbersDf.rdd.partitions.size + 5)

numbersDf3.rdd.partitions.size

### Index DataFrame by a given column

When partitioning by a column, Spark will create a minimum of 200 partitions by default.

Partitioning by a column is similar to indexing a column in a relational database. Each partition group by the column's values.


In [10]:
val people = List(
  (10, "blue"),
  (13, "red"),
  (15, "blue"),
  (99, "red"),
  (67, "blue")
)
val peopleDf = people.toDF("age", "color")


people: List[(Int, String)] = List((10,blue), (13,red), (15,blue), (99,red), (67,blue))
peopleDf: org.apache.spark.sql.DataFrame = [age: int, color: string]


In [14]:
val colorDf = peopleDf.repartition($"color")

colorDf.rdd.partitions.size

colorDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [age: int, color: string]
res9: Int = 200


### What number of partition to choose?



```
```

#### I/O bound task

If you’re writing the data out to a file system, you can choose a partition size that will create reasonable sized files (100MB). Spark will optimize the number of partitions based on the number of clusters when the data is read.



#### CPU bound task
You can determine the number of partitions by multiplying the number of CPUs in the cluster by 2, 3, or 4 (see more [here](https://stackoverflow.com/questions/35800795/number-of-partitions-in-rdd-and-performance-in-spark/35804407#35804407) and [here](http://spark.apache.org/docs/latest/tuning.html#level-of-parallelism)).

In [17]:

val numberCPUsPerCluster = sc.defaultParallelism
val numberPartitions = numberCPUsPerCluster * 4

numberCPUsPerCluster: Int = 16
numberPartitions: Int = 64


## Spark SQL

Spark SQL allows you as a user to register any DataFrame as a table or view (a temporary table) and query it using pure SQL.

In [10]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [13]:
val sqlWay = spark.sql("""
    SELECT DEST_COUNTRY_NAME, count(1)
    FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
""")

val dataFrameWay = flightData2015
   .groupBy("DEST_COUNTRY_NAME")
   .count()

sqlWay.explain
dataFrameWay.explain

== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#20, 5)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#20] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/filippos/Documents/notebooks/fsquillace-notebooks/tutorials/2015-sum..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#20, 5)
   +- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#20], functions=[partial_count(1)])
      +- *(1) FileScan csv [DEST_COUNTRY_NAME#20] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/filippos/Documents/notebooks/fsquillace-notebooks/tutorials/2015-sum..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAM

sqlWay: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, count(1): bigint]
dataFrameWay: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, count: bigint]


In [19]:
// In SQL:

spark.sql("SELECT max(count) FROM flight_data_2015").take(1)

res11: Array[org.apache.spark.sql.Row] = Array([370002])


In [18]:
// In Scala

import org.apache.spark.sql.functions

flightData2015.select(max("count")).take(1)

import org.apache.spark.sql.functions
res10: Array[org.apache.spark.sql.Row] = Array([370002])


In [43]:
spark.sql("""
    SELECT DEST_COUNTRY_NAME, sum(count) AS destination_total
    FROM flight_data_2015
    GROUP BY DEST_COUNTRY_NAME
    ORDER BY destination_total DESC
    LIMIT 5
""").show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



In [42]:
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.functions.desc

flightData2015
    .groupBy("DEST_COUNTRY_NAME")
    .sum("count")
    .withColumnRenamed("sum(count)", "destination_total")
    .orderBy(desc("destination_total"))
    .limit(5)
    .show()

+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
|    United States|           411352|
|           Canada|             8399|
|           Mexico|             7140|
|   United Kingdom|             2025|
|            Japan|             1548|
+-----------------+-----------------+



import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.functions.desc


## Datasets: Type-safe Structured APIs

In [78]:
// A Scala case class (similar to a struct) that will automatically
// be mapped into a structured data table in Spark
case class Flight(DEST_COUNTRY_NAME: String, ORIGIN_COUNTRY_NAME: String, count: BigInt)

// The following import is required:
// https://stackoverflow.com/questions/38664972/why-is-unable-to-find-encoder-for-type-stored-in-a-dataset-when-creating-a-dat
import spark.implicits._
val flightsDF = spark.read.parquet("spark-data/2010-summary.parquet/")
val flights = flightsDF.as[Flight]

defined class Flight
import spark.implicits._
flightsDF: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
flights: org.apache.spark.sql.Dataset[Flight] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]


In [77]:
flights
  // This does not work: https://stackoverflow.com/a/42780897
//   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
  .filter(col("ORIGIN_COUNTRY_NAME") =!= "Canada")
  .show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



## Structured streaming

In [1]:
val staticDataFrame = spark.read.format("csv")
   .option("header", "true")
   .option("inferSchema", "true")
   .load("spark-data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema

Intitializing Scala interpreter ...

Spark Web UI available at http://48193172d7c6:4040
SparkContext available as 'sc' (version = 2.4.0, master = local[*], app id = local-1561884043982)
SparkSession available as 'spark'


staticDataFrame: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
staticSchema: org.apache.spark.sql.types.StructType = StructType(StructField(InvoiceNo,StringType,true), StructField(StockCode,StringType,true), StructField(Description,StringType,true), StructField(Quantity,IntegerType,true), StructField(InvoiceDate,TimestampType,true), StructField(UnitPrice,DoubleType,true), StructField(CustomerID,DoubleType,true), StructField(Country,StringType,true))


In [2]:
staticDataFrame.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



### Simple query based on time window interval

In this example we’ll take a look at the largest sale hours where a given customer (identified by CustomerId) makes a large purchase. For example, let’s add a total cost column and see on what days a customer spent the most.

In [3]:
import org.apache.spark.sql.functions.{window, column, desc, col}
staticDataFrame
   .selectExpr(
   "CustomerId",
   "(UnitPrice * Quantity) as total_cost",
   "InvoiceDate")
   .groupBy(
     col("CustomerId"), window(col("InvoiceDate"), "1 day"))
   .sum("total_cost")
   .show(5, false)


+----------+------------------------------------------+-----------------+
|CustomerId|window                                    |sum(total_cost)  |
+----------+------------------------------------------+-----------------+
|16057.0   |[2011-12-05 00:00:00, 2011-12-06 00:00:00]|-37.6            |
|14126.0   |[2011-11-29 00:00:00, 2011-11-30 00:00:00]|643.6300000000001|
|13500.0   |[2011-11-16 00:00:00, 2011-11-17 00:00:00]|497.9700000000001|
|17160.0   |[2011-11-08 00:00:00, 2011-11-09 00:00:00]|516.8499999999999|
|15608.0   |[2011-11-11 00:00:00, 2011-11-12 00:00:00]|122.4            |
+----------+------------------------------------------+-----------------+
only showing top 5 rows



import org.apache.spark.sql.functions.{window, column, desc, col}


### Streaming

In [89]:
val streamingDataFrame = spark.readStream
    .schema(staticSchema)
    .option("maxFilesPerTrigger", 1) // How many files to read at once 
    .format("csv")
    .option("header", "true")
    .load("spark-data/retail-data/by-day/*.csv")

streamingDataFrame: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]


In [90]:
// Now we can see the DataFrame is streaming.
streamingDataFrame.isStreaming

res69: Boolean = true


In [91]:
val purchaseByCustomerPerHour = streamingDataFrame
   .selectExpr(
       "CustomerId",
       "(UnitPrice * Quantity) as total_cost",
       "InvoiceDate")
   .groupBy(
     col("CustomerId"), window(col("InvoiceDate"), "1 day"))
   .sum("total_cost")


purchaseByCustomerPerHour: org.apache.spark.sql.DataFrame = [CustomerId: double, window: struct<start: timestamp, end: timestamp> ... 1 more field]


### Note

Before kicking off the stream, we will set a small optimization that will allow this to run better on a single machine. This simply limits the number of output partitions a er a shuffle

In [92]:
spark.conf.set("spark.sql.shu le.partitions", "5")

In [93]:
purchaseByCustomerPerHour.writeStream
   .format("memory") // memory = store in-memory table
   .queryName("customer_purchases") // counts = name of the in-memory table
   .outputMode("complete") // complete = all the counts should be in the table
   .start()

res71: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@64e23ca4


In [None]:
// Another option you can use is to just simply write the results out to the console.
// This might take a while:
// purchaseByCustomerPerHour.writeStream
//   .format("console")
//   .queryName("customer_purchases_2")
//   .outputMode("complete")
// .start()

Once we start the stream, we can run queries against the stream to debug what our result will look like if we were to write this out to a production sink.

In [95]:
spark.sql("""
   SELECT *
   FROM customer_purchases
   ORDER BY `sum(total_cost)` DESC
   """)
   .show(5, false)

+----------+------------------------------------------+------------------+
|CustomerId|window                                    |sum(total_cost)   |
+----------+------------------------------------------+------------------+
|null      |[2011-03-29 00:00:00, 2011-03-30 00:00:00]|33521.39999999998 |
|null      |[2011-12-08 00:00:00, 2011-12-09 00:00:00]|31975.590000000007|
|18102.0   |[2010-12-07 00:00:00, 2010-12-08 00:00:00]|25920.37          |
|null      |[2010-12-10 00:00:00, 2010-12-11 00:00:00]|25399.560000000012|
|null      |[2010-12-17 00:00:00, 2010-12-18 00:00:00]|25371.769999999768|
+----------+------------------------------------------+------------------+
only showing top 5 rows



## Machine learning

In [4]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



### Data transformation into numerical

Machine learning algorithms in MLlib require data to be represented as numerical values. Our current data is represented by a variety of di erent types including timestamps, integers, and strings. Therefore we need to transform this data into some numerical representation. In this instance, we will use several DataFrame transformations to manipulate our date data.

In [6]:
import org.apache.spark.sql.functions.date_format

val preppedDataFrame = staticDataFrame
   .na.fill(0)  // convert n/a value into 0
   .withColumn("day_of_week", date_format($"InvoiceDate", "EEEE")) // Add new field based on InvoiceDate.
   .coalesce(5) // avoids data movement, but only if you are decreasing the number of RDD partitions.

import org.apache.spark.sql.functions.date_format
preppedDataFrame: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [InvoiceNo: string, StockCode: string ... 7 more fields]


In [7]:
preppedDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = false)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = false)
 |-- CustomerID: double (nullable = false)
 |-- Country: string (nullable = true)
 |-- day_of_week: string (nullable = true)



In [11]:
preppedDataFrame.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|     Monday|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|     Monday|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|     Monday|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|     Monday|
+---------+---------+-------------------

### Dataset split: training/test sets

In [10]:
val trainDataFrame = preppedDataFrame
   .where("InvoiceDate < '2011-07-01'")
val testDataFrame = preppedDataFrame
   .where("InvoiceDate >= '2011-07-01'")

print(trainDataFrame.count())
print(testDataFrame.count())

245903296006

trainDataFrame: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [InvoiceNo: string, StockCode: string ... 7 more fields]
testDataFrame: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [InvoiceNo: string, StockCode: string ... 7 more fields]


In [13]:
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
   .setInputCol("day_of_week")
   .setOutputCol("day_of_week_index")

import org.apache.spark.ml.feature.StringIndexer
indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_83e88603c896


This will turn our days of weeks into corresponding numerical values. For example, Spark may represent Saturday as 6 and Monday as 1. However with this numbering scheme, we are implicitly stating that Saturday is greater than Monday (by pure numerical values). This is obviously incorrect. Therefore we need to use a OneHotEncoder to encode each of these values as their own column. These boolean flags state whether that day of week is the relevant day of the week.

In [14]:
import org.apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder()
   .setInputCol("day_of_week_index")
   .setOutputCol("day_of_week_encoded")

import org.apache.spark.ml.feature.OneHotEncoder
encoder: org.apache.spark.ml.feature.OneHotEncoder = oneHot_14284f0e3c66


All machine learning algorithms in Spark take as input a Vector type, which must be a set of numerical values.

In [15]:
import org.apache.spark.ml.feature.VectorAssembler

val vectorAssembler = new VectorAssembler()
  .setInputCols(Array("UnitPrice", "Quantity", "day_of_week_encoded"))
  .setOutputCol("features")

import org.apache.spark.ml.feature.VectorAssembler
vectorAssembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_ac85d99b3cf8


In [16]:
import org.apache.spark.ml.Pipeline

val transformationPipeline = new Pipeline()
   .setStages(Array(indexer, encoder, vectorAssembler))

import org.apache.spark.ml.Pipeline
transformationPipeline: org.apache.spark.ml.Pipeline = pipeline_0024aeb91662


`StringIndexer` needs to know how many unique values there are to be index. In this way it will map Monday to 1, Tuesday to 2, etc.

In [17]:
val fittedPipeline = transformationPipeline.fit(trainDataFrame)

fittedPipeline: org.apache.spark.ml.PipelineModel = pipeline_0024aeb91662


Once we fit the training data, we are now create to take that fitted pipeline and use it to transform all of our data in a consistent and repeatable way.

In [20]:
val transformedTraining = fittedPipeline.transform(trainDataFrame)

transformedTraining: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 10 more fields]


In [21]:
transformedTraining.cache()

res10: transformedTraining.type = [InvoiceNo: string, StockCode: string ... 10 more fields]


In [27]:
transformedTraining.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+-----------------+-------------------+--------------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|day_of_week_index|day_of_week_encoded|            features|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----------+-----------------+-------------------+--------------------+
|   537226|    22811|SET OF 6 T-LIGHTS...|       6|2010-12-06 08:34:00|     2.95|   15987.0|United Kingdom|     Monday|              2.0|      (5,[2],[1.0])|(7,[0,1,4],[2.95,...|
|   537226|    21713|CITRONELLA CANDLE...|       8|2010-12-06 08:34:00|      2.1|   15987.0|United Kingdom|     Monday|              2.0|      (5,[2],[1.0])|(7,[0,1,4],[2.1,8...|
|   537226|    22927|GREEN GIANT GARDE...|       2|2010-12-06 08:34:00|     5.95|   15987.0|United Kingdo

### ML models

In Spark, training machine learning models is a two phase process. First we initialize an untrained model, then we train it. There are always two types for every algorithm in MLlib’s DataFrame API. They following the naming pattern of `Algorithm`, for the untrained version, and `AlgorithmModel` for the trained version. In our case, this is `KMeans` and then `KMeansModel`.

In [22]:
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans()
   .setK(20)
   .setSeed(1L)

import org.apache.spark.ml.clustering.KMeans
kmeans: org.apache.spark.ml.clustering.KMeans = kmeans_ab0d7ade39e1


In [23]:
val kmModel = kmeans.fit(transformedTraining)

2019-06-30 09:14:15 WARN  KMeans:66 - The input data is not directly cached, which may hurt performance if its parent RDDs are also uncached.
2019-06-30 09:14:18 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2019-06-30 09:14:18 WARN  BLAS:61 - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
2019-06-30 09:14:18 WARN  KMeans:66 - The input data was not directly cached, which may hurt performance if its parent RDDs are also uncached.


kmModel: org.apache.spark.ml.clustering.KMeansModel = kmeans_ab0d7ade39e1


In [24]:
kmModel.computeCost(transformedTraining)

res11: Double = 1.0350348110517502E8


In [25]:
val transformedTest = fittedPipeline.transform(testDataFrame)

transformedTest: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 10 more fields]


In [26]:
kmModel.computeCost(transformedTest)

res12: Double = 5.486895310782777E8
