## Data Wrangling

In [1]:
#!pip install spark-nlp
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

import matplotlib.pyplot as plt
%matplotlib inline

from pyspark.sql.types import *

import pandas as pd
import seaborn as sns

#create Spark session
spark = SparkSession.builder.appName('classification').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '4g'), 
                                        ('spark.app.name', 'review-classification'), 
                                        ('spark.executor.cores', '5'), 
                                        ('spark.cores.max', '4'), 
                                        ('spark.driver.memory','8g')])

#print spark configuration settings
spark.sparkContext.getConf().getAll()

spark.conf.set("spark.sql.legacy.timeParserPolicy","CORRECTED")

### Reading in data

In [None]:
#reading in the data
df = spark.read.json("gs://bdprojectfinal/Clothing_Shoes_and_Jewelry.json")

In [None]:
spark.conf.set("spark.sql.caseSensitive", "true")
meta=spark.read.json('gs://bdprojectfinal/meta_Clothing_Shoes_and_Jewelry.json')

In [None]:
meta=meta.select("brand","asin","price")

In [None]:
meta.createOrReplaceTempView('meta')
df.createOrReplaceTempView('df')

In [None]:
df=df.withColumn("reviewTime", f.to_timestamp('reviewTime','MM dd, yyyy'))\
    .withColumn("Year", f.year("reviewTime"))

In [None]:
review_count_full=spark.sql('''select 
asin,
count(*) as review_count
from df
group by
asin
''')

In [None]:
rc_df=review_count_full.toPandas()

rc_df.plot.hist(bins=100)

In [None]:
#creative view
df.createOrReplaceTempView('df')

review_count=spark.sql('''select 
asin,
count(*)
from df
group by
asin
''')

In [None]:
rc_df=review_count.toPandas()

rc_df.plot.hist(bins=100)

We filter the products with over 10 review counts with additional features

In [None]:
df_full=spark.sql('''
with cte as (
select 
asin,
count(*) as review_count
from df
group by
asin
),

fil as (
select 
asin
from cte
where review_count>10
),

cte3 as (
select
reviewerID,
count(*) as reviewer_count
from df
group by reviewerID
),

cte2 as (select 
Year,
overall,
reviewText,
reviewTime,
reviewerID,
reviewerName,
unixReviewTime,
verified,
vote,
asin
from df
where
asin in (select asin from fil))

select 
d.Year,
d.overall,
d.reviewText,
d.reviewTime,
d.reviewerID,
d.reviewerName,
d.unixReviewTime,
d.verified,
d.vote,
d.asin,
r.reviewer_count,
m.brand,
m.price
from 
cte2 d inner join cte3 r on d.reviewerID=r.reviewerID
inner join meta m on d.asin=m.asin
''')

In [None]:
df_full.createOrReplaceTempView('df_full')

In [None]:
df_full_reviewer=spark.sql('''
with cte as (
select
reviewerID,
count(*) as reviewer_count
from df
group by reviewerID
)

select 
d.Year,
d.overall,
d.reviewText,
d.reviewTime,
d.reviewerID,
d.reviewerName,
d.unixReviewTime,
d.verified,
d.vote,
d.asin,
reviewer.reviewer_count
from df_full d left join cte reviewer on d.reviewerID = reviewer.reviewerID
''')

Exporting complete dataset

In [None]:
df_use.write.json("gs://bdprojectfinal/df_full_feature_data")

## Spark NLP set up

In [1]:
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
import sparknlp
#Start a Spark session using Spark NLP
spark=sparknlp.start()

 spark = SparkSession.builder \
     .appName("reviewclassification") \
    .getOrCreate()

conf = spark.sparkContext._conf.setAll([("spark.driver.memory","4G"),
                                        ("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.0.3"),
                                       ('spark.executor.cores', '5'),
                                        ('spark.executor.memory', '4G')
                                       ])
SparkSession.builder.config()

## Import libraries

In [2]:
from pyspark.ml.feature import HashingTF, IDF, StringIndexer, SQLTransformer,IndexToString
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator# Convert text column to nlp file
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.context import SparkContext
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC,LogisticRegression,FMClassifier,RandomForestClassifier,LinearSVC,GBTClassifier

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

## read in data

In [6]:
#reading in the data
df = spark.read.json("gs://bdprojectfinal/df_full_feature_data/*.json")
#convert voting to binary labels
df = df.withColumn(
    'label',
    f.when(f.col("vote")>0, 1)\
    .otherwise(0)
)
#split into Train and Test
Train, Test = df.randomSplit([0.8, 0.2], seed = 3)

### Utility functions

In [3]:
def evaluation_report(prediction_result):
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    import pandas as pd
    import numpy as np
    evaluator = MulticlassClassificationEvaluator()
    acc=evaluator.evaluate(prediction_result,{evaluator.metricName: "accuracy"})
    class1=prediction_result.where(f.col('label')==1)
    f1_class1=evaluator.evaluate(class1,{evaluator.metricName: "f1"})
    class0=prediction_result.where(f.col('label')==0)
    f1_class0=evaluator.evaluate(class0,{evaluator.metricName: "f1"}) 
    output = pd.DataFrame(np.array([[acc,f1_class1,f1_class0]]),
                   columns=['accuracy', 'f1_class1', 'f1_class0'])
    return output


from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

from pyspark.sql.functions import udf

@udf("long")
def num_nonzeros(v):
    return v.numNonzeros()

## TF-IDF embedding

In [None]:
#define pipeline for data processing
document_assembler = DocumentAssembler() \
      .setInputCol("reviewText") \
      .setOutputCol("document")
    
tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")

normalizer = Normalizer() \
      .setInputCols(["token"]) \
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

stemmer = Stemmer() \
      .setInputCols(["cleanTokens"]) \
      .setOutputCol("stem")
finisher = Finisher() \
      .setInputCols(["stem"]) \
      .setOutputCols(["token_features"]) \
      .setOutputAsArray(True) \
      .setCleanAnnotations(False)
hashingTF = HashingTF(inputCol="token_features", 
                      outputCol="rawFeatures", 
                      numFeatures=10000)

idf = IDF(inputCol="rawFeatures", 
          outputCol="features", 
          minDocFreq=5)

In [9]:
#define TF-IDF pipeline
nlp_pipeline_tf = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            stemmer, 
            finisher,
            hashingTF,
            idf])
#process training dataset
nlp_model_tf = nlp_pipeline_tf.fit(Train)
#transform train and test dataset
Train  = nlp_model_tf.transform(Train)
Test = nlp_model_tf.transform(Test)

## TF-IDF with logistic regression

In [None]:
#initiate logistic regression with weight
lr = LogisticRegression(featuresCol = 'features', 
                        labelCol = 'label', 
                        maxIter=10)
#fit model
lrModel = lr.fit(Train)
#predict on test data
predictions_tf = lrModel.transform(Test)
result=evaluation_report(predictions_tf)
result

## Random Forest with TFIDF

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# add training dataset
rfModel = rf.fit(Train)
predictions_rf = rfModel.transform(Test)

In [None]:
result=evaluation_report(predictions_rf)
result

## Naive Bayes with TFIDF

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# create the trainer and set its parameters
nb = NaiveBayes(labelCol="label", \
                featuresCol="features", \
                smoothing=1.0,
                modelType="multinomial")

# train the model
nbModel = nb.fit(Train)

# select example rows to display.
prediction_nb = nbModel.transform(Test)

In [None]:
result=evaluation_report(predictions_nb)
result

## LinearSVC with TFIDF

In [None]:
from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(maxIter=10, regParam=0.1,labelCol="label", 
                            featuresCol="feature")

# Fit the model
lsvcModel = lsvc.fit(Train)

predictions=lsvcModel.transform(Test)
result=evaluation_report(predictions)
result

## FMClassifier with TFIDF

In [None]:
from pyspark.ml.classification import FMClassifier
fm = FMClassifier(featuresCol="features-Bert",labelCol="label",stepSize=0.01)
# train the model
model_fm = fm.fit(Train)
# select example rows to display.
prediction = model_fm.transform(Test)
result=evaluation_report(prediction)
result

## Adding weight to modeling

In [None]:
balancingRatio = Train.filter(f.col('label') == 1).count() / Train.count()

Train = Train.withColumn(
    'weight1',
    f.when(f.col("label")==1, (1+balancingRatio))\
    .otherwise(1-balancingRatio)

In [None]:
#initiate logistic regression with weight
lr = LogisticRegression(featuresCol = 'features', 
                        labelCol = 'label', 
                        weightCol="weight1",
                        maxIter=10)
#fit model
lrModel = lr.fit(Train)
#predict on test data
predictions_tf = lrModel.transform(Test)
result=evaluation_report(predictions_tf)
result

## Random Forest with TFIDF

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            weightCol="weight1", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# add training dataset
rfModel = rf.fit(Train)
predictions_rf = rfModel.transform(Test)

In [None]:
result=evaluation_report(predictions_rf)
result

## Naive Bayes with TFIDF

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# create the trainer and set its parameters
nb = NaiveBayes(labelCol="label", \
                featuresCol="features", \
                weightCol="weight1",
                smoothing=1.0,
                modelType="multinomial")

# train the model
nbModel = nb.fit(Train)

# select example rows to display.
prediction_nb = nbModel.transform(Test)

In [None]:
result=evaluation_report(predictions_nb)
result

## LinearSVC with TFIDF

In [None]:
from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(maxIter=10, regParam=0.1,labelCol="label", weightCol="weight1",
                            featuresCol="feature")

# Fit the model
lsvcModel = lsvc.fit(Train)

predictions=lsvcModel.transform(Test)
result=evaluation_report(predictions)
result

## FMClassifier with TFIDF

In [None]:
from pyspark.ml.classification import FMClassifier
fm = FMClassifier(featuresCol="features-Bert",labelCol="label",weightCol="weight1",stepSize=0.01)
# train the model
model_fm = fm.fit(Train)
# select example rows to display.
prediction = model_fm.transform(Test)
result=evaluation_report(prediction)
result

## Glove embedding and re-balance data

In [7]:
document_assembler = DocumentAssembler() \
      .setInputCol("reviewText") \
      .setOutputCol("document")
    
tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")
    
normalizer = Normalizer() \
      .setInputCols(["token"]) \
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

glove_embeddings = WordEmbeddingsModel().pretrained() \
      .setInputCols(["document",'cleanTokens'])\
      .setOutputCol("embeddings")\
      .setCaseSensitive(False)

embeddingsSentence = SentenceEmbeddings() \
      .setInputCols(["document", "embeddings"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")
    
embeddings_finisher = EmbeddingsFinisher() \
      .setInputCols(["sentence_embeddings"]) \
      .setOutputCols(["finished_sentence_embeddings"]) \
      .setOutputAsVector(True)\
      .setCleanAnnotations(False)

nlp_pipeline_w = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            glove_embeddings,
            embeddingsSentence,
            embeddings_finisher])

glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]


In [20]:
nlp_model_w= nlp_pipeline_w.fit(Train)

Train= nlp_model_w.transform(Train)

Train= Train.withColumn("GloVe-features", explode(Train.finished_sentence_embeddings))

Test= nlp_model_w.transform(Test)

Test= Test.withColumn("GloVe-features", explode(Test.finished_sentence_embeddings))

Train=Train.drop('cleanTokens','document','normalized','rawFeatures','stem','token','token_features','finished_sentence_embeddings')
Test=Test.drop('cleanTokens','document','normalized','rawFeatures','stem','token','token_features','finished_sentence_embeddings')

## Bert embeddings

In [30]:
#add new component
bert_embeddings = BertEmbeddings\
      .pretrained('bert_base_cased', 'en') \
      .setInputCols(["document",'cleanTokens'])\
      .setOutputCol("bert")\
      .setCaseSensitive(False)\

embeddingsSentence = SentenceEmbeddings() \
      .setInputCols(["document", "bert"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")
    
embeddings_finisher = EmbeddingsFinisher() \
      .setInputCols(["sentence_embeddings"]) \
      .setOutputCols(["finished_sentence_embeddings"]) \
      .setOutputAsVector(True)\
      .setCleanAnnotations(False)
#create new pipeline
nlp_pipeline_bert = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            bert_embeddings,
            embeddingsSentence,
            embeddings_finisher])

bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
[OK!]


In [31]:
#create new processor for raw data
Train = nlp_pipeline_bert.fit(Train).transform(Train)
print('train_done')

train_done


In [32]:
#process test data
Test = nlp_pipeline_bert.fit(Train).transform(Test)
print('test_done')

test_done


In [16]:
Train=Train.drop('cleanTokens','document','normalized','rawFeatures','stem','token','token_features')
Test=Test.drop('cleanTokens','document','normalized','rawFeatures','stem','token','token_features')

## Saving/Read Train/Test

In [None]:
percentage = df.filter(f.col('label') == 1).count() / df.filter(f.col('label') == 0).count() 

In [None]:
#process balanced dataset
percentage=670054/7418313
df=df.sampleBy("label",
               fractions={
                   1:1,
                   0:percentage},
               seed=3
               )
Train, Test = df.randomSplit([0.8, 0.2], seed = 3)

In [None]:
Train = Train.where(num_nonzeros("features") != 0)
Test = Test.where(num_nonzeros("features") != 0)

Train.write.json('gs://bdprojectfinal/train_multiple')
Test.write.json('gs://bdprojectfinal/test_multiple')

In [13]:
Train=spark.read.json('gs://bdprojectfinal/train_multiple/*.json')
Test=spark.read.json('gs://bdprojectfinal/test_multiple/*.json')

Train = Train.withColumn(
    "features-Bert", 
    list_to_vector_udf(Train['features-Bert.values'])
)

Test = Test.withColumn(
    "features-Bert", 
    list_to_vector_udf(Test['features-Bert.values'])
)

Train = Train.withColumn(
    "features-GloVe", 
    list_to_vector_udf(Train['features-GloVe.values'])
)

Test = Test.withColumn(
    "features-GloVe", 
    list_to_vector_udf(Test['features-GloVe.values'])
)

Train = Train.where(num_nonzeros("features-GloVe") != 0)
Test = Test.where(num_nonzeros("features-GloVe") != 0)

## Re-fitting to TFIDF feature

## TF-IDF with logistic regression

In [None]:
#initiate logistic regression with weight
lr = LogisticRegression(featuresCol = 'features', 
                        labelCol = 'label', 
                        maxIter=10)
#fit model
lrModel = lr.fit(Train)
#predict on test data
predictions_tf = lrModel.transform(Test)
result=evaluation_report(predictions_tf)
result

## Random Forest with TFIDF

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# add training dataset
rfModel = rf.fit(Train)
predictions_rf = rfModel.transform(Test)

In [None]:
result=evaluation_report(predictions_rf)
result

## Naive Bayes with TFIDF

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# create the trainer and set its parameters
nb = NaiveBayes(labelCol="label", \
                featuresCol="features", \
                smoothing=1.0,
                modelType="multinomial")

# train the model
nbModel = nb.fit(Train)

# select example rows to display.
prediction_nb = nbModel.transform(Test)

In [None]:
result=evaluation_report(predictions_nb)
result

## LinearSVC with TFIDF

In [None]:
from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(maxIter=10, regParam=0.1,labelCol="label", 
                            featuresCol="feature")

# Fit the model
lsvcModel = lsvc.fit(Train)

predictions=lsvcModel.transform(Test)
result=evaluation_report(predictions)
result

## FMClassifier with TFIDF

In [None]:
from pyspark.ml.classification import FMClassifier
fm = FMClassifier(featuresCol="features-Bert",labelCol="label",stepSize=0.01)
# train the model
model_fm = fm.fit(Train)
# select example rows to display.
prediction = model_fm.transform(Test)
result=evaluation_report(prediction)
result

## Balanced dataset w/ Bert

In [14]:
#initiate logistic regression with weight
lr = LogisticRegression(featuresCol = 'features-Bert', 
                        labelCol = 'label', 
                        maxIter=10)

rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features-Bert", \
                            numTrees = 50, \
                            maxDepth = 3, \
                            maxBins = 32)
lsvc = LinearSVC(maxIter=10, regParam=0.1,labelCol="label", 
                            featuresCol="features-Bert")
gbt = GBTClassifier(labelCol="label", featuresCol="features-Bert", maxIter=5)
fm = FMClassifier(featuresCol="features-Bert",labelCol="label",stepSize=0.01)

In [None]:
summary=pd.DataFrame(columns=['accuracy','f1_class1','f1_class0'])
for i in [lr,rf,lsvc,gbt,fm]:
    model = i.fit(Train)
    prediction = model.transform(Test)
    results=evaluation_report(prediction)
    summary=summary.append(results)

In [16]:
summary

Unnamed: 0,accuracy,f1_class1,f1_class0
0,0.713075,0.825616,0.839332
0,0.701584,0.793902,0.853867
0,0.710358,0.81196,0.848798
0,0.694555,0.793667,0.844767
0,0.715667,0.829144,0.839365


## Adding additional features

In [104]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").setHandleInvalid("keep").fit(Train) for column in list(['price','brand','asin','reviewerID'])]

pipeline = Pipeline(stages=indexers)
pipelie_fit = pipeline.fit(Train)
Train=pipelie_fit.transform(Train)
Test=pipelie_fit.transform(Test)

In [105]:
#assemble the vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
    inputCols=["asin_index","brand_index","overall","price_index","reviewerID_index","reviewer_count","verified","features-Bert"],
    outputCol="features-all")

Train=assembler.transform(Train)
Test=assembler.transform(Test)

## Additional features+LR

In [106]:
from pyspark.ml.classification import LinearSVC,LogisticRegression,FMClassifier,RandomForestClassifier,LinearSVC
#initiate logistic regression with weight
lr = LogisticRegression(featuresCol = 'features-all', 
                        labelCol = 'label', 
                        maxIter=10)
#fit model
lrModel = lr.fit(Train)
prediction = lrModel.transform(Test)

In [107]:
results=evaluation_report(prediction)
results

Unnamed: 0,accuracy,f1_class1,f1_class0
0,0.722128,0.85307,0.823359


## Additional linear SVC

In [108]:
from pyspark.ml.classification import LinearSVC
lsvc = LinearSVC(maxIter=10, regParam=0.1,labelCol="label", 
                            featuresCol="features-all")

# Fit the model
lsvcModel = lsvc.fit(Train)
prediction=lsvcModel.transform(Test)

In [109]:
results=evaluation_report(prediction)
results

Unnamed: 0,accuracy,f1_class1,f1_class0
0,0.71897,0.862425,0.808473


## Factorization machines classifier + balanced Bert additional features

In [None]:
from pyspark.ml.classification import FMClassifier
fm = FMClassifier(featuresCol="features-all",labelCol="label",stepSize=0.001)
# train the model
model_fm = fm.fit(Train)
# select example rows to display.
prediction = model_fm.transform(Test)

In [None]:
results=evaluation_report(prediction)
results

Unnamed: 0,accuracy,f1_class1,f1_class0
0,0.510809,0.099046,0.992287


## Random Forest + balanced Bert additional features

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features-all", \
                            numTrees = 50, \
                            maxDepth = 3, \
                            maxBins = 32)

# add training dataset
rfModel = rf.fit(Train)

prediction = rfModel.transform(Test)

results=evaluation_report(prediction)
results

## GloVe Results

In [9]:
#initiate logistic regression with weight
lr = LogisticRegression(featuresCol = 'features-GloVe', 
                        labelCol = 'label', 
                        maxIter=10)

rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features-GloVe", \
                            numTrees = 50, \
                            maxDepth = 3, \
                            maxBins = 32)
lsvc = LinearSVC(maxIter=10, regParam=0.1,labelCol="label", 
                            featuresCol="features-GloVe")
gbt = GBTClassifier(labelCol="label", featuresCol="features-GloVe", maxIter=5)
fm = FMClassifier(featuresCol="features-GloVe",labelCol="label",stepSize=0.01)

In [None]:
import pandas as pd
summary=pd.DataFrame(columns=['accuracy','f1_class1','f1_class0'])
for i in [lr,rf,lsvc,gbt,fm]:
    model = i.fit(Train)
    prediction = model.transform(Test)
    results=evaluation_report(prediction)
    summary=summary.append(results)
summary

Unnamed: 0,accuracy,f1_class1,f1_class0
0,0.644596,0.8344,0.726525
0,0.667965,0.824916,0.774939
0,0.63778,0.892063,0.63365
0,0.66261,0.81628,0.776428
0,0.684275,0.867496,0.749457


## Tuning

## Random Forest + Bert

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

numFolds = 3

rf = RandomForestClassifier(labelCol="label", featuresCol="features-Bert")
evaluator = MulticlassClassificationEvaluator() 

pipeline = Pipeline(stages=[rf])
paramGrid = (ParamGridBuilder() \
             .addGrid(rf.numTrees, [100,150]) \
             .addGrid(rf.maxDepth, [5,6,7]) \
             .build())
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=numFolds)

model = crossval.fit(Train)

In [121]:
prediction = model.transform(Test)

In [122]:
results=evaluation_report(prediction)
results

Unnamed: 0,accuracy,f1_class1,f1_class0
0,0.721642,0.816742,0.859797


## GBT Classification

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import GBTClassifier
numFolds = 3

gbt = GBTClassifier(labelCol="label", featuresCol="features-Bert", maxIter=30)
evaluator = MulticlassClassificationEvaluator() 

pipeline = Pipeline(stages=[gbt])
paramGrid = (ParamGridBuilder() \
             .addGrid(gbt.stepSize, [0.1,0.01,0.05]) \
             .addGrid(gbt.maxDepth, [10,15]) \
             .build())
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=numFolds)

model = crossval.fit(Train)

In [None]:
prediction = model.transform(Test)
results=evaluation_report(prediction)
results

## LR + tuning

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
numFolds = 3

lr = LogisticRegression(featuresCol = 'features-Bert', 
                        labelCol = 'label', 
                        maxIter=10)
evaluator = MulticlassClassificationEvaluator() 

pipeline = Pipeline(stages=[lr])
paramGrid = (ParamGridBuilder() \
             .addGrid(lr.regParam, [0.1,0.01,0.05]) \
             .addGrid(lr.elasticNetParam, [0.01,0.05]) \
             .addGrid(lr.threshold, [0.4,0.5,0.6]) \
             .build())
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=numFolds)

model = crossval.fit(Train)

In [None]:
prediction = model.transform(Test)
results=evaluation_report(prediction)
results

Unnamed: 0,accuracy,f1_class1,f1_class0
0,0.722053,0.828891,0.848122


## Linear SVC

In [None]:
numFolds = 3

lsvc = LinearSVC(maxIter=10, regParam=0.1,labelCol="label", 
                            featuresCol="features-Bert")
evaluator = MulticlassClassificationEvaluator() 

pipeline = Pipeline(stages=[lsvc])
paramGrid = (ParamGridBuilder() \
             .addGrid(lsvc.maxIter, [10,20]) \
             .addGrid(lsvc.regParam, [0.1,0.2,0.3]) \
             .build())
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=numFolds)

model = crossval.fit(Train)
prediction = model.transform(Test)
results=evaluation_report(prediction)
results

Unnamed: 0,accuracy,f1_class1,f1_class0
0,0.709419,0.820769,0.838976


## Factorization Machine

In [None]:
numFolds = 3

fm = FMClassifier(featuresCol="features-Bert",labelCol="label")
evaluator = MulticlassClassificationEvaluator() 

pipeline = Pipeline(stages=[fm])
paramGrid = (ParamGridBuilder() \
             .addGrid(fm.factorSize, [10,15]) \
             .addGrid(fm.stepSize,[1,0.5,1.5]) \
             .build())
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=numFolds)

model = crossval.fit(Train)
prediction = model.transform(Test)
results=evaluation_report(prediction)
results