In [None]:
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

import os
import pandas as pd
import pyspark
import sparknlp

from pymongo import MongoClient

from pyspark.sql.functions import udf, col, when
from pyspark.sql.types import ArrayType, StringType

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Word2Vec

from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import *

import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
eng_stopwords = stopwords.words('english')

spark = sparknlp.start()

## Fetch Data fro MongoDB

In [None]:
client = MongoClient('mongodb://yongtai:taiyong@test.asknyu.com:27017/')
proj_db = client.project
constraint_train = proj_db.constraint_train
constraint_test = proj_db.constraint_test
constraint_val = proj_db.constraint_val

In [None]:
one_item = constraint_train.find_one()
print(one_item)

{'_id': ObjectId('637d3f5de4ba7f6d3d922c3d'), 'id': 1, 'tweet': 'The CDC currently reports 99031 deaths. In general the discrepancies in death counts between different sources are small and explicable. The death toll stands at roughly 100000 people today.', 'label': 'real'}


In [None]:
train_pandasDF = pd.DataFrame(list(constraint_train.find())).drop(['_id', 'id'], axis=1)
train_df = spark.createDataFrame(train_pandasDF) 
train_df.show(5)

val_pandasDF = pd.DataFrame(list(constraint_val.find())).drop(['_id', 'id'], axis=1)
val_df = spark.createDataFrame(val_pandasDF) 
val_df.show(5)

test_pandasDF = pd.DataFrame(list(constraint_test.find())).drop(['_id', 'id'], axis=1)
test_df = spark.createDataFrame(test_pandasDF) 

+--------------------+-----+
|               tweet|label|
+--------------------+-----+
|The CDC currently...| real|
|States reported 1...| real|
|Politically Corre...| fake|
|Covid Act Now fou...| real|
|If you tested pos...| real|
+--------------------+-----+
only showing top 5 rows

+--------------------+-----+
|               tweet|label|
+--------------------+-----+
|Chinese convertin...| fake|
|11 out of 13 peop...| fake|
|6/10 Sky's @EdCon...| real|
|No one can leave ...| real|
|#IndiaFightsCoron...| real|
+--------------------+-----+
only showing top 5 rows



## Preprocessing

In [None]:
class CustomTransformer(Transformer):
    # lazy workaround - a transformer needs to have these attributes
    _defaultParamMap = dict()
    _paramMap = dict()
    _params = dict()

class NullDropper(CustomTransformer):
  def __init__(self, cols=None):
    self.cols = cols

  def _transform(self, data):
    return data.dropna()

class LabelEncoder(CustomTransformer):
  def __init__(self, cols=None):
    self.cols = cols

  def _transform(self, data):
    return data.withColumn("label", when(col("label")=="real", 0.0).otherwise(1.0))

class Cleaner(CustomTransformer):
  def __init__(self, cols=None):
    self.cols = cols

  def _transform(self, data):
    def filter_out_urls(words):
      # eliminate nulls and blanks
      newWords = []
      for word in words.split(" "):
          if not word.startswith("https:"):
              newWords.append(word)
      return " ".join(newWords)

    udf_filter_urls = udf(filter_out_urls, StringType())
    return data.withColumn("text", udf_filter_urls(col("tweet")))

In [None]:
nullDroper = NullDropper()

labelEncoder = LabelEncoder()

cleaner = Cleaner()

documentAssembler = DocumentAssembler() \
     .setInputCol('text') \
     .setOutputCol('document')

tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('tokenized')

# normalizer = Normalizer() \
#      .setInputCols(['tokenized']) \
#      .setOutputCol('normalized') \
#      .setLowercase(True)

stemmer = Stemmer() \
    .setInputCols(["tokenized"]) \
    .setOutputCol("stemmed")

# lemmatizer = LemmatizerModel.pretrained() \
#      .setInputCols(['stemmed']) \
#      .setOutputCol('lemmatized')

stopwords_cleaner = StopWordsCleaner() \
     .setInputCols(['stemmed']) \
     .setOutputCol('stopremoved') \
     .setCaseSensitive(False) \
     .setStopWords(eng_stopwords)

# ngrammer = NGramGenerator() \
#     .setInputCols(['lemmatized']) \
#     .setOutputCol('ngrams') \
#     .setN(3) \
#     .setEnableCumulative(True) \
#     .setDelimiter('_')

# pos_tagger = PerceptronModel.pretrained('pos_anc') \
#      .setInputCols(['document', 'lemmatized']) \
#      .setOutputCol('pos')

# allowed_tags = ['<JJ>+<NN>', '<NN>+<NN>']
# chunker = Chunker() \
#      .setInputCols(['document', 'pos']) \
#      .setOutputCol('ngrams') \
#      .setRegexParsers(allowed_tags)

finisher = Finisher() \
     .setInputCols('stopremoved')

word2Vec = Word2Vec(vectorSize=30, minCount=0, inputCol="finished_stopremoved", outputCol="result")

In [None]:
preprocessPipeline = Pipeline(stages = [
    nullDroper,
    labelEncoder, 
    cleaner,
    documentAssembler,                  
    tokenizer,
    stemmer,           
    stopwords_cleaner,
    finisher,
    word2Vec
])

preprocessModel = preprocessPipeline.fit(train_df)

train = preprocessModel.transform(train_df)
val = preprocessModel.transform(val_df)

## Training

In [None]:
class F1BinaryEvaluator():

    def __init__(self, predCol="prediction", labelCol="label", metricLabel=1.0):
        self.labelCol = labelCol
        self.predCol = predCol
        self.metricLabel = metricLabel

    def isLargerBetter(self):
        return True

    def evaluate(self, dataframe):
        tp = dataframe.filter(self.labelCol + ' = ' + str(self.metricLabel) + ' and ' + self.predCol + ' = ' + str(self.metricLabel)).count()
        fp = dataframe.filter(self.labelCol + ' != ' + str(self.metricLabel) + ' and ' + self.predCol + ' = ' + str(self.metricLabel)).count()
        fn = dataframe.filter(self.labelCol + ' = ' + str(self.metricLabel) + ' and ' + self.predCol + ' != ' + str(self.metricLabel)).count()
        return tp / (tp + (.5 * (fn +fp)))

In [None]:
f1_evaluator = F1BinaryEvaluator()
rf = RandomForestClassifier(featuresCol="result")
grid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [5, 20, 100, 500])
             .build())
cv = CrossValidator(numFolds=5, estimator=rf, estimatorParamMaps=grid, evaluator=f1_evaluator, parallelism=5)
cvModel = cv.fit(train)

In [None]:
f1_evaluator.evaluate(cvModel.transform(train))

0.8723056825604181

## Evaluation

In [None]:
bestModel = cvModel.bestModel
rf = RandomForestClassifier(featuresCol="result")
model = rf.fit(train)
f1_evaluator.evaluate(model.transform(val))

0.8612997090203686

##### LR

In [None]:
from pyspark.ml.classification import LogisticRegression

f1_evaluator = F1BinaryEvaluator()
estimator = LogisticRegression(featuresCol="result")
grid = (ParamGridBuilder()
             .addGrid(estimator.maxIter, [10])
             .build())
cv = CrossValidator(numFolds=5, estimator=estimator, estimatorParamMaps=grid, evaluator=f1_evaluator, parallelism=5)
cvModel = cv.fit(train)

In [None]:
f1_evaluator.evaluate(cvModel.transform(train))

0.8786391887471378

In [None]:
bestModel = cvModel.bestModel
estimator = LogisticRegression(featuresCol="result", maxIter=bestModel._java_obj.getMaxIter())
model = estimator.fit(train)
f1_evaluator.evaluate(model.transform(val))

0.8716447047340166

##### Decision tree

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
f1_evaluator = F1BinaryEvaluator()
estimator = DecisionTreeClassifier(featuresCol="result")
grid = (ParamGridBuilder()
             .addGrid(estimator.maxDepth, [5])
             .build())
cv = CrossValidator(numFolds=5, estimator=estimator, estimatorParamMaps=grid, evaluator=f1_evaluator, parallelism=5)
cvModel = cv.fit(train)


In [None]:
f1_evaluator.evaluate(cvModel.transform(train))

0.8658417453598176

In [None]:
bestModel = cvModel.bestModel
estimator = DecisionTreeClassifier(featuresCol="result", maxDepth=bestModel._java_obj.getMaxDepth())
model = estimator.fit(train)
f1_evaluator.evaluate(model.transform(val))

0.8421052631578947

##### SVC

In [None]:
from pyspark.ml.classification import LinearSVC

f1_evaluator = F1BinaryEvaluator()
estimator = LinearSVC(featuresCol="result")
grid = (ParamGridBuilder()
             .addGrid(estimator.maxIter, [10])
             .build())
cv = CrossValidator(numFolds=5, estimator=estimator, estimatorParamMaps=grid, evaluator=f1_evaluator, parallelism=5)
cvModel = cv.fit(train)


In [None]:
f1_evaluator.evaluate(cvModel.transform(train))

0.8738753476198265

In [None]:
bestModel = cvModel.bestModel
estimator = LinearSVC(featuresCol="result", maxIter=bestModel._java_obj.getMaxIter())
model = estimator.fit(train)
f1_evaluator.evaluate(model.transform(val))

0.8728155339805825