In [1]:
!pip install kafka-python



In [2]:
from kafka import KafkaConsumer, KafkaProducer
import json
import uuid
import os
import logging
import sys
import time
import multiprocessing
from scipy.fft import fft
import numpy as np


multiprocessing.set_start_method('fork')

In [3]:

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



In [4]:
def consume_messages(pipe_connection):
    sys.stdout.write("Starting\n")
    sys.stdout.flush()
    

    consumer = KafkaConsumer(os.environ.get('KAFKA_TOPIC', "accelerometer"), 
                             auto_offset_reset = 'earliest',
                             bootstrap_servers = os.environ.get('KAFKA_BROKER', 'broker1:9093').split(","), 
                             group_id = os.environ.get('KAFKA_GROUP_ID', "accelerometer-group"),
                             api_version = (0, 10), 
                             value_deserializer = json.loads,
                             consumer_timeout_ms = 1000)
    sys.stdout.write("consumer done\n")
    sys.stdout.flush()

    messages_by_key = {}
    
    count = 0
    last_ordering = 0
    while True:
        # Step 1: Consume messages
        try:
            for message in consumer:
                if count == 0:
                    sys.stdout.write(str(message))
                    sys.stdout.write("\n")
                    sys.stdout.flush()
                # convert bytes to string
                key = message.key.decode('utf-8')
                if key not in messages_by_key:
                    # add a mulltiprocessing.Dictionary to the dictionary
                    messages_by_key[key] = {}
                    
                messages_current_key = messages_by_key[key]

                seconds = int(message.timestamp / 1000)
                if seconds not in messages_current_key:
                    # add a mulltiprocessing.Queue for each second
                    messages_current_key[seconds] = []

                messages_current_key_seconds = messages_current_key[seconds]

                messages_current_key_seconds.append([
                    message.timestamp, 
                    message.value.get('x'),
                    message.value.get('y'), 
                    message.value.get('z')
                    ])

                sys.stdout.write(f"\rRead Message {count}")
                count += 1
                
                if count-100 >= last_ordering:
                    # Step 2: Order messages and send to pipe
                    for key in messages_by_key:
                        messages_current_key = messages_by_key[key]
                        seconds = list(messages_current_key.keys())
                        # only process the first n-1 seconds
                        for second in seconds[:-1]:
                            messages_current_key_seconds = messages_current_key[second]
                            messages_current_key_seconds.sort(key=lambda x: x[0])
                            # send the messages to the pipe
                            pipe_connection.send({
                                'key': key,
                                'second': second,
                                'x': [x[1] for x in messages_current_key_seconds],
                                'y': [x[2] for x in messages_current_key_seconds],
                                'z': [x[3] for x in messages_current_key_seconds]
                            })
                            # remove the messages from the dictionary
                            del messages_current_key[second]
                    last_ordering = count
                    # commit offsets so we won't get the same messages again
                    consumer.commit()
        except Exception as ex:
            logger.error('Exception in consuming message', exc_info=True)

        time.sleep(0.05)

In [5]:
def complex_ndarray_to_list(complex_ndarray):
    return [complex_ndarray.real.tolist(), complex_ndarray.imag.tolist()]

def process_messages(pipe_connection):
    producer = KafkaProducer(bootstrap_servers=os.environ.get('KAFKA_BROKER', 'broker1:9093').split(","), 
                              api_version=(0, 10),
                              max_block_ms=10000)
    # poll the pipe for messages and compute the fft
    while True:
        message = pipe_connection.recv()
        key = message['key']
        second = message['second']
        
        # compute the fft
        x_fft = fft(message['x'])
        y_fft = fft(message['y'])
        z_fft = fft(message['z'])
        # send the fft to the producer
        message = {
                    'second': second,
                    'x': complex_ndarray_to_list(x_fft),
                    'y': complex_ndarray_to_list(y_fft),
                    'z': complex_ndarray_to_list(z_fft)
            }
        producer.send('accelerometer-fft',
                        key=key.encode('utf-8'),
                        value=bytes(json.dumps(message), encoding='utf-8'))
        producer.flush()

In [6]:
# start two processes, one for consuming messages and one for processing messages

# create a pipe to communicate between the processes
pipe_end, pipe_start = multiprocessing.Pipe()

p1 = multiprocessing.Process(target=consume_messages, args=(pipe_end,))
p2 = multiprocessing.Process(target=process_messages, args=(pipe_start,))
p1.start()
p2.start()
p1.join()
p2.join()

Starting


INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('accelerometer',)


consumer done


INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=broker1:9093 <connecting> [IPv4 ('172.18.0.5', 9093)]>: connecting to broker1:9093 [('172.18.0.5', 9093) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=broker1:9093 <connecting> [IPv4 ('172.18.0.5', 9093)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=broker1:9093 <connecting> [IPv4 ('172.18.0.5', 9093)]>: connecting to broker1:9093 [('172.18.0.5', 9093) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=broker1:9093 <connecting> [IPv4 ('172.18.0.5', 9093)]>: Connection complete.
INFO:kafka.cluster:Group coordinator for accelerometer-group is BrokerMetadata(nodeId='coordinator-3', host='broker3', port=9097, rack=None)
INFO:kafka.coordinator:Discovered coordinator coordinator-3 for group accelerometer-group
INFO:kafka.coordinator:Starting new heartbeat thread
INFO:kafka.coordinator.consumer:Revoking previously assigned partitions set() for group accelerometer-gr

ConsumerRecord(topic='accelerometer', partition=0, offset=43220, timestamp=1679249934305, timestamp_type=0, key=b'Adolf:Pixel6:running', value={'z': -0.053102970123291016, 'y': -0.2511281967163086, 'x': -0.23227882385253906}, headers=[], checksum=2963818272, serialized_key_size=20, serialized_value_size=81, serialized_header_size=-1)
Read Message 599Processing {key} {second}
Processing {key} {second}
Processing Adolf:Pixel6:running 1679249934
Processing {key} {second}
Processing {key} {second}
Processing {key} {second}
Processing {key} {second}


INFO:kafka.conn:<BrokerConnection node_id=3 host=broker3:9097 <connecting> [IPv4 ('172.18.0.6', 9097)]>: connecting to broker3:9097 [('172.18.0.6', 9097) IPv4]


Processing {key} {second}


INFO:kafka.conn:<BrokerConnection node_id=3 host=broker3:9097 <connecting> [IPv4 ('172.18.0.6', 9097)]>: Connection complete.


Processing {key} {second}


INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=broker1:9093 <connected> [IPv4 ('172.18.0.5', 9093)]>: Closing connection. 


Processing {key} {second}
Processing {key} {second}
Processing {key} {second}
Read Message 600Processing Bormann:Pixel6:running 1679249934
Read Message 604Processing Canaris:Pixel6:running 1679249934
Read Message 605Processing Dietrich:Pixel6:running 1679249934
Read Message 608Processing Erwin:Pixel6:running 1679249934
Read Message 612Processing Franz:Pixel6:running 1679249934
Read Message 615Processing Göring:Pixel6:running 1679249934
Processing Heinz:Pixel6:running 1679249934
Processing Irwin:Pixel6:running 1679249934
Processing Joachim:Pixel6:running 1679249934
Processing Florin:Pixel6:running 1679249934
Read Message 1399Processing {key} {second}
Processing Adolf:Pixel6:running 1679249935
Processing {key} {second}
Processing {key} {second}
Processing {key} {second}
Processing {key} {second}
Processing Bormann:Pixel6:running 1679249935
Processing {key} {second}
Processing {key} {second}
Processing {key} {second}
Processing Canaris:Pixel6:running 1679249935
Processing {key} {second}
P

Process Process-2:
Process Process-1:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/conda/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipykernel_103/2985421838.py", line 10, in process_messages
    message = pipe_connection.recv()
  File "/opt/conda/lib/python3.10/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/opt/conda/lib/python3.10/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/opt/conda/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/conda/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)


KeyboardInterrupt: 

  File "/opt/conda/lib/python3.10/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
  File "/tmp/ipykernel_103/4279202708.py", line 50, in consume_messages
    sys.stdout.write(f"\rRead Message {count}")
KeyboardInterrupt
  File "/opt/conda/lib/python3.10/site-packages/ipykernel/iostream.py", line 571, in write
    self.pub_thread.schedule(self._flush)
INFO:kafka.conn:<BrokerConnection node_id=3 host=broker3:9097 <connected> [IPv4 ('172.18.0.6', 9097)]>: Closing connection. 
  File "/opt/conda/lib/python3.10/site-packages/ipykernel/iostream.py", line 213, in schedule
    f()
  File "/opt/conda/lib/python3.10/site-packages/ipykernel/iostream.py", line 527, in _flush
    self.session.send(
  File "/opt/conda/lib/python3.10/site-packages/jupyter_client/session.py", line 861, in send
    stream.send_multipart(to_send, copy=copy)
  File "/opt/conda/lib/python3.10/site-packages/ipykernel/iostream.py", line 220, in send_multipart
    self.schedule(lambda: 