In [1]:
!pip install TextBlob



In [2]:
from pyspark.sql import *
from pyspark.sql.functions import explode
def preprocessing(lines):
    words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    return words

In [3]:
# text classification
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity
def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity
def text_classification(words):
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("word"))
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, StringType())
    words = words.withColumn("subjectivity", subjectivity_detection_udf("word"))
    return words

In [None]:
if __name__ == "__main__":
    # create Spark session
    from pyspark.sql.functions import *
    from pyspark.sql import *
    import pyspark.sql.functions as F
    from pyspark.sql.types import *
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    from textblob import TextBlob
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import explode
    from pyspark.sql.functions import split
    from pyspark.sql import *
    spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()
    # read the tweet data from socket
    lines = spark.readStream.format("socket").option("host", "0.0.0.0").option("port", 5554).load()
    # Preprocess the data
    #words = preprocessing(lines)
    #words = lines.select(explode(split(lines.value, "t_end")).alias("word"))

    # text classification to define polarity and subjectivity
    #words = text_classification(words)
    #words = lines.repartition(1)
    query = lines.writeStream.queryName("all_tweets")\
        .outputMode("append").format("parquet")\
        .option("path", "./parc")\
        .option("checkpointLocation", "./check")\
        .trigger(processingTime='60 seconds').start()
    query.awaitTermination()

In [8]:
query.stop()