In [0]:
!pip install --upgrade pip
!pip install textblob==0.17.1

In [0]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob

In [0]:
# text cleaning

def preprocessing(lines):
    words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('word', F.lower('word'))
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', r'[^\w\s]', ''))
    words = words.withColumn('word', F.regexp_replace('word', r'[^\x00-\x7F]+', ''))
    
    return words

In [0]:
# text classification

def sentiment_detection(text):
    sentiment = float(TextBlob(text).sentiment.polarity)
    if sentiment < 0:
        sentiment_str = 'negative'
    elif sentiment > 0:
        sentiment_str = 'positive'
    else:
        sentiment_str = 'neutral'
    return sentiment_str

def polarity_detection(text):
    return TextBlob(text).sentiment.polarity

def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity

def text_classification(words):
    
    # sentiment_detection
    sentiment_detection_udf = udf(sentiment_detection, StringType())  
    words = words.withColumn("sentiment", sentiment_detection_udf("word"))
    
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("word"))    
    
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, StringType()) 
    words = words.withColumn("subjectivity", subjectivity_detection_udf("word"))    
    return words


In [0]:
ssc = StreamingContext(sc, 1)
# create Spark session
spark_s = SparkSession.builder.appName("GC-ds_sentiment_analysis").getOrCreate()
# read the tweet data from socket
lines = spark_s \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 3412) \
    .load()
    
print("Streaming in progress")
# Preprocess the data
words = preprocessing(lines)
print("Processing ok")
print(words)
    
# text classification to define polarity, subjectivity and sentiment
words = text_classification(words)
print("Classification ok")
    
words = words.repartition(1)
    
query = words.writeStream.queryName("all_tweets") \
    .outputMode("append") \
    .format("memory") \
    .start()
    
#query.awaitTermination()

In [0]:
%sql

select * from all_tweets

In [0]:
#ssc.stop()
#query.stop()