### Import necessary spark functions.

In [1]:
import org.apache.spark.sql.functions._ 
import org.apache.spark.ml.feature._
import org.apache.spark.ml.classification.{NaiveBayes,NaiveBayesModel,RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.ml._

### Read sample of training data set.

There is a column in the dataset that causes Spark's CSV read method to incorrectly parse the dataset.  The `BodyMarkdown` field includes values that span multiple lines.  Spark treats the line breaks within the values as new rows in the dataset.  We must specify the additional options `quote`, `escape`, and `multiLine` to read the data set properly.  See this [blog post](https://kokes.github.io/blog/2018/05/19/spark-sane-csv-processing.html) for more details.

In [2]:
import org.apache.spark.sql.types._

val customSchema = StructType(Array(
    StructField("PostId", DoubleType, true),
    StructField("PostCreationDate", StringType, true),
    StructField("OwnerUserId", DoubleType, true),
    StructField("OwnerCreationDate", StringType, true),
    StructField("ReputationAtPostCreation", DoubleType, true),
    StructField("OwnerUndeletedAnswerCountAtPostTime", DoubleType, true),
    StructField("Title", StringType, true),
    StructField("BodyMarkdown", StringType, true),
    StructField("Tag1", StringType, true),
    StructField("Tag2", StringType, true),
    StructField("Tag3", StringType, true),
    StructField("Tag4", StringType, true),
    StructField("Tag5", StringType, true),
    StructField("PostClosedDate", StringType, true),
    StructField("OpenStatus", StringType, true)
))

var df = spark.
    read.
    option("quote", "\"").
    option("escape", "\"").
    option("multiLine", "true").
    option("header", "true").
    schema(customSchema).
    csv("train-sample.csv")

customSchema = StructType(StructField(PostId,DoubleType,true), StructField(PostCreationDate,StringType,true), StructField(OwnerUserId,DoubleType,true), StructField(OwnerCreationDate,StringType,true), StructField(ReputationAtPostCreation,DoubleType,true), StructField(OwnerUndeletedAnswerCountAtPostTime,DoubleType,true), StructField(Title,StringType,true), StructField(BodyMarkdown,StringType,true), StructField(Tag1,StringType,true), StructField(Tag2,StringType,true), StructField(Tag3,StringType,true), StructField(Tag4,StringType,true), StructField(Tag5,StringType,true), StructField(PostClosedDate,StringType,true), StructField(OpenStatus,StringType,true))
df = [PostId: double, PostCre...


[PostId: double, PostCre...

We have 140,000 million observations across 15 different columns.  However, identifiers such as `PostId` and `OwnerUserId` will be discarded.  `OpenStatus` and `PostClosedDate` will also be discarded as features since the former is our response variable and the latter implies the response value.  That leaves 11 columns.

In [7]:
df.count()

140272

In [8]:
df.show(5)

+-----------+-------------------+-----------+-------------------+------------------------+-----------------------------------+--------------------+--------------------+----------+---------------+-----+----+----+-------------------+--------------+
|     PostId|   PostCreationDate|OwnerUserId|  OwnerCreationDate|ReputationAtPostCreation|OwnerUndeletedAnswerCountAtPostTime|               Title|        BodyMarkdown|      Tag1|           Tag2| Tag3|Tag4|Tag5|     PostClosedDate|    OpenStatus|
+-----------+-------------------+-----------+-------------------+------------------------+-----------------------------------+--------------------+--------------------+----------+---------------+-----+----+----+-------------------+--------------+
|
|
|
|
|
+-----------+-------------------+-----------+-------------------+------------------------+-----------------------------------+--------------------+--------------------+----------+---------------+-----+----+----+-------------------+--------------+
on

The `OpenStatus` column seems to have an issue with trailing whitespace -- there should be a column ending pipe operator `|` in the printed table above.  Using `trim` didn't work, so we're taking a regex approach instead:

In [9]:
df = df.withColumn("OpenStatus", regexp_extract(col("OpenStatus"), "([\\w\\s]+\\w)", 1))

df = [PostId: double, PostCreationDate: string ... 13 more fields]


[PostId: double, PostCreationDate: string ... 13 more fields]

Calculating the percentage of null values for each column, we see that `Tag1` is rarely missing but that the other `TagX` fields increase in sparsity as `X` increases.  `Tag4` and `Tag5` will likely need to be dropped.

In [79]:
val nullCountDf = df.
    select(df.columns.map(c => (sum(when(col(c).isNull || col(c) === "" || col(c).isNaN, 1).otherwise(0)) / df.count()).alias(c)): _*)

nullCountDf.
    columns.
    zip(nullCountDf.collect()(0).toSeq).
    foreach(tuple => println(tuple))

(PostId,0.0)
(PostCreationDate,0.0)
(OwnerUserId,0.0)
(OwnerCreationDate,0.0)
(ReputationAtPostCreation,0.0)
(OwnerUndeletedAnswerCountAtPostTime,0.0)
(Title,0.0)
(BodyMarkdown,0.0)
(Tag1,7.129006501653929E-6)
(Tag2,0.19408720200752824)
(Tag3,0.45855195619938405)
(Tag4,0.7172208281053952)
(Tag5,0.8879534048135052)
(PostClosedDate,0.5)
(OpenStatus,0.0)


nullCountDf = [PostId: double, PostCreationDate: double ... 13 more fields]


[PostId: double, PostCreationDate: double ... 13 more fields]

Fill the `Tag` fields with "unknown" so they can be processed as categorical features.

In [81]:
df = df.na.fill("unknown", Seq("Tag1", "Tag2", "Tag3", "Tag4", "Tag5"))

df = [PostId: double, PostCreationDate: string ... 13 more fields]


[PostId: double, PostCreationDate: string ... 13 more fields]

### EDA - frequency of `OpenStatus` and `TagX`

In [83]:
val df_simple = df.
    select(col("Tag1"), col("Tag2"), col("Tag3"), col("Tag4"), col("Tag5"), col("OpenStatus"))

df_simple = [Tag1: string, Tag2: string ... 4 more fields]


[Tag1: string, Tag2: string ... 4 more fields]

We see that the label `OpenStatus` has only 5 distinct values while each of the tags has over 5,000 distinct values.

In [84]:
df_simple.
    columns.
    map(c => {
            val count = df_simple.select(c).distinct.count
            f"Distinct $c values: $count"
    }).
    foreach(println)

Distinct Tag1 values: 5212
Distinct Tag2 values: 9295
Distinct Tag3 values: 11083
Distinct Tag4 values: 10030
Distinct Tag5 values: 7607
Distinct OpenStatus values: 5


`OpenStatus` takes on the value `open` in 50% of all cases.  `too localized` appears in the least number of cases at 4.4%.  

In [85]:
val status_counts = df_simple.
    groupBy("OpenStatus").
    count().
    sort(col("count").desc).
    withColumn("perc_of_total", lit(100) * col("count") / df_simple.count())

status_counts.show(numRows = 10)

+-------------------+-----+------------------+
|         OpenStatus|count|     perc_of_total|
+-------------------+-----+------------------+
|               open|70136|              50.0|
|not a real question|30789|21.949498117942284|
|          off topic|17530|12.497148397399338|
|   not constructive|15659|11.163311280939888|
|      too localized| 6158| 4.390042203718489|
+-------------------+-----+------------------+



status_counts = [OpenStatus: string, count: bigint ... 1 more field]


[OpenStatus: string, count: bigint ... 1 more field]

To cut down on the number of distinct `TagX` values, we'll try keeping the top 90% of observations.  The bottom 10% will be replaced with "other".  Below we count the number of distinct tags in the top 90% of observations for each `TagX` column.  `Tag2`, `Tag3`, and `Tag4` still have over 1800 distinct values, but this is far less than the 10,000+ values they originally had.

In [86]:
def get_percentile_count(df: DataFrame, percentile: Double)(col_name: String): Double = {
    
    val cumsum_window = Window.
      orderBy(col("count").desc).
      rowsBetween(Window.unboundedPreceding, Window.currentRow)

    val total_window = Window.
        rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    
    df.
        groupBy(col_name).
        count().
        orderBy(col("count").desc).
        withColumn("fracObs", sum(col("count")).over(cumsum_window) / sum(col("count")).over(total_window)).
        filter(col("fracObs") <= percentile).
        count()
}

Array("Tag1", "Tag2", "Tag3", "Tag4", "Tag5").
    map(get_percentile_count(df_simple, 0.9))

get_percentile_count: (df: org.apache.spark.sql.DataFrame, percentile: Double)(col_name: String)Double


Array(382.0, 1855.0, 2644.0, 1861.0, 126.0)

## Simple model: NaiveBayes with Tags as features

To start with, we'll take a simple approach - encode the Tag columns as categorical variables and predict `OpenStatus` using Naive Bayes. Spark's RF Classifier example will serve as a reference ([link](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/RandomForestClassifierExample.scala)).

In [88]:
val df_simple = df.
    select(col("Tag1"), col("Tag2"), col("Tag3"), col("Tag4"), col("Tag5"), col("OpenStatus"))

df_simple = [Tag1: string, Tag2: string ... 4 more fields]


[Tag1: string, Tag2: string ... 4 more fields]

### Function for Reducing `TagX` Instances

Recall that we'll be replacing the bottom 10\% of `TagX` values with "else".  We need to avoid any data leaking from the test set to the training set, and we also need to apply the same replacement rules to both sets.  The below function `replaceInfrequentVals` accomplishes this.  It takes the training and test sets, along with a column name and percentile cutoff.  It then calls `getPercentileLookup` to create a lookup table of the specified columns' values in the training set alongside their percentile rank in terms of frequency.  This lookup table is then joined to both the training and test sets; column values with a percentile rank greater than the provided cutoff are set to "else".  The function `replaceMultipleInfrequentVals` successively applies `replaceInfrequentVals` for an array of column names.

In [6]:
/**
 * Get a lookup table of column values and their percentile rank for frequency.
 *
 * @param  df        A DataFrame which includes a column specified by colName.
 * @param  colName   The name of the column for which to calculate percentile ranks.
 */
def getPercentileLookup(df: DataFrame, colName: String): DataFrame = {
    
    val cumsumWindow = Window.
      orderBy(col("count").desc).
      rowsBetween(Window.unboundedPreceding, Window.currentRow)

    val totalWindow = Window.
        rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    
    df.
        groupBy(colName).
        count().
        orderBy(col("count").desc).
        withColumn("percentileRank", sum(col("count")).over(cumsumWindow) / sum(col("count")).over(totalWindow)).
        drop("count")
    
}

/**
 * Replace column values that appear infrequently in the training set with "else"
 * in both the training and test sets. 
 *
 * @param  train             DataFrame for which to calculate the frequency perentile rank.
 * @param  test              Additional DataFrame for which to replace infrequent values.
 * @param  colName           Column for which to replace infrequent values.
 * @param  percentileCutoff  Values with percentile rank higher than this number are replaced by "else".
 * @return A modified train and test DataFrame.
 */
def replaceInfrequentVals(
    train: DataFrame, 
    test: DataFrame, 
    colName: String,
    percentileCutoff: Double
): List[DataFrame] = {
    
    val percentileLookup = getPercentileLookup(train, colName)
    
    val trainReplaced = train.
        join(percentileLookup, Seq(colName), "left_outer").
        na.fill(1, Seq("percentileRank")).
        withColumn(colName, when(col("percentileRank") <= percentileCutoff, col(colName)).otherwise("else")).
        drop("percentileRank")
    
    val testReplaced = test.
        join(percentileLookup, Seq(colName), "left_outer").
        na.fill(1, Seq("percentileRank")).
        withColumn(colName, when(col("percentileRank") <= percentileCutoff, col(colName)).otherwise("else")).
        drop("percentileRank")
    
    List(trainReplaced, testReplaced)
    
}

/**
 * Apply `replaceInfrequentVals` to multiple columns.
 */
def replaceMultipleInfrequentVals(
    train: DataFrame,
    test: DataFrame,
    colNames: Array[String],
    percentileCutoff: Double
): List[DataFrame] = {
    
    var trainOut = train
    var testOut  = test
    
    colNames.foreach(x => {
        var dfs = replaceInfrequentVals(trainOut, testOut, x, percentileCutoff)
        trainOut = dfs(0)
        testOut  = dfs(1)
    })
    
    List(trainOut, testOut)
    
}

getPercentileLookup: (df: org.apache.spark.sql.DataFrame, colName: String)org.apache.spark.sql.DataFrame
replaceInfrequentVals: (train: org.apache.spark.sql.DataFrame, test: org.apache.spark.sql.DataFrame, colName: String, percentileCutoff: Double)List[org.apache.spark.sql.DataFrame]
replaceMultipleInfrequentVals: (train: org.apache.spark.sql.DataFrame, test: org.apache.spark.sql.DataFrame, colNames: Array[String], percentileCutoff: Double)List[org.apache.spark.sql.DataFrame]


### Pipeline for Naive Bayes

We want to create a StringIndexer for each Tag column; rather than create 5 variables we'll take a functional approach.  Note that `setHandleInvalid` is set to "keep" so that the indexer adds new indexes when it sees new labels in data sets other than our current data set ([StackOverflow link](https://stackoverflow.com/a/43917703/11407644)).

In [7]:
val featureCols = Array[String]("Tag1", "Tag2", "Tag3", "Tag4", "Tag5")

val featureIndexers = featureCols.map { colName =>
    new StringIndexer().
        setInputCol(colName).
        setOutputCol("indexed" + colName).
        setHandleInvalid("keep").
        fit(df_simple)
}

featureCols = Array(Tag1, Tag2, Tag3)
featureIndexers = Array(strIdx_a2bd6098cbc5, strIdx_0ebb50a8f750, strIdx_9f785ff0df21)


Array(strIdx_a2bd6098cbc5, strIdx_0ebb50a8f750, strIdx_9f785ff0df21)

Spark ML models expect a feature vector to be the only predictor.  [`VectorAssembler`](https://spark.apache.org/docs/latest/ml-features.html#vectorassembler) is a transformer that combines a list of columns into a single vector column.

In [9]:
val assembler = new VectorAssembler().
    setInputCols(featureCols.map{x => "indexed" + x}).
    setOutputCol("features").
    setHandleInvalid("keep")

assembler = vecAssembler_7d60450d8a13


vecAssembler_7d60450d8a13

Just as with the categorical features, we index the response.  Keeping with convention, the indexed response is called `indexedLabel` rather than `indexedOpenStatus`.

In [11]:
val labelIndexer = new StringIndexer().
    setInputCol("OpenStatus").
    setOutputCol("indexedLabel").
    setHandleInvalid("keep").
    fit(df_simple)

labelIndexer = strIdx_941d13fcfb98


strIdx_941d13fcfb98

Since the response is indexed, we need a way to transform the predicted response back to its original string value.  This inverse transformer is called [`IndexToString`](https://spark.apache.org/docs/latest/ml-features.html#indextostring):

In [12]:
val labelConverter = new IndexToString().
    setInputCol("prediction").
    setOutputCol("predictionLabel").
    setLabels(labelIndexer.labels)

labelConverter = idxToStr_504786612218


idxToStr_504786612218

Finally we can specify our model, a [`NaiveBayes`](https://spark.apache.org/docs/latest/ml-classification-regression.html#naive-bayes) classifier:

In [19]:
val nb = new NaiveBayes().
    setLabelCol("indexedLabel").
    setFeaturesCol("features").
    setSmoothing(1.0).
    setModelType("multinomial")

nb = nb_2ea76f312034


lastException: Throwable = null


nb_2ea76f312034

In [20]:
val nb_pipeline = new Pipeline().
    setStages(featureIndexers ++ Array(assembler, labelIndexer, nb, labelConverter))

nb_pipeline = pipeline_0a22e2e1f953


pipeline_0a22e2e1f953

### Fit Pipeline

Split into train and test sets.

In [15]:
val Array(train, test) = df_simple.randomSplit(Array(0.7, 0.3))

train = [Tag1: string, Tag2: string ... 2 more fields]
test = [Tag1: string, Tag2: string ... 2 more fields]


[Tag1: string, Tag2: string ... 2 more fields]

Reduce distinct occurrences of `TagX` values.

In [16]:
val List(trainReduced, testReduced) = replaceMultipleInfrequentVals(train,
                                                                    test,
                                                                    Array("Tag1", "Tag2", "Tag3", "Tag4", "Tag5"),
                                                                    0.9)

trainReduced = [Tag3: string, Tag2: string ... 2 more fields]
testReduced = [Tag3: string, Tag2: string ... 2 more fields]


[Tag3: string, Tag2: string ... 2 more fields]

Fit the pipeline to the training set to create the model.

In [21]:
val model = nb_pipeline.fit(trainReduced)

model = pipeline_0a22e2e1f953


pipeline_0a22e2e1f953

Make predictions.

In [22]:
val predictions = model.transform(testReduced)
predictions.select("predictionLabel", "OpenStatus", "features").show(5)

+---------------+--------------------+-------------------+
|predictionLabel|          OpenStatus|           features|
+---------------+--------------------+-------------------+
|[3.0,1522.0,2433.0]|
|  [2.0,10.0,2433.0]|
|  [2.0,14.0,2433.0]|
|  [3.0,58.0,2433.0]|
|   [4.0,13.0,723.0]|
+---------------+--------------------+-------------------+
only showing top 5 rows



predictions = [Tag3: string, Tag2: string ... 11 more fields]


[Tag3: string, Tag2: string ... 11 more fields]

### Evaluate Model

Set up [MulticlassClassificationEvaluator](https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator).

In [32]:
val evaluator = new MulticlassClassificationEvaluator()
    .setLabelCol("indexedLabel")
    .setPredictionCol("prediction")
    .setMetricName("accuracy")

evaluator = mcEval_3dd72513707f


lastException: Throwable = null


mcEval_3dd72513707f

Our accuracy was ~15%.  To get a sense for how good or bad this is, let's look at how well the model would have performed if it simply guessed the most common response (`Open`) each time.

In [33]:
val accuracy = evaluator.evaluate(predictions)

accuracy = 0.15867281456481439


0.15867281456481439

In [34]:
predictions.
    groupBy("OpenStatus").
    count().
    withColumn("perc_of_total", lit(100) * col("count") / lit(predictions.count())).
    show(5)

+--------------------+-----+------------------+
|          OpenStatus|count|     perc_of_total|
+--------------------+-----+------------------+
| 1896|4.5063459618766935|
|21120|50.197271474069495|
| 9176| 21.80919332604459|
| 4629| 11.00204401768313|
| 5253|12.485145220326093|
+--------------------+-----+------------------+



If the model had guessed `open` for each test observation, the accuracy would have been ~50% -- much higher than the ~15% that NaiveBayes achieved.

Looking at class-specific performance, we see that the model was best at predicting `` and worst at predicting ``.

In [35]:
predictions.
    select("predictionLabel", "OpenStatus").
    withColumn("success", when(col("predictionLabel") === col("OpenStatus"), 1).otherwise(0)).
    groupBy("OpenStatus").
    agg((sum(col("success")) / count("*")).alias("accurracy")).
    show(5)

+--------------------+--------------------+
|          OpenStatus|           accurracy|
+--------------------+--------------------+
|  0.4868143459915612|
| 0.18792613636363636|
|0.001525719267654...|
|0.002592352559948153|
| 0.33466590519703027|
+--------------------+--------------------+



### Conclusion

We've successfully implemented a Spark NLP pipeline on a small subset of our full training set.  This pipeline involved...