## NLP Sentiment Analysis

`https://spacy.io` is a library that has methods and pre-trained models for parsing natural language and determining parts of speech, etc., in many languages

`VADER` is a library that can characterize the sentiments of individual sentences.

### Example simplified usage

Import spaCy and the english model trained on web pages.

In [2]:
import spacy

In [3]:
english = spacy.load("en_core_web_sm")

Import the analyzer class from VADER.

In [4]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

Let's examine some simplified chat text from our chat app.

In [5]:
sampletext = """User jane left
>> jane: merci
>> support3: sure .. coming right up
>> jane: black coffee, no milk, no sugar please !
>> support3: hi, how can we help ?
>> jane: hi
"""

Tell spaCy to use english to parse the input text.

In [6]:
result = english(sampletext)

We can check what we can do with a `spacy.tokens.doc.Doc` by using `help(result)` as well.

In [7]:
type(result)

spacy.tokens.doc.Doc

One of the things we can do is use spaCy to indentify speech in natural language text.

In [8]:
for token in result:
    print(token.text, token.pos_)

User PROPN
jane NOUN
left VERB

 SPACE
> X
> PUNCT
jane NOUN
: PUNCT
merci PROPN

 SPACE
> X
> X
support3 NOUN
: PUNCT
sure ADJ
.. PUNCT
coming VERB
right ADV
up ADV

 SPACE
> X
> PUNCT
jane NOUN
: PUNCT
black ADJ
coffee NOUN
, PUNCT
no DET
milk NOUN
, PUNCT
no DET
sugar NOUN
please INTJ
! PUNCT

 SPACE
> X
> X
support3 NOUN
: PUNCT
hi INTJ
, PUNCT
how SCONJ
can AUX
we PRON
help VERB
? PUNCT

 SPACE
> X
> PUNCT
jane NOUN
: PUNCT
hi NUM

 SPACE


Instantiate the VADER analyzer now.

In [18]:
analyzer = SentimentIntensityAnalyzer()

Now we can get the sentiment scores for each sentence: negative (neg), neutral (neu), positive (pos), and overall sentiment.

In [9]:
[analyzer.polarity_scores(str(s)) for s in list(result.sents)]

[{'neg': 0.0, 'neu': 1.0, 'pos': 0.0, 'compound': 0.0},
 {'neg': 0.184, 'neu': 0.618, 'pos': 0.198, 'compound': 0.0818},
 {'neg': 0.0, 'neu': 0.722, 'pos': 0.278, 'compound': 0.4019},
 {'neg': 0.0, 'neu': 1.0, 'pos': 0.0, 'compound': 0.0}]

## Streaming NLP

We want to do this analysis with Spark so we can stream the data. Lets run this locally for now, adjust the parameters to suit.

In [9]:
# notebook parameters

import os

os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.9"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.9"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-11.0.13.0.8-1.fc34.x86_64"

spark_master = "local[*]"
app_name = "sentiment-analysis"
driver_memory = '8g'
executor_memory = '8g'

Setup a spark session.

In [10]:
import pyspark

session = pyspark.sql.SparkSession.builder \
    .master(spark_master) \
    .appName(app_name) \
    .config("spark.eventLog.enabled", True) \
    .config("spark.eventLog.dir", ".") \
    .config("spark.driver.memory", driver_memory) \
    .config("spark.executor.memory", executor_memory) \
    .config("spark.executor.cores", 1) \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
    .getOrCreate()

session

21/11/22 15:17:06 WARN Utils: Your hostname, virt resolves to a loopback address: 127.0.0.1; using 192.168.86.109 instead (on interface wlp2s0)
21/11/22 15:17:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/mike/.local/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/mike/.ivy2/cache
The jars for the packages stored in: /home/mike/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5fbfd9a0-6e45-4509-8a11-a583a0207707;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.1.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.1.2 in central
	found org.apache.kafka#kafka-clients;2.6.0 in central
	found com.github.luben#zstd-jni;1.4.8-1 in central
	found org.lz4#lz4-java;1.7.1 in local-m2-cache
	found org.xerial.snappy#snappy-java;1.1.8.2 in local-m2-cache
	found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
	found org.spark-project.spark#unused;1.0.0 in local-m2-cache
	found org.apache.commons#commons-pool2;2.6.2 in local-m2-cache
:: resolution report :: resolve 540ms :: artifacts dl 7ms
	:: modules in use:
	com.github.luben#zstd-jni;1.4.8-1 from central in [default]
	org.apache.commons#co

Deserialize the JSON message payloads from Kafka and process using spark data frames.

In [11]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, from_json

structure = StructType([StructField(fn, StringType(), True) for fn in "id username supportname message timestamp".split()])

records = session \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "chats") \
  .option("startingOffsets", "earliest") \
  .load() \
  .select(col("key").cast("string"),from_json(col("value").cast("string"), structure).alias("json")) \
  .select(col("json.id").alias("id"), \
          col("json.username").alias("username"), \
          col("json.supportname").alias("supportname"), \
          col("json.message").alias("message"), \
          col("json.timestamp").alias("timestamp"))


Take a look at top 5 rows.

In [12]:
records.head(5)



[Row(id='karen-support', username='karen', supportname='support', message='>> karen: hi', timestamp='2021-11-22T03:29:56.956+00:00'),
 Row(id='karen-support', username='karen', supportname='support', message='>> support: hi karen, how can i help today?', timestamp='2021-11-22T03:30:05.540+00:00'),
 Row(id='karen-support', username='karen', supportname='support', message='>> karen: i hate this product, it sucks !', timestamp='2021-11-22T03:30:15.506+00:00'),
 Row(id='karen-support', username='karen', supportname='support', message='>> support: what seems to be the problem ?', timestamp='2021-11-22T03:30:25.205+00:00'),
 Row(id='karen-support', username='karen', supportname='support', message='>> karen: i cant even get my app deployed !', timestamp='2021-11-22T03:30:33.278+00:00')]

Typically with Spark programs, we'd prefer to broadcast large data like models, but the spaCy model is tricky to serialize. So instead, we'll use this trick suggested by the Sparkling Pandas library, essentially simulating lazily-initialized worker-local storage for Spacy models.

In [13]:
# This code is borrowed from Sparkling Pandas; see here:
# https://github.com/sparklingpandas/sparklingml/blob/627c8f23688397a53e2e9e805e92a54c2be1cf3d/sparklingml/transformation_functions.py#L53
class SpacyMagic(object):
    """
    Simple Spacy Magic to minimize loading time.
    >>> SpacyMagic.get("en")
    <spacy.en.English ...
    """
    _spacys = {}

    @classmethod
    def get(cls, lang):
        if lang not in cls._spacys:
            import spacy
            cls._spacys[lang] = spacy.load(lang)
        return cls._spacys[lang]

A user-defined function to split chat messages into sentences.

In [14]:
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import udf

def split_sentences_impl(s):
    """ splits an English string into sentences, using spaCy """
    english = SpacyMagic.get("en_core_web_sm")
    return [str(sentence) for sentence in english(s).sents]

split_sentences = udf(split_sentences_impl, ArrayType(StringType()))

To see what this looks like, we'll run it on the first 10 rows of the data frame.

In [15]:
split_records = records \
  .orderBy("timestamp") \
  .limit(10) \
  .select("timestamp", "id", "username", "supportname", split_sentences(col("message")).alias("sentences")) \
  .cache()

split_records.collect()



[Row(timestamp='2021-11-22T03:29:56.956+00:00', id='karen-support', username='karen', supportname='support', sentences=['>> karen: hi']),
 Row(timestamp='2021-11-22T03:30:05.540+00:00', id='karen-support', username='karen', supportname='support', sentences=['>> support: hi karen, how can i help today?']),
 Row(timestamp='2021-11-22T03:30:15.506+00:00', id='karen-support', username='karen', supportname='support', sentences=['>> karen: i hate this product, it sucks !']),
 Row(timestamp='2021-11-22T03:30:25.205+00:00', id='karen-support', username='karen', supportname='support', sentences=['>> support: what seems to be the problem ?']),
 Row(timestamp='2021-11-22T03:30:33.278+00:00', id='karen-support', username='karen', supportname='support', sentences=['>> karen: i cant even get my app deployed !']),
 Row(timestamp='2021-11-22T03:30:47.308+00:00', id='karen-support', username='karen', supportname='support', sentences=['>> support: that does not sound great.', 'lets start from the beggin

We can explode each array into multiple rows to make further processing easier:

In [16]:
from pyspark.sql.functions import explode
sentences = split_records.select("timestamp", "id", "username", "supportname", explode(col("sentences")).alias("sentence"))
sentences.show(truncate=False)

+-----------------------------+-------------+--------+-----------+-------------------------------------------+
|timestamp                    |id           |username|supportname|sentence                                   |
+-----------------------------+-------------+--------+-----------+-------------------------------------------+
|2021-11-22T03:29:56.956+00:00|karen-support|karen   |support    |>> karen: hi                               |
|2021-11-22T03:30:05.540+00:00|karen-support|karen   |support    |>> support: hi karen, how can i help today?|
|2021-11-22T03:30:15.506+00:00|karen-support|karen   |support    |>> karen: i hate this product, it sucks !  |
|2021-11-22T03:30:25.205+00:00|karen-support|karen   |support    |>> support: what seems to be the problem ? |
|2021-11-22T03:30:33.278+00:00|karen-support|karen   |support    |>> karen: i cant even get my app deployed !|
|2021-11-22T03:30:47.308+00:00|karen-support|karen   |support    |>> support: that does not sound great.     |
|

Now we'll create our user-defined function for VADER scoring: it will take text and return a sentiment structure. Note that we are actually creating a broadcast variable for the VADER model.

In [19]:
from pyspark.sql.types import FloatType

sentiment_fields = "pos neg neu compound".split()
sentiment_structure = StructType([StructField(fn, FloatType(), True) for fn in sentiment_fields])

analyzer_bcast = session.sparkContext.broadcast(analyzer)

def vader_impl(s):
    va = analyzer_bcast.value
    result = va.polarity_scores(s)
    return [result[key] for key in sentiment_fields]

sentiment_score = udf(vader_impl, sentiment_structure)

Annotate each sentence with its sentiment and order from most negative to most positive sentiment.

In [20]:
sentences \
  .select("timestamp", "id", "username", "supportname", "sentence", sentiment_score(col("sentence")).alias("sentiment")) \
  .orderBy("sentiment.compound") \
  .show(10, False)

+-----------------------------+-------------+--------+-----------+-------------------------------------------+----------------------------+
|timestamp                    |id           |username|supportname|sentence                                   |sentiment                   |
+-----------------------------+-------------+--------+-----------+-------------------------------------------+----------------------------+
|2021-11-22T03:30:15.506+00:00|karen-support|karen   |support    |>> karen: i hate this product, it sucks !  |{0.0, 0.481, 0.519, -0.7574}|
|2021-11-22T03:30:47.308+00:00|karen-support|karen   |support    |>> support: that does not sound great.     |{0.246, 0.3, 0.455, -0.1516}|
|2021-11-22T03:30:47.308+00:00|karen-support|karen   |support    |lets start from the beggining              |{0.0, 0.0, 1.0, 0.0}        |
|2021-11-22T03:29:56.956+00:00|karen-support|karen   |support    |>> karen: hi                               |{0.0, 0.0, 1.0, 0.0}        |
|2021-11-22T03:30:33