# Simple Kafka Producer and Consumer

In [None]:
!pip install kafka-python

In [1]:
import json

from kafka import KafkaConsumer, KafkaProducer

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))


def connect_kafka_producer(server):
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[server], api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer

In [2]:
server = 'broker:9093'

heroes_topic = 'heroes'

In [3]:
from kafka.admin import KafkaAdminClient, NewTopic

admin_client = KafkaAdminClient(
    bootstrap_servers=server, 
    client_id='Franz'
)

In [5]:
# Create Topic
topic_list = [NewTopic(name=heroes_topic, num_partitions=3, replication_factor=1)]

admin_client.create_topics(new_topics=topic_list, validate_only=False)

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='heroes', error_code=0, error_message=None)])

In [4]:
# Remove Topic
# admin_client.delete_topics(['heroes'])

DeleteTopicsResponse_v3(throttle_time_ms=0, topic_error_codes=[(topic='heroes', error_code=0)])

# Spark Streaming Data to Kafka Sink

In [6]:
# Set JAVA_HOME for Spark
import os

spark_version = '3.2.0'

os.environ["JAVA_HOME"] = "jdk-11"
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12'

In [7]:
from pyspark.sql import functions as f
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.types import *

import timeit
start_time = timeit.default_timer()

spark = SparkSession \
      .builder \
      .appName('creating stream') \
      .master("local[*]") \
      .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0')\
      .getOrCreate()\

sc = spark._sc

# spark.version



:: loading settings :: url = jar:file:/opt/conda/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-10455bae-dd1e-4626-bc10-b7420da55ec7;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.0 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central

In [8]:
# Read stream from specific directory

stream_dir = 'individual_heroes'

schema = StructType([StructField('Name',StringType(),True),
                     StructField('Alignment',StringType(),True),
                     StructField('Intelligence',DecimalType(10, 2),True),
                     StructField('Strength',DecimalType(10, 2),True),
                     StructField('Speed',IntegerType(),True),
                     StructField('Durability',DecimalType(10, 2),True),
                     StructField('Power',DecimalType(10, 2),True),
                     StructField('Combat',DecimalType(10, 2),True),
                     StructField('Total',DecimalType(10, 2),True),
                     StructField('Id',IntegerType(),True)])

df = spark.readStream\
  .schema(schema)\
  .csv(stream_dir)\


# To KVs
df = df\
    .withColumn('Value', f.to_json(f.struct(*[f.col('Name').alias('Name'), 
                                    f.col('Alignment').alias('Alignment'), 
                                    f.col('Intelligence').alias('Intelligence'), 
                                    f.col('Strength').alias('Strength'), 
                                    f.col('Speed').alias('Speed'), 
                                    f.col('Durability').alias('Durability'), 
                                    f.col('Power').alias('Power'),
                                    f.col('Combat').alias('Combat'),
                                    f.col('Total').alias('Total')])))\
    .selectExpr("CAST(Id AS STRING) as key", "CAST(Value AS STRING) as value")

In [None]:
# To Kafka

df \
   .writeStream \
   .format("kafka") \
   .option("kafka.bootstrap.servers", "PLAINTEXT://broker:9093") \
   .option("topic", heroes_topic) \
   .option("checkpointLocation", "checkpoint")\
   .start()\
   .awaitTermination()

21/12/30 16:59:10 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/12/30 16:59:36 WARN FileStreamSource: Listed 370 file(s) in 2803 ms
21/12/30 17:00:10 WARN FileStreamSource: Listed 377 file(s) in 2613 ms          
21/12/30 17:00:19 WARN FileStreamSource: Listed 379 file(s) in 3019 ms
21/12/30 17:00:28 WARN FileStreamSource: Listed 380 file(s) in 2626 ms
21/12/30 17:00:38 WARN FileStreamSource: Listed 382 file(s) in 3874 ms
21/12/30 17:00:47 WARN FileStreamSource: Listed 384 file(s) in 2582 ms
21/12/30 17:00:54 WARN FileStreamSource: Listed 386 file(s) in 2559 ms
21/12/30 17:01:04 WARN FileStreamSource: Listed 388 file(s) in 2587 ms
21/12/30 17:01:13 WARN FileStreamSource: Listed 389 file(s) in 2628 ms
21/12/30 17:01:22 WARN FileStreamSource: Listed 391 file(s) in 3426 ms
21/12/30 17:01:32 WARN FileStreamSource: Listed 393 file(s) in 2426 ms
21/12/30 17:01:42 WARN FileStreamSource: Listed 395 file(s) in 37