**`import all the necessary libraries`**


In [1]:
import cv2
import sys
import time
import csv
import numpy as np
import pandas as pd
from kafka import KafkaProducer
from kafka import KafkaConsumer
from loguru import logger
from Algorithms import dcdp, dfNonRecursiveCoresetTree, pdcdp
from Encoder_Decoder.receive_and_decode import *
from Encoder_Decoder.encode_and_transmit import *

**`Setting up the Python consumer`**
- **`Topic: video`**


In [2]:
bootstrap_servers = ['127.0.0.1:9093'] #hostname:port, this is the server where we will consume streaming data
topicName = 'video' #name of the kafka topic to subsribe to 
offset = 'latest'
#set it to 'earliest' to consume frames from the begnning,

**`Define time_it function, it is used to log runtime cost of stream clustering`**

In [3]:
class time_it:
    def stream_runtime(func):
        def wrapper(*args, **kwargs):
            start = time.time()
            result = func(*args, **kwargs)
            end = time.time()
            logger.info(f"Runtime of {func.__name__} is {end - start}")
            with open('./processingTime.csv','a') as f:
                csvwriter = csv.writer(f)
                csvwriter.writerow([f'Processing time (in sec): ', (end - start)])
            return result
        return wrapper

**`Define stream Clustering class`**

**`We are using coreset tree algorithm to generate a small representation of the frame`**  
**`then perform the clustering with dcdp algorithm`**  
**`If you want to test a different algorithm such as PDCDP algorithm`** 
**`then inside the stream_clustering function, replace the dcdp.DCDP`**  
**` with pdcdp.PDCDP(clustering_lambda, parts=4)`**
**`the value of parts can be maximum of MAX no of CPU Cores available on the system`**  

*`For smaller dataset pdcdp is slower than dcdp, as it cannot utilise the full capacity of parallelism`*  
*`For larger dataset pdcdp is significantly Faster than dcdp algorithm, multiple magnitudes faster`*

In [4]:
class stream_clustering(time_it):
    def __init__(self, topic: str, server: list, offset: str,
                 coreset_size: int, clustering_lambda: float, max_iter: int,
                 metrics_topic: str) -> None:
        self.topic = topic
        self.server = server
        self.offset = offset
        self.stream = KafkaConsumer(self.topic, bootstrap_servers=self.server,
                                    auto_offset_reset=self.offset)
        self.coreset_size = coreset_size
        self.clustering_lambda = clustering_lambda
        self.metrics_topic = metrics_topic
        self.max_iter = max_iter
        self.streaming_metrics_gen = KafkaProducer(bootstrap_servers=self.server)

    def start(self) -> None:
        try:
            for message in self.stream:
                # consume message by subscribing to a Kafka topic
                # and decode the message for further processing
                received_frame = message.value.decode()
                decoded_frame = receive_decode_bytes_to_numpy_array(received_frame)
                shape_of_frame = decoded_frame.shape
                frame_df = pd.DataFrame(decoded_frame.reshape(-1,3))
                display_image = self.stream_clustering(frame_df, shape_of_frame,\
                                                        coreset_size=self.coreset_size, \
                                                        clutering_lambda=self.clustering_lambda,
                                                        )
                cv2.imshow('frame', display_image)
                k = cv2.waitKey(1000)
                if k == 27:
                    break
        # closing all open windows
            cv2.destroyAllWindows()
        except KeyboardInterrupt:
            logger.info("Streaming interrupted")

    #'latest' option is used to consume frame as it arrives
    # Read message from consumer
    @time_it.stream_runtime
    def stream_clustering(self, frame: pd.DataFrame,  shape_of_frame: tuple ,\
                                coreset_size: int, clutering_lambda: float) -> np.ndarray:
            Tree = dfNonRecursiveCoresetTree.CoreSetTree(frame.drop_duplicates(), coreset_size) #initialze the coreset tree
            Tree.fit() # finds the coreset and output a coreset dataframe
            coreset = Tree.coreset.to_numpy()
            # cluster_finder = pdcdp.PDCDP(clutering_lambda, 4)# to try pdcdp uncomment this line 
            cluster_finder = dcdp.DC_DP(clutering_lambda, self.max_iter) # and comment this line
            cluster_finder.fit(coreset)
            # logger.info(f"{cluster_finder.centroids}")
            frame = frame.to_numpy()
            labels = cluster_finder.predict(frame)
            encoded_frame = encode_and_transmit_numpy_array_in_bytes(frame.reshape(shape_of_frame))
            value = {'centroids': repr((cluster_finder.centroids).tolist()),\
                                                    'frame': encoded_frame}
            self.streaming_metrics_gen.send(self.metrics_topic, \
                                            repr(value).encode())
            for i in range(frame.shape[0]):
                frame[i] = cluster_finder.centroids[labels[i]] #process the frame
            return frame.reshape((shape_of_frame)) #return a frame containing all the pixels

**`Start stream clustering`**

In [5]:
ssc = stream_clustering(topic=topicName, server=bootstrap_servers, \
                        offset=offset, coreset_size=1000, clustering_lambda=2,\
                        max_iter=150, metrics_topic='metrics')

**`Once the stream processing has started, a new window will be generated`**  
**`And new frames will be displayed as soon as clustering finishes`**

**`Stop the streaming by pressing Esc button on the streaming window`**

In [6]:
ssc.start()

[32m2023-06-21 13:02:05.672[0m | [1mINFO    [0m | [36mAlgorithms.dcdp[0m:[36mfit[0m:[36m57[0m - [1mDCDP took 150 iterations to converge[0m
[32m2023-06-21 13:02:06.349[0m | [1mINFO    [0m | [36m__main__[0m:[36mwrapper[0m:[36m7[0m - [1mRuntime of stream_clustering is 5.018372058868408[0m
QObject::moveToThread: Current thread (0x45b1c90) is not the object's thread (0x4712aa0).
Cannot move to target thread (0x45b1c90)

QObject::moveToThread: Current thread (0x45b1c90) is not the object's thread (0x4712aa0).
Cannot move to target thread (0x45b1c90)

QObject::moveToThread: Current thread (0x45b1c90) is not the object's thread (0x4712aa0).
Cannot move to target thread (0x45b1c90)

QObject::moveToThread: Current thread (0x45b1c90) is not the object's thread (0x4712aa0).
Cannot move to target thread (0x45b1c90)

QObject::moveToThread: Current thread (0x45b1c90) is not the object's thread (0x4712aa0).
Cannot move to target thread (0x45b1c90)

QObject::moveToThread: Curren