#Multinomial Logistic Regression

In [1]:
import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("asin", StringType, true),
  StructField("overall", DoubleType, true),
  StructField("reviewtext", StringType, true),
  StructField("category", StringType, true))
)

import org.apache.spark.sql.types._
customSchema: org.apache.spark.sql.types.StructType = StructType(StructField(asin,StringType,true), StructField(overall,DoubleType,true), StructField(reviewtext,StringType,true), StructField(category,StringType,true))


In [2]:
val data = spark.read.format("csv")
  .option("sep", ",")
  .option("inferSchema", "false")
  .schema(customSchema)
  .option("header", "false")
  .load("hdfs:///user/maria_dev/for_nlp_models.csv")
  .na.drop()

data: org.apache.spark.sql.DataFrame = [asin: string, overall: double ... 2 more fields]


In [3]:
data.registerTempTable("ratings1")



In [4]:
%sql
select overall, count(*) as count_ratings from ratings1 group by overall order by overall

overall	count_ratings
1.0	136686
2.0	129119
3.0	296208
4.0	684735
5.0	1864279


In [5]:
import org.apache.spark.sql.functions.{coalesce, lit, typedLit}

// create a mapping
val translationMap = Map(
  1 -> 0,
  2 -> 0,
  3 -> 1, 
  4 -> 2,
  5 -> 2
)

// convert a map to column
val translationMapCol = typedLit(translationMap)

// add translation
val DF = data.withColumn("label", coalesce(translationMapCol($"overall"), lit("")).cast(DoubleType))

DF.printSchema()

import org.apache.spark.sql.functions.{coalesce, lit, typedLit}
translationMap: scala.collection.immutable.Map[Int,Int] = Map(5 -> 2, 1 -> 0, 2 -> 0, 3 -> 1, 4 -> 2)
translationMapCol: org.apache.spark.sql.Column = keys: [5,1,2,3,4], values: [2,0,0,1,2]
DF: org.apache.spark.sql.DataFrame = [asin: string, overall: double ... 3 more fields]
root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewtext: string (nullable = true)
 |-- category: string (nullable = true)
 |-- label: double (nullable = true)



In [6]:
DF.registerTempTable("ratings2")



In [7]:
%sql
select label, count(*) as count_ratings from ratings2 group by label order by label

label	count_ratings
0.0	265805
1.0	296208
2.0	2549014


In [8]:

DF.count
DF.columns

res897: Long = 3111027
res898: Array[String] = Array(asin, overall, reviewtext, category, label)


In [9]:
 // subsample a smaller dataset and filter only needed columns
val sampSize = 500000
val frac = sampSize.toDouble/data.count
val sampledData = DF.sample(withReplacement = false, fraction = frac, seed = 42) //.select("asin", ")
sampledData.count

In [10]:
// assign a table that will be modelled in subsequent parts
val ratingsDF = sampSize // DF or sampledData

// = sampledData.toDF()//.withColumnRenamed("overall", "label")
ratingsDF.printSchema()

ratingsDF: org.apache.spark.sql.DataFrame = [asin: string, overall: double ... 3 more fields]
root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewtext: string (nullable = true)
 |-- category: string (nullable = true)
 |-- label: double (nullable = true)



In [11]:
ratingsDF.count

res909: Long = 3111027


In [12]:
// check for duplicate reviews
// val uniqueCheck = ratingsDF.groupBy("asin", "reviewerid", "overall").count().orderBy(desc("count"))
// uniqueCheck.show()

In [13]:
// drop duplicates


val DF = ratingsDF
// val DF = ratingsDF.dropDuplicates(Array("asin","reviewerid"))

DF: org.apache.spark.sql.DataFrame = [asin: string, overall: double ... 3 more fields]


In [14]:
val DF2 = DF.filter($"reviewtext".isNotNull)

DF2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [asin: string, overall: double ... 3 more fields]


In [15]:
val vocabSize = DF2.withColumn("reviewtext", lower($"reviewtext")).withColumn("reviewtext", split(col("reviewtext"), " ").cast("array<string>")).select($"asin", explode($"reviewtext").alias("exploded")).groupBy("exploded").count().count

vocabSize: Long = 1020666


In [16]:
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer, StopWordsRemover, CountVectorizer, HashingTF, IDF}
import scala.math.pow
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}


//// TEXT COLUMN PREPROCESSING
//We define the RegexTokenizer() to convert reviews from plain text form to words
val regextokenizer = new RegexTokenizer().setGaps(false)
    .setPattern("\\w+(?:'\\w+)?|[^\\w\\s]")
    .setInputCol("reviewtext")
    .setOutputCol("reviewwords")
    
    
//We define remove the stop words from the reviewwords
val sw_remover = new StopWordsRemover()
    .setCaseSensitive(false)
    .setInputCol("reviewwords")
    .setOutputCol("words_filtered")
    
    
//  Hashing TF to vectorize features
val hashtf = new HashingTF()
            .setInputCol("words_filtered")
            .setOutputCol("tf")
            .setNumFeatures(pow(2,12).toInt)
            
            
// Lowers the terms of documents that are very common
val idf = new IDF()
        .setInputCol("tf")
        .setOutputCol("tfidf")

// CATEGORY COLUMN I.E. CATEGORICAL COLUMN PREPROCESSING
// convert categorical column 'category' to labels; no rule for assigning the labels is used, the transformation is done in a random manner
val indexer = new StringIndexer()
        .setInputCol("category")
        .setOutputCol("categorylabel")
        .setHandleInvalid("keep") //telling StringIndexer how to treat unseen labels - 'skip' will remove the rows entirely; 'keep' will assign the same index to all unseen


// assemble two features columns into a 'features' column
val vassembler = new VectorAssembler()
        .setInputCols(Array("tfidf", "categorylabel"))
        .setOutputCol("features")


// Set the pipeline    
val preprocessing_stages = Array(regextokenizer, sw_remover, hashtf, idf, indexer, vassembler)
val preprocessing_pipeline = new Pipeline()
                    .setStages(preprocessing_stages)
                    
                    

                    
// Split into training and test datasets and, just to be safe, define the target column                  
val Array(trainingData, testData) = DF2.randomSplit(Array(0.8, 0.2), seed=42)
val labelColumn = "label"


println("Starting Multinomial Logistic Regression")
// Initialise hyperparameters that will be later tuned and define the multinomial logistic regression model, including lambda parameter corresponding to regularization
val lambda_par = 0.02
val alpha_par = 0.3
val en_mlr = new LogisticRegression()
        .setLabelCol(labelColumn)
        .setFeaturesCol("features")
        .setRegParam(lambda_par)
        .setMaxIter(100)
        .setElasticNetParam(alpha_par)
        .setFamily("multinomial")
        
        

// Define the full model pipeline - preprocessing + model       
val model_stages = Array(preprocessing_pipeline, en_mlr) 
val model_pipeline = new Pipeline()
                    .setStages(model_stages)
                    
                    
// Define the param grid used for tuning the hyperparameters                    
val paramGrid = new ParamGridBuilder()
  .addGrid(en_mlr.regParam, Array(0.01, 0.05, 0.1))
  .addGrid(en_mlr.elasticNetParam, Array(0.1, 0.2, 0.4))
  .build()
  
  
// Define the evaluator used for evaluating the model
val evaluator = new MulticlassClassificationEvaluator()
                .setLabelCol(labelColumn)


// We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
// This will allow us to jointly choose parameters for all Pipeline stages.
// A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
// metricName() param for metric name in evaluation (supports "f1" (default), "weightedPrecision", "weightedRecall", "accuracy")
// Note that the evaluator here is a BinaryClassificationEvaluator and its default metric
// is F1 score

val crossval = new CrossValidator()
  .setEstimator(model_pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(3)  // Use 3+ in practice
  
// Run cross-validation, and choose the best set of parameters.
val cvModelMLR = crossval.fit(trainingData)

// Now we can optionally save the fitted pipeline to disk
// +cvModelMLR.write.overwrite().save("hdfs:///user/maria_dev/spark-mlr-cv-best")

// And load it back in during production
val cvModelMLR = PipelineModel.load("hdfs:///user/maria_dev/spark-mlr-cv-best")

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer, StopWordsRemover, CountVectorizer, HashingTF, IDF}
import scala.math.pow
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}
regextokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_4d346c993a00
sw_remover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_d6d4ee83814a
hashtf: org.apache.spark.ml.feature.HashingTF = hashingTF_b953c3a1f17d
idf: org.apache.spark.ml.feature.IDF = idf_26ea91ca7d7f
indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_8eac59d0d48d
vassembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_3c15388df547
preprocessing_stages: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.param

In [17]:
import org.apache.spark.mllib.evaluation.MulticlassMetrics


// Make predictions on test documents. cvModel uses the best model found (multinomial logistic regression model).
val results = cvModelMLR.transform(testData)
                     .select("asin", "reviewtext", "category", "label", "probability", "prediction")
                     
// Compute raw scores on the test set
val predictionAndLabels = results
        .select($"prediction",$"label")
        .as[(Double, Double)]
        .rdd
        
// EVALUATE THE MODEL
        
// Instantiate metrics object
val metrics = new MulticlassMetrics(predictionAndLabels)

// Confusion matrix
println("Confusion matrix:")
println(metrics.confusionMatrix)

// Overall Statistics
val accuracy = metrics.accuracy
println("Summary Statistics")
println(s"Accuracy = $accuracy")


// Weighted stats
println(s"Weighted precision: ${metrics.weightedPrecision}")
println(s"Weighted recall: ${metrics.weightedRecall}")
println(s"Weighted F1 score: ${metrics.weightedFMeasure}")
println(s"Weighted false positive rate: ${metrics.weightedFalsePositiveRate}")

import org.apache.spark.mllib.evaluation.MulticlassMetrics
results: org.apache.spark.sql.DataFrame = [asin: string, reviewtext: string ... 4 more fields]
predictionAndLabels: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[39528] at rdd at <console>:199
metrics: org.apache.spark.mllib.evaluation.MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@ce39930
Confusion matrix:
14125.0  2593.0  33628.0   
3363.0   6536.0  46984.0   
2692.0   3281.0  491108.0  
accuracy: Double = 0.846865019609141
Summary Statistics
Accuracy = 0.846865019609141
Weighted precision: 0.8144684381243038
Weighted recall: 0.8468650196091411
Weighted F1 score: 0.8070515918639329
Weighted false positive rate: 0.6202997707641787


#Gradient Boosted Tree

In [18]:
import org.apache.spark.sql.types._

val customSchema = StructType(Array(
  StructField("asin", StringType, true),
  StructField("overall", DoubleType, true),
  StructField("reviewtext", StringType, true),
  StructField("category", StringType, true))
)


import org.apache.spark.sql.types._
customSchema: org.apache.spark.sql.types.StructType = StructType(StructField(asin,StringType,true), StructField(overall,DoubleType,true), StructField(reviewtext,StringType,true), StructField(category,StringType,true))


In [19]:
val data = spark.read.format("csv")
  .option("sep", ",")
  .option("inferSchema", "false")
  .schema(customSchema)
  .option("header", "false")
  .load("hdfs:///user/maria_dev/for_nlp_models.csv")
  .na.drop()

data: org.apache.spark.sql.DataFrame = [asin: string, overall: double ... 2 more fields]


In [20]:
data.registerTempTable("ratings3")



In [21]:
%sql
select overall, count(*) as count_ratings from ratings3 group by overall order by overall

overall	count_ratings
1.0	136686
2.0	129119
3.0	296208
4.0	684735
5.0	1864279


In [22]:
import org.apache.spark.sql.functions.{coalesce, lit, typedLit}

// create a mapping
val translationMap = Map(
  1 -> 0,
  2 -> 0,
  3 -> 0, 
  4 -> 1,
  5 -> 1
)

// convert a map to column
val translationMapCol = typedLit(translationMap)

// add translation
val DF = data.withColumn("label", coalesce(translationMapCol($"overall"), lit("")).cast(DoubleType))

DF.printSchema()

import org.apache.spark.sql.functions.{coalesce, lit, typedLit}
translationMap: scala.collection.immutable.Map[Int,Int] = Map(5 -> 1, 1 -> 0, 2 -> 0, 3 -> 0, 4 -> 1)
translationMapCol: org.apache.spark.sql.Column = keys: [5,1,2,3,4], values: [1,0,0,0,1]
DF: org.apache.spark.sql.DataFrame = [asin: string, overall: double ... 3 more fields]
root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewtext: string (nullable = true)
 |-- category: string (nullable = true)
 |-- label: double (nullable = true)



In [23]:
DF.registerTempTable("ratings4")



In [24]:
%sql
select label, count(*) as count_ratings from ratings4 group by label order by label

label	count_ratings
0.0	562013
1.0	2549014


In [25]:
DF.count
DF.columns

res1011: Long = 3111027
res1012: Array[String] = Array(asin, overall, reviewtext, category, label)


In [26]:
 // subsample a smaller dataset and filter only needed columns
 val sampSize = 500000
 val frac = sampSize.toDouble/data.count
 val sampledData = DF.sample(withReplacement = false, fraction = frac, seed = 42) //.select("asin", ")
 sampledData.count

In [27]:
// assign a table that will be modelled in subsequent parts
val ratingsDF = sampledData // DF or sampledData

// = sampledData.toDF()//.withColumnRenamed("overall", "label")
ratingsDF.printSchema()

ratingsDF: org.apache.spark.sql.DataFrame = [asin: string, overall: double ... 3 more fields]
root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewtext: string (nullable = true)
 |-- category: string (nullable = true)
 |-- label: double (nullable = true)



In [28]:
ratingsDF.count

res1023: Long = 3111027


In [29]:
// check for duplicate reviews
//val uniqueCheck = ratingsDF.groupBy("asin", "reviewerid", "overall").count().orderBy(desc("count"))
//uniqueCheck.show()

In [30]:
// drop duplicates

val DF = ratingsDF
// val DF = ratingsDF.dropDuplicates(Array("asin","reviewerid"))

DF: org.apache.spark.sql.DataFrame = [asin: string, overall: double ... 3 more fields]


In [31]:
val DF2 = DF.filter($"reviewtext".isNotNull)

DF2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [asin: string, overall: double ... 3 more fields]


In [32]:
val vocabSize = DF2.withColumn("reviewtext", lower($"reviewtext")).withColumn("reviewtext", split(col("reviewtext"), " ").cast("array<string>")).select($"asin", explode($"reviewtext").alias("exploded")).groupBy("exploded").count().count

vocabSize: Long = 1020666


In [33]:
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer, StopWordsRemover, CountVectorizer, HashingTF, IDF}
import scala.math.pow
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}


//// TEXT COLUMN PREPROCESSING
//We define the RegexTokenizer() to convert reviews from plain text form to words
val regextokenizer = new RegexTokenizer().setGaps(false)
    .setPattern("\\w+(?:'\\w+)?|[^\\w\\s]")
    .setInputCol("reviewtext")
    .setOutputCol("reviewwords")
    
    
//We define remove the stop words from the reviewwords
val sw_remover = new StopWordsRemover()
    .setCaseSensitive(false)
    .setInputCol("reviewwords")
    .setOutputCol("words_filtered")
    
    
//  Hashing TF to vectorize features
val hashtf = new HashingTF()
            .setInputCol("words_filtered")
            .setOutputCol("tf")
            .setNumFeatures(pow(2,12).toInt)
            
            
// Lowers the terms of documents that are very common
val idf = new IDF()
        .setInputCol("tf")
        .setOutputCol("tfidf")

// CATEGORY COLUMN I.E. CATEGORICAL COLUMN PREPROCESSING
// convert categorical column 'category' to labels; no rule for assigning the labels is used, the transformation is done in a random manner
val indexer = new StringIndexer()
        .setInputCol("category")
        .setOutputCol("categorylabel")
        .setHandleInvalid("keep") //telling StringIndexer how to treat unseen labels - 'skip' will remove the rows entirely; 'keep' will assign the same index to all unseen


// assemble two features columns into a 'features' column
val vassembler = new VectorAssembler()
        .setInputCols(Array("tfidf", "categorylabel"))
        .setOutputCol("features")


// Set the pipeline    
val preprocessing_stages = Array(regextokenizer, sw_remover, hashtf, idf, indexer, vassembler)
val preprocessing_pipeline = new Pipeline()
                    .setStages(preprocessing_stages)
                    
                    

                    
// Split into training and test datasets and, just to be safe, define the target column                  
val Array(trainingData, testData) = DF2.randomSplit(Array(0.8, 0.2), seed=42)
val labelColumn = "label"


println("Starting GBTree Classification")
// For the classification we'll use the Gradient-boosted tree estimator
// using some default and set hyper-parameters; no crossvalidation is done as this was very computationally expensive and tool a lot of time
val gbt = new GBTClassifier()
    .setLabelCol(labelColumn)
    .setFeaturesCol("features")
    .setMaxIter(10)
    .setMaxBins(10000)
    
    
// Define the full model pipeline - preprocessing + model       
val model_stages = Array(preprocessing_pipeline, gbt) 
val model_pipeline = new Pipeline()
                    .setStages(model_stages)
     
               
//We fit our DataFrame into the pipeline to generate a model
val model = model_pipeline.fit(trainingData)


//We'll make predictions using the model and the test data
//val predictions = model.transform(testData)


// Define the evaluator used for evaluating the model
// val evaluator = new BinaryClassificationEvaluator()
//               .setLabelCol(labelColumn)
                

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("hdfs:///user/maria_dev/spark-gbt-first")

// And load it back in during production
val model = PipelineModel.load("hdfs:///user/maria_dev/spark-gbt-first")

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer, StopWordsRemover, CountVectorizer, HashingTF, IDF}
import scala.math.pow
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer}
regextokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_ec5d0aff7616
sw_remover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_3ee15eaca8ce
hashtf: org.apache.spark.ml.feature.HashingTF = hashingTF_f1ab31af8bc2
idf: org.apache.spark.ml.feature.IDF = idf_e1a1b6769621
indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_0e663b8a9987
vassembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_c4e0c913fca6
preprocess

In [34]:
import org.apache.spark.mllib.evaluation.{MulticlassMetrics, BinaryClassificationMetrics}


// We'll calculate results using the model and the test data
val results = model.transform(testData)
                     .select("asin", "reviewtext", "category", "label", "probability", "prediction")

In [35]:
// Compute raw scores on the test set
val predictionAndLabels = results
        .select($"prediction",$"label")
        .as[(Double, Double)]
        .rdd
        
// EVALUATE THE MODEL
        
// Instantiate metrics object
val metrics = new BinaryClassificationMetrics(predictionAndLabels)

// Confusion matrix
//println("Confusion matrix:")
//println(metrics.confusionMatrix)

// Overall Statistics
//val accuracy = metrics.accuracy
//println("Summary Statistics")
//println(s"Accuracy = $accuracy")


// Weighted stats
//println(s"Weighted precision: ${metrics.precisionByThreshold}")
//println(s"Weighted recall: ${metrics.recallByThreshold}")
//println(s"Weighted F1 score: ${metrics.fMeasureByThreshold}")
println(s"Area under ROC: ${metrics.areaUnderROC}")

// Precision by threshold
val precision = metrics.precisionByThreshold
precision.foreach { case (t, p) =>
  println(s"Threshold: $t, Precision: $p")
}