In [1]:
# thread-safe print
from threading import Thread, Lock

lock = Lock()
def Print(*args):
    with lock:
        print(*args)
        
Print("hi")

hi


In [2]:
# topic creation
from kafka import KafkaAdminClient, KafkaProducer
from kafka.admin import NewTopic
broker = "kafka:9092"
admin = KafkaAdminClient(bootstrap_servers=[broker])

In [3]:
# admin.create_topics([NewTopic("animals", 4, 1)])
# admin.create_topics([NewTopic("animals-json", 4, 1)])

# Producer

In [4]:
import string, random, threading, json, time
string.ascii_uppercase[:10]

'ABCDEFGHIJ'

In [5]:
from animals_pb2 import *

In [6]:
# producer
def animal_gen():
    Print("generating animals")
    producer = KafkaProducer(bootstrap_servers=[broker])
    while True:
        beach = random.choice(list(string.ascii_uppercase[:10]))
        animal = random.choice(["dolphin", "shark", "seagull"])
        key = bytes(beach, "utf-8")

        # protobuf
        s = Sighting(beach=beach, animal=animal)
        value = s.SerializeToString()
        producer.send("animals", value=value, key=key)
        
        # JSON
        value = {"beach": beach, "animal": animal}
        value = bytes(json.dumps(value), "utf-8")
        producer.send("animals-json", value=value, key=key)
        
        time.sleep(1)
threading.Thread(target=animal_gen).start()

generating animals


# Python Consumer

## Streaming Group By on Beach

In [7]:
from kafka import KafkaConsumer, TopicPartition

# these are TOTAL counts
def beach_counter(partitions=[]):
    consumer = KafkaConsumer(bootstrap_servers=[broker])
    consumer.assign([TopicPartition("animals", part_num) for part_num in partitions])
    consumer.seek_to_beginning()
    
    counts = {}
    
    #TODO: while True:
    for i in range(10):
        batch = consumer.poll(1000)
        for tp, messages in batch.items():
            for msg in messages:
                s = Sighting.FromString(msg.value)
                if s.beach not in counts:
                    counts[s.beach] = 0
                counts[s.beach] += 1
        Print(partitions, counts)
        
#beach_counter([0,1,2,3])
threading.Thread(target=beach_counter, args=([0,1],)).start()
threading.Thread(target=beach_counter, args=([2,3],)).start()

## Stream Group By on Animal

In [8]:
from kafka import KafkaConsumer, TopicPartition

# these are PARTIAL counts
def animal_counter(partitions=[]):
    consumer = KafkaConsumer(bootstrap_servers=[broker])
    consumer.assign([TopicPartition("animals", part_num) for part_num in partitions])
    consumer.seek_to_beginning()
    
    counts = {}
    
    #TODO: while True:
    for i in range(10):
        batch = consumer.poll(1000)
        for tp, messages in batch.items():
            for msg in messages:
                s = Sighting.FromString(msg.value)
                if s.animal not in counts:
                    counts[s.animal] = 0
                counts[s.animal] += 1
        Print(partitions, counts)
        
#beach_counter([0,1,2,3])
threading.Thread(target=animal_counter, args=([0,1],)).start()
threading.Thread(target=animal_counter, args=([2,3],)).start()

# Spark Consumer

In [9]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
         .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2')
         .config('spark.sql.shuffle.partitions', 10)
         .getOrCreate())

[0, 1] {'B': 168, 'I': 192, 'C': 140}
[2, 3] {'seagull': 165, 'dolphin': 169, 'shark': 166}
[0, 1] {'seagull': 167, 'shark': 191, 'dolphin': 142}
[0, 1] {'seagull': 324, 'shark': 366, 'dolphin': 299}
[2, 3] {'G': 211, 'H': 186, 'J': 25, 'F': 38, 'A': 40}
[2, 3] {'G': 211, 'H': 186, 'J': 188, 'F': 190, 'A': 225}
[2, 3] {'G': 211, 'H': 186, 'J': 193, 'F': 195, 'A': 235}
[0, 1] {'B': 190, 'I': 226, 'C': 163, 'E': 211, 'D': 199}
[2, 3] {'seagull': 322, 'dolphin': 350, 'shark': 328}
[2, 3] {'seagull': 328, 'dolphin': 363, 'shark': 329}
[2, 3] {'G': 212, 'H': 186, 'J': 193, 'F': 195, 'A': 235}
[2, 3] {'seagull': 328, 'dolphin': 364, 'shark': 329}
[0, 1] {'seagull': 324, 'shark': 366, 'dolphin': 299}
[0, 1] {'B': 190, 'I': 226, 'C': 163, 'E': 211, 'D': 199}
:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5f85f5e3-b203-41c7-8cff-f5fe90d7deca;1.0
	confs: [default]


[2, 3] {'G': 212, 'H': 187, 'J': 193, 'F': 195, 'A': 235}
[2, 3] {'seagull': 328, 'dolphin': 365, 'shark': 329}
[0, 1] {'seagull': 324, 'shark': 366, 'dolphin': 299}
[0, 1] {'B': 190, 'I': 226, 'C': 163, 'E': 211, 'D': 199}


	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 1047ms :: artifacts dl 33ms
	:: modules in use:
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-logging;1.1.3 from central in [default]
	org.apache.commons#commons-pool2;

[0, 1] {'B': 190, 'I': 226, 'C': 164, 'E': 211, 'D': 199}
[0, 1] {'seagull': 324, 'shark': 366, 'dolphin': 300}
[2, 3] {'G': 212, 'H': 187, 'J': 193, 'F': 195, 'A': 235}
[2, 3] {'seagull': 328, 'dolphin': 365, 'shark': 329}


23/04/17 15:31:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


[0, 1] {'seagull': 324, 'shark': 366, 'dolphin': 301}
[0, 1] {'B': 190, 'I': 226, 'C': 165, 'E': 211, 'D': 199}
[2, 3] {'seagull': 328, 'dolphin': 365, 'shark': 329}
[2, 3] {'G': 212, 'H': 187, 'J': 193, 'F': 195, 'A': 235}
[2, 3] {'seagull': 328, 'dolphin': 365, 'shark': 330}
[0, 1] {'B': 190, 'I': 226, 'C': 165, 'E': 211, 'D': 199}
[0, 1] {'seagull': 324, 'shark': 366, 'dolphin': 301}
[2, 3] {'G': 212, 'H': 187, 'J': 194, 'F': 195, 'A': 235}
[0, 1] {'seagull': 324, 'shark': 366, 'dolphin': 302}
[0, 1] {'B': 191, 'I': 226, 'C': 165, 'E': 211, 'D': 199}
[2, 3] {'seagull': 328, 'dolphin': 365, 'shark': 330}
[2, 3] {'G': 212, 'H': 187, 'J': 194, 'F': 195, 'A': 235}
[0, 1] {'seagull': 324, 'shark': 366, 'dolphin': 303}
[2, 3] {'seagull': 328, 'dolphin': 365, 'shark': 330}
[0, 1] {'B': 192, 'I': 226, 'C': 165, 'E': 211, 'D': 199}
[2, 3] {'G': 212, 'H': 187, 'J': 194, 'F': 195, 'A': 235}
[0, 1] {'B': 192, 'I': 227, 'C': 165, 'E': 211, 'D': 199}
[0, 1] {'seagull': 325, 'shark': 366, 'dolphin

# regular read

In [10]:
df = (spark.read.format("kafka")
 .option("kafka.bootstrap.servers", "kafka:9092")
 .option("subscribe", "animals-json").load())

In [11]:
df.isStreaming

False

In [12]:
df.count()

                                                                                

1181

In [13]:
df.dtypes

[('key', 'binary'),
 ('value', 'binary'),
 ('topic', 'string'),
 ('partition', 'int'),
 ('offset', 'bigint'),
 ('timestamp', 'timestamp'),
 ('timestampType', 'int')]

In [14]:
df.show()

+----+--------------------+------------+---------+------+--------------------+-------------+
| key|               value|       topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------------+---------+------+--------------------+-------------+
|[43]|[7B 22 62 65 61 6...|animals-json|        0|     0|2023-04-17 15:11:...|            0|
|[42]|[7B 22 62 65 61 6...|animals-json|        0|     1|2023-04-17 15:11:...|            0|
|[43]|[7B 22 62 65 61 6...|animals-json|        0|     2|2023-04-17 15:11:...|            0|
|[43]|[7B 22 62 65 61 6...|animals-json|        0|     3|2023-04-17 15:11:...|            0|
|[43]|[7B 22 62 65 61 6...|animals-json|        0|     4|2023-04-17 15:12:...|            0|
|[43]|[7B 22 62 65 61 6...|animals-json|        0|     5|2023-04-17 15:12:...|            0|
|[49]|[7B 22 62 65 61 6...|animals-json|        0|     6|2023-04-17 15:12:...|            0|
|[49]|[7B 22 62 65 61 6...|animals-json|        0|     7|2023-04-17 15

In [15]:
from pyspark.sql.functions import col, from_json

In [16]:
df.select(col("key").cast("string"), col("value").cast("string")).show()

+---+--------------------+
|key|               value|
+---+--------------------+
|  C|{"beach": "C", "a...|
|  B|{"beach": "B", "a...|
|  C|{"beach": "C", "a...|
|  C|{"beach": "C", "a...|
|  C|{"beach": "C", "a...|
|  C|{"beach": "C", "a...|
|  I|{"beach": "I", "a...|
|  I|{"beach": "I", "a...|
|  I|{"beach": "I", "a...|
|  I|{"beach": "I", "a...|
|  I|{"beach": "I", "a...|
|  I|{"beach": "I", "a...|
|  C|{"beach": "C", "a...|
|  B|{"beach": "B", "a...|
|  C|{"beach": "C", "a...|
|  I|{"beach": "I", "a...|
|  B|{"beach": "B", "a...|
|  C|{"beach": "C", "a...|
|  C|{"beach": "C", "a...|
|  C|{"beach": "C", "a...|
+---+--------------------+
only showing top 20 rows



In [17]:
schema = "beach string, animal string"
df.select(col("key").cast("string"),
          from_json(col("value").cast("string"), schema)).show()

+---+--------------------------------+
|key|from_json(CAST(value AS STRING))|
+---+--------------------------------+
|  C|                    {C, dolphin}|
|  B|                    {B, dolphin}|
|  C|                      {C, shark}|
|  C|                    {C, dolphin}|
|  C|                    {C, seagull}|
|  C|                      {C, shark}|
|  I|                      {I, shark}|
|  I|                    {I, dolphin}|
|  I|                    {I, dolphin}|
|  I|                    {I, seagull}|
|  I|                    {I, seagull}|
|  I|                      {I, shark}|
|  C|                      {C, shark}|
|  B|                    {B, dolphin}|
|  C|                      {C, shark}|
|  I|                    {I, dolphin}|
|  B|                    {B, seagull}|
|  C|                    {C, seagull}|
|  C|                      {C, shark}|
|  C|                    {C, seagull}|
+---+--------------------------------+
only showing top 20 rows



                                                                                

In [18]:
schema = "beach string, animal string"
(df.select(col("key").cast("string"),
          from_json(col("value").cast("string"), schema).alias("value"))
    .select("key", "value.*")).show()

+---+-----+-------+
|key|beach| animal|
+---+-----+-------+
|  C|    C|dolphin|
|  B|    B|dolphin|
|  C|    C|  shark|
|  C|    C|dolphin|
|  C|    C|seagull|
|  C|    C|  shark|
|  I|    I|  shark|
|  I|    I|dolphin|
|  I|    I|dolphin|
|  I|    I|seagull|
|  I|    I|seagull|
|  I|    I|  shark|
|  C|    C|  shark|
|  B|    B|dolphin|
|  C|    C|  shark|
|  I|    I|dolphin|
|  B|    B|seagull|
|  C|    C|seagull|
|  C|    C|  shark|
|  C|    C|seagull|
+---+-----+-------+
only showing top 20 rows



                                                                                

# readStream, writeStream

In [30]:
# read=>readStream
df = (spark.readStream.format("kafka")
 .option("kafka.bootstrap.servers", "kafka:9092")
 .option("subscribe", "animals-json")
 .option("startingOffsets", "earliest").load())

In [31]:
df.isStreaming

True

In [32]:
# source => transformations => sink
# query = spark.readStream(...).......writeStream(...).start()

In [33]:
schema = "beach string, animal string"
animals = (df.select(col("key").cast("string"),
          from_json(col("value").cast("string"), schema).alias("value"))
    .select("key", "value.*"))

In [34]:
animals

DataFrame[key: string, beach: string, animal: string]

In [35]:
animals.isStreaming

True

In [36]:
# spark.streams.active[0].stop()

In [37]:
q = (animals.filter("animal='shark'").writeStream
 .format("console").trigger(processingTime="5 seconds")
 .outputMode("append").start())

23/04/17 15:33:00 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4aacd6cc-5573-4d07-928e-dc641f5164d2. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/17 15:33:00 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+------+
|key|beach|animal|
+---+-----+------+
|  H|    H| shark|
|  G|    G| shark|
|  G|    G| shark|
|  G|    G| shark|
|  H|    H| shark|
|  H|    H| shark|
|  H|    H| shark|
|  G|    G| shark|
|  G|    G| shark|
|  H|    H| shark|
|  G|    G| shark|
|  G|    G| shark|
|  G|    G| shark|
|  H|    H| shark|
|  G|    G| shark|
|  H|    H| shark|
|  H|    H| shark|
|  G|    G| shark|
|  H|    H| shark|
|  H|    H| shark|
+---+-----+------+
only showing top 20 rows



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+---+-----+------+
|key|beach|animal|
+---+-----+------+
|  D|    D| shark|
+---+-----+------+



In [38]:
q.stop()

In [39]:
q = (animals.groupby("animal").count()
 .writeStream
 .format("console").trigger(processingTime="5 seconds")
 .outputMode("complete").start())

23/04/17 15:33:08 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a3e6a4c1-21d3-47b1-a19b-48b61480c0c0. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/17 15:33:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  433|
|dolphin|  424|
|seagull|  415|
+-------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  435|
|seagull|  415|
|dolphin|  424|
+-------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  435|
|seagull|  415|
|dolphin|  425|
+-------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  436|
|seagull|  418|
|dolphin|  426|
+-------+-----+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  438|
|seagull|  420|
|dolphin|  427|
+-------+-----+



                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  439|
|seagull|  421|
|dolphin|  430|
+-------+-----+





-------------------------------------------
Batch: 6
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  440|
|seagull|  424|
|dolphin|  431|
+-------+-----+





-------------------------------------------
Batch: 7
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  442|
|seagull|  427|
|dolphin|  431|
+-------+-----+



                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  444|
|seagull|  429|
|dolphin|  432|
+-------+-----+



                                                                                

-------------------------------------------
Batch: 9
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  446|
|seagull|  432|
|dolphin|  432|
+-------+-----+



                                                                                

-------------------------------------------
Batch: 10
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  449|
|seagull|  434|
|dolphin|  432|
+-------+-----+



                                                                                

-------------------------------------------
Batch: 11
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  450|
|seagull|  436|
|dolphin|  434|
+-------+-----+



                                                                                

-------------------------------------------
Batch: 12
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  452|
|seagull|  438|
|dolphin|  435|
+-------+-----+



                                                                                

-------------------------------------------
Batch: 13
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  454|
|seagull|  440|
|dolphin|  436|
+-------+-----+



                                                                                

-------------------------------------------
Batch: 14
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  457|
|seagull|  441|
|dolphin|  437|
+-------+-----+



In [42]:
q.awaitTermination(20)
q.stop()

                                                                                

-------------------------------------------
Batch: 21
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  472|
|seagull|  448|
|dolphin|  450|
+-------+-----+





-------------------------------------------
Batch: 22
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  476|
|seagull|  448|
|dolphin|  451|
+-------+-----+



                                                                                

-------------------------------------------
Batch: 23
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  478|
|seagull|  449|
|dolphin|  453|
+-------+-----+





-------------------------------------------
Batch: 24
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|  shark|  481|
|seagull|  450|
|dolphin|  454|
+-------+-----+



                                                                                