In [8]:
import os
import uuid
import random
import json
import requests
import pandas as pd
from pyspark.sql import SparkSession
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark_version = '3.2.1'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--master local[2] pyspark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12-3.2.1,spark-streaming-kafka-0-10_2.12:3.2.1'

## Create topic on kafka

In [9]:
# kafka producer variables
bootstrap_servers='kafka:9092'
admin_client = AdminClient({
    "bootstrap.servers": bootstrap_servers
})
topic_list = []
topic_1='project_1'
topic_list.append(NewTopic(topic_1, 1, 1))
admin_client.create_topics(topic_list)

p = Producer({'bootstrap.servers': bootstrap_servers})

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {}'.format(msg.topic()))

def confluent_kafka_producer(topic):
    for data in simple_messages:
        record_key = str(uuid.uuid4())
        record_value = json.dumps({'data': data})
        p.produce(topic, key=record_key, value=record_value, on_delivery=delivery_report)
        p.poll(0)
    p.flush()
    print('we\'ve sent {count} messages to {brokers}'.format(count=msg_count, brokers=bootstrap_servers))

## Producer message to kafka

In [10]:
simple_messages = [
'I love this pony',
'This restaurant is great',
'The weather is bad today',
'I will go to the beach this weekend',
'She likes to swim',
'Apple is a great company'
]
msg_count=len(simple_messages)
    
confluent_kafka_producer(topic_1)

Message delivered to project_1
Message delivered to project_1
Message delivered to project_1
Message delivered to project_1
Message delivered to project_1
Message delivered to project_1
we've sent 6 messages to kafka:9092


In [11]:
spark  = SparkSession.builder.master("spark://spark-master:7077").getOrCreate()
spark

In [14]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", ":9092") \
  .option("subscribe", "project_1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

AnalysisException:  Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".        

In [None]:
df_json = df_raw.selectExpr('CAST(value AS STRING) as json')

In [None]:
# read a small batch of data from kafka and display to the console

schema = StructType([StructField('data', StringType())])

df_json.select(from_json(df_json.json, schema).alias('raw_data')) \
  .select('raw_data.data') \
  .writeStream \
  .trigger(once=True) \
  .format("console") \
  .start() \
  .awaitTermination()

In [None]:
# Test service
import requests
import json

data_jsons = '{"data":"' + simple_messages[1] + '"}'
print(data_jsons)
result = requests.post('http://127.0.0.1:9000/predict', json=json.loads(data_jsons))
print(json.dumps(result.json()))

#vader_udf = udf(lambda data: apply_sentiment_analysis(data), StringType())

In [None]:
def apply_sentiment_analysis(data):
    import requests
    import json
    
    result = requests.post('http://localhost:9000/predict', json=json.loads(data))
    return json.dumps(result.json())

vader_udf = udf(lambda data: apply_sentiment_analysis(data), StringType())

In [None]:
schema_input = StructType([StructField('data', StringType())])
schema_output = StructType([StructField('neg', StringType()),\
                            StructField('pos', StringType()),\
                            StructField('neu', StringType()),\
                            StructField('compound', StringType())])

df_json.select(from_json(df_json.json, schema_input).alias('sentence'),\
               from_json(vader_udf(df_json.json), schema_output).alias('response'))\
  .select('sentence.data', 'response.*') \
  .writeStream \
  .trigger(once=True) \
  .format("console") \
  .start() \
  .awaitTermination()