In [1]:
#import libraries

import os
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pandas as pd
import uuid
import random
import csv
from confluent_kafka import Producer
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import requests

In [2]:
#Establish connection between spark and kafka

os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell"

In [3]:
#Read data that we wand to send
df=pd.read_csv("sample_data.csv")
df

Unnamed: 0,id,Message,Class
0,1,Even my brother is not like to speak with me. ...,ham
1,2,Had your mobile 11 months or more? U R entitle...,Spam
2,3,URGENT! You have won a 1 week FREE membership ...,Spam
3,4,I have been searching for the right words to t...,ham


In [4]:
# kafka producer variables
bootstrap_servers='kafka:9092'
topic='test'

In [5]:
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {}'.format(msg.topic()))

def confluent_kafka_producer():

    p = Producer({'bootstrap.servers': bootstrap_servers})
    with open("sample_data.csv",'r') as file:
        reader= csv.reader(file,delimiter='\t')
        for msg in reader:
            record_key=str(uuid.uuid4())
            record_value=json.dumps({'data':msg})
            p.produce(topic,key=record_key, value=record_value, on_delivery=delivery_report)
            p.poll(0)
        
        
            p.flush()
    print('we\'ve sent {count} messages to {brokers}'.format(count=len(df), brokers=bootstrap_servers))

In [6]:
confluent_kafka_producer()

Message delivered to test
Message delivered to test
Message delivered to test
Message delivered to test
Message delivered to test
we've sent 4 messages to kafka:9092


In [8]:
spark = SparkSession \
    .builder \
    .appName('RealtimeKafkaML') \
    .getOrCreate()

In [9]:
df_raw = spark \
  .readStream \
  .format('kafka') \
  .option('kafka.bootstrap.servers', bootstrap_servers) \
  .option("startingOffsets", "earliest") \
  .option('subscribe', topic) \
  .load()

In [10]:
df_json = df_raw.selectExpr('CAST(value AS STRING) as json')

In [18]:
# read a small batch of data from kafka and display to the console

schema = StructType([StructField('data', StringType())])

df_json.select(from_json(df_json.json, schema).alias('raw_data')) \
  .select('raw_data.data') \
  .writeStream \
  .trigger(once=True) \
  .format("console") \
  .start() \
  .awaitTermination()

21/11/20 11:42:49 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-3f423e5f-8c1e-40d5-bc3f-c446bf68a638. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/11/20 11:42:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+
|                data|
+--------------------+
|["id,Message,Class"]|
|["1,Even my broth...|
|["2,Had your mobi...|
|["3,\"URGENT! You...|
|["4,I have been s...|
|["id,Message,Class"]|
|["1,Even my broth...|
|["2,Had your mobi...|
|["3,\"URGENT! You...|
|["4,I have been s...|
|["id,Message,Class"]|
|["1,Even my broth...|
|["2,Had your mobi...|
|["3,\"URGENT! You...|
|["4,I have been s...|
|["id,Message,Class"]|
|["1,Even my broth...|
|["2,Had your mobi...|
|["3,\"URGENT! You...|
|["4,I have been s...|
+--------------------+
only showing top 20 rows

