# Yahoo Topic Classification

In this notebook we create document embeddings using BERT, and test whether we can classify yahoo topics using an SVM with linear kernel. BERT model is available via SparkNLP.

The preprocessing steps are different to when we use a non deep learning solution, as was the case with the TFI-IDF document embeddings.

## Setting up Libraries and Environments

In [4]:
import findspark
findspark.init()


In [5]:
# Basic
import pandas as pd
import numpy as np
import pyspark
from pyspark import SparkFiles

# Data Manipulation
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window

# ML
from pyspark.ml.feature import *
from pyspark.ml.linalg import Vector
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC, OneVsRest, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [6]:
# Import Spark NLP
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.common import *
#from sparknlp.embeddings import *

# Start Spark Session with Spark NLP
from pyspark.sql import SparkSession
spark = sparknlp.start()

spark = SparkSession \
.builder \
.config("spark.driver.extraClassPath", "lib/sparknlp.jar") \
.config("sparl.executor.extraClassPath", "lib/sparknlp.jar") \
.getOrCreate()


'''
# extra args from tutorial
.config("spark.driver.memory","16G")\
.config("spark.driver.maxResultSize", "0") \
.config("spark.kryoserializer.buffer.max", "2000M")\
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.2.4")\
'''

## Data Import

In [7]:
schema_list = [
    StructField('Label', StringType(), False), 
    StructField('Title', StringType(), True), 
    StructField('Content', StringType(), True),
    StructField('Answer', StringType(), True),
    StructField('Set', StringType(), True)
]
schema_df = StructType(fields=schema_list)

In [8]:
# Import Data from reduced folders
df = spark.read.csv("../data/reduced", schema=schema_df)

## Data Preview
This includes some basic view of the data before processing

In [9]:
df.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+-----+--------------------+--------------------+--------------------+-----+
|Label|               Title|             Content|              Answer|  Set|
+-----+--------------------+--------------------+--------------------+-----+
|    9|I've tried talkin...|                null|~I was going to s...|Train|
|    2|Find the coordina...|Find the coordina...|"Let P be (a,b). ...|Train|
|    6|       Who will win?|Who's gonna win t...|ND go irish. USC ...|Train|
|   10|    Where do I vote?|                null|If you don't know...|Train|
|    2| help for the s.a.t?|can anyone give m...|Try some of these...|Train|
+-----+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



                                                                                

In [10]:
df.describe().show()



+-------+------------------+--------------------+--------------------------------+----------------------+------+
|summary|             Label|               Title|                         Content|                Answer|   Set|
+-------+------------------+--------------------+--------------------------------+----------------------+------+
|  count|            365124|              365124|                          201247|                358339|365124|
|   mean| 5.493070847164251|                null|                        Infinity|              Infinity|  null|
| stddev|2.8693308861059896|                null|                             NaN|                   NaN|  null|
|    min|                 1|! Does anyone els...|                               !|  ! ""Please excuse...|  Test|
|    max|                 9|﻿Speculate about ...|나보기가역겨워. 가실 때에는. ...|혼돈\nhttp://dictio...| Train|
+-------+------------------+--------------------+--------------------------------+----------------------+-----

                                                                                

## Data Processing

### Steps to Take
- Merge the title, content, and answer fields.
- Rename columns
- Document Assembly
- Sentence Detection
- Bert Sentence Embedding


#### Column Concatenation

In [14]:
df = df.fillna('')


In [15]:
df = df.withColumn('Text', concat(df['Title'], df['Content'], df['Answer'])).select(['Text', 'Set', 'Label'])

df.show(5)

AnalysisException: Cannot resolve column name "Title" among (Text, Set, Label)

#### Document Assembler

Transformer to get raw text to annotator form for processing

In [16]:
documentAssembler = DocumentAssembler() \
    .setInputCol("Text") \
    .setOutputCol("document")

#### Sentence Detector

Takes Document annotation input and identifies sentences. As our input data consists of yahoo answers, it's expected that there will be several sentences in a lot of the answers. This added context is helpful to the BERT embedding model.

In [17]:
sentenceDetector = SentenceDetector()\
    .setInputCols(["document"])\
    .setOutputCol("sentence")

#### Word Embeddings

Using pre-trained BERT model, we generate word embeddings

In [18]:
wordEmbeddings = BertSentenceEmbeddings().pretrained('sent_small_bert_L2_768', 'en')\
  .setInputCols(["sentence"])\
  .setOutputCol("sentence_embeddings")

sent_small_bert_L2_768 download started this may take some time.
Approximate size to download 139.6 MB
[ | ]sent_small_bert_L2_768 download started this may take some time.
Approximate size to download 139.6 MB
Download done! Loading the resource.
[ — ]

2022-12-15 22:26:01.109064: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


[OK!]


#### Embedding Finisher
Take output of Sentence embedding and return in array form

In [19]:
embeddingsFinisher = EmbeddingsFinisher() \
    .setInputCols(["sentence_embeddings"]) \
    .setOutputCols("embeddings") \
    .setOutputAsVector(True)

In [20]:
stringIndexer = StringIndexer(inputCol='Label', outputCol='LabelString')

### Full Processing Pipeline

In [21]:
pipeline = Pipeline().setStages(
    [
        documentAssembler,
        sentenceDetector,
        wordEmbeddings,
        embeddingsFinisher,
    ]
)

In [22]:
df_processed = pipeline.fit(df).transform(df)

In [23]:
df_processed.show(5)

[Stage 7:>                                                          (0 + 1) / 1]

+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+
|                Text|  Set|Label|            document|            sentence| sentence_embeddings|          embeddings|
+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+
|I've tried talkin...|Train|    9|[{document, 0, 25...|[{document, 0, 79...|[{sentence_embedd...|[[0.2289849370718...|
|Find the coordina...|Train|    2|[{document, 0, 11...|[{document, 0, 28...|[{sentence_embedd...|[[0.4521834552288...|
|Who will win?Who'...|Train|    6|[{document, 0, 29...|[{document, 0, 83...|[{sentence_embedd...|[[0.2233218699693...|
|Where do I vote?I...|Train|   10|[{document, 0, 70...|[{document, 0, 70...|[{sentence_embedd...|[[0.1928332597017...|
|help for the s.a....|Train|    2|[{document, 0, 37...|[{document, 0, 57...|[{sentence_embedd...|[[0.3777643740177...|
+--------------------+-----+-----+--------------

                                                                                

In [24]:
train = df_processed.filter('Set == "Train"')
test = df_processed.filter('Set == "Test"')

In [25]:
train.show(5)

[Stage 8:>                                                          (0 + 1) / 1]

+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+
|                Text|  Set|Label|            document|            sentence| sentence_embeddings|          embeddings|
+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+
|I've tried talkin...|Train|    9|[{document, 0, 25...|[{document, 0, 79...|[{sentence_embedd...|[[0.2289849370718...|
|Find the coordina...|Train|    2|[{document, 0, 11...|[{document, 0, 28...|[{sentence_embedd...|[[0.4521834552288...|
|Who will win?Who'...|Train|    6|[{document, 0, 29...|[{document, 0, 83...|[{sentence_embedd...|[[0.2233218699693...|
|Where do I vote?I...|Train|   10|[{document, 0, 70...|[{document, 0, 70...|[{sentence_embedd...|[[0.1928332597017...|
|help for the s.a....|Train|    2|[{document, 0, 37...|[{document, 0, 57...|[{sentence_embedd...|[[0.3777643740177...|
+--------------------+-----+-----+--------------

                                                                                

In [26]:
print(f'train: {train.count()} and test: {test.count()}')

ERROR:root:KeyboardInterrupt while sending command.                (0 + 8) / 10]
Traceback (most recent call last):
  File "/mnt/c/Users/joluw/hadoop/spark-3.2.2/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/mnt/c/Users/joluw/hadoop/spark-3.2.2/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
train.count()

[Stage 9:>                 (0 + 8) / 10][Stage 10:>                (0 + 0) / 10]

In [None]:
def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

### Classification - SVM

In [26]:
svm = LinearSVC()
ovr = OneVsRest(classifier=svm, featuresCol='embeddings', labelCol='Label')

In [27]:
ovrModel = ovr.fit(train)
predictions = ovrModel.transform(test)

ERROR:root:KeyboardInterrupt while sending command.                (0 + 0) / 10]
Traceback (most recent call last):
  File "/mnt/c/Users/joluw/hadoop/spark-3.2.2/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/mnt/c/Users/joluw/hadoop/spark-3.2.2/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
predictions.show()

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol='Label')

In [None]:
print(f'Accuracy {evaluator.evaluate(predictions)}')