# Spark Clustering Pipeline Implementation

We now recreate the clustering pipeline using Spark. We will do so in two ways:
* Using SparkML
* By importing the model we trained in Python

# Dependencies and Initialization

The next cell contains dependency imports in Ivy (similar to Maven/Gradle).
Note that we use the <a href="https://github.com/jpmml/jpmml-evaluator-spark">jpmml-evaluator-spark</a> package for loading the model trained in Python. An older version of pmml-model is included in SparkML which prevents us from using newer PMML versions. The clean way to address that is to shade it in the way described <a href="https://github.com/jpmml/jpmml-sparkml#library">here</a>. In our case we just import the newer version of pmml-model before importing SparkML, and it solves the issue.

In [1]:
// This cell will generate a large output on the first run (it needs to download the dependencies)
import $exclude.`org.slf4j:slf4j-log4j12`, $ivy.`org.slf4j:slf4j-nop:1.7.21` // for cleaner logs
import $profile.`hadoop-2.6`
import $ivy.`org.apache.spark::spark-sql:2.1.0` // adjust spark version - spark >= 2.0
import $ivy.`org.jpmml:pmml-model:1.3.9`
import $ivy.`org.apache.spark::spark-mllib:2.1.0`
import $ivy.`org.jpmml:jpmml-evaluator-spark:1.1.0`
import $ivy.`org.jupyter-scala::spark:0.4.2` // for JupyterSparkSession (SparkSession aware of the jupyter-scala kernel)

[32mimport [39m[36m$exclude.$                        , $ivy.$                            // for cleaner logs
[39m
[32mimport [39m[36m$profile.$           
[39m
[32mimport [39m[36m$ivy.$                                   // adjust spark version - spark >= 2.0
[39m
[32mimport [39m[36m$ivy.$                           
[39m
[32mimport [39m[36m$ivy.$                                    
[39m
[32mimport [39m[36m$ivy.$                                      
[39m
[32mimport [39m[36m$ivy.$                                // for JupyterSparkSession (SparkSession aware of the jupyter-scala kernel)[39m

In [2]:
import org.apache.spark._
import org.apache.spark.sql._

[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._[39m

We are now ready to initialize the SparkSession. This object is the way we load data to Spark and perform transformations and actions on it.

In [3]:
val sparkSession = SparkSession.builder()
    .master("local")
    .config("spark.executor.memory", "4g")
    .appName("Spark clustering demo app")
    .getOrCreate()

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


[36msparkSession[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@3fa6f2f0

In [4]:
import sparkSession.implicits._

[32mimport [39m[36msparkSession.implicits._[39m

# Reading the Data

In [5]:
val dataset = sparkSession.read.format("csv")
    .option("header", "true").option("delimiter", "\t")
    .load("user_dataset_sample.tsv")

val organizations = sparkSession.read.format("csv")
    .option("header", "true")
    .load("organizations.csv")

[36mdataset[39m: [32mDataFrame[39m = [user_ip: string, count_hour0: string ... 7 more fields]
[36morganizations[39m: [32mDataFrame[39m = [user_ip: string]

In [6]:
dataset.printSchema

root
 |-- user_ip: string (nullable = true)
 |-- count_hour0: string (nullable = true)
 |-- count_hour1: string (nullable = true)
 |-- count_hour2: string (nullable = true)
 |-- count_hour3: string (nullable = true)
 |-- count_hour4: string (nullable = true)
 |-- count_hour5: string (nullable = true)
 |-- domain_count: string (nullable = true)
 |-- nxdomain_count: string (nullable = true)



In [7]:
import org.apache.spark.sql.types.DoubleType
val datasetWithQueryCount = dataset
    .na.replace(dataset.columns, Map("NULL" -> "0"))
    .withColumn("domain_count", $"domain_count".cast(DoubleType))
    .withColumn("nxdomain_count", $"nxdomain_count".cast(DoubleType))
//     .withColumn("query_count", windowColumns.map(x => dataset(x)).reduce(_+_))
    .withColumn("query_count", 'count_hour0 + 'count_hour1 + 'count_hour2 + 'count_hour3 + 'count_hour4 + 'count_hour5)
val datasetWithNorms = (0 to 5).foldLeft(datasetWithQueryCount)(
    (df, window) =>
        df.withColumn(s"count_hour${window}_norm", df(s"count_hour$window")/df("query_count"))
    ).filter($"query_count".between(20, 1000000))

[32mimport [39m[36morg.apache.spark.sql.types.DoubleType
[39m
[36mdatasetWithQueryCount[39m: [32mDataFrame[39m = [user_ip: string, count_hour0: string ... 8 more fields]
[36mdatasetWithNorms[39m: [32mDataset[39m[[32mRow[39m] = [user_ip: string, count_hour0: string ... 14 more fields]

# SparkML Pipeline

## Generating a Dataframe for SparkML objects
While sklearn objects act on matrices, SparkML objects act on a `Vector` field in a dataframe which contains the features, and a label field (for supervised learning models). Transforming our dataframe to the desired format is a bit tedious but 

In [8]:
import org.apache.spark.ml.feature.VectorAssembler

val selectedFeatures = Array("query_count", "domain_count") ++
    (0 to 5).map(x => s"count_hour${x}_norm")

val assembler = new VectorAssembler()
    .setInputCols(selectedFeatures)
    .setOutputCol("features")

val featuresDataset = assembler.transform(datasetWithNorms).select('user_ip, 'features)

featuresDataset.show(false)

+-----------+----------------------------------------------------------------------------------------------------------------------------------------+
|user_ip    |features                                                                                                                                |
+-----------+----------------------------------------------------------------------------------------------------------------------------------------+
|0.0.102.79 |[9303.0,2456.0,0.010856712888315597,0.09061593034504999,0.19219606578523057,0.19703321509190583,0.27152531441470495,0.23777276147479307]|
|0.0.129.25 |[2930.0,1188.0,0.07508532423208192,0.2955631399317406,0.089419795221843,0.12116040955631399,0.16655290102389078,0.2522184300341297]     |
|0.0.144.177|[662.0,36.0,0.11027190332326284,0.2190332326283988,0.20996978851963746,0.16314199395770393,0.1148036253776435,0.18277945619335348]      |
|0.0.167.184|[554.0,85.0,0.018050541516245487,0.02707581227436823,0.20577617328519857,0.054151

[32mimport [39m[36morg.apache.spark.ml.feature.VectorAssembler

[39m
[36mselectedFeatures[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m(
  [32m"query_count"[39m,
  [32m"domain_count"[39m,
  [32m"count_hour0_norm"[39m,
  [32m"count_hour1_norm"[39m,
  [32m"count_hour2_norm"[39m,
  [32m"count_hour3_norm"[39m,
  [32m"count_hour4_norm"[39m,
  [32m"count_hour5_norm"[39m
)
[36massembler[39m: [32mml[39m.[32mfeature[39m.[32mVectorAssembler[39m = vecAssembler_1d9548d9181b
[36mfeaturesDataset[39m: [32mDataFrame[39m = [user_ip: string, features: vector]

## Clustering Pipeline

In [9]:
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.clustering.KMeans

[32mimport [39m[36morg.apache.spark.ml.feature.StandardScaler
[39m
[32mimport [39m[36morg.apache.spark.ml.clustering.KMeans[39m

In [10]:
val scaler = new StandardScaler()
    .setInputCol(assembler.getOutputCol)
    .setOutputCol("scaledFeatures")

val clusterer = new KMeans()
    .setK(2).setSeed(42L)
    .setFeaturesCol(scaler.getOutputCol)

val scalerModel = scaler.fit(featuresDataset)
val scaledFeaturesDataset = scalerModel.transform(featuresDataset)

// Fitting the model takes about a minute
val clusteringModel = clusterer.fit(scaledFeaturesDataset)

[36mscaler[39m: [32mStandardScaler[39m = stdScal_3626ad1ed1ca
[36mclusterer[39m: [32mKMeans[39m = kmeans_66b9ddc9a2d9
[36mscalerModel[39m: [32mml[39m.[32mfeature[39m.[32mStandardScalerModel[39m = stdScal_3626ad1ed1ca
[36mscaledFeaturesDataset[39m: [32mDataFrame[39m = [user_ip: string, features: vector ... 1 more field]
[36mclusteringModel[39m: [32mml[39m.[32mclustering[39m.[32mKMeansModel[39m = kmeans_66b9ddc9a2d9

In [11]:
val datasetWithClusters = clusteringModel.transform(scaledFeaturesDataset)
datasetWithClusters.printSchema

root
 |-- user_ip: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)
 |-- prediction: integer (nullable = true)



[36mdatasetWithClusters[39m: [32mDataFrame[39m = [user_ip: string, features: vector ... 2 more fields]

We can write the results to a file. Note that the file save fails if the file already exists. You can use `SaveMode.Overwrite` to force overwrite.

In [12]:
datasetWithClusters.select('user_ip, 'prediction).write.format("csv").save("clusters_sparkml.csv")

Let's look at the sizes of the clusters we are getting. We expect the sizes to be comparable (not necessarily identical) to the ones we got in the Python implementation

In [13]:
datasetWithClusters.groupBy('prediction).count.show(false)

                                                                                

+----------+------+
|prediction|count |
+----------+------+
|1         |67553 |
|0         |104816|
+----------+------+



# Pipeline Imported from Scikit-Learn (PMML)
We now read the pmml file exported by the Python part. The file contains a pipeline with the scaler and the clustering part. We expect that running the model on the same data will result in the same partition to clusters, this can be checked on the output files.

In [14]:
import org.jpmml.evaluator.spark.{EvaluatorUtil, TransformerBuilder}
import java.io.File

[32mimport [39m[36morg.jpmml.evaluator.spark.{EvaluatorUtil, TransformerBuilder}
[39m
[32mimport [39m[36mjava.io.File[39m

In [15]:
val pmmlFile = new File("clustering_pipeline.pmml")
val evaluator = EvaluatorUtil.createEvaluator(pmmlFile)

[36mpmmlFile[39m: [32mFile[39m = clustering_pipeline.pmml
[36mevaluator[39m: [32morg[39m.[32mjpmml[39m.[32mevaluator[39m.[32mEvaluator[39m = org.jpmml.evaluator.clustering.ClusteringModelEvaluator@2f082cf0

In [16]:
val pmmlTransformerBuilder = new TransformerBuilder(evaluator)
//     .withTargetCols()
    .withOutputCols()
    .withLabelCol("prediction")
    .exploded(false)

val pmmlTransformer = pmmlTransformerBuilder.build()
val datasetWithPmmlClusters = pmmlTransformer.transform(datasetWithNorms)

[36mpmmlTransformerBuilder[39m: [32mTransformerBuilder[39m = org.jpmml.evaluator.spark.TransformerBuilder@a6c0bed
[36mpmmlTransformer[39m: [32mml[39m.[32mTransformer[39m = pmml-transformer
[36mdatasetWithPmmlClusters[39m: [32mDataFrame[39m = [user_ip: string, count_hour0: string ... 15 more fields]

In [17]:
datasetWithPmmlClusters.printSchema

root
 |-- user_ip: string (nullable = true)
 |-- count_hour0: string (nullable = true)
 |-- count_hour1: string (nullable = true)
 |-- count_hour2: string (nullable = true)
 |-- count_hour3: string (nullable = true)
 |-- count_hour4: string (nullable = true)
 |-- count_hour5: string (nullable = true)
 |-- domain_count: double (nullable = true)
 |-- nxdomain_count: double (nullable = true)
 |-- query_count: double (nullable = true)
 |-- count_hour0_norm: double (nullable = true)
 |-- count_hour1_norm: double (nullable = true)
 |-- count_hour2_norm: double (nullable = true)
 |-- count_hour3_norm: double (nullable = true)
 |-- count_hour4_norm: double (nullable = true)
 |-- count_hour5_norm: double (nullable = true)
 |-- pmml: struct (nullable = true)
 |    |-- Cluster: string (nullable = false)
 |    |-- affinity(0): double (nullable = false)
 |    |-- affinity(1): double (nullable = false)
 |    |-- prediction: string (nullable = false)



In [18]:
val clusters = datasetWithPmmlClusters.select("user_ip", "pmml.Cluster")

[36mclusters[39m: [32mDataFrame[39m = [user_ip: string, Cluster: string]

In [19]:
clusters.write.format("csv").save("clusters_pmml.csv")

You can now compare this file with the one written by the Python implementation.

# Closing the session

In [20]:
sparkSession.stop