In [None]:
from pyspark.sql import SparkSession, functions as F, types
from pyspark import SparkConf
from sparknlp.annotator import *
from pyspark.ml import Pipeline, PipelineModel

Setting appropriate jar files and mongo connection

In [None]:
mongodb_conn = '<MONGDB_CONNECTION_STRING>'
conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1,org.mongodb.spark:mongo-spark-connector:10.0.4,com.johnsnowlabs.nlp:spark-nlp_2.12:4.2.1' )
conf.set("spark.mongodb.input.uri", mongodb_conn)
conf.set("spark.mongodb.output.uri", mongodb_conn)
conf.set("spark.driver.memory","8G")
spark = SparkSession.builder.appName('twitter') \
    .master('local[*]') \
    .config(conf=conf) \
    .getOrCreate()

Reading tweets from Kafka server by subscribing to the producer topic

In [None]:
tweets = spark.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'localhost:9092') \
    .option('subscribe', 'politics') \
    .option('startOffsets', 'earliest') \
    .option('failOnDataLoss', 'false') \
    .load()

In [None]:
#Hugging Face Model
MODEL_NAME = 'j-hartmann/emotion-english-distilroberta-base'

In [None]:
sequenceClassifier_loaded = RoBertaForSequenceClassification.load("./{}_spark_nlp".format(MODEL_NAME))\
  .setInputCols(["document",'token'])\
  .setOutputCol("class")

Each row in the source has the following schema

- key - binary
- value - binary
- topic - string
- partition - int
- offset - long
- timestamp - long
- timestampType - int

To read the data (value) from binary it needs to be converted to string

In [None]:
tweet_df = tweets.selectExpr("CAST(value AS STRING)")

In [None]:
tweet_df = tweet_df.withColumnRenamed('value', 'text').withColumnRenamed('timestamp', 'time')

In [None]:
document_assembler = DocumentAssembler() \
    .setInputCol('text') \
    .setOutputCol('document') \
    .setCleanupMode('shrink')

tokenizer = Tokenizer() \
    .setInputCols(['document']) \
    .setOutputCol('token')

pipeline = Pipeline(stages=[
    document_assembler, 
    tokenizer,
    sequenceClassifier_loaded    
])

result = pipeline.fit(tweet_df).transform(tweet_df)
result = result.select("text", 'time', "class.result")

Writing the tweets with emotion classified to MongoDB

In [None]:
query = result.writeStream \
    .format("mongodb") \
    .outputMode("append") \
    .option('spark.mongodb.connection.uri', mongodb_conn) \
    .option("spark.mongodb.database", "MONGODB_DATABASE") \
    .option("spark.mongodb.collection", "<MONGODB_COLLECTION>") \
    .option('checkpointLocation', '<PATH_FOR_CHECKPOINT_LOCATION>') \
    .start()

In [None]:
query.stop()