# 1. Setup

In [1]:
! pip show confluent-kafka

Name: confluent-kafka
Version: 2.3.0
Summary: Confluent's Python client for Apache Kafka
Home-page: https://github.com/confluentinc/confluent-kafka-python
Author: Confluent Inc
Author-email: support@confluent.io
License: 
Location: /Users/sheidamajidi/anaconda3/envs/causalml-py38/lib/python3.8/site-packages
Requires: 
Required-by: 


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr


In [5]:
spark = SparkSession.builder \
        .appName("KafkaIntegrationExample") \
        .getOrCreate()


24/03/30 22:20:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


# 2. Batch Processing

## 2.1. Load topic data from Confluent in batch mode

In [3]:
from pyspark.sql import SparkSession, functions
topic_name = "test-topic"
bootstrap_servers = "server"
df_kafka = spark.read.format("kafka")\
    .option("kafka.bootstrap.servers", bootstrap_servers)\
    .option("subscribe", topic_name)\
    .option("kafka.security.protocol","SASL_SSL")\
    .option("kafka.sasl.mechanism", "PLAIN")\
    .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user name\" password=\"pass\";")\
    .load()

display(df_kafka)

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [5]:
df_kafka

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

## 2.2. Write a Kafka sink for batch queries

### 2.2.1. Create sample data

In [26]:
# import pyspark class Row from module sql
from pyspark.sql import *

In [27]:
sample_data = Row("key", "value", "topic")

samples = [
    sample_data('1', '{"name": "Jane Doe", "age": 29, "email": "jane.doe@example.com"}', "test-topic"),
    sample_data('2', '{"name": "John Smith", "age": 34, "email": "john.smith@example.com"}', "test-topic"),
    sample_data('3', '{"name": "Emily Jones", "age": 23, "email": "emily.jones@example.com"}', "test-topic"),
    sample_data('4', '{"name": "Michael Brown", "age": 45, "email": "michael.brown@example.com"}', "test-topic"),
    sample_data('5', '{"name": "Linda White", "age": 52, "email": "linda.white@example.com"}', "test-topic"),
    sample_data('6', '{"name": "David Harris", "age": 37, "email": "david.harris@example.com"}', "test-topic"),
    sample_data('7', '{"name": "Jessica Clark", "age": 28, "email": "jessica.clark@example.com"}', "test-topic"),
    sample_data('8', '{"name": "Daniel Lewis", "age": 43, "email": "daniel.lewis@example.com"}', "test-topic"),
    sample_data('9', '{"name": "Laura Allen", "age": 19, "email": "laura.allen@example.com"}', "test-topic"),
    sample_data('10', '{"name": "Kevin Walker", "age": 56, "email": "kevin.walker@example.com"}', "test-topic"),
    sample_data('11', '{"name": "Sarah Hall", "age": 33, "email": "sarah.hall@example.com"}', "test-topic"),
    sample_data('12', '{"name": "Brian Young", "age": 26, "email": "brian.young@example.com"}', "test-topic"),
    sample_data('13', '{"name": "Nancy King", "age": 49, "email": "nancy.king@example.com"}', "test-topic"),
    sample_data('14', '{"name": "Paul Scott", "age": 38, "email": "paul.scott@example.com"}', "test-topic"),
    sample_data('15', '{"name": "Lisa Green", "age": 31, "email": "lisa.green@example.com"}', "test-topic"),
    sample_data('16', '{"name": "James Adams", "age": 22, "email": "james.adams@example.com"}', "test-topic"),
    sample_data('17', '{"name": "Sandra Thompson", "age": 46, "email": "sandra.thompson@example.com"}', "test-topic")
]

# for demonstration, I print the first sample
print(samples[0])


Row(key='1', value='{"name": "Jane Doe", "age": 29, "email": "jane.doe@example.com"}', topic='test-topic')


### 2.2.2. Create a dataframe from sample data

In [30]:
# converting the list to a DataFrame
df = spark.createDataFrame(samples)

In [31]:
# df.show(truncate=False)
df.show()

+---+--------------------+----------+
|key|               value|     topic|
+---+--------------------+----------+
|  1|{"name": "Jane Do...|test-topic|
|  2|{"name": "John Sm...|test-topic|
|  3|{"name": "Emily J...|test-topic|
|  4|{"name": "Michael...|test-topic|
|  5|{"name": "Linda W...|test-topic|
|  6|{"name": "David H...|test-topic|
|  7|{"name": "Jessica...|test-topic|
|  8|{"name": "Daniel ...|test-topic|
|  9|{"name": "Laura A...|test-topic|
| 10|{"name": "Kevin W...|test-topic|
| 11|{"name": "Sarah H...|test-topic|
| 12|{"name": "Brian Y...|test-topic|
| 13|{"name": "Nancy K...|test-topic|
| 14|{"name": "Paul Sc...|test-topic|
| 15|{"name": "Lisa Gr...|test-topic|
| 16|{"name": "James A...|test-topic|
| 17|{"name": "Sandra ...|test-topic|
+---+--------------------+----------+



In [23]:
display(df)

DataFrame[customer_id: string, age: bigint, gender: string, email: string, total_spent: double]

### 2.2.3. Write data from a dataframe to a confluent kafka topic

In [39]:
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("topic", topic_name) \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pass\";") \
    .save()

                                                                                

# 3. Stream Processing

## 3.1. Read a stream from Kafka

In [42]:
df_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers)\
    .option("subscribe", topic_name)\
    .option("kafka.security.protocol","SASL_SSL")\
    .option("kafka.sasl.mechanism", "PLAIN")\
    .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pass\";")\
    .load() \

display(df_stream)

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

## 3.2. Write a Kafka sink for streaming queries

In [45]:
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df_stream \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers)\
    .option("subscribe", topic_name)\
    .option("kafka.security.protocol","SASL_SSL")\
    .option("kafka.sasl.mechanism", "PLAIN")\
    .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pass\";")\
  .option("topic", "databricks_test") \
  .option("checkpointLocation", "/Users/sheidamajidi/Desktop") \
  .start()

24/03/31 01:33:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/03/31 01:33:46 WARN KafkaOffsetReaderAdmin: Error in attempt 1 getting Kafka offsets: 
org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:551)
	at org.apache.kafka.clients.admin.Admin.create(Admin.java:144)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.createAdmin(ConsumerStrategy.scala:50)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.createAdmin$(ConsumerStrategy.scala:47)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.createAdmin(ConsumerStrategy.scala:102)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.admin(KafkaOffsetReaderAdmin.scala:70)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.$anonfun$partitionsAssignedToAdmin$1(KafkaOffsetReaderAdmin.scala:499)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.withRetries(KafkaOffsetReaderAdmin.scala:518)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.partitionsAssigne

In [48]:
# saving to DBFS
df_stream \
  .selectExpr("CAST(key AS STRING) AS key", "CAST(value AS STRING) AS value") \
  .writeStream \
  .format("json") \
  .option("path", "/Users/sheidamajidi/Desktop") \
  .option("checkpointLocation", "/Users/sheidamajidi/Desktop") \
  .start()


24/03/31 02:07:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x14e10c610>

24/03/31 02:07:39 WARN KafkaOffsetReaderAdmin: Error in attempt 1 getting Kafka offsets: 
org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:551)
	at org.apache.kafka.clients.admin.Admin.create(Admin.java:144)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.createAdmin(ConsumerStrategy.scala:50)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.createAdmin$(ConsumerStrategy.scala:47)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.createAdmin(ConsumerStrategy.scala:102)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.admin(KafkaOffsetReaderAdmin.scala:70)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.$anonfun$partitionsAssignedToAdmin$1(KafkaOffsetReaderAdmin.scala:499)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.withRetries(KafkaOffsetReaderAdmin.scala:518)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.partitionsAssigne

In [47]:
ds = df_stream \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", bootstrap_servers) \
  .option("kafka.security.protocol", "SASL_SSL") \
  .option("kafka.sasl.mechanism", "PLAIN") \
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pass\";") \
  .option("topic", "databricks_test") \
  .option("checkpointLocation", "/Users/sheidamajidi/Desktop") \
  .start()


24/03/31 02:05:50 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/03/31 02:05:50 WARN KafkaOffsetReaderAdmin: Error in attempt 1 getting Kafka offsets: 
org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:551)
	at org.apache.kafka.clients.admin.Admin.create(Admin.java:144)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.createAdmin(ConsumerStrategy.scala:50)
	at org.apache.spark.sql.kafka010.ConsumerStrategy.createAdmin$(ConsumerStrategy.scala:47)
	at org.apache.spark.sql.kafka010.SubscribeStrategy.createAdmin(ConsumerStrategy.scala:102)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.admin(KafkaOffsetReaderAdmin.scala:70)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.$anonfun$partitionsAssignedToAdmin$1(KafkaOffsetReaderAdmin.scala:499)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.withRetries(KafkaOffsetReaderAdmin.scala:518)
	at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.partitionsAssigne