In [1]:
from confluent_kafka import Consumer
from time import time, sleep
import os
import sys
nb_dir = os.path.split(os.getcwd())[0]
if nb_dir not in sys.path:
    sys.path.append(nb_dir)

from conf import conf

import pandas as pd
import numpy as np
import json
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
from sparknlp.pretrained import PretrainedPipeline

In [None]:
spark = sparknlp.start()

In [None]:
MODEL_NAME='classifierdl_use_emotion'

In [2]:
consumerConf = {'bootstrap.servers': conf.BOOTSTRAP_SERVER,
        'group.id': "AfekaFinalProj",
        'auto.offset.reset': 'smallest'}

consumer = Consumer(consumerConf)

In [None]:
documentAssembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")
    
use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")


sentimentdl = ClassifierDLModel.pretrained(name=MODEL_NAME)\
    .setInputCols(["sentence_embeddings"])\
    .setOutputCol("sentiment")

nlpPipeline = Pipeline(
      stages = [
          documentAssembler,
          use,
          sentimentdl
      ])

In [None]:
empty_df = spark.createDataFrame([['']]).toDF("text")
pipelineModel = nlpPipeline.fit(empty_df)

In [None]:
def toCategory(sentiment):
  negList = ['fear', 'sadness']
  if sentiment in negList:
    return 'Negative'
  else:
    return 'Positive'

In [3]:
running = True

def basic_consume_loop(consumer, topics):
    try:        
        consumer.subscribe(topics)
        
        start_time = time()
        seconds = 2

        batch = []
        
        while running:         
            msg = consumer.poll(timeout=3.0)            
            
            current_time = time()
            elapsed_time = current_time - start_time
    
            # create a new file
            if elapsed_time > seconds:               
                
                if len(batch) > 0:      

                    df = spark.createDataFrame(pd.DataFrame({"text":batch}))
                    result = pipelineModel.transform(df) 
                    
                    sentimentDF = result.select(F.explode('sentiment.result')).toPandas()

                    sentimentDF['PosNeg'] = sentimentDF['col'].apply(toCategory)

                    sentimentDF.groupby('PosNeg')["PosNeg"].count().plot(kind="pie")

                    sentimentDF.groupby('PosNeg')["PosNeg"].count().plot(kind="bar")

                    
                    batch = []
                
                start_time = time()  
                
            if msg is None: continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                batch.append(msg.value().decode('utf-8'))
                
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

def shutdown():
    running = False

In [None]:
basic_consume_loop(consumer, [conf.KAFKA_TOKEN])