In [2]:
# ! python3 -m grpc_tools.protoc -I=. --python_out=. animals.proto

In [3]:
from animals_pb2 import Sighting

In [4]:
s = Sighting(animal="shark", beach="A")
s

beach: "A"
animal: "shark"

In [5]:
s.SerializeToString()

b'\n\x01A\x12\x05shark'

In [6]:
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer

In [7]:
broker = "localhost:9092"

In [8]:
admin = KafkaAdminClient(bootstrap_servers=[broker])

In [9]:
from kafka.admin import NewTopic

In [10]:
admin.create_topics([NewTopic("animals", 4, 1)])   # protobufs
admin.create_topics([NewTopic("animals-json", 4, 1)])   # JSON

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='animals-json', error_code=0, error_message=None)])

In [13]:
import random, time, threading

def animal_gen():
    producer = KafkaProducer(bootstrap_servers=[broker])

    while True:
        beach = random.choice(list("ABCDEFGHI"))
        animal = random.choice(["shark", "dolphin", "turtle", "seagull"])
        s = Sighting(animal=animal, beach=beach)
        producer.send("animals", value=s.SerializeToString(), key=bytes(beach, "utf-8"))
        time.sleep(1)

threading.Thread(target=animal_gen).start()

# Streaming Group By (count occurences per beach)

In [22]:
from threading import Thread, Lock

lock = Lock()
def Print(*args):
    with lock:
        print(*args)

Print("hi")

hi


In [14]:
from kafka import TopicPartition

In [24]:
def beach_consumer(partitions=[]):
    counts = {}   # key=beach, value=count
    
    consumer = KafkaConsumer(bootstrap_servers=[broker])
    consumer.assign([TopicPartition("animals", p) for p in partitions])
    consumer.seek_to_beginning()
    for i in range(10):      # TODO: loop forever
        batch = consumer.poll(1000)
        for tp, messages in batch.items():
            for msg in messages:
                s = Sighting.FromString(msg.value)

                if not s.beach in counts:
                    counts[s.beach] = 0
                counts[s.beach] += 1
        Print(partitions, counts)
threading.Thread(target=beach_consumer, args=([0,1],)).start()
threading.Thread(target=beach_consumer, args=([2,3],)).start()

[0, 1] {'I': 55, 'B': 44, 'C': 59, 'E': 55, 'D': 58}
[2, 3] {'H': 59, 'G': 65, 'A': 53, 'F': 54}
[2, 3] {'H': 59, 'G': 65, 'A': 54, 'F': 54}
[0, 1] {'I': 55, 'B': 44, 'C': 59, 'E': 55, 'D': 58}
[2, 3] {'H': 59, 'G': 65, 'A': 55, 'F': 54}
[0, 1] {'I': 55, 'B': 44, 'C': 59, 'E': 55, 'D': 58}
[0, 1] {'I': 55, 'B': 44, 'C': 59, 'E': 55, 'D': 59}
[2, 3] {'H': 59, 'G': 65, 'A': 55, 'F': 54}
[2, 3] {'H': 60, 'G': 65, 'A': 55, 'F': 54}
[0, 1] {'I': 55, 'B': 44, 'C': 59, 'E': 55, 'D': 59}
[0, 1] {'I': 55, 'B': 44, 'C': 60, 'E': 55, 'D': 59}
[2, 3] {'H': 60, 'G': 65, 'A': 55, 'F': 54}
[2, 3] {'H': 60, 'G': 66, 'A': 55, 'F': 54}
[0, 1] {'I': 55, 'B': 44, 'C': 60, 'E': 55, 'D': 59}
[2, 3] {'H': 60, 'G': 66, 'A': 55, 'F': 54}
[0, 1] {'I': 55, 'B': 44, 'C': 60, 'E': 55, 'D': 59}
[2, 3] {'H': 60, 'G': 66, 'A': 55, 'F': 55}
[2, 3] {'H': 60, 'G': 66, 'A': 56, 'F': 55}
[0, 1] {'I': 55, 'B': 44, 'C': 60, 'E': 55, 'D': 59}
[0, 1] {'I': 55, 'B': 44, 'C': 60, 'E': 55, 'D': 60}


In [25]:
def animal_consumer(partitions=[]):
    counts = {}   # key=animal, value=count
    
    consumer = KafkaConsumer(bootstrap_servers=[broker])
    consumer.assign([TopicPartition("animals", p) for p in partitions])
    consumer.seek_to_beginning()
    for i in range(10):      # TODO: loop forever
        batch = consumer.poll(1000)
        for tp, messages in batch.items():
            for msg in messages:
                s = Sighting.FromString(msg.value)

                if not s.animal in counts:
                    counts[s.animal] = 0
                counts[s.animal] += 1
        Print(partitions, counts)
threading.Thread(target=animal_consumer, args=([0,1],)).start()
threading.Thread(target=animal_consumer, args=([2,3],)).start()

[0, 1] {'shark': 92, 'dolphin': 76, 'turtle': 74, 'seagull': 73}
[2, 3] {'dolphin': 62, 'seagull': 59, 'turtle': 65, 'shark': 81}
[0, 1] {'shark': 92, 'dolphin': 76, 'turtle': 74, 'seagull': 74}
[2, 3] {'dolphin': 62, 'seagull': 59, 'turtle': 65, 'shark': 81}
[0, 1] {'shark': 92, 'dolphin': 77, 'turtle': 74, 'seagull': 74}
[2, 3] {'dolphin': 62, 'seagull': 59, 'turtle': 65, 'shark': 81}
[0, 1] {'shark': 93, 'dolphin': 77, 'turtle': 74, 'seagull': 74}
[2, 3] {'dolphin': 62, 'seagull': 59, 'turtle': 65, 'shark': 81}
[0, 1] {'shark': 93, 'dolphin': 77, 'turtle': 74, 'seagull': 74}
[0, 1] {'shark': 93, 'dolphin': 77, 'turtle': 74, 'seagull': 75}
[2, 3] {'dolphin': 62, 'seagull': 59, 'turtle': 65, 'shark': 81}
[2, 3] {'dolphin': 63, 'seagull': 59, 'turtle': 65, 'shark': 81}
[0, 1] {'shark': 93, 'dolphin': 77, 'turtle': 74, 'seagull': 75}
[2, 3] {'dolphin': 63, 'seagull': 60, 'turtle': 65, 'shark': 81}
[0, 1] {'shark': 93, 'dolphin': 77, 'turtle': 74, 'seagull': 75}
[0, 1] {'shark': 94, 'dol

# Spark Streaming

In [26]:
import random, time, threading, json

def animal_gen_json():
    producer = KafkaProducer(bootstrap_servers=[broker])

    while True:
        beach = random.choice(list("ABCDEFGHI"))
        animal = random.choice(["shark", "dolphin", "turtle", "seagull"])

        value = bytes(json.dumps({"beach": beach, "animal": animal}), "utf-8")
        producer.send("animals-json", value=value, key=bytes(beach, "utf-8"))
        
        time.sleep(1)

threading.Thread(target=animal_gen_json).start()

In [73]:
# Spark session (with Kafka jar)
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("demo")
         .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0')
         .config("spark.sql.shuffle.partitions", 10)
         .getOrCreate())

In [28]:
df = (
    spark.read.format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .option("subscribe", "animals-json")
    .load()
)

In [30]:
df.dtypes

[('key', 'binary'),
 ('value', 'binary'),
 ('topic', 'string'),
 ('partition', 'int'),
 ('offset', 'bigint'),
 ('timestamp', 'timestamp'),
 ('timestampType', 'int')]

In [31]:
df.limit(5).toPandas()

23/11/20 19:54:02 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):


Unnamed: 0,key,value,topic,partition,offset,timestamp,timestampType
0,[67],"[123, 34, 98, 101, 97, 99, 104, 34, 58, 32, 34...",animals-json,0,0,2023-11-20 19:50:38.664,0
1,[73],"[123, 34, 98, 101, 97, 99, 104, 34, 58, 32, 34...",animals-json,0,1,2023-11-20 19:50:40.667,0
2,[73],"[123, 34, 98, 101, 97, 99, 104, 34, 58, 32, 34...",animals-json,0,2,2023-11-20 19:50:43.672,0
3,[67],"[123, 34, 98, 101, 97, 99, 104, 34, 58, 32, 34...",animals-json,0,3,2023-11-20 19:50:46.677,0
4,[66],"[123, 34, 98, 101, 97, 99, 104, 34, 58, 32, 34...",animals-json,0,4,2023-11-20 19:50:47.678,0


In [32]:
from pyspark.sql.functions import col, expr, from_json

In [42]:
schema = "beach string, animal string"

animals = (
    df
    .select(col("key").cast("string"), col("value").cast("string"))
    .select("key", from_json("value", schema).alias("value"))
    .select("key", "value.*")
)
animals

DataFrame[key: string, beach: string, animal: string]

In [43]:
animals.limit(5).toPandas()

23/11/20 19:58:09 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

Unnamed: 0,key,beach,animal
0,C,C,dolphin
1,I,I,seagull
2,I,I,dolphin
3,C,C,dolphin
4,B,B,seagull


In [46]:
animals.count()

23/11/20 19:58:31 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

474

In [47]:
animals.isStreaming

False

# Streaming DataFrame

In [None]:
# source => transformations => sink
# streaming_query = spark.readStream(????).????.writeStream(????)

In [48]:
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .option("subscribe", "animals-json")
    .load()
)

In [49]:
df.isStreaming

True

In [50]:
schema = "beach string, animal string"

animals = (
    df
    .select(col("key").cast("string"), col("value").cast("string"))
    .select("key", from_json("value", schema).alias("value"))
    .select("key", "value.*")
)
animals

DataFrame[key: string, beach: string, animal: string]

In [52]:
# not supported for streaming
# animals.toPandas()

# Shark Alert App

In [62]:
streaming_query = (
    animals
    .filter("animal='shark'")
    .writeStream
    .format("console")
    .trigger(processingTime="5 seconds")
    .outputMode("append")
).start()
type(streaming_query)

23/11/20 20:04:05 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b4b4a357-7aca-4a59-bd79-9aa6c8006b92. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/11/20 20:04:05 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


pyspark.sql.streaming.query.StreamingQuery

23/11/20 20:04:05 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----+------+
|key|beach|animal|
+---+-----+------+
+---+-----+------+



In [64]:
streaming_query.stop()
# spark.streams.active[0].stop()

# Animal Counter App

In [71]:
q = (
    animals.groupby("animal").count()
    .writeStream
    .format("console")
    .trigger(processingTime="5 seconds")
    .outputMode("complete")
).start()

23/11/20 20:07:12 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7315fa79-c5ed-4c79-b792-7fcf00174494. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/11/20 20:07:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/11/20 20:07:13 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
|animal|count|
+------+-----+
+------+-----+



23/11/20 20:07:38 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 24977 milliseconds

-------------------------------------------
Batch: 1
-------------------------------------------


                                                                                

+-------+-----+
| animal|count|
+-------+-----+
|seagull|    9|
|  shark|    5|
| turtle|    4|
|dolphin|    6|
+-------+-----+



23/11/20 20:07:59 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 21919 milliseconds
                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+-------+-----+
| animal|count|
+-------+-----+
|seagull|   14|
|  shark|   13|
| turtle|    8|
|dolphin|   11|
+-------+-----+



23/11/20 20:08:19 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 19445 milliseconds