# When Rotten Tomatoes Isn't Enough: Twitter Sentiment Analysis with DSE

#### A demo using DataStax Enterprise Analytics, Apache Cassandra, Apache Spark, python, Jupyter Notebooks, twitter api, pattern, and sentiment analysis

### Things To Setup
* Create a Twitter Account and get API access: https://developer.twitter.com/en/docs/ads/general/guides/getting-started.html
* Install DSE https://docs.datastax.com/en/install/doc/install60/installTOC.html
* Start DSE Analytics Cluster: dse cassandra -k #Must use -k option for Analytics
* Set and Source Twitter enviroment variables in shell you will start Jupyter from
* CONSUMER_KEY 
* CONSUMER_SECRET 
* ACCESS_TOKEN 
* ACCESS_TOKEN_SECRET
* Using Python 2.7
* Using DSE Analytics 6
* Using latest verion of Jupyter
* Install Anaconda and Jupyter #Anaconda is not required but will make installing jupyter easier 
* Find full path to <>/dse-6.0.1/resources/spark/python/lib/pyspark.zip
* Find full path to <>/dse-6.0.1/resources/spark/python/lib/py4j-0.10.4-src.zip
* Start Jupyter with DSE to get all environemnt variables: dse exec jupyter notebook
* !pip install cassandra-driver
* !pip install tweepy 
* !pip install pattern 
* !pip install panadas
* Counter-intuitive don't install pyspark!!

#### Add some environment variables to find dse verision of pyspark. Edit these varibles with your path.

In [1]:
pysparkzip = "/opt/dse/resources/spark/python/lib/pyspark.zip"
py4jzip = "/opt/dse/resources/spark/python/lib/py4j-0.10.4-src.zip"

In [2]:
# Needed to be able to find pyspark libaries
import sys
sys.path.append(pysparkzip)
sys.path.append(py4jzip)

#### Import python packages -- all are required

In [3]:
import pandas
import cassandra
import pyspark
import tweepy
import re
import os
from IPython.display import display, Markdown
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pattern.en import sentiment, positive

  from .tslib import iNaT, NaT, Timestamp, Timedelta, OutOfBoundsDatetime
  from pandas._libs import (hashtable as _hashtable,
  from pandas._libs import algos, lib
  from pandas._libs import hashing, tslib
  from pandas._libs import (lib, index as libindex, tslib as libts,
  import pandas._libs.tslibs.offsets as liboffsets
  from pandas._libs import algos as libalgos, ops as libops
  from pandas._libs.interval import (
  from pandas._libs import internals as libinternals
  import pandas._libs.sparse as splib
  import pandas._libs.window as _window
  from pandas._libs import (lib, reduction,
  from pandas._libs import algos as _algos, reshape as _reshape
  import pandas._libs.parsers as parsers
  from pandas._libs import algos, lib, writers as libwriters


#### Helper function to have nicer formatting of Spark DataFrames

In [4]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  5, truncate = True):
    if(truncate):
        pandas.set_option('display.max_colwidth', 50)
    else:
        pandas.set_option('display.max_colwidth', -1)
    pandas.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pandas.reset_option('display.max_rows')

### Creating Tables, Pulling Tweets, and Loading Tables

#### Connect to DSE Analytics Cluster

In [5]:
from cassandra.cluster import Cluster

cluster = Cluster(['dse']) #If you have a locally installed DSE cluster
session = cluster.connect()

#### Create Demo Keyspace 

In [6]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS dseanalyticsdemo 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

<cassandra.cluster.ResultSet at 0x7f0cff3103d0>

#### Set keyspace 

In [7]:
session.set_keyspace('dseanalyticsdemo')

#### Set Movie Title variable --Change this to search for different movies!

In [8]:
movieTitle = "missonimpossible"

In [9]:
positiveNegative = ["pos", "sad"] 

#### Create two tables in Cassandra for the movie title. One of negative tweets and one for positive tweets. Twitter returns a lot of information with each call but for this demo we will just utilize the twitter id (as our Primary key as it is unique) and the actual tweet. 
#### Is using twitter id the right value to distriubte by? Consider your data model when choosing your primary key. 

In [10]:
for emotion in positiveNegative: 
    
    query = "CREATE TABLE IF NOT EXISTS movie_tweets2_%s_%s (twitterid bigint, tweet text, PRIMARY KEY (twitterid))" % (movieTitle, emotion)
    print query
    session.execute(query)


CREATE TABLE IF NOT EXISTS movie_tweets2_missonimpossible_pos (twitterid bigint, tweet text, PRIMARY KEY (twitterid))
CREATE TABLE IF NOT EXISTS movie_tweets2_missonimpossible_sad (twitterid bigint, tweet text, PRIMARY KEY (twitterid))


#### Setting up Search Terms for gathering tweets from Twitters API. The happy :) and sad :( face are twitter operators to find positive and negative tweets

In [11]:
searchTermSad = movieTitle + " :("
searchTermPos = movieTitle + " :)"

searchTerms = [searchTermSad, searchTermPos]

#### Function to CleanUp Each Tweet before if is inserted into Cassandra.
#### Removing: 
* emojis 
* flags 
* special characters 
* URL's 
* RT (for Retweet)

In [12]:
#Code from: https://stackoverflow.com/questions/33404752/removing-emojis-from-a-string-in-python

def cleanUpTweet(tweet):
    
    emoji_pattern = re.compile(
    u"(\ud83d[\ude00-\ude4f])|"
    u"(\ud83c[\udf00-\uffff])|"  
    u"(\ud83d[\u0000-\uddff])|" 
    u"(\ud83d[\ude80-\udeff])|"  
    u"(\ud83c[\udde0-\uddff])" 
    "+", flags=re.UNICODE)

    removeSpecial = re.compile ('[\n|#|@|!|.|?|,|\"]')
    removeHttp = re.compile("http\S+ | https\S+")
    removeRetweet = re.compile("RT")
    
    noemoji = emoji_pattern.sub(r'', tweet)
    nospecial = removeSpecial.sub(r'', noemoji)
    nohttp = removeHttp.sub(r'', nospecial)
    noretweet = removeRetweet.sub(r'', nohttp)
    
    cleanTweet=noretweet
    
    return cleanTweet

#### Required from Twitter: 
* consumer_key= ''
* consumer_secret= ''
* access_token=''
* access_token_secret=''

In [13]:

consumer_key = os.environ['CONSUMER_KEY']
consumer_secret = os.environ['CONSUMER_SECRET']

access_token = os.environ['ACCESS_TOKEN']
access_token_secret = os.environ['ACCESS_TOKEN_SECRET']

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)

#### This cell will pull tweets from Twitter. The max number of tweets returned for free at one time is 100. 
#### Run this code a couple of times to get more data! 
#### Once the tweets are collected, loop over the list, clean up each tweet, and then insert it into the table. A large for loops surrounds this to make one call for postive tweets and one call for negative tweets. Happy and sad face have been URL encoded. :) = "%20%3A%29" and :( = "%20%3A%28"

In [14]:
for emotion in positiveNegative:
    print emotion
    public_tweets = 0
    query = "INSERT INTO movie_tweets2_%s_%s (twitterid, tweet)" % (movieTitle, emotion)
    query = query + " VALUES (%s, %s)"
    
    if emotion == "pos":
        searchTermPos= movieTitle + "%20%3A%29"
        public_tweets = api.search(q=searchTermPos, lang="en", count="100")
    if emotion == "sad":
        searchTermSad= movieTitle + "%20%3A%28"
        public_tweets = api.search(q=searchTermSad, lang="en", count="100")

    for tweet in public_tweets:
        cleanTweet = cleanUpTweet(tweet.text)
        if "JennyTinmouth" not in cleanTweet:
            session.execute(query, (tweet.id, cleanTweet))
            print(cleanTweet)

pos
 moon2150: Let’s try to catch a few of those fingerlings we relesed to weigh them missonimpossible agriterra fish RupaLake nepal b…
 moon2150: Let’s try to catch a few of those fingerlings we relesed to weigh them missonimpossible agriterra fish RupaLake nepal b…
Let’s try to catch a few of those fingerlings we relesed to weigh them missonimpossible agriterra fish RupaLake…
MissonImpossible the new star🦎after TomCruise
loved the new MissonImpossible film was brilliant
sad
 moon2150: Let’s try to catch a few of those fingerlings we relesed to weigh them missonimpossible agriterra fish RupaLake nepal b…
 moon2150: Let’s try to catch a few of those fingerlings we relesed to weigh them missonimpossible agriterra fish RupaLake nepal b…
Let’s try to catch a few of those fingerlings we relesed to weigh them missonimpossible agriterra fish RupaLake…
MissonImpossible the new star🦎after TomCruise
loved the new MissonImpossible film was brilliant


#### Do a select * on each table and verify that the tweets have been inserted into each Cassandra table

In [15]:
for emotion in positiveNegative:
    print emotion
    query = 'SELECT * FROM movie_tweets2_%s_%s limit 10' % (movieTitle, emotion)
    rows = session.execute(query)
    for user_row in rows:
        print (user_row.twitterid, user_row.tweet)

pos
(1031076451515293696, u'Let\u2019s try to catch a few of those fingerlings we relesed to weigh them missonimpossible agriterra fish RupaLake\u2026')
(1031077258927198208, u' moon2150: Let\u2019s try to catch a few of those fingerlings we relesed to weigh them missonimpossible agriterra fish RupaLake nepal b\u2026')
(1030577339867037697, u'loved the new MissonImpossible film was brilliant')
(1030812648038952961, u'MissonImpossible the new star\U0001f98eafter TomCruise')
(1031158181941202944, u' moon2150: Let\u2019s try to catch a few of those fingerlings we relesed to weigh them missonimpossible agriterra fish RupaLake nepal b\u2026')
sad
(1031076451515293696, u'Let\u2019s try to catch a few of those fingerlings we relesed to weigh them missonimpossible agriterra fish RupaLake\u2026')
(1031077258927198208, u' moon2150: Let\u2019s try to catch a few of those fingerlings we relesed to weigh them missonimpossible agriterra fish RupaLake nepal b\u2026')
(1030577339867037697, u'loved the

### Finally time for Apache Spark! 

#### Create a spark session that is connected to Cassandra. From there load each table into a Spark Dataframe and take a count of the number of rows in each.

In [16]:
countTokens = udf(lambda words: len(words), IntegerType())

spark = SparkSession.builder.appName('demo').master("dse://dse:9042").getOrCreate()

tableNamePos = "movie_tweets2_%s_pos" % (movieTitle.lower())
tableNameSad = "movie_tweets2_%s_sad" % (movieTitle.lower())
tablepos = spark.read.format("org.apache.spark.sql.cassandra").options(table=tableNamePos, keyspace="dseanalyticsdemo").load()
tablesad = spark.read.format("org.apache.spark.sql.cassandra").options(table=tableNameSad, keyspace="dseanalyticsdemo").load()

print "Postive Table Count: "
print tablepos.count()
print "Negative Table Count: "
print tablesad.count()


Postive Table Count: 
5
Negative Table Count: 
5


#### Use Tokenizer to break up the sentences into indiviudals words

In [35]:
tokenizerPos = Tokenizer(inputCol="tweet", outputCol="tweetwords")
tokenizedPos = tokenizerPos.transform(tablepos)

dfPos = tokenizedPos.select("tweet", "tweetwords").withColumn("tokens", countTokens(col("tweetwords")))

showDF(dfPos)

tokenizerSad = Tokenizer(inputCol="tweet", outputCol="tweetwords")
tokenizedSad = tokenizerSad.transform(tablesad)

dfSad = tokenizedSad.select("tweet", "tweetwords").withColumn("tokens", countTokens(col("tweetwords")))

showDF(dfSad)

Unnamed: 0,tweet,tweetwords,tokens
0,New from the Phil's Quick Capsule Review:,"[new, from, the, phil's, quick, capsule, review:]",7
1,Viratham Kalainthathu😸Showtime : MissonImpossible,"[viratham, kalainthathu😸showtime, :, missonimp...",4
2,It’s SaturdayNight it’s a MissonImpossible Mar...,"[it’s, saturdaynight, it’s, a, missonimpossibl...",7
3,summer movies blockbuster avengers plothole to...,"[summer, movies, blockbuster, avengers, plotho...",12
4,Mission Impossible Fallout is the GREATEST act...,"[mission, impossible, fallout, is, the, greate...",21


Unnamed: 0,tweet,tweetwords,tokens
0,LesleyAnnBrandt TomCruise It is mind blowing M...,"[lesleyannbrandt, tomcruise, it, is, mind, blo...",16
1,VijayFreak_: Viratham Kalainthathu😸Showtime :...,"[, vijayfreak_:, viratham, kalainthathu😸showti...",6
2,dan_steenson: Mission Impossible Fallout is t...,"[, dan_steenson:, mission, impossible, fallout...",22
3,LostInGallifrey: Some may think that saving S...,"[, lostingallifrey:, some, may, think, that, s...",19
4,Lets mix those potions bois We discuss missoni...,"[lets, mix, those, potions, bois, we, discuss,...",18


#### Using StopWordsRemover to remove all stop words. Interesting to see, people don't use many stop words with twitter!

In [36]:
removerPos = StopWordsRemover(inputCol="tweetwords", outputCol="tweetnostopwords")
removedPos = removerPos.transform(dfPos)

dfPosStop = removedPos.select("tweet", "tweetwords", "tweetnostopwords").withColumn("tokens", countTokens(col("tweetwords"))).withColumn("notokens", countTokens(col("tweetnostopwords")))

showDF(dfPosStop)

removerSad = StopWordsRemover(inputCol="tweetwords", outputCol="tweetnostopwords")
removedSad = removerSad.transform(dfSad)

dfSadStop = removedSad.select("tweet", "tweetwords", "tweetnostopwords").withColumn("tokens", countTokens(col("tweetwords"))).withColumn("notokens", countTokens(col("tweetnostopwords")))

showDF(dfSadStop)

Unnamed: 0,tweet,tweetwords,tweetnostopwords,tokens,notokens
0,New from the Phil's Quick Capsule Review:,"[new, from, the, phil's, quick, capsule, review:]","[new, phil's, quick, capsule, review:]",7,5
1,Viratham Kalainthathu😸Showtime : MissonImpossible,"[viratham, kalainthathu😸showtime, :, missonimp...","[viratham, kalainthathu😸showtime, :, missonimp...",4,4
2,It’s SaturdayNight it’s a MissonImpossible Mar...,"[it’s, saturdaynight, it’s, a, missonimpossibl...","[it’s, saturdaynight, it’s, missonimpossible, ...",7,6
3,summer movies blockbuster avengers plothole to...,"[summer, movies, blockbuster, avengers, plotho...","[summer, movies, blockbuster, avengers, plotho...",12,12
4,Mission Impossible Fallout is the GREATEST act...,"[mission, impossible, fallout, is, the, greate...","[mission, impossible, fallout, greatest, actio...",21,15


Unnamed: 0,tweet,tweetwords,tweetnostopwords,tokens,notokens
0,LostInGallifrey: Some may think that saving S...,"[, lostingallifrey:, some, may, think, that, s...","[, lostingallifrey:, may, think, saving, shado...",19,12
1,215Coltsfan MDanelySB Yes soon he'll have his ...,"[215coltsfan, mdanelysb, yes, soon, he'll, hav...","[215coltsfan, mdanelysb, yes, soon, scene, lik...",20,12
2,New from the Phil's Quick Capsule Review:,"[new, from, the, phil's, quick, capsule, review:]","[new, phil's, quick, capsule, review:]",7,5
3,LesleyAnnBrandt TomCruise It is mind blowing M...,"[lesleyannbrandt, tomcruise, it, is, mind, blo...","[lesleyannbrandt, tomcruise, mind, blowing, mi...",16,12
4,VijayFreak_: Viratham Kalainthathu😸Showtime :...,"[, vijayfreak_:, viratham, kalainthathu😸showti...","[, vijayfreak_:, viratham, kalainthathu😸showti...",6,6


### Sentiment Analysis using Python package Pattern

#### Convert each Spark Dataframe to a Pandas Dataframe. This works as-is because we are working with a small dataset. For larger datasets only convert to Pandas if data can fit in memory. From there loop over each row and get the sentiment score (anything + is postive and anything - or 0 is negative). The "positive" function will return true if the tweet is postive. The "assessment" function shows which words where used to judge and the score of each word. For more info on how the scores are calcuated: https://www.clips.uantwerpen.be/pages/pattern-en#sentiment

#### Negative Tweets

In [38]:
pandaSad = dfSadStop.toPandas()
movieScoreSad = 0
countSad = 0
numSadTweets = 0
sadList = list()

for index, row in pandaSad.iterrows():
    if positive(row["tweetnostopwords"], .1):
        countSad = countSad + 1
    scoreSad = sentiment(row['tweetnostopwords'])[0]
    if scoreSad <= 0:
        #print row['tweet']
        #print sentiment(row['tweetnostopwords'])[0]
        sadList.append((row['tweet'], sentiment(row["tweetnostopwords"]), positive(row["tweetnostopwords"]), \
                         sentiment(row['tweetnostopwords']).assessments))
        movieScoreSad = scoreSad + movieScoreSad
        
labels = ['Original Tweet', 'Sentiment Score', 'Postive', 'Assessments']
sadTweetScores = pandas.DataFrame.from_records(sadList, columns=labels)

sadTweetScores

Unnamed: 0,Original Tweet,Sentiment Score,Postive,Assessments
0,VijayFreak_: Viratham Kalainthathu😸Showtime :...,"(0.0, 0.0)",False,[]
1,Viratham Kalainthathu😸Showtime : MissonImpossible,"(0.0, 0.0)",False,[]
2,It’s SaturdayNight it’s a MissonImpossible Mar...,"(0.0, 0.0)",False,[]
3,summer movies blockbuster avengers plothole to...,"(-0.6, 0.9)",False,"[([crazy], -0.6, 0.9, None)]"
4,LostInGallifrey: Some may think that saving S...,"(0.0, 0.0)",False,[]
5,215Coltsfan MDanelySB Yes soon he'll have his ...,"(-0.265151515152, 0.727272727273)",False,"[([new], 0.136363636364, 0.454545454545, None)..."
6,LostInGallifrey: Some may think that saving S...,"(0.0, 0.0)",False,[]
7,Lets mix those potions bois We discuss missoni...,"(-0.6, 0.8)",False,"[([dirty], -0.6, 0.8, None)]"
8,VijayFreak_: Viratham Kalainthathu😸Showtime :...,"(0.0, 0.0)",False,[]


#### Positive Tweet
#### Also adding up all the sentiment scores of all the tweets

In [39]:
pandaPos = dfPosStop.toPandas()
movieScore = 0
countPos = 0
poslist = list()

for index, row in pandaPos.iterrows():
    if not positive(row["tweetnostopwords"]) and sentiment(row["tweetnostopwords"])[0] != 0.0:
        countPos = countPos + 1
    score = sentiment(row['tweetnostopwords'])[0]
    if score > 0:
        poslist.append((row['tweet'], sentiment(row["tweetnostopwords"]), positive(row["tweetnostopwords"]), \
                         sentiment(row['tweetnostopwords']).assessments))
        movieScore = score + movieScore
        
labels = ['Original Tweet', 'Sentiment Score', 'Postive', 'Assessments']
postiveTweetScores = pandas.DataFrame.from_records(poslist, columns=labels)

postiveTweetScores

Unnamed: 0,Original Tweet,Sentiment Score,Postive,Assessments
0,LesleyAnnBrandt TomCruise It is mind blowing M...,"(0.55, 0.2)",True,"[([best], 1.0, 0.3, None), ([action], 0.1, 0.1..."
1,dan_steenson: Mission Impossible Fallout is t...,"(0.233333333333, 0.65)",True,"[([impossible], -0.666666666667, 1.0, None), (..."
2,BlackQueenLara Yours Queen Happy to know you'r...,"(0.766666666667, 0.866666666667)",True,"[([happy], 0.8, 1.0, None), ([enjoying], 0.5, ..."
3,AMAHappyCampers I hear missonimpossible might ...,"(0.8, 0.75)",True,"[([great], 0.8, 0.75, None)]"
4,loved the new MissonImpossible film was brilliant,"(0.578787878788, 0.751515151515)",True,"[([loved], 0.7, 0.8, None), ([new], 0.13636363..."
5,Mission Impossible Fallout is the GREATEST act...,"(0.233333333333, 0.65)",True,"[([impossible], -0.666666666667, 1.0, None), (..."
6,MissionImpossibleFallout is officially one of ...,"(0.55, 0.2)",True,"[([best], 1.0, 0.3, None), ([action], 0.1, 0.1..."
7,New from the Phil's Quick Capsule Review:,"(0.234848484848, 0.477272727273)",True,"[([new], 0.136363636364, 0.454545454545, None)..."


### Alright! Should I see this movie???

In [40]:
posrating = movieScore/(dfPos.count() - countPos)

display(Markdown('**{}**  \n{}'.format("Positive Rating Average Score", posrating)))

if dfSad.count() != 0:
    sadrating = movieScoreSad/(dfSad.count() - countSad)
else: 
    sadrating = 0

display(Markdown('**{}**  \n{}'.format("Negative Rating Average Score", sadrating)))

if posrating > abs(sadrating):
    print "People like this movie!"
    display(Markdown('**{}**  \n'.format("People Like This Movie!")))
elif posrating == abs(sadrating):
    display(Markdown('**{}**  \n'.format("People are split! Take a chance!")))
elif posrating < abs(sadrating):
    display(Markdown('***{}***  \n'.format("People Do Not Like This Movie!")))
    

**Positive Rating Average Score**  
0.281926406926

**Negative Rating Average Score**  
-0.162794612795

People like this movie!


**People Like This Movie!**  
