In [1]:
! python3 -m grpc_tools.protoc -I=. --python_out=. animals.proto

In [1]:
from animals_pb2 import Sighting

In [2]:
s = Sighting(animal="shark", beach="A")
s

beach: "A"
animal: "shark"

In [3]:
s.SerializeToString()

b'\n\x01A\x12\x05shark'

In [4]:
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer

In [5]:
broker = "localhost:9092"

In [6]:
admin = KafkaAdminClient(bootstrap_servers=[broker])

In [7]:
from kafka.admin import NewTopic

In [8]:
from kafka.errors import TopicAlreadyExistsError

In [9]:
try:
    admin.create_topics([NewTopic("animals", 4, 1)])   # protobufs
except TopicAlreadyExistsError:
    pass
    
try:
    admin.create_topics([NewTopic("animals-json", 4, 1)])   # JSON
except TopicAlreadyExistsError:
    pass

In [10]:
import random, time, threading

def animal_gen():
    producer = KafkaProducer(bootstrap_servers=[broker])

    while True:
        beach = random.choice(list("ABCDEFGHI"))
        animal = random.choice(["shark", "dolphin", "turtle", "seagull"])
        s = Sighting(animal=animal, beach=beach)
        producer.send("animals", value=s.SerializeToString(), key=bytes(beach, "utf-8"))
        time.sleep(1)

threading.Thread(target=animal_gen).start()

# Streaming Group By (count occurences per beach)

In [11]:
from threading import Thread, Lock

lock = Lock()
def Print(*args):
    with lock:
        print(*args)

Print("hi")

hi


In [12]:
from kafka import TopicPartition

In [13]:
def beach_consumer(partitions=[]):
    counts = {}   # key=beach, value=count
    
    consumer = KafkaConsumer(bootstrap_servers=[broker])
    consumer.assign([TopicPartition("animals", p) for p in partitions])
    consumer.seek_to_beginning()
    for i in range(10):      # TODO: loop forever
        batch = consumer.poll(1000)
        for tp, messages in batch.items():
            for msg in messages:
                s = Sighting.FromString(msg.value)

                if not s.beach in counts:
                    counts[s.beach] = 0
                counts[s.beach] += 1
        Print(partitions, counts)
threading.Thread(target=beach_consumer, args=([0,1],)).start()
threading.Thread(target=beach_consumer, args=([2,3],)).start()

In [14]:
# beach_consumer([0,1])

In [15]:
def animal_consumer(partitions=[]):
    counts = {}   # key=animal, value=count
    
    consumer = KafkaConsumer(bootstrap_servers=[broker])
    consumer.assign([TopicPartition("animals", p) for p in partitions])
    consumer.seek_to_beginning()
    for i in range(10):      # TODO: loop forever
        batch = consumer.poll(1000)
        for tp, messages in batch.items():
            for msg in messages:
                s = Sighting.FromString(msg.value)

                if not s.animal in counts:
                    counts[s.animal] = 0
                counts[s.animal] += 1
        Print(partitions, counts)
threading.Thread(target=animal_consumer, args=([0,1],)).start()
threading.Thread(target=animal_consumer, args=([2,3],)).start()

[2, 3] {'G': 119, 'H': 127, 'A': 142, 'F': 112}
[2, 3] {'G': 119, 'H': 127, 'A': 161, 'F': 124}
[0, 1] {'I': 126, 'B': 125, 'C': 115, 'D': 78, 'E': 56}
[0, 1] {'I': 126, 'B': 125, 'C': 115, 'D': 154, 'E': 118}
[0, 1] {'shark': 119, 'turtle': 124, 'dolphin': 132, 'seagull': 125}
[0, 1] {'shark': 153, 'turtle': 155, 'dolphin': 164, 'seagull': 166}
[2, 3] {'dolphin': 148, 'seagull': 104, 'turtle': 136, 'shark': 112}
[2, 3] {'dolphin': 155, 'seagull': 110, 'turtle': 148, 'shark': 118}
[0, 1] {'shark': 153, 'turtle': 156, 'dolphin': 164, 'seagull': 166}
[0, 1] {'I': 126, 'B': 126, 'C': 115, 'D': 154, 'E': 118}
[2, 3] {'G': 119, 'H': 127, 'A': 161, 'F': 124}
[2, 3] {'dolphin': 155, 'seagull': 110, 'turtle': 148, 'shark': 118}
[0, 1] {'shark': 153, 'turtle': 156, 'dolphin': 164, 'seagull': 166}
[0, 1] {'I': 127, 'B': 126, 'C': 115, 'D': 154, 'E': 118}
[0, 1] {'shark': 153, 'turtle': 157, 'dolphin': 164, 'seagull': 166}
[2, 3] {'G': 119, 'H': 127, 'A': 161, 'F': 124}
[2, 3] {'dolphin': 155, 's

# Spark Streaming

In [16]:
import random, time, threading, json

def animal_gen_json():
    producer = KafkaProducer(bootstrap_servers=[broker])

    while True:
        beach = random.choice(list("ABCDEFGHI"))
        animal = random.choice(["shark", "dolphin", "turtle", "seagull"])

        value = bytes(json.dumps({"beach": beach, "animal": animal}), "utf-8")
        producer.send("animals-json", value=value, key=bytes(beach, "utf-8"))
        
        time.sleep(1)

threading.Thread(target=animal_gen_json).start()

In [36]:
# Spark session (with Kafka jar)
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("demo")
         .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0')
         .config("spark.sql.shuffle.partitions", 10) # important
         .getOrCreate())

In [37]:
df = (
    spark.read.format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .option("subscribe", "animals-json")
    .load()
)

In [38]:
df.isStreaming

False

In [39]:
df.dtypes

[('key', 'binary'),
 ('value', 'binary'),
 ('topic', 'string'),
 ('partition', 'int'),
 ('offset', 'bigint'),
 ('timestamp', 'timestamp'),
 ('timestampType', 'int')]

In [40]:
df.rdd.getNumPartitions()

24/01/14 05:42:37 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


4

In [41]:
df.count()

24/01/14 05:42:37 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


1016

In [42]:
df.limit(5).toPandas()

24/01/14 05:42:38 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):


Unnamed: 0,key,value,topic,partition,offset,timestamp,timestampType
0,[73],"[123, 34, 98, 101, 97, 99, 104, 34, 58, 32, 34...",animals-json,0,0,2024-01-14 05:13:50.472,0
1,[67],"[123, 34, 98, 101, 97, 99, 104, 34, 58, 32, 34...",animals-json,0,1,2024-01-14 05:13:55.479,0
2,[73],"[123, 34, 98, 101, 97, 99, 104, 34, 58, 32, 34...",animals-json,0,2,2024-01-14 05:13:56.480,0
3,[67],"[123, 34, 98, 101, 97, 99, 104, 34, 58, 32, 34...",animals-json,0,3,2024-01-14 05:13:59.483,0
4,[73],"[123, 34, 98, 101, 97, 99, 104, 34, 58, 32, 34...",animals-json,0,4,2024-01-14 05:14:08.494,0


In [43]:
from pyspark.sql.functions import col, expr, from_json

In [44]:
schema = "beach string, animal string"

animals = (
    df
    .select(col("key").cast("string"), col("value").cast("string"))
    .select("key", from_json("value", schema).alias("value"))
    .select("key", "value.*")
)
animals.limit(5).toPandas()

24/01/14 05:42:38 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Unnamed: 0,key,beach,animal
0,I,I,dolphin
1,C,C,turtle
2,I,I,turtle
3,C,C,seagull
4,I,I,seagull


In [45]:
schema = "beach string, animal string"

animals = (
    df
    .select(col("key").cast("string"), col("value").cast("string"))
    .select("key", from_json("value", schema).alias("value"))
    .select("key", "value.*")
)
animals

DataFrame[key: string, beach: string, animal: string]

In [46]:
animals.limit(5).toPandas()

24/01/14 05:42:39 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


Unnamed: 0,key,beach,animal
0,I,I,dolphin
1,C,C,turtle
2,I,I,turtle
3,C,C,seagull
4,I,I,seagull


In [47]:
animals.count()

24/01/14 05:42:39 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

1019

In [48]:
animals.isStreaming

False

# Streaming DataFrame

In [49]:
# source => transformations => sink
# streaming_query = spark.readStream(????).????.writeStream(????)

In [50]:
df = (
    spark.readStream.format("kafka") # instead of spark.read.format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .option("subscribe", "animals-json")
    .option("startingOffsets", "earliest")
    .load()
)

In [51]:
df.isStreaming

True

In [52]:
# df.rdd.getNumPartitions()

In [53]:
schema = "beach string, animal string"

animals = (
    df
    .select(col("key").cast("string"), col("value").cast("string"))
    .select("key", from_json("value", schema).alias("value"))
    .select("key", "value.*")
)
animals

DataFrame[key: string, beach: string, animal: string]

In [54]:
# not supported for streaming
# animals.count()

In [55]:
# not supported for streaming
# animals.toPandas()

# Shark Alert App

In [56]:
streaming_query = (
    animals
    .filter("animal='shark'")
    .writeStream
    .format("console")
    .trigger(processingTime="5 seconds")
    .outputMode("append")
).start()
type(streaming_query)

24/01/14 05:42:40 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4626da84-afb9-4200-bcc3-3e9a7e099421. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/01/14 05:42:40 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


pyspark.sql.streaming.query.StreamingQuery

In [57]:
spark.streams.active[0].stop()

In [58]:
streaming_query.stop()
# spark.streams.active[0].stop()

# Animal Counter App

In [59]:
q = (
    animals.groupby("animal").count()
    .writeStream
    .format("console")
    .trigger(processingTime="5 seconds")
    .outputMode("complete")
).start()

24/01/14 05:42:40 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ad866f28-68b3-4450-93e7-a99688ad0f22. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/01/14 05:42:40 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [60]:
q.stop()