#Prepare for Model Creation
##Download packages
There are two versions of SparkNLP that we could utilize (both work) however we vyed for the newer 3.1.2 version of Apache Spark which works with the newest version of Spark NLP (version 3.3.1).

In [1]:
import os
# > Old Package Versions
# # Install java
# ! apt-get update -qq
# ! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
# ! java -version
# # Install pyspark
# ! pip install --ignore-installed pyspark==2.4.4
# # Install Spark NLP
# ! pip install --ignore-installed spark-nlp==2.5.1

# > New Package Versions
! pip install -q pyspark==3.1.2 spark-nlp


[K     |████████████████████████████████| 212.4 MB 65 kB/s 
[K     |████████████████████████████████| 122 kB 52.5 MB/s 
[K     |████████████████████████████████| 198 kB 36.3 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## Import the packages
Now that we've downloaded the necessary packages we import them and instantiate a spark session. We set the `gpu` parameter to `True` even though this CoLab session doesn't have GPU equipped as we would prefer to use GPU when possible. We then print out the package versions to ensure we have the versions we believe we have installed.

In [2]:
import sparknlp
spark = sparknlp.start(gpu = True) # for GPU training
from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
import pandas as pd

print("Spark NLP version", sparknlp.version())
print("Apache Spark version:", spark.version)

spark

Spark NLP version 3.3.1
Apache Spark version: 3.1.2


##Get the dataset
###Data cleaning
This dataset is downloaded from Kaggle, specifically from [this site](www.google.com). We then import it as a json file (some preprocessing has been done via Java) and clean it up a little more to the Spark NLP format. We'll also split the dataset up here by doing a 25% test and 75% train ratio.

In [3]:
import json
import numpy as np
np.random.seed(0)  # for consistency
# open the training and the testing sets
with open('trainTone.clean.txt', 'wt') as writer_train:
  with open('testTone.clean.txt', 'wt') as writer_test:
    # write the headers for both
    writer_train.writelines('category,description\n')
    writer_test.writelines('category,description\n')
    # loop through the lines of the full unnormalized trainTone dataset
    lines = json.load(open('trainTone.txt'))
    for line in lines:
      # get the relevant data and construct the line
      tone,sentence = line['tone'].title(),line['sentence']
      output_line = f'{tone},"{sentence}"\n'
      write_to_train = np.random.uniform(0,1) > 0.25
      if(write_to_train):
        writer_train.writelines(output_line)
      else:
        writer_test.writelines(output_line)

###Read in the datasets
We load the training and testing datasets and show a brief set of lines from each one to ensure data quality.

In [4]:
# load the training dataset
trainDataset = spark.read.option('header', True).csv('trainTone.clean.txt')
trainDataset.show(truncate=50, n=5)

+--------+--------------------------------------------------+
|category|                                       description|
+--------+--------------------------------------------------+
| Sadness|                           i didnt feel humiliated|
| Sadness|i can go from feeling so hopeless to so damned ...|
|   Anger|  im grabbing a minute to post i feel greedy wrong|
|    Love|i am ever feeling nostalgic about the fireplace...|
|   Anger|                              i am feeling grouchy|
+--------+--------------------------------------------------+
only showing top 5 rows



In [5]:
# load the testing dataset
testDataset = spark.read.option('header', True).csv('testTone.clean.txt')
testDataset.show(truncate=50, n=5)

+--------+--------------------------------------------------+
|category|                                       description|
+--------+--------------------------------------------------+
|     Joy|i have immense sympathy with the general point ...|
|     Joy|   i do not feel reassured anxiety is on each side|
| Sadness|              i didnt really feel that embarrassed|
|   Anger|i already feel like i fucked up though because ...|
| Sadness|i feel so inhibited in someone elses kitchen li...|
+--------+--------------------------------------------------+
only showing top 5 rows



###Checkout the class distribution
Our dataset is multiclass but this doesn't mean there are balanced classes so we should read the classes to see how this may affect our data (less-represented classes may not be predicted as often).

In [6]:
from pyspark.sql.functions import col
print('Training dataset class distribution...')
trainDataset.groupBy('category').count().orderBy(col('count').desc()).show()
print('Testing dataset class distribution...')
testDataset.groupBy('category').count().orderBy(col('count').desc()).show()

Training dataset class distribution...
+--------+-----+
|category|count|
+--------+-----+
|     Joy| 3984|
| Sadness| 3459|
|   Anger| 1629|
|    Fear| 1433|
|    Love|  976|
|Surprise|  418|
+--------+-----+

Testing dataset class distribution...
+--------+-----+
|category|count|
+--------+-----+
|     Joy| 1378|
| Sadness| 1207|
|   Anger|  530|
|    Fear|  504|
|    Love|  328|
|Surprise|  154|
+--------+-----+



##Run the model
###Build the pipeline
We assemble a pipeline that takes the 'document' which are just sentences then vectorizes them via the universal sentence encoder and then classifies these embeddings into categories. We use an example here with a set batch size and set epoch size however these can be varied.

In [7]:
# actual content is inside description column
document = DocumentAssembler()\
      .setInputCol("description")\
      .setOutputCol("document")

# we can also use sentece detector here if we want to train on and get predictions for each sentence
use = UniversalSentenceEncoder.pretrained("tfhub_use_lg", "en") \
      .setInputCols("document") \
      .setOutputCol("sentence_embeddings")

# the classes/labels/categories are in category column
classifierdl = ClassifierDLApproach()\
      .setInputCols(["sentence_embeddings"])\
      .setOutputCol("class")\
      .setLabelColumn("category")\
      .setMaxEpochs(50)\
      .setBatchSize(8)\
      .setEnableOutputLogs(True)\
      .setRandomSeed(0)  # for consistency

use_clf_pipeline = Pipeline(
    stages = [
        document,
        use,
        classifierdl
    ])

tfhub_use_lg download started this may take some time.
Approximate size to download 753.3 MB
[OK!]


###Train the model
We also time the model to see how long it takes.

In [8]:
%%time
clf_pipelineModel = use_clf_pipeline.fit(trainDataset)

CPU times: user 4.56 s, sys: 463 ms, total: 5.03 s
Wall time: 15min 34s


###Check the logs
We check the logs in order to check on the epochs and the change in loss and accuracy over time.

In [9]:
import os
log_file_name = os.listdir("/root/annotator_logs")[0]

with open("/root/annotator_logs/"+log_file_name, "r") as log_file :
    print(log_file.read())

Training started - epochs: 50 - learning_rate: 0.005 - batch_size: 8 - training_examples: 11899 - classes: 6
Epoch 0/50 - 12.70s - loss: 2261.5657 - acc: 0.50877047 - batches: 1488
Epoch 1/50 - 10.81s - loss: 2187.4683 - acc: 0.5576384 - batches: 1488
Epoch 2/50 - 10.58s - loss: 2109.1465 - acc: 0.61908764 - batches: 1488
Epoch 3/50 - 10.79s - loss: 2074.6091 - acc: 0.64175636 - batches: 1488
Epoch 4/50 - 10.45s - loss: 2055.1348 - acc: 0.6570556 - batches: 1488
Epoch 5/50 - 10.52s - loss: 2039.4285 - acc: 0.66773146 - batches: 1488
Epoch 6/50 - 10.51s - loss: 2026.2279 - acc: 0.6763058 - batches: 1488
Epoch 7/50 - 10.61s - loss: 2015.6533 - acc: 0.68471193 - batches: 1488
Epoch 8/50 - 10.77s - loss: 2007.2582 - acc: 0.6905122 - batches: 1488
Epoch 9/50 - 10.47s - loss: 2001.5853 - acc: 0.69521964 - batches: 1488
Epoch 10/50 - 10.47s - loss: 1996.3895 - acc: 0.699759 - batches: 1488
Epoch 11/50 - 10.58s - loss: 1990.5846 - acc: 0.7034297 - batches: 1488
Epoch 12/50 - 10.40s - loss: 198

##Evaluate model
###Predict using test dataset
Here we take our test dataset and collect the predicted output.

In [10]:
preds = clf_pipelineModel.transform(testDataset)
preds.select('category','description','class.result').show(n=5, truncate=50)


+--------+--------------------------------------------------+---------+
|category|                                       description|   result|
+--------+--------------------------------------------------+---------+
|     Joy|i have immense sympathy with the general point ...|    [Joy]|
|     Joy|   i do not feel reassured anxiety is on each side|   [Fear]|
| Sadness|              i didnt really feel that embarrassed|[Sadness]|
|   Anger|i already feel like i fucked up though because ...|[Sadness]|
| Sadness|i feel so inhibited in someone elses kitchen li...|    [Joy]|
+--------+--------------------------------------------------+---------+
only showing top 5 rows



###Get model metrics
Then we take the predictions and use evaluative metrics from sklearn to see how well the model did across different model performance statistics.

In [11]:
from sklearn.metrics import classification_report
preds_df = preds.select('category','description','class.result').toPandas()
preds_df['result'] = preds_df['result'].apply(lambda x : x[0])
print(classification_report(preds_df['result'], preds_df['category']))

              precision    recall  f1-score   support

       Anger       0.54      0.56      0.55       510
        Fear       0.49      0.58      0.53       426
         Joy       0.83      0.64      0.72      1796
        Love       0.00      0.00      0.00         0
     Sadness       0.72      0.64      0.68      1369
    Surprise       0.00      0.00      0.00         0

    accuracy                           0.62      4101
   macro avg       0.43      0.40      0.41      4101
weighted avg       0.72      0.62      0.67      4101



  _warn_prf(average, modifier, msg_start, len(result))


We can repeat for the training dataset to see how well the original dataset performed to assess for possible overfitting.

In [12]:
preds = clf_pipelineModel.transform(trainDataset)
preds_df = preds.select('category','description','class.result').toPandas()
preds_df['result'] = preds_df['result'].apply(lambda x : x[0])
print(classification_report(preds_df['result'], preds_df['category']))

  _warn_prf(average, modifier, msg_start, len(result))


              precision    recall  f1-score   support

       Anger       0.73      0.78      0.76      1522
        Fear       0.72      0.81      0.77      1276
         Joy       0.90      0.71      0.79      5083
        Love       0.00      0.00      0.00         0
     Sadness       0.88      0.76      0.81      4018
    Surprise       0.00      0.00      0.00         0

    accuracy                           0.74     11899
   macro avg       0.54      0.51      0.52     11899
weighted avg       0.85      0.74      0.79     11899



###Predict a random example
Here we transform the pipeline into a light-weight version and use a plausibly confusing sentence ('not happy') to see if the model can figure out the true tone.

In [13]:
from sparknlp.base import LightPipeline
light_model = LightPipeline(clf_pipelineModel)
text = 'i am not happy'
light_model.annotate(text)

{'class': ['Joy'],
 'document': ['i am not happy'],
 'sentence_embeddings': ['i am not happy']}

##Save the pipeline
###Save the directory and convert to TAR archive
We will want to import the pipeline later so we save the model and compress it to make it easier for transport.

In [14]:
clf_pipelineModel.write().save('ToneItPipeline')
!tar czvf ToneItPipeline.tar.gz ToneItPipeline

ToneItPipeline/
ToneItPipeline/stages/
ToneItPipeline/stages/1_UNIVERSAL_SENTENCE_ENCODER_5e0d8b922c74/
ToneItPipeline/stages/1_UNIVERSAL_SENTENCE_ENCODER_5e0d8b922c74/.use_tensorflow.crc
ToneItPipeline/stages/1_UNIVERSAL_SENTENCE_ENCODER_5e0d8b922c74/metadata/
ToneItPipeline/stages/1_UNIVERSAL_SENTENCE_ENCODER_5e0d8b922c74/metadata/._SUCCESS.crc
ToneItPipeline/stages/1_UNIVERSAL_SENTENCE_ENCODER_5e0d8b922c74/metadata/part-00000
ToneItPipeline/stages/1_UNIVERSAL_SENTENCE_ENCODER_5e0d8b922c74/metadata/_SUCCESS
ToneItPipeline/stages/1_UNIVERSAL_SENTENCE_ENCODER_5e0d8b922c74/metadata/.part-00000.crc
ToneItPipeline/stages/1_UNIVERSAL_SENTENCE_ENCODER_5e0d8b922c74/use_tensorflow
ToneItPipeline/stages/2_ClassifierDLModel_b2de7b745e33/
ToneItPipeline/stages/2_ClassifierDLModel_b2de7b745e33/.classifierdl_tensorflow.crc
ToneItPipeline/stages/2_ClassifierDLModel_b2de7b745e33/fields/
ToneItPipeline/stages/2_ClassifierDLModel_b2de7b745e33/fields/datasetParams/
ToneItPipeline/stages/2_ClassifierDLM

###Check on the size
We note that the compression did not significantly reduce the size however it still saves us some space and files are easier to transport than directories (which usually require compression to archives anyway).

In [15]:
# size of the raw directory
!du -sch ToneItPipeline

872M	ToneItPipeline
872M	total


In [16]:
# size of the compressed archive
!du -sch ToneItPipeline.tar.gz

775M	ToneItPipeline.tar.gz
775M	total


###Load the pipeline
Now we can take our saved pipeline and load it to see if we can effectively annotate random lines.

In [17]:
%%time
import sparknlp
sparknlp.start()
from pyspark.ml import PipelineModel
from sparknlp.base import LightPipeline
ToneItPipeline = LightPipeline(PipelineModel.load('ToneItPipeline'))
ToneItPipeline.annotate("we fell to the floor our faces pale")

CPU times: user 203 ms, sys: 33.6 ms, total: 237 ms
Wall time: 33.5 s


universalsentenceencoder --> 0.69<br>
tokenizer+bertsmall+sentence --> 0.29!!!ALLSAD<br>
bertsmallsent --> 0.35!!!ALLJOY<br>
bertusecmlmenbase --> 0.35!!!ALLJOY