# Twitter Sentiment Analysis with Pyspark

First step in any Apache Spark programming is to create a SparkContext. SparkContext is needed when we want to execute operations in a cluster. SparkContext tells Spark how and where to access a cluster. It is first step to connect with Apache Cluster. 

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext, SparkSession
import warnings

SCC_CHECKPOINT_PATH = "/Users/anujchaudhari/Desktop/256/project/samples/twitter_streaming/checkpoint"
STREAMING_SOCKET_IP = "192.168.0.100"
STREAMING_SOCKET_PORT = 5555
STREAMING_TIME_INTERVAL = 2

try:
    # create SparkContext on all CPUs available: in my case I have 4 CPUs on my laptop
    
    spark = SparkSession.builder.appName("twitter").getOrCreate()
    sc = spark.sparkContext
    sqlContext = SQLContext(sc)

    print("Just created a SparkContext")
    
except ValueError:
    warnings.warn("SparkContext already exists in this scope")
    
    

# Create Spark Streaming Context

ssc = StreamingContext(sc, STREAMING_TIME_INTERVAL )
ssc.checkpoint(SCC_CHECKPOINT_PATH)
socket_stream = ssc.socketTextStream(STREAMING_SOCKET_IP, STREAMING_SOCKET_PORT)
lines = socket_stream.window(STREAMING_TIME_INTERVAL)

print("SparkContext Master: " + sc.master)

Just created a SparkContext
SparkContext Master: local[*]


In [2]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('clean_tweet.csv')

In [3]:
type(df)

pyspark.sql.dataframe.DataFrame

In [4]:
df.show(5)

+---+--------------------+------+
|_c0|                text|target|
+---+--------------------+------+
|  0|awww that s a bum...|     0|
|  1|is upset that he ...|     0|
|  2|i dived many time...|     0|
|  3|my whole body fee...|     0|
|  4|no it s not behav...|     0|
+---+--------------------+------+
only showing top 5 rows



In [5]:
df = df.dropna()

In [6]:
df.count()

1596753

After successfully loading the data as Spark Dataframe, we can take a peek at the data by calling .show(), which is equivalent to Pandas .head(). After dropping NA, we have a bit less than 1.6 million Tweets. I will split this in three parts; training, validation, test. Since I have around 1.6 million entries, 1% each for validation and test set will be enough to test the models.

In [7]:
(train_set, val_set, test_set) = df.randomSplit([0.01, 0.01, 0.98], seed = 2000)

In [8]:
test_set.head(5)

[Row(_c0=0, text='awww that s a bummer you shoulda got david carr of third day to do it d', target=0),
 Row(_c0=1, text='is upset that he can t update his facebook by texting it and might cry as a result school today also blah', target=0),
 Row(_c0=2, text='i dived many times for the ball managed to save the rest go out of bounds', target=0),
 Row(_c0=3, text='my whole body feels itchy and like its on fire', target=0),
 Row(_c0=4, text='no it s not behaving at all i m mad why am i here because i can t see you all over there', target=0)]

## N-gram Implementation

I had to use VectorAssembler in the pipeline, to combine the features I get from each n-grams.

In [9]:
from pyspark.ml.feature import NGram, VectorAssembler
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

def build_ngrams_wocs(inputCol=["text","target"], n=3):
    tokenizer = [Tokenizer(inputCol="text", outputCol="words")]
    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=5460,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    lr = [LogisticRegression(maxIter=100)]
    return Pipeline(stages=tokenizer + ngrams + cv + idf+ assembler + label_stringIdx+lr)

### Model Training

In [16]:

trigramwocs_pipelineFit = build_ngrams_wocs().fit(train_set)
predictions_wocs = trigramwocs_pipelineFit.transform(val_set)
accuracy_wocs = predictions_wocs.filter(predictions_wocs.label == predictions_wocs.prediction).count() / float(val_set.count())

# print accuracy
print("Accuracy Score: {0:.4f}".format(accuracy_wocs))

Accuracy Score: 0.6883
CPU times: user 153 ms, sys: 36.2 ms, total: 189 ms
Wall time: 56.5 s


In [21]:
trigramwocs_pipelineFit.save("Model_Twitter_Sentiment")

In [12]:
from pyspark.ml import PipelineModel

pipeline = PipelineModel.load("Model_Twitter_Sentiment")

### Test Set Prediction 

In [14]:
test_predictions = pipeline.transform(val_set)
test_accuracy = test_predictions.filter(test_predictions.label == test_predictions.prediction).count() / float(val_set.count())

# # print accuracy, roc_auc
print("Accuracy Score: {0:.4f}".format(test_accuracy))
# #print("ROC-AUC: {0:.4f}".format(test_roc_auc))

Accuracy Score: 0.6883


### Spark Streaming Tweet Handling

In [23]:
import time
from pyspark.sql import Row
from pyspark.sql import SparkSession


def tweet_cleaner_updated(text):
    return text

def processTweets(rdd):

    try:        
        spark = SparkSession.builder.appName("test_import").getOrCreate()
        
        tweet = rdd.collect()
        if len(tweet) != 0:
            tweet = list(tweet[0])
        else:
            tweet = []

        rows = []
        for i in range(len(tweet)):
            rows.append(Row(_c0=i,text=tweet[i],target=0))

        if len(rows) == 0:
            rows.append(Row(_c0=1,text="EMPTY TWEET",target=0))
            
        df = spark.createDataFrame(rows)
        df.registerTempTable("tweets")
        
    except Exception as e: 
        print(e)
    
lines = lines.map(lambda x: x.lower());
lines = lines.map(lambda x: x.replace(" rt " , " "))
lines = lines.map(lambda x: x.replace("\n" , " "))
lines = lines.reduce(lambda x,y : x + y)
lines = lines.map(lambda x: x.split(" $$$$$$ "))
lines = lines.map(lambda x: tweet_cleaner_updated(x))

lines.foreachRDD(lambda rdd: processTweets(rdd))



Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.python.PythonTransformedDStream.
: java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported
	at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:228)
	at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:66)
	at org.apache.spark.streaming.api.python.PythonDStream.<init>(PythonDStream.scala:224)
	at org.apache.spark.streaming.api.python.PythonTransformedDStream.<init>(PythonDStream.scala:241)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


In [24]:
ssc.start()

Py4JJavaError: An error occurred while calling o31.start.
: java.lang.IllegalStateException: StreamingContext has already been stopped
	at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:608)
	at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)


### Redis Queue Config

In [None]:
import redis

config = {
    'host' : 'localhost',
    'port' : 6379,
    'db' : 0
}

redis_object = redis.StrictRedis(**config)

channel = "tweet_prediction"

### Get Tweet Data from temp table and predict the sentiment

In [None]:
import time
import json

count = 0
predicted_tweets = 0
time.sleep( 2 )

while count < 500:
    
    print("%%%%%%%%%%%%%%%%% BLOCK " + str(count) + " %%%%%%%%%%%%%%%%\n")    
    
    df_all_tweets = sqlContext.sql( 'Select * from tweets' )
    
    predicted_tweets = trigramwocs_pipelineFit.transform(df_all_tweets).collect()
    
    for tweet in predicted_tweets:
        print("\n########################")
        print(tweet.text)
        print(tweet.prediction)

        # Send Data to Redis Queue
        message = {}
        message["text"] = json.dumps(tweet)
        message["sentiment"] = tweet.prediction
        message_body = json.dump(message)
        message = '{message_body}'.format(**locals()).encode('UTF-8')
        
        redis_object.publish(channel, message)

    
    count = count + 1

In [20]:
ssc.awaitTermination()