## Kafka + Spark streaming

Building protocol buffers.

In [1]:
! python3 -m grpc_tools.protoc -I=. --python_out=. animals.proto

In [2]:
from animals_pb2 import Sighting

In [3]:
s = Sighting(animal="shark", beach="A")
s

beach: "A"
animal: "shark"

In [4]:
# serialize to string
s.SerializePartialToString()

b'\n\x01A\x12\x05shark'

In [5]:
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from kafka import TopicPartition
import random
import time
import threading
from threading import Thread, Lock

In [6]:
lock = Lock()
def Print(*args):
    with lock:
        print(*args)

### Admin

In [7]:
broker = "localhost:9092"
admin = KafkaAdminClient(bootstrap_servers=[broker])

### Creating `animals` and `animals-json` topics

In [8]:
try:
    admin.create_topics([NewTopic("animals", num_partitions=4, replication_factor=1)])        # protobufs
except TopicAlreadyExistsError:
    print("Topic already exists")
    
try:
    admin.create_topics([NewTopic("animals-json", num_partitions=4, replication_factor=1)])   # JSON
except TopicAlreadyExistsError:
    print("Topic already exists")

Topic already exists
Topic already exists


### Producer

In [11]:
animals = ["shark", "dolphin", "turtle", "seagull", "whale"]
beaches = list("ABCDEFGHI")

def animal_gen():
    producer = KafkaProducer(bootstrap_servers=[broker])
    
    while True:
        beach = random.choice(beaches)
        animal = random.choice(animals)
        s = Sighting(animal=animal, beach=beach)
        
        producer.send("animals", value=s.SerializeToString(), key=bytes(beach, "utf-8"))
        time.sleep(1)

threading.Thread(target=animal_gen).start()

### Consumer

### Streaming Group By (count animal occurences per beach)

In [16]:
def beach_consumer(partitions=[]):
    counts = {}   # key=beach, value=count
    
    consumer = KafkaConsumer(bootstrap_servers=[broker])
    consumer.assign([TopicPartition("animals", p) for p in partitions])
    consumer.seek_to_beginning()
    
    for i in range(10):      # TODO: loop forever
        batch = consumer.poll(1000)
        for tp, messages in batch.items():
            for msg in messages:
                s = Sighting.FromString(msg.value)
                
                # counts dict update
                if not s.beach in counts:
                    counts[s.beach] = 0
                counts[s.beach] += 1
        
        Print(partitions, counts)

threading.Thread(target=beach_consumer, args=([0, 1],)).start()
threading.Thread(target=beach_consumer, args=([2, 3],)).start()

[0, 1] {'I': 72, 'C': 74, 'B': 63, 'D': 75, 'E': 73}
[2, 3] {'H': 76, 'G': 77, 'A': 57, 'F': 78}
[2, 3] {'H': 77, 'G': 77, 'A': 57, 'F': 78}
[0, 1] {'I': 72, 'C': 74, 'B': 63, 'D': 75, 'E': 73}
[0, 1] {'I': 72, 'C': 75, 'B': 63, 'D': 75, 'E': 73}
[2, 3] {'H': 77, 'G': 77, 'A': 57, 'F': 78}
[0, 1] {'I': 73, 'C': 75, 'B': 63, 'D': 75, 'E': 73}
[2, 3] {'H': 77, 'G': 77, 'A': 57, 'F': 78}
[0, 1] {'I': 73, 'C': 75, 'B': 64, 'D': 75, 'E': 73}
[2, 3] {'H': 77, 'G': 77, 'A': 57, 'F': 78}
[2, 3] {'H': 77, 'G': 77, 'A': 57, 'F': 79}
[0, 1] {'I': 73, 'C': 75, 'B': 64, 'D': 75, 'E': 73}
[2, 3] {'H': 77, 'G': 77, 'A': 58, 'F': 79}
[0, 1] {'I': 73, 'C': 75, 'B': 64, 'D': 75, 'E': 73}
[0, 1] {'I': 74, 'C': 75, 'B': 64, 'D': 75, 'E': 73}
[2, 3] {'H': 77, 'G': 77, 'A': 58, 'F': 79}
[2, 3] {'H': 78, 'G': 77, 'A': 58, 'F': 79}
[0, 1] {'I': 74, 'C': 75, 'B': 64, 'D': 75, 'E': 73}
[2, 3] {'H': 78, 'G': 77, 'A': 58, 'F': 80}
[0, 1] {'I': 74, 'C': 75, 'B': 64, 'D': 75, 'E': 73}


In [17]:
def animal_consumer(partitions=[]):
    counts = {}   # key=animal, value=count
    
    consumer = KafkaConsumer(bootstrap_servers=[broker])
    consumer.assign([TopicPartition("animals", p) for p in partitions])
    consumer.seek_to_beginning()
    for i in range(10):      # TODO: loop forever
        batch = consumer.poll(1000)
        for tp, messages in batch.items():
            for msg in messages:
                s = Sighting.FromString(msg.value)

                if not s.animal in counts:
                    counts[s.animal] = 0
                counts[s.animal] += 1
        Print(partitions, counts)
threading.Thread(target=animal_consumer, args=([0, 1],)).start()
threading.Thread(target=animal_consumer, args=([2, 3],)).start()

[2, 3] {'seagull': 64, 'whale': 60, 'turtle': 57, 'shark': 74, 'dolphin': 68}
[0, 1] {'seagull': 83, 'dolphin': 78, 'shark': 67, 'whale': 97, 'turtle': 66}
[0, 1] {'seagull': 83, 'dolphin': 78, 'shark': 68, 'whale': 97, 'turtle': 66}
[2, 3] {'seagull': 64, 'whale': 60, 'turtle': 57, 'shark': 74, 'dolphin': 68}
[0, 1] {'seagull': 83, 'dolphin': 78, 'shark': 68, 'whale': 97, 'turtle': 66}
[0, 1] {'seagull': 83, 'dolphin': 78, 'shark': 69, 'whale': 97, 'turtle': 66}
[2, 3] {'seagull': 64, 'whale': 60, 'turtle': 57, 'shark': 74, 'dolphin': 68}
[0, 1] {'seagull': 84, 'dolphin': 78, 'shark': 69, 'whale': 97, 'turtle': 66}
[2, 3] {'seagull': 64, 'whale': 60, 'turtle': 57, 'shark': 74, 'dolphin': 68}
[0, 1] {'seagull': 84, 'dolphin': 78, 'shark': 70, 'whale': 97, 'turtle': 66}
[2, 3] {'seagull': 64, 'whale': 60, 'turtle': 57, 'shark': 74, 'dolphin': 68}
[0, 1] {'seagull': 84, 'dolphin': 78, 'shark': 70, 'whale': 97, 'turtle': 67}
[2, 3] {'seagull': 64, 'whale': 60, 'turtle': 57, 'shark': 74, '

**Observation:** now the count will get split across both the consumers. We need to do more work if we need summarization.

### Spark streaming

In [None]:
import json

In [None]:
def animal_gen_json():
    producer = KafkaProducer(bootstrap_servers=[broker])

    while True:
        beach = random.choice(beaches)
        animal = random.choice(animals)

        value = ???
        producer.send("animals-json", value=value, key=bytes(beach, "utf-8"))
        
        time.sleep(1)

threading.Thread(target=animal_gen_json).start()

In [None]:
# Spark session (with Kafka jar)
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("demo")
         .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0')
         #.config("spark.sql.shuffle.partitions", 10)
         .getOrCreate())

In [None]:
# data types
df

In [None]:
# first five rows of data


In [None]:
# spark import statement

In [None]:
bad_schema = ""
schema = ""

animals_df

In [None]:
animals_df.limit(5).toPandas()

In [None]:
# count

In [None]:
animals_df.isStreaming

### Streaming DataFrame

source => transformations => sink

```
# streaming_query = spark.readStream(????).????.writeStream(????)
```

In [None]:
df = (
    spark.read.format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .option("subscribe", "animals-json")
    .load()
)

In [None]:
df.isStreaming

In [None]:
schema = "beach string, animal string"

animals_df = (
    
)
animals_df

In [None]:
# not supported for streaming
# animals_df.toPandas()

### Shark Alert Application

### How can we stop the stream?

Alternatively, we can use the variable that we used to save the streaming query.

### Animal Counter Application

In [None]:
animal_query = (
    animals_df.groupby("animal").count()
    .writeStream
    .format("console")
    .trigger(processingTime="5 seconds")
    .outputMode("append")
).start()

In [None]:
animal_query.stop()