# Text Classification with Spark NLP

In [1]:
! pip install -q pyspark==3.4.1 spark-nlp==5.1.2

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.0/54.0 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m536.3/536.3 kB[0m [31m31.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import os
import sys

import sparknlp

from sparknlp.base import *
from sparknlp.common import *
from sparknlp.annotator import *

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession

import pandas as pd

spark = sparknlp.start()

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

spark

Spark NLP version:  5.1.2
Apache Spark version:  3.4.1


In [3]:
newsDF = spark.read \
      .option("header", True) \
      .csv("/content/news_category_train.csv")


newsDF.show(truncate=50)

+--------+--------------------------------------------------+
|category|                                       description|
+--------+--------------------------------------------------+
|Business| Short sellers, Wall Street's dwindling band of...|
|Business| Private investment firm Carlyle Group, which h...|
|Business| Soaring crude prices plus worries about the ec...|
|Business| Authorities have halted oil export flows from ...|
|Business| Tearaway world oil prices, toppling records an...|
|Business| Stocks ended slightly higher on Friday but sta...|
|Business| Assets of the nation's retail money market mut...|
|Business| Retail sales bounced back a bit in July, and n...|
|Business|" After earning a PH.D. in Sociology, Danny Baz...|
|Business| Short sellers, Wall Street's dwindling  band o...|
|Business| Soaring crude prices plus worries  about the e...|
|Business| OPEC can do nothing to douse scorching  oil pr...|
|Business| Non OPEC oil exporters should consider  increa...|
|Busines

In [4]:
newsDF.take(2)

[Row(category='Business', description=" Short sellers, Wall Street's dwindling band of ultra cynics, are seeing green again."),
 Row(category='Business', description=' Private investment firm Carlyle Group, which has a reputation for making well timed and occasionally controversial plays in the defense industry, has quietly placed its bets on another part of the market.')]

In [5]:
from pyspark.sql.functions import col

newsDF.groupBy("category") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+--------+-----+
|category|count|
+--------+-----+
|   World|30000|
|Sci/Tech|30000|
|  Sports|30000|
|Business|30000|
+--------+-----+



# Building classification Pipeline

LogReg with CountVectorizer


In [6]:
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer

In [7]:
%%time

document_assembler = DocumentAssembler() \
      .setInputCol("description") \
      .setOutputCol("document")

tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")

normalizer = Normalizer() \
      .setInputCols(["token"]) \
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

stemmer = Stemmer() \
      .setInputCols(["cleanTokens"]) \
      .setOutputCol("stem")

finisher = Finisher() \
      .setInputCols(["stem"]) \
      .setOutputCols(["token_features"]) \
      .setOutputAsArray(True) \
      .setCleanAnnotations(False)

countVectors = CountVectorizer(inputCol="token_features", outputCol="features", vocabSize=10000, minDF=5)

label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")

nlp_pipeline = Pipeline(
    stages=[document_assembler,
            tokenizer,
            normalizer,
            stopwords_cleaner,
            stemmer,
            finisher,
            countVectors,
            label_stringIdx])

nlp_model = nlp_pipeline.fit(newsDF)

processed = nlp_model.transform(newsDF)

processed.count()

CPU times: user 814 ms, sys: 90.9 ms, total: 905 ms
Wall time: 1min 44s


120000

In [8]:
processed.select('description','token_features').show(truncate=15)

+---------------+---------------+
|    description| token_features|
+---------------+---------------+
| Short selle...|[short, sell...|
| Private inv...|[privat, inv...|
| Soaring cru...|[soar, crude...|
| Authorities...|[author, hal...|
| Tearaway wo...|[tearawai, w...|
| Stocks ende...|[stock, end,...|
| Assets of t...|[asset, nati...|
| Retail sale...|[retail, sal...|
|" After earn...|[earn, phd, ...|
| Short selle...|[short, sell...|
| Soaring cru...|[soar, crude...|
| OPEC can do...|[opec, noth,...|
| Non OPEC oi...|[non, opec, ...|
| WASHINGTON/...|[washingtonn...|
| The dollar ...|[dollar, tum...|
|If you think...|[think, mai,...|
|The purchasi...|[purchas, po...|
|There is lit...|[littl, caus...|
|The US trade...|[u, trade, d...|
|Oil giant Sh...|[oil, giant,...|
+---------------+---------------+
only showing top 20 rows



In [9]:
processed.select('token_features').take(2)

[Row(token_features=['short', 'seller', 'wall', 'street', 'dwindl', 'band', 'ultra', 'cynic', 'see', 'green']),
 Row(token_features=['privat', 'invest', 'firm', 'carlyl', 'group', 'reput', 'make', 'well', 'time', 'occasion', 'controversi', 'plai', 'defens', 'industri', 'quietli', 'place', 'bet', 'anoth', 'part', 'market'])]

In [10]:
processed.select('features').take(2)

[Row(features=SparseVector(10000, {241: 1.0, 384: 1.0, 467: 1.0, 745: 1.0, 838: 1.0, 2227: 1.0, 3670: 1.0, 6142: 1.0, 6227: 1.0})),
 Row(features=SparseVector(10000, {26: 1.0, 38: 1.0, 46: 1.0, 68: 1.0, 117: 1.0, 155: 1.0, 182: 1.0, 197: 1.0, 246: 1.0, 304: 1.0, 320: 1.0, 407: 1.0, 427: 1.0, 621: 1.0, 867: 1.0, 2366: 1.0, 2825: 1.0, 2858: 1.0, 6779: 1.0}))]

In [11]:
processed.select('description','features','label').show(5)

+--------------------+--------------------+-----+
|         description|            features|label|
+--------------------+--------------------+-----+
| Short sellers, W...|(10000,[241,384,4...|  0.0|
| Private investme...|(10000,[26,38,46,...|  0.0|
| Soaring crude pr...|(10000,[15,28,46,...|  0.0|
| Authorities have...|(10000,[0,32,35,4...|  0.0|
| Tearaway world o...|(10000,[1,2,11,28...|  0.0|
+--------------------+--------------------+-----+
only showing top 5 rows



In [12]:
# set seed for reproducibility
(trainingData, testData) = processed.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 84003
Test Dataset Count: 35997


In [13]:
trainingData.printSchema()

root
 |-- category: string (nullable = true)
 |-- description: string (nullable = true)
 |-- document: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- embeddings: array (nullable = true)
 |    |    |    |-- element: float (containsNull = false)
 |-- token: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- annotatorType: string (nullable = true)
 |    |    |-- begin: integer (nullable = false)
 |    |    |-- end: integer (nullable = false)
 |    |    |-- result: string (nullable = true)
 |    |    |-- metadata: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |   

In [14]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0)

lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("description","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 15)

+---------------+--------+---------------+-----+----------+
|    description|category|    probability|label|prediction|
+---------------+--------+---------------+-----+----------+
|" U.S. stock...|Business|[0.999415625...|  0.0|       0.0|
|" U.S. blue ...|Business|[0.997383744...|  0.0|       0.0|
|Attorney Gen...|Business|[0.996676286...|  0.0|       0.0|
|" Stocks fel...|Business|[0.995670023...|  0.0|       0.0|
|The airline ...|Business|[0.994536408...|  0.0|       0.0|
|" Shares of ...|Business|[0.993787221...|  0.0|       0.0|
|" Stocks sli...|Business|[0.993759879...|  0.0|       0.0|
|" Mid priced...|Business|[0.993214287...|  0.0|       0.0|
|" Citigroup ...|Business|[0.993153775...|  0.0|       0.0|
| OPEC oil pr...|Business|[0.993096689...|  0.0|       0.0|
+---------------+--------+---------------+-----+----------+
only showing top 10 rows



In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

evaluator.evaluate(predictions)

0.901318208334548

In [16]:
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
y_true = predictions.select("label")
y_true = y_true.toPandas()

y_pred = predictions.select("prediction")
y_pred = y_pred.toPandas()

In [17]:
y_pred.prediction.value_counts()

Unnamed: 0_level_0,count
prediction,Unnamed: 1_level_1
2.0,9374
1.0,9077
0.0,9017
3.0,8529


In [18]:
cnf_matrix = confusion_matrix(list(y_true.label.astype(int)), list(y_pred.prediction.astype(int)))
cnf_matrix

array([[7797,  789,  104,  287],
       [ 699, 7899,   86,  299],
       [  47,   77, 8902,   90],
       [ 474,  312,  282, 7853]])

In [19]:
print(classification_report(y_true.label, y_pred.prediction))
print(accuracy_score(y_true.label, y_pred.prediction))

              precision    recall  f1-score   support

         0.0       0.86      0.87      0.87      8977
         1.0       0.87      0.88      0.87      8983
         2.0       0.95      0.98      0.96      9116
         3.0       0.92      0.88      0.90      8921

    accuracy                           0.90     35997
   macro avg       0.90      0.90      0.90     35997
weighted avg       0.90      0.90      0.90     35997

0.9014917909825819


LogReg with TFIDF


In [20]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="token_features", outputCol="rawFeatures", numFeatures=10000)

idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

nlp_pipeline_tf = Pipeline(
    stages=[document_assembler,
            tokenizer,
            normalizer,
            stopwords_cleaner,
            stemmer,
            finisher,
            hashingTF,
            idf,
            label_stringIdx])

nlp_model_tf = nlp_pipeline_tf.fit(newsDF)

processed_tf = nlp_model_tf.transform(newsDF)

processed_tf.count()

120000

In [21]:
# set seed for reproducibility
processed_tf.select('description','features','label').show(5)

+--------------------+--------------------+-----+
|         description|            features|label|
+--------------------+--------------------+-----+
| Short sellers, W...|(10000,[551,621,6...|  0.0|
| Private investme...|(10000,[157,831,9...|  0.0|
| Soaring crude pr...|(10000,[793,1738,...|  0.0|
| Authorities have...|(10000,[1548,1611...|  0.0|
| Tearaway world o...|(10000,[323,585,1...|  0.0|
+--------------------+--------------------+-----+
only showing top 5 rows



In [22]:
(trainingData, testData) = processed_tf.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 84003
Test Dataset Count: 35997


In [23]:
lrModel_tf = lr.fit(trainingData)

predictions_tf = lrModel_tf.transform(testData)

predictions_tf.select("description","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 15)

+---------------+--------+---------------+-----+----------+
|    description|category|    probability|label|prediction|
+---------------+--------+---------------+-----+----------+
|" U.S. stock...|Business|[0.998061427...|  0.0|       0.0|
|Attorney Gen...|Business|[0.995454968...|  0.0|       0.0|
|" Stocks fel...|Business|[0.995193741...|  0.0|       0.0|
|" U.S. regul...|Business|[0.994795976...|  0.0|       0.0|
|Former Enron...|Business|[0.993438909...|  0.0|       0.0|
|" Mid priced...|Business|[0.993402129...|  0.0|       0.0|
|" Stocks sli...|Business|[0.992725234...|  0.0|       0.0|
|In NEW YORK,...|Business|[0.992287637...|  0.0|       0.0|
| Interest ra...|Business|[0.991581683...|  0.0|       0.0|
|" Shares of ...|Business|[0.991295981...|  0.0|       0.0|
+---------------+--------+---------------+-----+----------+
only showing top 10 rows



Random Forest with TFIDF

In [24]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# train model with Training Data
rfModel = rf.fit(trainingData)
predictions_rf = rfModel.transform(testData)

In [25]:
predictions_rf.select("description","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 15)

+---------------+--------+---------------+-----+----------+
|    description|category|    probability|label|prediction|
+---------------+--------+---------------+-----+----------+
|" Stocks fel...|Business|[0.407714917...|  0.0|       0.0|
|Shares of Ma...|Business|[0.399823021...|  0.0|       0.0|
|end jewelry ...|Business|[0.397368065...|  0.0|       0.0|
|" Amazon.com...|Business|[0.388693784...|  0.0|       0.0|
|Shares of Me...|Business|[0.387333747...|  0.0|       0.0|
|Amazon.com I...|Business|[0.384674689...|  0.0|       0.0|
|Shares of dr...|Business|[0.383754243...|  0.0|       0.0|
|US investmen...|Business|[0.381760862...|  0.0|       0.0|
|Shares of Ha...|Business|[0.379005165...|  0.0|       0.0|
| Oil prices ...|Business|[0.378993627...|  0.0|       0.0|
+---------------+--------+---------------+-----+----------+
only showing top 10 rows



In [26]:
y_true = predictions_rf.select("label")
y_true = y_true.toPandas()

y_pred = predictions_rf.select("prediction")
y_pred = y_pred.toPandas()

print(classification_report(y_true.label, y_pred.prediction))
print(accuracy_score(y_true.label, y_pred.prediction))

              precision    recall  f1-score   support

         0.0       0.77      0.69      0.73      8977
         1.0       0.77      0.61      0.68      8983
         2.0       0.82      0.83      0.82      9116
         3.0       0.65      0.84      0.73      8921

    accuracy                           0.74     35997
   macro avg       0.75      0.74      0.74     35997
weighted avg       0.75      0.74      0.74     35997

0.7427007806206073


LogReg with Spark NLP Glove Word Embeddings

In [27]:
document_assembler = DocumentAssembler() \
      .setInputCol("description") \
      .setOutputCol("document")

tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")

normalizer = Normalizer() \
      .setInputCols(["token"]) \
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

glove_embeddings = WordEmbeddingsModel().pretrained() \
      .setInputCols(["document",'cleanTokens'])\
      .setOutputCol("embeddings")\
      .setCaseSensitive(False)

embeddingsSentence = SentenceEmbeddings() \
      .setInputCols(["document", "embeddings"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")

embeddings_finisher = EmbeddingsFinisher() \
      .setInputCols(["sentence_embeddings"]) \
      .setOutputCols(["finished_sentence_embeddings"]) \
      .setOutputAsVector(True)\
      .setCleanAnnotations(False)

explodeVectors = SQLTransformer(statement=
      "SELECT EXPLODE(finished_sentence_embeddings) AS features, * FROM __THIS__") # explodying array column in DataFrame in mutliple rows while retainin the other columns

label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")


nlp_pipeline_w2v = Pipeline(
    stages=[document_assembler,
            tokenizer,
            normalizer,
            stopwords_cleaner,
            glove_embeddings,
            embeddingsSentence,
            embeddings_finisher,
            explodeVectors,
            label_stringIdx])

nlp_model_w2v = nlp_pipeline_w2v.fit(newsDF)

processed_w2v = nlp_model_w2v.transform(newsDF)

processed_w2v.count()

glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]


120000

In [28]:
processed_w2v.columns

['features',
 'category',
 'description',
 'document',
 'token',
 'normalized',
 'cleanTokens',
 'embeddings',
 'sentence_embeddings',
 'finished_sentence_embeddings',
 'label']

In [29]:
processed_w2v.show(5)

+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----+
|            features|category|         description|            document|               token|          normalized|         cleanTokens|          embeddings| sentence_embeddings|finished_sentence_embeddings|label|
+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----+
|[-0.1556767076253...|Business| Short sellers, W...|[{document, 0, 84...|[{token, 1, 5, Sh...|[{token, 1, 5, Sh...|[{token, 1, 5, Sh...|[{word_embeddings...|[{sentence_embedd...|        [[-0.155676707625...|  0.0|
|[-0.0144653050228...|Business| Private investme...|[{document, 0, 20...|[{token, 1, 7, Pr...|[{token, 1, 7, Pr...|[{token, 1, 7, Pr...|[{word_e

In [30]:
processed_w2v.select('finished_sentence_embeddings').take(1)

[Row(finished_sentence_embeddings=[DenseVector([-0.1557, 0.196, 0.1099, -0.3089, 0.16, 0.1672, -0.4649, -0.1101, -0.053, -0.1551, 0.0327, 0.0772, 0.1494, -0.1865, 0.1155, -0.0597, 0.0234, -0.0451, 0.2361, -0.0089, 0.3358, 0.0444, 0.0088, -0.1453, 0.2289, 0.0914, -0.1665, -0.3726, 0.1892, 0.121, 0.1993, -0.0239, -0.1346, 0.1159, 0.2086, 0.1285, 0.068, 0.1372, 0.3153, -0.1934, 0.0257, -0.226, -0.0984, 0.1139, 0.1413, -0.3743, 0.072, 0.1403, 0.251, -0.3106, 0.1709, -0.0697, -0.0554, 0.5123, -0.1873, -1.7784, 0.0295, 0.1014, 0.9268, 0.2129, -0.1354, 0.5739, -0.0679, 0.461, 0.4216, 0.0225, 0.4456, -0.2462, 0.1411, -0.3258, 0.0025, 0.0114, -0.3895, -0.1106, -0.261, 0.0147, 0.0781, 0.1268, -0.2042, -0.2278, 0.5096, 0.1539, -0.3515, -0.0102, -0.7003, -0.3872, -0.1668, -0.2405, -0.0766, 0.1396, -0.0592, -0.1568, -0.1606, -0.1371, -0.684, -0.2549, -0.1541, 0.1536, 0.2715, 0.3342])])]

In [31]:
from pyspark.sql.functions import explode

# processed_w2v= processed_w2v.withColumn("features", explode(processed_w2v.finished_sentence_embeddings))

In [32]:
processed_w2v.select("features").take(1)

[Row(features=DenseVector([-0.1557, 0.196, 0.1099, -0.3089, 0.16, 0.1672, -0.4649, -0.1101, -0.053, -0.1551, 0.0327, 0.0772, 0.1494, -0.1865, 0.1155, -0.0597, 0.0234, -0.0451, 0.2361, -0.0089, 0.3358, 0.0444, 0.0088, -0.1453, 0.2289, 0.0914, -0.1665, -0.3726, 0.1892, 0.121, 0.1993, -0.0239, -0.1346, 0.1159, 0.2086, 0.1285, 0.068, 0.1372, 0.3153, -0.1934, 0.0257, -0.226, -0.0984, 0.1139, 0.1413, -0.3743, 0.072, 0.1403, 0.251, -0.3106, 0.1709, -0.0697, -0.0554, 0.5123, -0.1873, -1.7784, 0.0295, 0.1014, 0.9268, 0.2129, -0.1354, 0.5739, -0.0679, 0.461, 0.4216, 0.0225, 0.4456, -0.2462, 0.1411, -0.3258, 0.0025, 0.0114, -0.3895, -0.1106, -0.261, 0.0147, 0.0781, 0.1268, -0.2042, -0.2278, 0.5096, 0.1539, -0.3515, -0.0102, -0.7003, -0.3872, -0.1668, -0.2405, -0.0766, 0.1396, -0.0592, -0.1568, -0.1606, -0.1371, -0.684, -0.2549, -0.1541, 0.1536, 0.2715, 0.3342]))]

In [33]:
processed_w2v.select("features").show(5)

+--------------------+
|            features|
+--------------------+
|[-0.1556767076253...|
|[-0.0144653050228...|
|[0.10348732769489...|
|[-0.0355810523033...|
|[0.00647281948477...|
+--------------------+
only showing top 5 rows



In [34]:
processed_w2v.select('description','features','label').show(5)

+--------------------+--------------------+-----+
|         description|            features|label|
+--------------------+--------------------+-----+
| Short sellers, W...|[-0.1556767076253...|  0.0|
| Private investme...|[-0.0144653050228...|  0.0|
| Soaring crude pr...|[0.10348732769489...|  0.0|
| Authorities have...|[-0.0355810523033...|  0.0|
| Tearaway world o...|[0.00647281948477...|  0.0|
+--------------------+--------------------+-----+
only showing top 5 rows



In [35]:
# set seed for reproducibility
(trainingData, testData) = processed_w2v.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 84003
Test Dataset Count: 35997


In [None]:
from pyspark.sql.functions import udf

# filter rows in a DataFrame based on the number of non-zero elements in a features column.

@udf("long")
def num_nonzeros(v):
    return v.numNonzeros()
# Filter out rows with zero-dimension features in both trainingData and testData
trainingData = trainingData.where(num_nonzeros("features") != 0)
testData = testData.where(num_nonzeros("features") != 0)

# Alternatively, you can filter based on the size of finished_sentence_embeddings
# trainingData = trainingData.filter(size(trainingData["finished_sentence_embeddings"]) > 0)
# testData = testData.filter(size(testData["finished_sentence_embeddings"]) > 0)

# Another option: replace zero vectors with a vector of ones with the correct dimension
# First, get the dimension of the feature vectors
feature_dim = len(trainingData.select("features").first()[0])

# Define a UDF to replace zero vectors
@udf("array<double>")
def replace_zero_vector(v):
    if v.numNonzeros() == 0:
        return [1.0] * feature_dim  # Replace with a vector of ones
    else:
        return v.toArray().tolist()
# Apply the UDF to the features column
trainingData = trainingData.withColumn("features", replace_zero_vector("features"))
testData = testData.withColumn("features", replace_zero_vector("features"))


In [37]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

# Define a UDF to convert the array<double> to VectorUDT
@udf(returnType=VectorUDT())
def array_to_vector(features):
    return Vectors.dense(features)

# Apply the UDF to the 'features' column of both trainingData and testData
trainingData = trainingData.withColumn("features", array_to_vector("features"))
testData = testData.withColumn("features", array_to_vector("features"))

# Now you can fit the model
lrModel_w2v = lr.fit(trainingData)

In [38]:
predictions_w2v = lrModel_w2v.transform(testData)

predictions_w2v.select("description","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 15)

+---------------+--------+---------------+-----+----------+
|    description|category|    probability|label|prediction|
+---------------+--------+---------------+-----+----------+
|THE stock ma...|Business|[0.989105694...|  0.0|       0.0|
|Wachovia Cor...|Business|[0.988506150...|  0.0|       0.0|
| Stocks fell...|Business|[0.985367050...|  0.0|       0.0|
|Japan #39;s ...|Business|[0.983635621...|  0.0|       0.0|
| ChevronTexa...|Business|[0.981973089...|  0.0|       0.0|
| US investme...|Business|[0.981749547...|  0.0|       0.0|
|  Shares of ...|Business|[0.981273669...|  0.0|       0.0|
| Goldman Sac...|Business|[0.981270793...|  0.0|       0.0|
| Tokyo stock...|Business|[0.980858489...|  0.0|       0.0|
|British Airw...|Business|[0.979088218...|  0.0|       0.0|
+---------------+--------+---------------+-----+----------+
only showing top 10 rows



In [39]:
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import pandas as pd

y_true = predictions_w2v.select("label")
y_true = y_true.toPandas()

y_pred = predictions_w2v.select("prediction")
y_pred = y_pred.toPandas()

print(classification_report(y_true.label, y_pred.prediction))
print(accuracy_score(y_true.label, y_pred.prediction))

              precision    recall  f1-score   support

         0.0       0.83      0.82      0.82      9051
         1.0       0.82      0.81      0.82      9057
         2.0       0.93      0.96      0.94      8972
         3.0       0.88      0.87      0.87      8917

    accuracy                           0.86     35997
   macro avg       0.86      0.86      0.86     35997
weighted avg       0.86      0.86      0.86     35997

0.8643775870211406


In [40]:
processed_w2v.select('description','cleanTokens.result').show(truncate=15)

+---------------+---------------+
|    description|         result|
+---------------+---------------+
| Short selle...|[Short, sell...|
| Private inv...|[Private, in...|
| Soaring cru...|[Soaring, cr...|
| Authorities...|[Authorities...|
| Tearaway wo...|[Tearaway, w...|
| Stocks ende...|[Stocks, end...|
| Assets of t...|[Assets, nat...|
| Retail sale...|[Retail, sal...|
|" After earn...|[earning, PH...|
| Short selle...|[Short, sell...|
| Soaring cru...|[Soaring, cr...|
| OPEC can do...|[OPEC, nothi...|
| Non OPEC oi...|[Non, OPEC, ...|
| WASHINGTON/...|[WASHINGTONN...|
| The dollar ...|[dollar, tum...|
|If you think...|[think, may,...|
|The purchasi...|[purchasing,...|
|There is lit...|[little, cau...|
|The US trade...|[US, trade, ...|
|Oil giant Sh...|[Oil, giant,...|
+---------------+---------------+
only showing top 20 rows



LogReg with Spark NLP Bert Embeddings

In [41]:
document_assembler = DocumentAssembler() \
      .setInputCol("description") \
      .setOutputCol("document")

tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")

normalizer = Normalizer() \
      .setInputCols(["token"]) \
      .setOutputCol("normalized")

stopwords_cleaner = StopWordsCleaner()\
      .setInputCols("normalized")\
      .setOutputCol("cleanTokens")\
      .setCaseSensitive(False)

bert_embeddings = BertEmbeddings.pretrained('bert_base_cased', 'en') \
      .setInputCols(["document",'cleanTokens'])\
      .setOutputCol("bert")\
      .setCaseSensitive(False)\

embeddingsSentence = SentenceEmbeddings() \
      .setInputCols(["document", "bert"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")

embeddings_finisher = EmbeddingsFinisher() \
      .setInputCols(["sentence_embeddings"]) \
      .setOutputCols(["finished_sentence_embeddings"]) \
      .setOutputAsVector(True)\
      .setCleanAnnotations(False)

label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")


nlp_pipeline_bert = Pipeline(
    stages=[document_assembler,
            tokenizer,
            normalizer,
            stopwords_cleaner,
            bert_embeddings,
            embeddingsSentence,
            embeddings_finisher,
            label_stringIdx])


bert_base_cased download started this may take some time.
Approximate size to download 384.9 MB
[OK!]


In [43]:
%%time
limited_df = newsDF.limit(1000)

nlp_model_bert = nlp_pipeline_bert.fit(limited_df)

processed_bert = nlp_model_bert.transform(limited_df)

processed_bert.count()

CPU times: user 2.68 s, sys: 303 ms, total: 2.98 s
Wall time: 14min 31s


1000

In [44]:
from pyspark.sql.functions import explode

processed_bert = processed_bert.withColumn("features", explode(processed_bert.finished_sentence_embeddings))

processed_bert.select('description','features','label').show()

+--------------------+--------------------+-----+
|         description|            features|label|
+--------------------+--------------------+-----+
| Short sellers, W...|[-0.0012149482499...|  2.0|
| Private investme...|[0.13144019246101...|  2.0|
| Soaring crude pr...|[-0.1905521601438...|  2.0|
| Authorities have...|[0.06882479041814...|  2.0|
| Tearaway world o...|[-0.1174716278910...|  2.0|
| Stocks ended sli...|[-0.0321817845106...|  2.0|
| Assets of the na...|[-0.2906664013862...|  2.0|
| Retail sales bou...|[-0.0385283492505...|  2.0|
|" After earning a...|[-0.0362812504172...|  2.0|
| Short sellers, W...|[-0.0012149482499...|  2.0|
| Soaring crude pr...|[-0.1905521601438...|  2.0|
| OPEC can do noth...|[-0.1431127935647...|  2.0|
| Non OPEC oil exp...|[0.01600192859768...|  2.0|
| WASHINGTON/NEW Y...|[0.14494347572326...|  2.0|
| The dollar tumbl...|[-0.1958881020545...|  2.0|
|If you think you ...|[0.27292791008949...|  2.0|
|The purchasing po...|[0.00386757543310...|  2.0|


In [45]:
# set seed for reproducibility
(trainingData, testData) = processed_bert.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 703
Test Dataset Count: 297


In [46]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

lrModel = lr.fit(trainingData)

In [47]:
from pyspark.sql.functions import udf

@udf("long")
def num_nonzeros(v):
    return v.numNonzeros()

testData = testData.where(num_nonzeros("features") != 0)

In [48]:
predictions = lrModel.transform(testData)

predictions.select("description","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 15)

+---------------+--------+---------------+-----+----------+
|    description|category|    probability|label|prediction|
+---------------+--------+---------------+-----+----------+
|Prototype 90...|Sci/Tech|[0.997235280...|  0.0|       0.0|
|  In this un...|Sci/Tech|[0.996880983...|  0.0|       0.0|
| Gamma ray b...|Sci/Tech|[0.995862429...|  0.0|       0.0|
|At the outer...|Sci/Tech|[0.995525995...|  0.0|       0.0|
| Designed to...|Sci/Tech|[0.994857232...|  0.0|       0.0|
|The jury's s...|Business|[0.994484828...|  2.0|       0.0|
|It's ironic ...|Sci/Tech|[0.993564951...|  0.0|       0.0|
|\\For some r...|Sci/Tech|[0.993047116...|  0.0|       0.0|
|A possible T...|Sci/Tech|[0.992974608...|  0.0|       0.0|
|Research In ...|Sci/Tech|[0.992067154...|  0.0|       0.0|
+---------------+--------+---------------+-----+----------+
only showing top 10 rows



In [49]:
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
import pandas as pd

df = predictions.select('description','category','label','prediction').toPandas()

print(classification_report(df.label, df.prediction))
print(accuracy_score(df.label, df.prediction))

              precision    recall  f1-score   support

         0.0       0.80      0.89      0.84       133
         1.0       0.84      0.76      0.80        71
         2.0       0.66      0.55      0.60        53
         3.0       0.86      0.90      0.88        40

    accuracy                           0.80       297
   macro avg       0.79      0.77      0.78       297
weighted avg       0.79      0.80      0.79       297

0.797979797979798


In [50]:
!jupyter nbconvert --to html "/content/Text_Classification_with_Spark_NLP.ipynb"

[NbConvertApp] Converting notebook /content/Text_Classification_with_Spark_NLP.ipynb to html
[NbConvertApp] Writing 395197 bytes to /content/Text_Classification_with_Spark_NLP.html
