In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 pyspark-shell"

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pandas as pd
import uuid
import random
from confluent_kafka import Producer
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import requests

In [3]:
# kafka producer variables

simple_messages = [
'I love this pony',
'This restaurant is great',
'The weather is bad today',
'I will go to the beach this weekend',
'She likes to swim',
'Apple is a great company'
]

bootstrap_servers='127.0.0.1:9092'
topic='test'
msg_count=5

In [4]:
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {}'.format(msg.topic()))

def confluent_kafka_producer():

    p = Producer({'bootstrap.servers': bootstrap_servers})
    for data in simple_messages:
        
        record_key = str(uuid.uuid4())
        record_value = json.dumps({'data': data})
        
        p.produce(topic, key=record_key, value=record_value, on_delivery=delivery_report)
        p.poll(0)

    p.flush()
    print('we\'ve sent {count} messages to {brokers}'.format(count=len(simple_messages), brokers=bootstrap_servers))

In [5]:
confluent_kafka_producer()

Message delivered to test
Message delivered to test
Message delivered to test
Message delivered to test
Message delivered to test
Message delivered to test
we've sent 6 messages to 127.0.0.1:9092


In [6]:
spark = SparkSession \
    .builder \
    .appName('RealtimeKafkaML') \
    .getOrCreate()

In [9]:
df_raw = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', bootstrap_servers) \
  .option("startingOffsets", "earliest") \
  .option('subscribe', topic) \
  .load()

In [10]:
df_json = df_raw.selectExpr('CAST(value AS STRING) as json')

In [11]:
# read a small batch of data from kafka and display to the console

schema = StructType([StructField('data', StringType())])

df_json.select(from_json(df_json.json, schema).alias('raw_data')) \
  .select('raw_data.data') \
  .writeStream \
  .trigger(once=True) \
  .format("console") \
  .start() \
  .awaitTermination()

In [12]:
def apply_sentiment_analysis(data):
    import requests
    import json
    
    result = requests.post('http://localhost:9000/predict', json=json.loads(data))
    return json.dumps(result.json())

vader_udf = udf(lambda data: apply_sentiment_analysis(data), StringType())

In [13]:
schema_input = StructType([StructField('data', StringType())])
schema_output = StructType([StructField('neg', StringType()),\
                            StructField('pos', StringType()),\
                            StructField('neu', StringType()),\
                            StructField('compound', StringType())])

df_json.select(from_json(df_json.json, schema_input).alias('sentence'),\
               from_json(vader_udf(df_json.json), schema_output).alias('response'))\
  .select('sentence.data', 'response.*') \
  .writeStream \
  .trigger(once=True) \
  .format("console") \
  .start() \
  .awaitTermination()