# When Rotten Tomatoes Isn't Enough: Twitter Sentiment Analysis with DSE
------
<img src="images/allLogos.png" width="250" height="250">
#### A demo using DataStax Enterprise Analytics, Apache Cassandra, Apache Spark, Python, Jupyter Notebooks, Twitter tweets, pattern, and Sentiment Analysis

### Things To Setup
#### Please work through the ***Installation of DSE and Juypter Notebook*** for setup instructions


##### On your free time try to get the Twitter Dev API up and running. Utilize the other notebooks for this. This example will use CSV files.

#### Add some environment variables to find dse verision of pyspark. Edit these varibles with your path.

In [None]:
pysparkzip = "<>/cassandra/dse-6.0.4/resources/spark/python/lib/pyspark.zip"
py4jzip = ""<>/cassandra/dse-6.0.4/resources/spark/python/lib/py4j-0.10.4-src.zip"

In [None]:
# Needed to be able to find pyspark libaries
import sys
sys.path.append(pysparkzip)
sys.path.append(py4jzip)

#### Import python packages -- all are required

In [None]:
import pandas
import cassandra
import pyspark
import re
import os
from IPython.display import display, Markdown
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pattern.en import sentiment, positive

#### Helper function to have nicer formatting of Spark DataFrames

In [None]:
#Helper for pretty formatting for Spark DataFrames
def showDF(df, limitRows =  5, truncate = True):
    if(truncate):
        pandas.set_option('display.max_colwidth', 50)
    else:
        pandas.set_option('display.max_colwidth', -1)
    pandas.set_option('display.max_rows', limitRows)
    display(df.limit(limitRows).toPandas())
    pandas.reset_option('display.max_rows')

# DataStax Enterprise Analytics
<img src="images/datastaxlogo.png" width="200" height="200">

### Creating Tables and Loading Tweets

#### Connect to DSE Analytics Cluster

In [None]:
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

#### Create Demo Keyspace 

In [None]:
session.execute("""
    CREATE KEYSPACE IF NOT EXISTS demo1 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

#### Set keyspace 

In [None]:
session.set_keyspace('demo1')

#### Set Movie Title variable --Change this to search for different movies!
##### Choices are: MamaMia2, FirstMan, AStarIsBorn, and MissionImpossible

In [None]:
movieTitle = "??"

In [None]:
positiveNegative = ["pos", "sad"] 

#### Create two tables in Cassandra for the movie title. One of negative tweets and one for positive tweets. Twitter returns a lot of information with each call but for this demo we will just utilize the twitter id (as our Primary key as it is unique) and the actual tweet. 
#### Is using twitter id the right value to distriubte by? Consider your data model when choosing your primary key. 

In [None]:
for emotion in positiveNegative: 
    
    query = "CREATE TABLE IF NOT EXISTS movie_tweets_%s_%s (twitterid bigint, tweet text, PRIMARY KEY (twitterid))" % (movieTitle, emotion)
    print query
    session.execute(query)


### Load Twitter Tweets
#### Pulled from twitter and stored in CSV file
<img src="images/twitterlogo.png" width="100" height="100">

#### Load Negative Tweets from CSV file

In [None]:
fileName = 'data/' + movieTitle + '_sad.csv'
input_file = open(fileName, 'r')
for line in input_file:
    tweets = line.split(',')
    query = "INSERT INTO movie_tweets_%s_sad (twitterid, tweet)" % (movieTitle)
    query = query + " VALUES (%s, %s)"
    session.execute(query, (int(tweets[0]), tweets[1]))

#### Load Postive Tweets from CSV File

In [None]:
fileName1 = 'data/' + movieTitle + '_pos.csv'
input_file1 = open(fileName1, 'r')
for line in input_file1:
    tweets = line.split(',')
    query = "INSERT INTO movie_tweets_%s_pos (twitterid, tweet)" % (movieTitle)
    query = query + " VALUES (%s, %s)"
    session.execute(query, (int(tweets[0]), tweets[1]))

#### Do a select * on each table and verify that the tweets have been inserted into each Cassandra table

In [None]:
for emotion in positiveNegative:
    print emotion
    query = 'SELECT * FROM movie_tweets_%s_%s limit 10' % (movieTitle, emotion)
    rows = session.execute(query)
    for user_row in rows:
        print (user_row.twitterid, user_row.tweet)

## DSE Analytics with Apache Spark
<img src="images/sparklogo.png" width="150" height="200">

### Finally time for Apache Spark! 

#### Create a spark session that is connected to Cassandra. From there load each table into a Spark Dataframe and take a count of the number of rows in each.

In [None]:
countTokens = udf(lambda words: len(words), IntegerType())

spark = SparkSession.builder.appName('demo').master("local").getOrCreate()

tableNamePos = "movie_tweets_%s_pos" % (movieTitle.lower())
tableNameSad = "movie_tweets_%s_sad" % (movieTitle.lower())
tablepos = spark.read.format("org.apache.spark.sql.cassandra").options(table=tableNamePos, keyspace="demo1").load()
tablesad = spark.read.format("org.apache.spark.sql.cassandra").options(table=tableNameSad, keyspace="demo1").load()

print "Postive Table Count: "
print tablepos.count()
print "Negative Table Count: "
print tablesad.count()


#### Use Tokenizer to break up the sentences into indiviudals words

In [None]:
tokenizerPos = Tokenizer(inputCol="tweet", outputCol="tweetwords")
tokenizedPos = tokenizerPos.transform(tablepos)

dfPos = tokenizedPos.select("tweet", "tweetwords").withColumn("tokens", countTokens(col("tweetwords")))

showDF(dfPos)

tokenizerSad = Tokenizer(inputCol="tweet", outputCol="tweetwords")
tokenizedSad = tokenizerSad.transform(tablesad)

dfSad = tokenizedSad.select("tweet", "tweetwords").withColumn("tokens", countTokens(col("tweetwords")))

showDF(dfSad)

#### Using StopWordsRemover to remove all stop words. Interesting to see, people don't use many stop words with twitter!

In [None]:
removerPos = StopWordsRemover(inputCol="tweetwords", outputCol="tweetnostopwords")
removedPos = removerPos.transform(dfPos)

dfPosStop = removedPos.select("tweet", "tweetwords", "tweetnostopwords").withColumn("tokens", countTokens(col("tweetwords"))).withColumn("notokens", countTokens(col("tweetnostopwords")))

showDF(dfPosStop)

removerSad = StopWordsRemover(inputCol="tweetwords", outputCol="tweetnostopwords")
removedSad = removerSad.transform(dfSad)

dfSadStop = removedSad.select("tweet", "tweetwords", "tweetnostopwords").withColumn("tokens", countTokens(col("tweetwords"))).withColumn("notokens", countTokens(col("tweetnostopwords")))

showDF(dfSadStop)

### Sentiment Analysis using Python package Pattern

#### Convert each Spark Dataframe to a Pandas Dataframe. This works as-is because we are working with a small dataset. For larger datasets only convert to Pandas if data can fit in memory. From there loop over each row and get the sentiment score (anything + is postive and anything - or 0 is negative). The "positive" function will return true if the tweet is postive. The "assessment" function shows which words where used to judge and the score of each word. For more info on how the scores are calcuated: https://www.clips.uantwerpen.be/pages/pattern-en#sentiment

#### Negative Tweets

In [None]:
pandaSad = dfSadStop.toPandas()
movieScoreSad = 0
countSad = 0
numSadTweets = 0
sadList = list()

for index, row in pandaSad.iterrows():
    if positive(row["tweetnostopwords"], .1):
        countSad = countSad + 1
    scoreSad = sentiment(row['tweetnostopwords'])[0]
    if scoreSad <= 0:
        #print row['tweet']
        #print sentiment(row['tweetnostopwords'])[0]
        sadList.append((row['tweet'], sentiment(row["tweetnostopwords"]), positive(row["tweetnostopwords"]), \
                         sentiment(row['tweetnostopwords']).assessments))
        movieScoreSad = scoreSad + movieScoreSad
        
labels = ['Original Tweet', 'Sentiment Score', 'Postive', 'Assessments']
sadTweetScores = pandas.DataFrame.from_records(sadList, columns=labels)

sadTweetScores

#### Positive Tweet
#### Also adding up all the sentiment scores of all the tweets

In [None]:
pandaPos = dfPosStop.toPandas()
movieScore = 0
countPos = 0
poslist = list()

for index, row in pandaPos.iterrows():
    if not positive(row["tweetnostopwords"]) and sentiment(row["tweetnostopwords"])[0] != 0.0:
        countPos = countPos + 1
    score = sentiment(row['tweetnostopwords'])[0]
    if score > 0:
        #print row['tweet']
        #print sentiment(row['tweetnostopwords'])[0]
        poslist.append((row['tweet'], sentiment(row["tweetnostopwords"]), positive(row["tweetnostopwords"]), \
                         sentiment(row['tweetnostopwords']).assessments))
        movieScore = score + movieScore
        
labels = ['Original Tweet', 'Sentiment Score', 'Postive', 'Assessments']
postiveTweetScores = pandas.DataFrame.from_records(poslist, columns=labels)

postiveTweetScores

### Alright! Should I see this movie???

In [None]:
posrating = movieScore/(dfPos.count() - countPos)

display(Markdown('**{}**  \n{}'.format("Positive Rating Average Score", posrating)))

if dfSad.count() != 0:
    sadrating = movieScoreSad/(dfSad.count() - countSad)
else: 
    sadrating = 0

display(Markdown('**{}**  \n{}'.format("Negative Rating Average Score", sadrating)))

if posrating > abs(sadrating):
    display(Markdown('**{}**  \n'.format("People Like This Movie!")))
elif posrating == abs(sadrating):
    display(Markdown('**{}**  \n'.format("People are split! Take a chance!")))
elif posrating < abs(sadrating):
    display(Markdown('***{}***  \n'.format("People Do Not Like This Movie!")))
    

### Is this answer what you were expecting? Either way, go back and take a look at 

#### What if we try this again, but removing some extra StopWords. Let's remove: 
* Movie Title
* :)
* :(
* mission, impossible, star, first

#### Using StopWordsRemover to remove list of stop words (but note will not remove other stop words!)

In [None]:
stopwordList = [movieTitle,":(",":)", "mission", "impossible", "Star", "first"]

removerPos = StopWordsRemover(inputCol="tweetwords", outputCol="tweetnostopwords", stopWords=stopwordList)
removedPos = removerPos.transform(dfPos)

dfPosStop = removedPos.select("tweet", "tweetwords", "tweetnostopwords").withColumn("tokens", countTokens(col("tweetwords"))).withColumn("notokens", countTokens(col("tweetnostopwords")))

showDF(dfPosStop)

removerSad = StopWordsRemover(inputCol="tweetwords", outputCol="tweetnostopwords", stopWords=stopwordList)
removedSad = removerSad.transform(dfSad)

dfSadStop = removedSad.select("tweet", "tweetwords", "tweetnostopwords").withColumn("tokens", countTokens(col("tweetwords"))).withColumn("notokens", countTokens(col("tweetnostopwords")))

showDF(dfSadStop)

#### Sad Tweets: Convert to Pandas, use Pattern to get sentiment, and get the average

In [None]:
pandaSad = dfSadStop.toPandas()
movieScoreSad = 0
countSad = 0
numSadTweets = 0
sadList = list()

for index, row in pandaSad.iterrows():
    if positive(row["tweetnostopwords"], .1):
        countSad = countSad + 1
    scoreSad = sentiment(row['tweetnostopwords'])[0]
    if scoreSad <= 0:
        sadList.append((row['tweet'], sentiment(row["tweetnostopwords"]), positive(row["tweetnostopwords"]), \
                         sentiment(row['tweetnostopwords']).assessments))
        movieScoreSad = scoreSad + movieScoreSad
        
labels = ['Original Tweet', 'Sentiment Score', 'Postive', 'Assessments']
sadTweetScores = pandas.DataFrame.from_records(sadList, columns=labels)

sadTweetScores

#### Positive Tweets: Convert to Pandas, use Pattern to get sentiment, and get the average

In [None]:
pandaPos = dfPosStop.toPandas()
movieScore = 0
countPos = 0
poslist = list()

for index, row in pandaPos.iterrows():
    if not positive(row["tweetnostopwords"]) and sentiment(row["tweetnostopwords"])[0] != 0.0:
        countPos = countPos + 1
    score = sentiment(row['tweetnostopwords'])[0]
    if score > 0:
        poslist.append((row['tweet'], sentiment(row["tweetnostopwords"]), positive(row["tweetnostopwords"]), \
                         sentiment(row['tweetnostopwords']).assessments))
        movieScore = score + movieScore
        
labels = ['Original Tweet', 'Sentiment Score', 'Postive', 'Assessments']
postiveTweetScores = pandas.DataFrame.from_records(poslist, columns=labels)

postiveTweetScores

#### Okay let's see if there was a difference! 

In [None]:
posrating1 = movieScore/(dfPos.count() - countPos)

display(Markdown('**{}**  \n{}'.format("Positive Rating Original Average Score", posrating)))
display(Markdown('**{}**  \n{}'.format("Positive Rating Average Score", posrating1)))

if dfSad.count() != 0:
    sadrating1 = movieScoreSad/(dfSad.count() - countSad)
else: 
    sadrating1 = 0

display(Markdown('**{}**  \n{}'.format("Negative Rating Original Average Score", sadrating)))
display(Markdown('**{}**  \n{}'.format("Negative Rating Average Score", sadrating1)))

if posrating1 > abs(sadrating1):
    display(Markdown('**{}**  \n'.format("People Like This Movie!")))
elif posrating1 == abs(sadrating1):
    display(Markdown('**{}**  \n'.format("People are split! Take a chance!")))
elif posrating1 < abs(sadrating1):
    display(Markdown('***{}***  \n'.format("People Do Not Like This Movie!")))