In [1]:
import sparknlp
sparknlp.start()

In [2]:
from pyspark.sql import SparkSession
# start spark session configured for spark nlp
spark = SparkSession.builder \
     .master("local[*]") \
     .config("spark.driver.memory","16G")\
     .config("spark.driver.maxResultSize", "0") \
     .config("spark.kryoserializer.buffer.max", "2000M")\
     .appName("Spark NLP") \
     .config("spark.jars.packages", 
             "com.johnsnowlabs.nlp:spark-nlp_2.11:2.3.5") \
     .getOrCreate()

In [8]:
df = spark.read.csv('twitter_data.csv',
                     inferSchema='true', header='true')
data = df.select('text')
data.head(10)

[Row(text='VIDEO: “I was in my office. I was minding my own business...” –David Solomon tells $GS interns how he learned he wa… https://t.co/QClAITywXV'),
 Row(text="The price of lumber $LB_F is down 22% since hitting its YTD highs. The Macy's $M turnaround is still happening.… https://t.co/XnKsV4De39"),
 Row(text='Who says the American Dream is dead? https://t.co/CRgx19x7sA'),
 Row(text='Barry Silbert is extremely optimistic on bitcoin -- but predicts that 99% of new crypto entrants are “going to zero… https://t.co/mGMVo2cZgY'),
 Row(text='How satellites avoid attacks and space junk while circling the Earth https://t.co/aHzIV3Lqp5 #paid @Oracle https://t.co/kacpqZWiDJ'),
 Row(text=".@RealMoney's David Butler's favorite FANG stock isn't #RealMoneySOD Alphabet but Facebook https://t.co/MczAPSFjOi"),
 Row(text='Don’t miss my convo with one of my favorite thinkers @SamHarrisOrg! https://t.co/uuPVxIobCh'),
 Row(text='U.S. intelligence documents on Nelson Mandela made public https://t.co/XT

In [25]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
import nltk
import numpy as np
import pandas as pd
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\nares\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [3]:
from nltk.corpus import stopwords

eng_stopwords = stopwords.words('english')
eng_stopwords.append('xxxx')
eng_stopwords.append('RT')

In [4]:
from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import *
from sparknlp.pretrained import *
from sparknlp.base import *
from pyspark.ml import Pipeline

In [5]:
documentAssembler = DocumentAssembler() \
    .setInputCol('text') \
    .setOutputCol('document')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

normalizer = Normalizer() \
    .setInputCols(['token']) \
    .setOutputCol('normalized') \
    .setLowercase(True)

lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(['normalized']) \
    .setOutputCol('lemma') \

stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(['lemma']) \
    .setOutputCol('clean_lemma') \
    .setCaseSensitive(False) \
    .setStopWords(eng_stopwords)

stemmer = Stemmer() \
    .setInputCols(["clean_lemma"]) \
    .setOutputCol("stem") \
    .setLanguage("English")

bert = BertEmbeddings.pretrained() \
    .setInputCols("document", "stem") \
    .setOutputCol("bert")

sentiment_detector = ViveknSentimentModel.pretrained() \
    .setInputCols(["document", "stem"]) \
    .setOutputCol("sentiment") \

finisher = Finisher() \
    .setInputCols(['sentiment']) \
    .setCleanAnnotations(False)

finisher1 = Finisher() \
    .setInputCols(['bert']) \
    .setCleanAnnotations(False)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
small_bert_L2_768 download started this may take some time.
Approximate size to download 139.6 MB
[OK!]
sentiment_vivekn download started this may take some time.
Approximate size to download 873.6 KB
[OK!]


In [7]:
pipeline = Pipeline() \
     .setStages([
           documentAssembler,
           tokenizer,
           normalizer,
           lemmatizer,
           stopwords_cleaner,
           stemmer,
           bert,
           sentiment_detector,
           finisher,
           finisher1
     ])

In [8]:
# import data
df = spark.read.csv('twitter_data.csv',
                     inferSchema='true', header='true')
data = df.select('text')
                                                               
# transform text with the pipeline
df = pipeline.fit(data).transform(data)
df.head()

Row(text='VIDEO: “I was in my office. I was minding my own business...” –David Solomon tells $GS interns how he learned he wa… https://t.co/QClAITywXV', document=[Row(annotatorType='document', begin=0, end=139, result='VIDEO: “I was in my office. I was minding my own business...” –David Solomon tells $GS interns how he learned he wa… https://t.co/QClAITywXV', metadata={'sentence': '0'}, embeddings=[])], token=[Row(annotatorType='token', begin=0, end=4, result='VIDEO', metadata={'sentence': '0'}, embeddings=[]), Row(annotatorType='token', begin=5, end=5, result=':', metadata={'sentence': '0'}, embeddings=[]), Row(annotatorType='token', begin=7, end=8, result='“I', metadata={'sentence': '0'}, embeddings=[]), Row(annotatorType='token', begin=10, end=12, result='was', metadata={'sentence': '0'}, embeddings=[]), Row(annotatorType='token', begin=14, end=15, result='in', metadata={'sentence': '0'}, embeddings=[]), Row(annotatorType='token', begin=17, end=18, result='my', metadata={'sentence':

In [9]:
from pyspark.sql.functions import explode, col
df_words = df.withColumn('exploded_score', 
                              explode(col('finished_sentiment')))

In [10]:
df_words.columns

['text',
 'document',
 'token',
 'normalized',
 'lemma',
 'clean_lemma',
 'stem',
 'bert',
 'sentiment',
 'finished_sentiment',
 'finished_bert',
 'exploded_score']

In [11]:
df_words = df_words.toPandas()
df_words.head()

Unnamed: 0,text,document,token,normalized,lemma,clean_lemma,stem,bert,sentiment,finished_sentiment,finished_bert,exploded_score
0,VIDEO: “I was in my office. I was minding my o...,"[(document, 0, 139, VIDEO: “I was in my office...","[(token, 0, 4, VIDEO, {'sentence': '0'}, []), ...","[(token, 0, 4, video, {'sentence': '0'}, []), ...","[(token, 0, 4, video, {'sentence': '0'}, []), ...","[(token, 0, 4, video, {'sentence': '0'}, []), ...","[(token, 0, 4, video, {'sentence': '0'}, []), ...","[(word_embeddings, 0, 4, video, {'sentence': '...","[(sentiment, 0, 134, positive, {'confidence': ...",[positive],"[video, offic, mind, busi, david, solomon, tel...",positive
1,The price of lumber $LB_F is down 22% since hi...,"[(document, 0, 135, The price of lumber $LB_F ...","[(token, 0, 2, The, {'sentence': '0'}, []), (t...","[(token, 0, 2, the, {'sentence': '0'}, []), (t...","[(token, 0, 2, the, {'sentence': '0'}, []), (t...","[(token, 4, 8, price, {'sentence': '0'}, []), ...","[(token, 4, 8, price, {'sentence': '0'}, []), ...","[(word_embeddings, 4, 8, price, {'sentence': '...","[(sentiment, 4, 127, positive, {'confidence': ...",[positive],"[price, lumber, lbf, sinc, hit, ytd, high, mac...",positive
2,Who says the American Dream is dead? https://t...,"[(document, 0, 59, Who says the American Dream...","[(token, 0, 2, Who, {'sentence': '0'}, []), (t...","[(token, 0, 2, who, {'sentence': '0'}, []), (t...","[(token, 0, 2, who, {'sentence': '0'}, []), (t...","[(token, 4, 7, say, {'sentence': '0'}, []), (t...","[(token, 4, 7, sai, {'sentence': '0'}, []), (t...","[(word_embeddings, 4, 7, sai, {'sentence': '0'...","[(sentiment, 4, 51, negative, {'confidence': '...",[negative],"[sai, american, dream, dead, httpstcocrgxxsa]",negative
3,Barry Silbert is extremely optimistic on bitco...,"[(document, 0, 139, Barry Silbert is extremely...","[(token, 0, 4, Barry, {'sentence': '0'}, []), ...","[(token, 0, 4, barry, {'sentence': '0'}, []), ...","[(token, 0, 4, barry, {'sentence': '0'}, []), ...","[(token, 0, 4, barry, {'sentence': '0'}, []), ...","[(token, 0, 4, barri, {'sentence': '0'}, []), ...","[(word_embeddings, 0, 4, barri, {'sentence': '...","[(sentiment, 0, 133, negative, {'confidence': ...",[negative],"[barri, silbert, extrem, optimist, bitcoin, pr...",negative
4,How satellites avoid attacks and space junk wh...,"[(document, 0, 129, How satellites avoid attac...","[(token, 0, 2, How, {'sentence': '0'}, []), (t...","[(token, 0, 2, how, {'sentence': '0'}, []), (t...","[(token, 0, 2, how, {'sentence': '0'}, []), (t...","[(token, 4, 13, satellite, {'sentence': '0'}, ...","[(token, 4, 13, satellit, {'sentence': '0'}, [...","[(word_embeddings, 4, 13, satellit, {'sentence...","[(sentiment, 4, 124, na, {'confidence': '0.0'}...",[na],"[satellit, avoid, attack, space, junk, circl, ...",na


In [12]:
data = df_words[["finished_bert","exploded_score"]]
type(data["finished_bert"])
data.head()

Unnamed: 0,finished_bert,exploded_score
0,"[video, offic, mind, busi, david, solomon, tel...",positive
1,"[price, lumber, lbf, sinc, hit, ytd, high, mac...",positive
2,"[sai, american, dream, dead, httpstcocrgxxsa]",negative
3,"[barri, silbert, extrem, optimist, bitcoin, pr...",negative
4,"[satellit, avoid, attack, space, junk, circl, ...",na


In [13]:
counts = df_words.groupby('exploded_score').count()

In [14]:
#counts_pd = counts.toPandas()
counts

Unnamed: 0_level_0,text,document,token,normalized,lemma,clean_lemma,stem,bert,sentiment,finished_sentiment,finished_bert
exploded_score,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
na,9153,9153,9153,9153,9153,9153,9153,9153,9153,9153,9153
negative,10276,10276,10276,10276,10276,10276,10276,10276,10276,10276,10276
positive,9447,9447,9447,9447,9447,9447,9447,9447,9447,9447,9447


In [15]:
counts.shape

(3, 11)

In [16]:
data.to_csv('sentiment_score.csv')