In [None]:
# pyspark functions for real time streaming
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions
from pyspark.streaming import StreamingContext

# textblob for sentiment analysis
!pip install --upgrade pip
!pip install textblob==0.17.1
from textblob import TextBlob

In [None]:
# text cleaning

def preprocessing(lines):
    
    words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
    
    words = words.na.replace("", None)
    words = words.na.drop()
    
    words = words.withColumn("word", functions.lower("word"))
    words = words.withColumn("word", functions.regexp_replace("word", r"http\S+", ""))
    words = words.withColumn("word", functions.regexp_replace("word", "@\w+", ""))
    words = words.withColumn("word", functions.regexp_replace("word", r"[^\w\s]", ""))
    words = words.withColumn("word", functions.regexp_replace("word", r"[^\x00-\x7F]+", ""))
    
    return words


# text classification

def sentiment_detection(text):
    sentiment = float(TextBlob(text).sentiment.polarity)
    if sentiment < 0:
        sentiment_str = "negative"
    elif sentiment > 0:
        sentiment_str = "positive"
    else:
        sentiment_str = "neutral"
    return sentiment_str

def polarity_detection(text):
    return TextBlob(text).sentiment.polarity

def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity

def text_classification(words):
    
    # sentiment_detection
    sentiment_detection_udf = udf(sentiment_detection, StringType())  
    words = words.withColumn("sentiment", sentiment_detection_udf("word"))
    
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("word"))    
    
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, StringType()) 
    words = words.withColumn("subjectivity", subjectivity_detection_udf("word"))    
    return words

In [None]:
# real time streaming
ssc = StreamingContext(sc, 1)

# create Spark session
spark_s = SparkSession.builder.appName("Real_time_Dwayne_Johnson_tweets").getOrCreate()

# read the tweet data from socket
host=3360
lines = spark_s \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", host) \
    .load()
print("Streaming in progress...")

# Preprocess the data
words = preprocessing(lines)
print("Processing: OK")
print(words)
    
# text classification to define sentiment
words = text_classification(words)
print("Classification of tweets sentiment: OK")
    
words = words.repartition(1)
    
query = words.writeStream.queryName("tweets_list") \
    .outputMode("append") \
    .format("memory") \
    .start()
    
#query.awaitTermination()

In [None]:
%sql

select * from tweets_list