# Part 2: Tweet preprocessing and sentiment analysis

Import statements

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from textblob import TextBlob
import sparknlp
from sparknlp.pretrained import PretrainedPipeline

Tweet preprocessing

In [7]:
def preprocessing(lines):
    words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    return words

Tweet sentiment analysis

In [8]:
# text classification
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity
def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity
def text_classification(words):
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("word"))
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, StringType())
    words = words.withColumn("subjectivity", subjectivity_detection_udf("word"))
    return words

Run the main function

In [10]:
if __name__ == "__main__":
    
    # create Spark session
    spark = SparkSession.builder.appName("Spark NLP")\
                                .master("local[2]")\
                                .config("spark.driver.memory","8G")\
                                .config("spark.driver.maxResultSize", "0")\
                                .config("spark.kryoserializer.buffer.max", "2000M")\
                                .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.0.2")\
                                .getOrCreate()
    print('spark session created')
    # pipeline = PretrainedPipeline.from_disk("analyze_sentimentdl_use_twitter_en_2.5.0_2.4_1589108892106")
    pipeline = PretrainedPipeline("analyze_sentimentdl_use_twitter", lang="en")
    print('pipeline loaded')
    # read the tweet data from socket
    lines = spark.readStream.format("socket").option("host", "0.0.0.0").option("port", 5556).load()
    # Preprocess the data
    words = preprocessing(lines)
    # text classification to define polarity and subjectivity
    
    # extract results from "sentiments" column
    words = pipeline.transform(words.withColumnRenamed("word", "text"))
    words.selectExpr("text","explode(sentiment) sentiments", "polarity")\
         .selectExpr("text","sentiments.result result", "polarity")\
         .createOrReplaceTempView("result_tbl_")

    spark.sql("""
        SELECT
            text,
            CASE WHEN polarity>0 THEN 'positive' 
            WHEN polarity<0 THEN 'negative'
            ELSE 'neutral'
            END AS label,
            result
        FROM
        result_tbl_""").createOrReplaceTempView("result_tbl")
    
    #words = text_classification(words)
    words = words.repartition(1)
    query = words.writeStream.queryName("all_tweets")\
        .outputMode("append").format("parquet")\
        .option("path", "./parc")\
        .option("checkpointLocation", "./check")\
        .trigger(processingTime='60 seconds').start()
    query.awaitTermination()

spark session created
analyze_sentimentdl_use_twitter download started this may take some time.
Approx size to download 935.1 MB
[OK!]


Py4JJavaError: An error occurred while calling z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.downloadPipeline.
: java.lang.OutOfMemoryError: Java heap space
	at java.nio.file.Files.read(Files.java:3099)
	at java.nio.file.Files.readAllBytes(Files.java:3158)
	at com.johnsnowlabs.ml.tensorflow.TensorflowWrapper.writeObject(TensorflowWrapper.scala:235)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1155)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:140)
	at org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:176)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$3(BlockManager.scala:1432)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$3$adapted(BlockManager.scala:1430)
	at org.apache.spark.storage.BlockManager$$Lambda$1925/2009384867.apply(Unknown Source)
	at org.apache.spark.storage.DiskStore.put(DiskStore.scala:70)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1430)
	at org.apache.spark.storage.BlockManager$$Lambda$1401/477898198.apply(Unknown Source)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
	at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:1269)
	at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1758)
	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:133)
	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:91)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:35)
