In [None]:
! pip install -q pyspark==3.3.0 spark-nlp==4.2.0

In [None]:
import os
import sys

import sparknlp

from sparknlp.base import *
from sparknlp.common import *
from sparknlp.annotator import *

from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession

import pandas as pd

spark = sparknlp.start()#gpu=True

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

spark

Spark NLP version:  4.2.0
Apache Spark version:  3.3.0


In [None]:
df = spark.read.format("csv").option("header", True).option("multiline", True).option("escape", "\"").load("train.csv")

# Reanme label column for using StringIndexer
df = df.withColumnRenamed("label","label_str")

df.count()

20800

# TF-IDF + LogReg

### Cleaning

In [None]:
# Expanding contractions and lowering
from pyspark.sql.functions import lower, regexp_replace

replacement_patterns = [
  (r'won\'t', 'will not'),
  (r'can\'t', 'cannot'),
  (r'i\'m', 'i am'),
  (r'ain\'t', 'is not'),
  (r'(\w+)\'ll', '\g<1> will'),
  (r'(\w+)n\'t', '\g<1> not'),
  (r'(\w+)\'ve', '\g<1> have'),
  (r'(\w+)\'s', '\g<1> is'),
  (r'(\w+)\'re', '\g<1> are'),
  (r'(\w+)\'d', '\g<1> would')
]

# create a copy
df_clean = df.alias('df_clean')

for (pattern, repl) in replacement_patterns:
  df_clean = df_clean.select('id', 
          (lower(regexp_replace('text', pattern, repl)).alias('text')), 
          (lower(regexp_replace('title', pattern, repl)).alias('title')),
          'label_str')

df_clean.show()

+---+--------------------+--------------------+---------+
| id|                text|               title|label_str|
+---+--------------------+--------------------+---------+
|  0|house dem aide: w...|house dem aide: w...|        1|
|  1|ever get the feel...|flynn: hillary cl...|        0|
|  2|why the truth mig...|why the truth mig...|        1|
|  3|videos 15 civilia...|15 civilians kill...|        1|
|  4|print \nan irania...|iranian woman jai...|        1|
|  5|in these trying t...|jackie mason: hol...|        0|
|  6|ever wonder how b...|life: life of lux...|        1|
|  7|paris  —   france...|benoît hamon wins...|        0|
|  8|donald j. trump i...|excerpts from a d...|        0|
|  9|a week before mic...|a back-channel pl...|        0|
| 10|organizing for ac...|obama’s organizin...|        0|
| 11|the bbc produced ...|bbc comedy sketch...|        0|
| 12|the mystery surro...|russian researche...|        1|
| 13|clinton campaign ...|us officials see ...|        1|
| 14|yes, ther

In [None]:
# Removing puntuation and digits
df_clean = df_clean.select('id', 
                     (regexp_replace('text', "[^a-z\\s]", "").alias('text')),
                     (regexp_replace('title', "[^a-z\\s]", "").alias('title')),
                     'label_str')

# Removing extra spaces
df_clean = df_clean.select('id', 
                     (regexp_replace('text', r'\s+', " ").alias('text')),
                     (regexp_replace('title', r'\s+', " ").alias('title')),
                     'label_str')


In [None]:
(trainingData, testData) = df_clean.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 14584
Test Dataset Count: 6216


### Titles 

In [None]:
%%time

document_assembler = DocumentAssembler() \
      .setInputCol("title") \
      .setOutputCol("document")
    
tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")
      
normalizer = Normalizer() \
      .setInputCols(["token"]) \
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

finisher = Finisher() \
      .setInputCols(["cleanTokens"]) \
      .setOutputCols(["token_features"]) \
      .setOutputAsArray(True) \
      .setCleanAnnotations(False)


label_stringIdx = StringIndexer(inputCol = "label_str", outputCol = "label")

hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=10000)

idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) 

nlp_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            finisher,
            hashingTF,
            idf,
            label_stringIdx])

nlp_model = nlp_pipeline.fit(trainingData)

trainingData_processed = nlp_model.transform(trainingData)
testData_processed = nlp_model.transform(testData)

CPU times: user 1.39 s, sys: 177 ms, total: 1.57 s
Wall time: 3min 21s


In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0)

lrModel_tf = lr.fit(trainingData_processed)

predictions_tf = lrModel_tf.transform(testData_processed)

result = predictions_tf.select('title', 'label', 'prediction').toPandas()

In [None]:
# Compute metrics
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

print(classification_report(result.label, result.prediction))
print(accuracy_score(result.label, result.prediction))

              precision    recall  f1-score   support

         0.0       0.89      0.96      0.93      3057
         1.0       0.96      0.89      0.92      3159

    accuracy                           0.92      6216
   macro avg       0.93      0.92      0.92      6216
weighted avg       0.93      0.92      0.92      6216

0.924066924066924


In [None]:
result.to_csv('tfidf_titles.csv')

### Text

In [None]:
%%time

document_assembler = DocumentAssembler() \
      .setInputCol("text") \
      .setOutputCol("document")
    
tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")
      
normalizer = Normalizer() \
      .setInputCols(["token"]) \
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

finisher = Finisher() \
      .setInputCols(["cleanTokens"]) \
      .setOutputCols(["token_features"]) \
      .setOutputAsArray(True) \
      .setCleanAnnotations(False)

label_stringIdx = StringIndexer(inputCol = "label_str", outputCol = "label")

hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=10000)

idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

nlp_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            stopwords_cleaner, 
            finisher,
            hashingTF,
            idf,
            label_stringIdx])

nlp_model = nlp_pipeline.fit(trainingData)

trainingData_processed = nlp_model.transform(trainingData)
testData_processed = nlp_model.transform(testData)


CPU times: user 3.99 s, sys: 581 ms, total: 4.57 s
Wall time: 11min 3s


In [None]:
lrModel_tf = lr.fit(trainingData_processed)

predictions_tf = lrModel_tf.transform(testData_processed)

results = predictions_tf.select('text', 'label', 'prediction').toPandas()

In [None]:
# Compute metrics
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

print(classification_report(results.label, results.prediction))
print(accuracy_score(results.label, results.prediction))

              precision    recall  f1-score   support

         0.0       0.87      0.96      0.91      3057
         1.0       0.96      0.86      0.91      3159

    accuracy                           0.91      6216
   macro avg       0.91      0.91      0.91      6216
weighted avg       0.92      0.91      0.91      6216

0.911036036036036


In [None]:
results.to_csv('tfidf_text.csv')

# BERT + SGD

In [None]:
# Using raw text (without cleaning part)
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 14584
Test Dataset Count: 6216


### Titles

In [None]:
document_assembler = DocumentAssembler()\
  .setInputCol("title")\
  .setOutputCol("document")

tokenizer = Tokenizer().setInputCols(["document"])\
  .setOutputCol("token")
 
word_embeddings = BertEmbeddings.pretrained('bert_base_cased', 'en')\
  .setInputCols(["document", "token"])\
  .setOutputCol("embeddings")

label_stringIdx = StringIndexer(inputCol = "label_str", outputCol = "label")

bert_pipeline = Pipeline().setStages(
  [
    document_assembler,
    tokenizer,
    word_embeddings,
   label_stringIdx
  ]
)

bert_model = bert_pipeline.fit(trainingData)
trainingData_processed = bert_model.transform(trainingData)
testData_processed = bert_model.transform(testData)

bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
[OK!]


In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T


from pyspark.ml.classification import GBTClassifier
from pyspark.ml.linalg import Vectors, VectorUDT

#Average pooling -> sentence embeddings
def avg_vectors(bert_vectors):
  length = 768
  avg_vec = [0] * length
  for vec in bert_vectors:
    for i, x in enumerate(vec["embeddings"]):
      avg_vec[i] += x
    avg_vec[i] = avg_vec[i] / length
  return avg_vec

#create a udf
avg_vectors_udf = F.udf(avg_vectors, T.ArrayType(T.DoubleType()))
df_doc_vec_train = trainingData_processed.withColumn("doc_vector", avg_vectors_udf(F.col("embeddings")))
df_doc_vec_test = testData_processed.withColumn("doc_vector", avg_vectors_udf(F.col("embeddings")))


def dense_vector(vec):
	return Vectors.dense(vec)

dense_vector_udf = F.udf(dense_vector, VectorUDT())
training = df_doc_vec_train.withColumn("features", dense_vector_udf(F.col("doc_vector")))
test = df_doc_vec_test.withColumn("features", dense_vector_udf(F.col("doc_vector")))

sgd = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
sgdModelBert = sgd.fit(training)

predictions_bert = sgdModelBert.transform(test)

In [None]:
# Compute metrics
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

results = predictions_bert.select('title', 'label', 'prediction').toPandas()

print(classification_report(results.label, results.prediction))
print(accuracy_score(results.label, results.prediction))

              precision    recall  f1-score   support

         0.0       0.83      0.93      0.88      3057
         1.0       0.93      0.81      0.87      3159

    accuracy                           0.87      6216
   macro avg       0.88      0.87      0.87      6216
weighted avg       0.88      0.87      0.87      6216

0.8716216216216216


In [None]:
results.to_csv('bert_titles.csv')

### Text

In [None]:
document_assembler = DocumentAssembler()\
  .setInputCol("text")\
  .setOutputCol("document")

tokenizer = Tokenizer().setInputCols(["document"])\
  .setOutputCol("token")
 
word_embeddings = BertEmbeddings.pretrained('bert_base_cased', 'en')\
  .setInputCols(["document", "token"])\
  .setOutputCol("embeddings")

label_stringIdx = StringIndexer(inputCol = "label_str", outputCol = "label")

bert_pipeline = Pipeline().setStages(
  [
    document_assembler,
    tokenizer,
    word_embeddings,
   label_stringIdx
  ]
)


bert_model = bert_pipeline.fit(trainingData)
trainingData_processed = bert_model.transform(trainingData)
testData_processed = bert_model.transform(testData)

bert_base_cased download started this may take some time.
Approximate size to download 389.1 MB
[OK!]


In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.ml.classification import LogisticRegression, GBTClassifier
from pyspark.ml.linalg import Vectors, VectorUDT

def avg_vectors(bert_vectors):
  length = 768
  avg_vec = [0] * length
  for vec in bert_vectors:
    for i, x in enumerate(vec["embeddings"]):
      avg_vec[i] += x
    avg_vec[i] = avg_vec[i] / length
  return avg_vec


#create a udf
avg_vectors_udf = F.udf(avg_vectors, T.ArrayType(T.DoubleType()))
df_doc_vec_train = trainingData_processed.withColumn("doc_vector", avg_vectors_udf(F.col("embeddings")))
df_doc_vec_test = testData_processed.withColumn("doc_vector", avg_vectors_udf(F.col("embeddings")))


def dense_vector(vec):
	return Vectors.dense(vec)

dense_vector_udf = F.udf(dense_vector, VectorUDT())
training = df_doc_vec_train.withColumn("features", dense_vector_udf(F.col("doc_vector")))
test = df_doc_vec_test.withColumn("features", dense_vector_udf(F.col("doc_vector")))


In [None]:
sgd = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
sgdModelBert = sgd.fit(training)
predictions_bert = sgdModelBert.transform(test)

In [None]:
# Compute metrics
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score

results = predictions_bert.select('text', 'label', 'prediction').toPandas()

print(classification_report(results.label, results.prediction))
print(accuracy_score(results.label, results.prediction))

              precision    recall  f1-score   support

         0.0       0.80      0.78      0.79      3057
         1.0       0.79      0.81      0.80      3159

    accuracy                           0.79      6216
   macro avg       0.79      0.79      0.79      6216
weighted avg       0.79      0.79      0.79      6216

0.7924710424710425


In [None]:
results.to_csv('bert_text.csv')