In [1]:
from threading import Thread, Lock
lock = Lock()
def Print(*args):
    with lock:
        print(*args)


In [2]:
import os, json

for partition in range(6):
    path = f"partition-{partition}.json"
    if os.path.exists(path):
        os.remove(path)

In [3]:
import datetime, time, random, string

def one_station(name):
    # temp pattern
    month_avg = [27,31,44,58,70,79,83,81,74,61,46,32]
    shift = (random.random()-0.5) * 30
    month_avg = [m + shift + (random.random()-0.5) * 5 for m in month_avg]
    
    # rain pattern
    start_rain = [0.1,0.1,0.3,0.5,0.4,0.2,0.2,0.1,0.2,0.2,0.2,0.1]
    shift = (random.random()-0.5) * 0.1
    start_rain = [r + shift + (random.random() - 0.5) * 0.2 for r in start_rain]
    stop_rain = 0.2 + random.random() * 0.2

    # day's state
    today = datetime.date(2000, 1, 1)
    temp = month_avg[0]
    raining = False
    
    # gen weather
    while True:
        # choose temp+rain
        month = today.month - 1
        temp = temp * 0.8 + month_avg[month] * 0.2 + (random.random()-0.5) * 20
        if temp < 32:
            raining=False
        elif raining and random.random() < stop_rain:
            raining = False
        elif not raining and random.random() < start_rain[month]:
            raining = True

        yield (today.strftime("%Y-%m-%d"), name, temp, raining)

        # next day
        today += datetime.timedelta(days=1)
        
def all_stations(count=10, sleep_sec=1):
    assert count <= 26
    stations = []
    for name in string.ascii_uppercase[:count]:
        stations.append(one_station(name))
    while True:
        for station in stations:
            yield next(station)
        time.sleep(sleep_sec)

In [4]:
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, TopicPartition
from kafka.admin import NewTopic

from kafka.errors import TopicAlreadyExistsError, UnknownTopicOrPartitionError
broker = "localhost:9092"
admin = KafkaAdminClient(bootstrap_servers=[broker])
print(admin)
try:
    admin.delete_topics(["stations", "stations-json"])
    print("deleted")
except UnknownTopicOrPartitionError:
    print("cannot delete (may not exist yet)")

time.sleep(1)
admin.create_topics([NewTopic("stations", 6, 1)])
admin.create_topics([NewTopic("stations-json", 6, 1)])
admin.list_topics()

<kafka.admin.client.KafkaAdminClient object at 0x1079d34d0>
deleted


['stations']

In [5]:
# report_pb2 is the result of building report.proto
from report_pb2 import *



In [6]:
from threading import Thread, Lock
import os, json,threading


In [7]:
for partition in range(6):
    path = f"partition-{partition}.json"
    if os.path.exists(path):
        os.remove(path)

In [8]:
def produce():
    producer = KafkaProducer(bootstrap_servers=[broker],acks="all",retries=10)#KafkaProducer(bootstrap_servers=[broker],???)
    Print("generating records")
    for date, station, degrees, raining in all_stations(15):
        json_file = {"date":date,"station":station,"degrees":float(degrees), "raining":int(raining)}
        st = Report(date=date, station=station,degrees=degrees,raining=raining)
        value = st.SerializeToString()
        key = bytes(station,"utf-8")
        future = producer.send("stations", value=value, key=key)
        producer.send("stations-json",value = bytes(json.dumps(json_file),"utf-8"),key = key)
        time.sleep(1)
        # TODO: send to "stations" stream using protobuf
        # TODO: send to "stations-json" using JSON

# TODO: start thread to run produce
threading.Thread(target = produce).start()
# never join thread because we want it to run forever

generating records


In [10]:
def load_partition(partition_num):
    path = f"partition-{partition_num}.json"
    if os.path.exists(path):
        with open(path, "r") as file:
            return json.load(file)
    else:
        return {"partition": partition_num, "offset": 0, "stations": {}}

def save_partition(partition):
    path = f"partition-{partition['partition']}.json"
    with open(path, "w") as file:
        json.dump(partition, file)

In [11]:
def consume(part_nums=[], iterations=10):
    consumer = KafkaConsumer(bootstrap_servers=[broker],enable_auto_commit=False)
    # TODO: create list of TopicPartition objects
    consumer.assign([TopicPartition("stations",part) for part in part_nums])
    consumer.assignment()
    # consumer.seek_to_beginning()

    # PART 1: initialization
    partitions = {part:load_partition(part) for part in part_nums} # key=partition num, value=snapshot dict
    Print(partitions)
    # TODO: load partitions from JSON files (if they exist) or create fresh dicts
    for part in partitions:
        consumer.seek(TopicPartition("stations",part),partitions[part].get("offset",0))
        Print("Seeking partition", part, "to offset", partitions[part].get("offset", 0))
    # TODO: if offsets were specified in previous JSON files, the consumer
    #       should seek to those; else, seek to offset 0.

    # PART 2: process batches
    for i in range(iterations):

        batch = consumer.poll(1000) # 1s timeout
        for tp, messages in batch.items():

            snap = partitions[tp.partition]
            for message in messages:
                r = Report.FromString(message.value)
                rec = snap["stations"].setdefault(
                r.station,
                {"sum": 0.0, "count": 0, "avg": 0.0, "start": r.date, "end": r.date}
                )
                if r.date>rec["end"]:
                    rec["sum"] += r.degrees
                    rec["count"]+=1
                    rec["avg"] = rec["sum"]/rec["count"]
                    rec["end"] = r.date

                snap["offset"] = consumer.position(tp)
            save_partition(snap)
            Print(partitions)
                
            # perhaps create a separate function for the following?  You decide.
            # TODO: update the partitions based on new messages
            # TODO: save the data back to the JSON file
    Print("exiting")

for i in range(2):
    print("ROUND", i)
    t1 = threading.Thread(target=consume, args=([0,1], 30))
    t2 = threading.Thread(target=consume, args=([2,3], 30))
    t3 = threading.Thread(target=consume, args=([4,5], 30))
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()

ROUND 0
{2: {'partition': 2, 'offset': 0, 'stations': {}}, 3: {'partition': 3, 'offset': 0, 'stations': {}}}
Seeking partition 2 to offset 0
Seeking partition 3 to offset 0
{0: {'partition': 0, 'offset': 0, 'stations': {}}, 1: {'partition': 1, 'offset': 0, 'stations': {}}}
Seeking partition 0 to offset 0
Seeking partition 1 to offset 0
{4: {'partition': 4, 'offset': 0, 'stations': {}}, 5: {'partition': 5, 'offset': 0, 'stations': {}}}
Seeking partition 4 to offset 0
Seeking partition 5 to offset 0
{0: {'partition': 0, 'offset': 65, 'stations': {'N': {'sum': 2105.0962896116525, 'count': 64, 'avg': 32.89212952518207, 'start': '2000-01-01', 'end': '2000-03-05'}}}, 1: {'partition': 1, 'offset': 0, 'stations': {}}}
{4: {'partition': 4, 'offset': 327, 'stations': {'A': {'sum': 2207.196297901064, 'count': 65, 'avg': 33.95686612155483, 'start': '2000-01-01', 'end': '2000-03-06'}, 'B': {'sum': 2804.330841923062, 'count': 65, 'avg': 43.14355141420096, 'start': '2000-01-01', 'end': '2000-03-06'},