In [1]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import sparknlp
from sparknlp.pretrained import PretrainedPipeline
from sparknlp.base import *
from sparknlp.annotator import *
import gc
gc.enable()
from contractions import CONTRACTION_MAP
import re

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
spark = sparknlp.start(gpu=False)
sparknlp.version()

'2.5.5'

In [3]:
# Import Spark NLP 
from sparknlp.base import *
from sparknlp.annotator import *
#from sparknlp.embeddings import *


### Twitter Dataset

About this file
This is the sentiment140 dataset.
It contains 1,600,000 tweets extracted using the twitter api . The tweets have been annotated (0 = negative, 2 = neutral, 4 = positive) and they can be used to detect sentiment .
It contains the following 6 fields:

target: the polarity of the tweet (0 = negative, 2 = neutral, 4 = positive)
ids: The id of the tweet ( 2087)
date: the date of the tweet (Sat May 16 23:58:44 UTC 2009)
flag: The query (lyx). If there is no query, then this value is NO_QUERY.
user: the user that tweeted (robotickilldozr)
text: the text of the tweet (Lyx is cool)
The official link regarding the dataset with resources about how it was generated is here
The official paper detailing the approach is here

According to the creators of the dataset:

"Our approach was unique because our training data was automatically created, as opposed to having humans manual annotate tweets. In our approach, we assume that any tweet with positive emoticons, like :), were positive, and tweets with negative emoticons, like :(, were negative. We used the Twitter Search API to collect these tweets by using keyword search"

citation: Go, A., Bhayani, R. and Huang, L., 2009. Twitter sentiment classification using distant supervision. CS224N Project Report, Stanford, 1(2009), p.12.

In [4]:
spark_df = spark.read.csv('training.1600000.processed.noemoticon.csv',header=False,encoding='utf-8',).toDF("polarity","id","date","flag","user","text")
spark_df = spark_df.select('text','polarity').filter("text is not null")

In [5]:
spark_df.show(10,truncate=70)

+----------------------------------------------------------------------+--------+
|                                                                  text|polarity|
+----------------------------------------------------------------------+--------+
|@switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You ...|       0|
|is upset that he can't update his Facebook by texting it... and mig...|       0|
|@Kenichan I dived many times for the ball. Managed to save 50%  The...|       0|
|                       my whole body feels itchy and like its on fire |       0|
|@nationwideclass no, it's not behaving at all. i'm mad. why am i he...|       0|
|                                         @Kwesidei not the whole crew |       0|
|                                                           Need a hug |       0|
|@LOLTrish hey  long time no see! Yes.. Rains a bit ,only a bit  LOL...|       0|
|                                  @Tatiana_K nope they didn't have it |       0|
|               

In [6]:
spark_df.groupBy("polarity").count().show()

+--------+------+
|polarity| count|
+--------+------+
|       0|800000|
|       4|800000|
+--------+------+



In [7]:
pol = {0: 'negative', 2 :'neutral', 4 : 'positive'}

In [8]:
from itertools import chain
mapping_expr = F.create_map([F.lit(x) for x in chain(*pol.items())])

In [9]:

spark_df = spark_df.withColumn('polarity', mapping_expr[spark_df['polarity']])


In [10]:
emojis = {":)": 'smile', ":-)": 'smile', ";D": 'wink', ';d': 'wink', ':-E': 'vampire', ':(': 'sad', 
          ':-(': 'sad', ':-<': 'sad', ':P': 'raspberry', ':O': 'surprised', ":*(": "crying",
          ':-@': 'shocked', ':@': 'shocked',':-$': 'confused', ':\\': 'annoyed', 
          ':#': 'mute', ':X': 'mute', ':^)': 'smile', ':-&': 'confused', '$_$': 'greedy',
          '@@': 'eyeroll', ':-!': 'confused', ':-D': 'smile', ':-0': 'yell', 'O.o': 'confused',
          '<(-_-)>': 'robot', 'd[-_-]b': 'dj', ":'-)": 'sadsmile', ';)': 'wink', 
          ';-)': 'wink', 'O:-)': 'angel','O*-)': 'angel','(:-D': 'gossip', '=^.^=': 'cat'}

In [11]:
def replEmo(text,emojis=emojis):

    for emoji in emojis.keys():
            text = text.replace(emoji, "EMOJI" + emojis[emoji])  
    return text


In [12]:
def expand_contractions(text,contraction_mapping=CONTRACTION_MAP):

    for cont in contraction_mapping.keys():
            text = text.replace(cont, contraction_mapping[cont])  
    return text


In [13]:
expContr = F.udf(expand_contractions,F.StringType())

In [14]:
excEmos = F.udf(replEmo,F.StringType())

In [15]:
spark_df = spark_df.withColumn('text',expContr('text'))

In [16]:
spark_df = spark_df.withColumn('text',excEmos('text'))

In [18]:
spark_df = spark_df.withColumn("text", F.regexp_replace('text',"\w+:\/{2}[\d\w\-]+(\.[\d\w\-]+)*(?:(?:\/[^\s/]*))*"," URL").alias('text')).filter("text is not null")

In [19]:
spark_df = spark_df.withColumn("text", F.regexp_replace('text',"http.{1,10}…"," URL")).filter("text is not null")

In [20]:
spark_df = spark_df.withColumn("text", F.regexp_replace('text',"\@[a-zA-Z0-9_]{0,15}"," USER")).filter("text is not null")

In [22]:
spark_df = spark_df.withColumn("text", F.regexp_replace('text',"RT\s.*\:\s"," RETWEET")).filter("text is not null")

In [23]:
spark_df = spark_df.withColumn("text", F.trim(spark_df['text'])).filter("text is not null").filter(F.col("text") !=  "").filter(F.col("polarity") !=  "")

In [25]:
spark_df =  spark_df.withColumn("text", F.regexp_replace('text',"\s{2,}"," ")).filter("text is not null")
spark_df =  spark_df.withColumn("text", F.regexp_replace('text',"\&quot","")).filter("text is not null")
spark_df =  spark_df.withColumn("text", F.regexp_replace('text',"\&amp"," and ")).filter("text is not null")
spark_df =  spark_df.withColumn("text", F.regexp_replace('text',"\&lt"," < ")).filter("text is not null")
spark_df =  spark_df.withColumn("text", F.regexp_replace('text',"\&gt"," > ")).filter("text is not null")

In [27]:
spark_df.filter(F.length("text") <  3).show()

+----+--------+
|text|polarity|
+----+--------+
+----+--------+



In [28]:
remove_repeaters = lambda s: re.sub(r"(.)\1\1+",r"\1\1", s)

spark_df = spark_df.withColumn('text', F.udf(remove_repeaters, StringType())('text'))

In [29]:
sub=spark_df.orderBy(F.rand()).limit(400*1000)

In [30]:
train, test = sub.randomSplit([0.8, 0.2], seed=128)

In [32]:
train.show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------+--------+
|text                                                                                                                                        |polarity|
+--------------------------------------------------------------------------------------------------------------------------------------------+--------+
| < ; 3 for my 22! iClever                                                                                                                   |positive|
| < ; < ; < ; < ; made up #do notyouwish                                                                                                     |positive|
| < ; USER Will keep on trying.You do not show up in the list of ppl I am following, so can only read you when you USER to me directly       |negative|
| < ; has a new avatar on the forum                                                     

In [33]:
test.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------+--------+
|text                                                                                                                                          |polarity|
+----------------------------------------------------------------------------------------------------------------------------------------------+--------+
| < ; is about to get hit in the head from Karen for this song ? URL                                                                           |positive|
| < ; still hungry still have not eatin.                                                                                                       |negative|
| < ;*~Free!~* > ;                                                                                                                             |positive|
| < ;-- cannot sing, thank you for showing me this, guitar hero             

In [34]:
train.groupBy('polarity').count().show()

+--------+------+
|polarity| count|
+--------+------+
|positive|160666|
|negative|159232|
+--------+------+



In [35]:
test.groupBy('polarity').count().show()

+--------+-----+
|polarity|count|
+--------+-----+
|positive|40060|
|negative|40042|
+--------+-----+



In [36]:
def print_latest_log(path):
   
    from pathlib import Path
    import pendulum
    
    files=[*Path(path).iterdir()]
    files=[(file,file.stat().st_ctime) for file in files]
    
    a=sorted(files, key=lambda x:x[1],reverse=True)[0]
    
    with open(a[0],'r') as f:
        b=f.readlines()
    
    print(pendulum.from_timestamp(a[1],tz=pendulum.local_timezone()).to_cookie_string(),"\n")
    
    [print(b_) for b_ in b]
    
    return

In [48]:
document_assembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")\
    .setCleanupMode('shrink') 

tokenizer = Tokenizer() \
  .setInputCols(["document"]) \
  .setOutputCol("token")

normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")\
    .setLowercase(True)
    
lemma = LemmatizerModel.pretrained('lemma_antbnc') \
    .setInputCols(["normalized"]) \
    .setOutputCol("lemma")

glove_embeddings = WordEmbeddingsModel().pretrained() \
 .setInputCols(["document",'lemma'])\
 .setOutputCol("embeddings")\
 .setCaseSensitive(False)

embeddingsSentence = SentenceEmbeddings() \
      .setInputCols(["document", "embeddings"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")


sentimentClassifier = SentimentDLApproach()\
      .setInputCols("sentence_embeddings")\
      .setOutputCol("prediction")\
      .setLabelColumn("polarity")\
      .setBatchSize(16)\
      .setMaxEpochs(10)\
      .setDropout(0.65)\
      .setValidationSplit(0.2)\
      .setThreshold(0.0)\
      .setLr(0.003)


clf_pipeline = Pipeline(
    stages=[document_assembler, 
            tokenizer,
            normalizer,
            lemma, 
            glove_embeddings,
            embeddingsSentence,
            sentimentClassifier
           ])

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]


In [49]:
pipelineModel = clf_pipeline.fit(train)



In [53]:
result = pipelineModel.transform(test)



In [55]:
result_exp=result.select(F.explode(F.arrays_zip('document.result', 'prediction.result')).alias("cols"),'polarity') \
.select(F.expr("cols['0']").alias("document"),
        F.expr("cols['1']").alias("sentiment"),'polarity')

In [56]:
result_exp.show()

+--------------------+---------+--------+
|            document|sentiment|polarity|
+--------------------+---------+--------+
|< ; is about to g...| positive|positive|
|< ; still hungry ...| negative|negative|
|    < ;*~Free!~* > ;| positive|positive|
|< ;-- cannot sing...| positive|negative|
|< ;-- feels sad o...| negative|negative|
|< ;-- is sad he l...| negative|negative|
|< ;-- this is my ...| negative|negative|
|< ;3 Britney last...| negative|negative|
|< ;3 my Frankiee ...| negative|positive|
|< ;3 my Ryo Ohki ...| positive|positive|
|< ;i > ;Waiting F...| negative|negative|
|< ;sigh > ; I am ...| negative|negative|
|< ;~ so upset my ...| negative|negative|
|> ; < ; laptop in...| negative|negative|
|> ; > ; excerpt f...| positive|positive|
|> ; USER: *PERSON...| negative|negative|
|> ; twitter needs...| positive|negative|
|> ; was wanderin ...| positive|positive|
|> ;KingAussie: Fo...| positive|positive|
|> ;it know that >...| negative|positive|
+--------------------+---------+--

In [61]:
documentAssembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")
    
use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")

MODEL_NAME='sentimentdl_use_twitter'

sentimentdl = SentimentDLModel.pretrained(name=MODEL_NAME, lang="en")\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("pr_polarity")

nlpPipeline = Pipeline(
      stages = [
          documentAssembler,
          use,
          sentimentdl
      ])


tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]
sentimentdl_use_twitter download started this may take some time.
Approximate size to download 11.9 MB
[OK!]


In [63]:
empty_df = spark.createDataFrame([['']]).toDF("text")

pipelineModel = nlpPipeline.fit(empty_df)

result = pipelineModel.transform(train)

In [66]:
result.select(F.explode(F.arrays_zip('document.result', 'pr_polarity.result')).alias("cols"),"polarity") \
.select(F.expr("cols['0']").alias("document"),
        F.expr("cols['1']").alias("pr_polarity"),"polarity").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------+
|document                                                                                                                                    |pr_polarity|polarity|
+--------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------+
| < ; 3 for my 22! iClever                                                                                                                   |neutral    |positive|
| < ; < ; < ; < ; made up #do notyouwish                                                                                                     |positive   |positive|
| < ; USER Will keep on trying.You do not show up in the list of ppl I am following, so can only read you when you USER to me directly       |positive   |negative|
| < ; has a new 

In [67]:
documentAssembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")
    
use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")

MODEL_NAME='sentimentdl_use_twitter'

sentimentdl = SentimentDLModel.pretrained(name=MODEL_NAME, lang="en")\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("pr_polarity")\
    .setThreshold(0.0)

nlpPipeline = Pipeline(
      stages = [
          documentAssembler,
          use,
          sentimentdl
      ])

tfhub_use download started this may take some time.
Approximate size to download 923.7 MB
[OK!]
sentimentdl_use_twitter download started this may take some time.
Approximate size to download 11.9 MB
[OK!]


In [68]:
empty_df = spark.createDataFrame([['']]).toDF("text")

pipelineModel = nlpPipeline.fit(empty_df)

result = pipelineModel.transform(train)

In [69]:
result.select(F.explode(F.arrays_zip('document.result', 'pr_polarity.result')).alias("cols"),"polarity") \
.select(F.expr("cols['0']").alias("document"),
        F.expr("cols['1']").alias("pr_polarity"),"polarity").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------+
|document                                                                                                                                    |pr_polarity|polarity|
+--------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------+
| < ; 3 for my 22! iClever                                                                                                                   |negative   |positive|
| < ; < ; < ; < ; made up #do notyouwish                                                                                                     |positive   |positive|
| < ; USER Will keep on trying.You do not show up in the list of ppl I am following, so can only read you when you USER to me directly       |positive   |negative|
| < ; has a new 

In [70]:
resdf=result.select(F.explode(F.arrays_zip('document.result', 'pr_polarity.result')).alias("cols"),"polarity") \
.select(F.expr("cols['0']").alias("document"),
        F.expr("cols['1']").alias("pr_polarity"),"polarity").toPandas()

In [71]:
resdf


Unnamed: 0,document,pr_polarity,polarity
0,< ; 3 for my 22! iClever,negative,positive
1,< ; < ; < ; < ; made up #do notyouwish,positive,positive
2,< ; USER Will keep on trying.You do not show ...,positive,negative
3,< ; has a new avatar on the forum,positive,positive
4,< ;- I am pretty sure MSTRKRFT melted my face...,negative,positive
...,...,...,...
319893,"�cont) friends beauty at it is best..,,.",positive,positive
319894,�cont) ticket back to UK*,negative,positive
319895,"�go shawty,it is my birthday!we gonna party li...",positive,positive
319896,�ter vattenmelon eating watermelon,positive,positive


In [72]:
from sklearn import metrics as met

In [79]:
print(met.classification_report(resdf.polarity,resdf.pr_polarity))

              precision    recall  f1-score   support

    negative       0.82      0.79      0.81    159232
    positive       0.80      0.83      0.82    160666

    accuracy                           0.81    319898
   macro avg       0.81      0.81      0.81    319898
weighted avg       0.81      0.81      0.81    319898

