In [None]:
import findspark
findspark.init('/Users/swapnilsinha/spark/spark-3.0.1-bin-hadoop3.2')

import re
import numpy as np
import pandas as pd
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

import nltk
import folium
from os import path, getcwd
from PIL import Image
from wordcloud import WordCloud, STOPWORDS
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline 

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import format_number as fmt
from pyspark.sql.functions import udf
from pyspark.sql.types import *

from pyspark.ml.feature import Tokenizer,StopWordsRemover,Word2Vec
from pyspark.ml import PipelineModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator

**Loading Twitter data stored in MongoDB and collected via Twitter Data API to Spark Sql DataFrame**

In [None]:
spark = SparkSession \
    .builder \
    .appName('myApp') \
    .config('spark.mongodb.input.uri', 'mongodb://127.0.0.1/twitterdb.twitter_search') \
    .config('spark.mongodb.input.twitter', 'twitter_search') \
    .config('spark.mongodb.output.uri', 'mongodb://127.0.0.1/twitterdb.twitter_search') \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.4.1')\
    .getOrCreate()

df = spark.read.format('mongo').option('uri', 'mongodb://127.0.0.1/twitterdb.twitter_search').load()

**Required Funtions to be used in Spark udf for data processing**

In [None]:
def removePattern(input_text, pattern):
    r = re.findall(pattern, input_text)
    for i in r:
        input_text = re.sub(i, '', input_text)        
    return input_text

def cleanTweet(txt):
    '''
    Remove twitter return handles (RT @xxx:)
    '''
    txt = removePattern(txt, 'RT @[\w]*:')
    '''
    Remove twitter handles (@xxx)
    '''
    txt = removePattern(txt, '@[\w]*')
    '''
    Remove URL links (httpxxx)
    '''
    txt = removePattern(txt, 'https?://[A-Za-z0-9./]*')
    '''
    Remove special characters, numbers, punctuations
    '''
    txt = re.sub('[^A-Za-z]+', ' ', txt)
    return txt

def get_Clean_Tweet_Text(filteredTweetText):
    return ' '.join(filteredTweetText)

def get_Sentiment_Score(tweetText):
    analyzer = SentimentIntensityAnalyzer()
    vs = analyzer.polarity_scores(tweetText)
    return float(vs['compound'])

def get_Sentiment(score):
    return 1 if score > 0 else 0


def getTweetArray(tweet):
    return tweet.split(' ')

#### Sentiment analysis

**Cleanup and Preparing Data For Performing Sentiment Analysis**

In [None]:
udf_Clean_Tweet = udf(cleanTweet, StringType())
dfCleanTweet=df.withColumn('cleanTweetText', udf_Clean_Tweet('text'))
dfCleanTweet.select('text','cleanTweetText').show(5)

+--------------------+--------------------+
|                text|      cleanTweetText|
+--------------------+--------------------+
|@ReformedBroker B...|        Buy bitcoin |
|RT @DigiFinex: Fa...| Facebook has pus...|
|RT @tradingroomap...| Buying Bitcoin C...|
|Most Profound Que...|Most Profound Que...|
|Bitcoin BTC Curre...|Bitcoin BTC Curre...|
+--------------------+--------------------+
only showing top 5 rows



In [None]:
tokenizer = Tokenizer(inputCol='cleanTweetText', outputCol='words')
dfCleanTweetTokenized = tokenizer.transform(dfCleanTweet)
dfCleanTweetTokenized.select('text','cleanTweetText','words').show(5)

+--------------------+--------------------+--------------------+
|                text|      cleanTweetText|               words|
+--------------------+--------------------+--------------------+
|@ReformedBroker B...|        Buy bitcoin |    [, buy, bitcoin]|
|RT @DigiFinex: Fa...| Facebook has pus...|[, facebook, has,...|
|RT @tradingroomap...| Buying Bitcoin C...|[, buying, bitcoi...|
|Most Profound Que...|Most Profound Que...|[most, profound, ...|
|Bitcoin BTC Curre...|Bitcoin BTC Curre...|[bitcoin, btc, cu...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [None]:
remover = StopWordsRemover(inputCol='words', outputCol='filteredTweetText')
dfStopwordRemoved=remover.transform(dfCleanTweetTokenized)
dfStopwordRemoved.select('text','cleanTweetText','words','filteredTweetText').show(5)

+--------------------+--------------------+--------------------+--------------------+
|                text|      cleanTweetText|               words|   filteredTweetText|
+--------------------+--------------------+--------------------+--------------------+
|@ReformedBroker B...|        Buy bitcoin |    [, buy, bitcoin]|    [, buy, bitcoin]|
|RT @DigiFinex: Fa...| Facebook has pus...|[, facebook, has,...|[, facebook, push...|
|RT @tradingroomap...| Buying Bitcoin C...|[, buying, bitcoi...|[, buying, bitcoi...|
|Most Profound Que...|Most Profound Que...|[most, profound, ...|[profound, questi...|
|Bitcoin BTC Curre...|Bitcoin BTC Curre...|[bitcoin, btc, cu...|[bitcoin, btc, cu...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [None]:
udfCleanTweetText = udf(getCleanTweetText, StringType())
dfFilteredCleanedTweet = dfStopwordRemoved.withColumn('filteredCleanedTweetText', udfCleanTweetText('filteredTweetText'))
dfFilteredCleanedTweet.select('filteredCleanedTweetText').show(5)

+------------------------+
|filteredCleanedTweetText|
+------------------------+
|             buy bitcoin|
|     facebook pushed ...|
|     buying bitcoin c...|
|    profound question...|
|    bitcoin btc curre...|
+------------------------+
only showing top 5 rows



In [None]:
Sentiment_Score = udf(getSentimentScore, FloatType())
dfSentiment_Score = dfFilteredCleanedTweet.withColumn('sentimentScore', Sentiment_Score('filteredCleanedTweetText'))
dfSentiment_Score.select('filteredCleanedTweetText','sentimentScore').show(5)

+------------------------+--------------+
|filteredCleanedTweetText|sentimentScore|
+------------------------+--------------+
|             buy bitcoin|           0.0|
|     facebook pushed ...|           0.0|
|     buying bitcoin c...|       -0.7003|
|    profound question...|           0.0|
|    bitcoin btc curre...|           0.0|
+------------------------+--------------+
only showing top 5 rows



In [None]:
udfSentiment = udf(getSentiment, IntegerType())
dfSentiment = dfSentimentScore.withColumn('sentiment', udfSentiment('sentimentScore'))
dfSentiment.select('filteredCleanedTweetText','sentimentScore','sentiment').show(5)

+------------------------+--------------+---------+
|filteredCleanedTweetText|sentimentScore|sentiment|
+------------------------+--------------+---------+
|             buy bitcoin|           0.0|        0|
|     facebook pushed ...|           0.0|        0|
|     buying bitcoin c...|       -0.7003|        0|
|    profound question...|           0.0|        0|
|    bitcoin btc curre...|           0.0|        0|
+------------------------+--------------+---------+
only showing top 5 rows



In [None]:
dfSentiment.groupBy('sentiment').count().show()

df_PlotVader_Sentiment=dfSentiment.groupBy('sentiment').count().toPandas()
df_PlotVader_Sentiment

+---------+-----+
|sentiment|count|
+---------+-----+
|        1|27835|
|        0|25453|
+---------+-----+



Unnamed: 0,sentiment,count
0,1,27835
1,0,25453


In [None]:
dfSentiment.select('filteredCleanedTweetText','sentimentScore','sentiment').show(5)

+------------------------+--------------+---------+
|filteredCleanedTweetText|sentimentScore|sentiment|
+------------------------+--------------+---------+
|             buy bitcoin|           0.0|        0|
|     facebook pushed ...|           0.0|        0|
|     buying bitcoin c...|       -0.7003|        0|
|    profound question...|           0.0|        0|
|    bitcoin btc curre...|           0.0|        0|
+------------------------+--------------+---------+
only showing top 5 rows



In [None]:
filteredCleanedTweetTextRddList = dfSentiment.select('filteredCleanedTweetText').collect()
filteredCleanedTweetTextList = [row.filteredCleanedTweetText for row in filteredCleanedTweetTextRddList]
wordList = []
for filteredCleanedTweetText in filteredCleanedTweetTextList:
     wordList.append(filteredCleanedTweetText.split(' '))
allTweetWords = [word for subList in wordList for word in subList]
'''
Remove empty strings
'''
allTweetWords = list(filter(None, allTweetWords))

In [None]:
allTweetWords=set(allTweetWords)
frequencyDistribution = nltk.FreqDist(allTweetWords)
sorted(frequencyDistribution,key=frequencyDistribution.__getitem__, reverse=True)[0:100]

['correctly',
 'pagos',
 'publicado',
 'lager',
 'bitcoinasia',
 'arbitraj',
 'tops',
 'means',
 'hardcap',
 'watches',
 'lamp',
 'messages',
 'pontos',
 'heasley',
 'cannabis',
 'liechtenstein',
 'known',
 'historically',
 'bitcoinnetwork',
 'gonna',
 'nah',
 'comerica',
 'accommodation',
 'inception',
 'accelerating',
 'te',
 'bitball',
 'vistazo',
 'towns',
 'voir',
 'progressed',
 'illuminating',
 'dovish',
 'ledgers',
 'basher',
 'mybd',
 'doncaster',
 'encryption',
 'efficacy',
 'andr',
 'suggests',
 'midlands',
 'normativa',
 'attracted',
 'hizmeti',
 'environmental',
 'strives',
 'validation',
 'abbau',
 'blt',
 'hedgefund',
 'precisa',
 'svte',
 'steadily',
 'tnetworks',
 'dilemma',
 'artificialintelligence',
 'pending',
 'billiin',
 'wp',
 'payoff',
 'clubjack',
 'eldatodeld',
 'primero',
 'burda',
 'agricoinx',
 'finnaly',
 'soleimani',
 'fiatmoney',
 'eastern',
 'cryptio',
 'hammer',
 'menerangkan',
 'sora',
 'streets',
 'neighbour',
 'sudwatoken',
 'oooh',
 'ny',
 'even',


In [None]:
udfTweetArray = udf(getTweetArray, ArrayType(StringType()))
dfWord2Vec = dfSentiment.withColumn('filteredCleanedTweetArrayForW2V', udfTweetArray('filteredCleanedTweetText'))
dfWord2Vec.select('filteredCleanedTweetText','filteredCleanedTweetArrayForW2V').show(5)

+------------------------+-------------------------------+
|filteredCleanedTweetText|filteredCleanedTweetArrayForW2V|
+------------------------+-------------------------------+
|             buy bitcoin|               [, buy, bitcoin]|
|     facebook pushed ...|           [, facebook, push...|
|     buying bitcoin c...|           [, buying, bitcoi...|
|    profound question...|           [profound, questi...|
|    bitcoin btc curre...|           [bitcoin, btc, cu...|
+------------------------+-------------------------------+
only showing top 5 rows



In [None]:
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol='filteredCleanedTweetArrayForW2V', outputCol='wrdVector')
model_W2V = word2Vec.fit(dfWord2Vec)
wrdVec = model_W2V.transform(dfWord2Vec)
wrdVec.select('wrdVector').show(5, truncate = True)

+--------------------+
|           wrdVector|
+--------------------+
|[-0.0081031260391...|
|[0.03244456322863...|
|[0.05897613765242...|
|[-0.0681024376302...|
|[-0.1719806233627...|
+--------------------+
only showing top 5 rows



In [None]:
dfWordVectors = model_W2V.getVectors()
dfWordVectors.show(5)

+--------+--------------------+
|    word|              vector|
+--------+--------------------+
| serious|[0.00916644185781...|
|  breaks|[5.09095494635403...|
|precious|[0.05938430503010...|
|   retwe|[0.06232949718832...|
| sectors|[-0.0720720514655...|
+--------+--------------------+
only showing top 5 rows



**Let's explore the data a little**

In [None]:
topN=10

Synonyms = model_W2V.findSynonyms('btc', topN).toPandas()
Synonyms[['word']].head(topN)

Unnamed: 0,word
0,ltc
1,eth
2,price
3,bitcoin
4,btcusd
5,giveawayalert
6,bitcoincash
7,coinbase
8,prediction
9,gained


In [None]:
Synonyms = modelW2V.findSynonyms('bitcoin', topN).toPandas()
Synonyms[['word']].head(topN)

Unnamed: 0,word
0,via
1,litecoin
2,btc
3,ethereum
4,still
5,analysis
6,price
7,coinbase
8,bitcoins
9,sell


In [None]:
Similarity=model_W2V.findSynonyms('litecoin', topN).select('word', fmt('similarity', 5).alias('similarity')).toPandas()
Similarity.head(topN)

Unnamed: 0,word,similarity
0,ltc,0.73554
1,bitcoincash,0.70471
2,hitting,0.64089
3,dailycoin,0.63667
4,eur,0.63521
5,price,0.63315
6,tether,0.62944
7,ltcbtc,0.61391
8,forecast,0.59987
9,dogecoin,0.59245


In [None]:
Similarity=model_W2V.findSynonyms('blockchain', topN).select('word', fmt('similarity', 5).alias('similarity')).toPandas()
Similarity.head(topN)

Unnamed: 0,word,similarity
0,tokensale,0.48864
1,technology,0.47616
2,ieo,0.47388
3,stories,0.43097
4,iot,0.42421
5,fintech,0.41513
6,ether,0.41244
7,cryptoenergy,0.4082
8,finance,0.40691
9,solution,0.40449


In [None]:
Sentiment = Sentiment.withColumnRenamed('sentiment','target')

In [None]:
SentimnetAnalysisW2VLogreg=Sentiment.select('text','cleanTweetText','target')
SentimnetAnalysisW2VLogreg.show(5)

+--------------------+--------------------+------+
|                text|      cleanTweetText|target|
+--------------------+--------------------+------+
|@ReformedBroker B...|        Buy bitcoin |     0|
|RT @DigiFinex: Fa...| Facebook has pus...|     0|
|RT @tradingroomap...| Buying Bitcoin C...|     0|
|Most Profound Que...|Most Profound Que...|     0|
|Bitcoin BTC Curre...|Bitcoin BTC Curre...|     0|
+--------------------+--------------------+------+
only showing top 5 rows



In [None]:
w2vLogreg = PipelineModel.load('W2VLogreg.model')
predictions = w2vLogreg.transform(dfSentimnetAnalysisW2VLogreg)
predictions.printSchema()

root
 |-- text: string (nullable = true)
 |-- cleanTweetText: string (nullable = true)
 |-- target: integer (nullable = true)
 |-- tokenTweet: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filteredTokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [None]:
evaluator = BinaryClassificationEvaluator()
roc_accuracy=evaluator.evaluate(predictions)
print('ROC-Accuracy of logistic regression model with word2vec word embedding at predicting vader sentiment is: {:.4f}'.format(roc_accuracy))


ROC-Accuracy of logistic regression model with word2vec word embedding at predicting vedar sentiment is: 0.7075


**Perfromance comparision of  my own classifiers and Vader Sentiment Analysis**

In [None]:
sample1 = dfSentimnetAnalysisW2VLogreg.sample(False, 0.1, 101)
sample2 = dfSentimnetAnalysisW2VLogreg.sample(False, 0.2, 101)
sample3 = dfSentimnetAnalysisW2VLogreg.sample(False, 0.3, 101)

predictions1 = w2vLogreg.transform(sample1)
predictions2 = w2vLogreg.transform(sample2)
predictions3 = w2vLogreg.transform(sample3)

In [None]:
numberOfRecInSample1 = sample1.agg({'target':'count'}).collect()[0]['count(target)']
numberOfRecInSample2 = sample2.agg({'target':'count'}).collect()[0]['count(target)']
numberOfRecInSample3 = sample3.agg({'target':'count'}).collect()[0]['count(target)']

numOfRecInSamplesList=[numberOfRecInSample1,numberOfRecInSample2,numberOfRecInSample3]
print(numOfRecInSamplesList)

[5286, 10602, 15967]


In [None]:
matchedPredictionsPercent1 = ((predictions1.filter(predictions1['label'] == predictions1['prediction']).count())/numberOfRecInSample1)*100
matchedPredictionsPercent2 = ((predictions2.filter(predictions2['label'] == predictions2['prediction']).count())/numberOfRecInSample2)*100
matchedPredictionsPercent3 = ((predictions3.filter(predictions3['label'] == predictions3['prediction']).count())/numberOfRecInSample3)*100

predMatchWithVSList = [matchedPredictionsPercent1,matchedPredictionsPercent2,matchedPredictionsPercent3]

print('predMatchWithVSList : ',predMatchWithVSList)


predMatchWithVSList :  [57.90768066590996, 58.01735521599698, 57.66894219327363]
