# Sentiment analysis on streaming Twitter data using Spark 

- [Stream Tweets in real-time](https://developer.twitter.com/en/docs/tutorials/stream-tweets-in-real-time)
- [How to analyze the sentiment of your own Tweets](https://developer.twitter.com/en/docs/tutorials/how-to-analyze-the-sentiment-of-your-own-tweets)
- [Apache Spark Streaming Tutorial: Identifying Trending Twitter Hashtags](https://www.toptal.com/apache/apache-spark-streaming-twitter)
- [Topic Modeling and Sentiment Analysis on Twitter Data Using Spark](https://towardsdatascience.com/topic-modeling-and-sentiment-analysis-on-twitter-data-using-spark-a145bfcc433)

### Send tweets from the Twitter API

#### Import the necessary packages

In [None]:
import tweepy
from tweepy import Stream
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
import socket
import json
import requests
import sys
import requests_oauthlib
import os

#### Credentials

In [None]:
consumer_key='3VQqLVDmUzFsbd9jnZ9q1jUH2'
consumer_secret='KhDLpqs9QECXzXLzulwIBiFkhLl3IBd8UhbaLMZUaAQwy0WQY2'
access_token ='1439283727784423430-CXAd1gXrgBuHQFfwBxtHsU5RRqnrBN'
access_secret='g7oBc5orMh72N8bjK7MLwMK75eLRPPt47ux3hfa9l7FlH'

# auth = OAuthHandler(consumer_key, consumer_secret)
# auth.set_access_token(access_token, access_secret)
# api = tweepy.API(auth, wait_on_rate_limit=True)

#### API Key
Think of the API key as the user name that represents your App when making API requests. It helps us verify who you are.

#### API Key Secret
Your API Key Secret is like a password and helps verify your API Key. This will be one of the last times you'll see it displayed, so remember to save it in a safe place.

#### Bearer Token
An Access Token used in authentication that allows you to pull specific data.

#### Access Token

#### Access Token Secret

#### Create a StreamListener instance

In [None]:
class TweetsListener(StreamListener):
    # tweet object listens for the tweets
    def __init__(self, csocket):
        self.client_socket = csocket
        
    def on_data(self, data):
        try:  
            msg = json.loads( data )
            print("new message")
            # if tweet is longer than 140 characters
            
            
            if "extended_tweet" in msg:
                # add at the end of each tweet "t_end" 
                self.client_socket\
                    .send(str(msg['extended_tweet']['full_text']+"t_end")\
                    .encode('utf-8'))         
                print(msg['extended_tweet']['full_text'])

            
            else:
                # add at the end of each tweet "t_end" 
                self.client_socket\
                    .send(str(msg['text']+"t_end")\
                    .encode('utf-8'))
                print(msg['text'])
            return True
        
        except BaseException as e:
            print("Error on_data: %s" % str(e))
        return True
    
    def on_error(self, status):
        print(status)

#### Send data from Twitter

In [None]:
def sendData(c_socket, keyword):
    print('start sending data from Twitter to socket')
    
    # authentication based on the credentials
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_secret)
    
    # start sending data from the Streaming API 
    twitter_stream = Stream(auth, TweetsListener(c_socket))
    twitter_stream.filter(track = keyword, languages=["en"])
    

#### Start Streaming

In [None]:
if __name__ == "__main__":
    
    # server (local machine) creates listening socket
    s = socket.socket()
    host = ''
    port = 5555
    try:
        s.bind((host, port))
    
    except socket.error as msg:
        print('Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1])
        sys.exit()

    print('Socket bind complete')
    print('socket is ready')
    

    
    # server (local machine) listens for connections
    s.listen(4)
    print('socket is listening')
    
    
    # return the socket and the address on the other side of the connection (client side)
    c_socket, addr = s.accept()
    print("Received request from: " + str(addr))
    
    
    # select here the keyword for the tweet data
    sendData(c_socket, keyword = ['The Comeback Trail'])
# The Comeback Trail” / “Robert De Niro”.

### Tweet preprocessing and sentiment analysis

#### Import the necessary packages

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob

#### Tweet preprocessing

In [None]:
def preprocessing(lines):
    words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    return words

#### Tweet sentiment analysis

In [None]:
# text classification
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity

In [None]:
def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity

In [None]:
def text_classification(words):
    
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("word"))
    
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, StringType())
    words = words.withColumn("subjectivity", subjectivity_detection_udf("word"))
    
    return words

#### Run the main function

In [None]:
if __name__ == "__main__":
    # create Spark session
    spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()
    
    # read the tweet data from socket
    lines = spark.readStream.format("socket").option("host", "0.0.0.0").option("port", 5555).load()
    
    # Preprocess the data
    words = preprocessing(lines)
    
    # text classification to define polarity and subjectivity
    words = text_classification(words)
    words = words.repartition(1)
    
    query = words.writeStream.queryName("all_tweets")\
        .outputMode("append").format("parquet")\
        .option("path", "./parc")\
        .option("checkpointLocation", "./check")\
        .trigger(processingTime='60 seconds').start()
    
    query.awaitTermination()