* If a Kafka topic has 2 partitions and 3 replicas, each partition will have 3 copies of its data spread across different brokers. Here's how replication works in this scenario:

Each partition will have one leader and two followers (replicas).
* The leader is responsible for handling all read and write requests for that partition.
* The followers replicate the data from the leader for fault tolerance.
* If leader of a partition in Kafka fails, one of the followers (replicas) will be elected as the new leader. 
* Client is communicated of the switch and Leader gets updated to the latest data.


### Useful:
* Improved customer experience: Monitor to improve customer service and provide personalised recommendations.
* Predictive analytics in real time: Stock
* Operational efficiency:
* Fraud detection in real time
* IoT sensors: 

## Handling of streaming data Using tensorflow and kafka library

In [1]:
import sys
!{sys.executable} -m pip install -q tensorflow  tensorflow_io kafka-python


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [1]:
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
# SKLearn libraries
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

In [2]:
columns=["Sample code number", "Clump Thickness", "Uniformity of Cell Size", "Uniformity of Cell Shape", "Marginal Adhesion", "Single Epithelial Cell Size",
         "Bare Nuclei", "Bland Chromatin","Normal Nucleoli", "Mitoses", "Class"]

In [3]:
cancer_df = pd.read_csv('data/breast-cancer-wisconsin.data.csv', names= columns)
cancer_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 699 entries, 0 to 698
Data columns (total 11 columns):
 #   Column                       Non-Null Count  Dtype 
---  ------                       --------------  ----- 
 0   Sample code number           699 non-null    int64 
 1   Clump Thickness              699 non-null    int64 
 2   Uniformity of Cell Size      699 non-null    int64 
 3   Uniformity of Cell Shape     699 non-null    int64 
 4   Marginal Adhesion            699 non-null    int64 
 5   Single Epithelial Cell Size  699 non-null    int64 
 6   Bare Nuclei                  699 non-null    object
 7   Bland Chromatin              699 non-null    int64 
 8   Normal Nucleoli              699 non-null    int64 
 9   Mitoses                      699 non-null    int64 
 10  Class                        699 non-null    int64 
dtypes: int64(10), object(1)
memory usage: 60.2+ KB


In [4]:
cancer_df.head()

Unnamed: 0,Sample code number,Clump Thickness,Uniformity of Cell Size,Uniformity of Cell Shape,Marginal Adhesion,Single Epithelial Cell Size,Bare Nuclei,Bland Chromatin,Normal Nucleoli,Mitoses,Class
0,1000025,5,1,1,1,2,1,3,1,1,2
1,1002945,5,4,4,5,7,10,3,2,1,2
2,1015425,3,1,1,1,2,2,3,1,1,2
3,1016277,6,8,8,1,3,4,3,7,1,2
4,1017023,4,1,1,3,2,1,3,1,1,2


In [5]:
cancer_df["Class"].value_counts()

Class
2    458
4    241
Name: count, dtype: int64

In [6]:
cancer_df['Class'] = cancer_df['Class'].replace(2,0)
cancer_df['Class'] = cancer_df['Class'].replace(4,1)

In [7]:
train_df, test_df = train_test_split(cancer_df, test_size=0.4, shuffle=True)

print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["Class"], axis=1)
y_train_df = train_df["Class"]

x_test_df = test_df.drop(["Class"], axis=1)
y_test_df = test_df["Class"]

Number of training samples:  419
Number of testing sample:  280


In [8]:
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))

In [9]:
x_train

['756136,1,1,1,1,2,1,2,1,1',
 '1043999,1,1,1,1,2,3,3,1,1',
 '1113906,9,5,5,2,2,2,5,1,1',
 '1218741,10,10,9,3,7,5,3,5,1',
 '1171845,8,6,4,3,5,9,3,1,1',
 '1200772,1,1,1,1,2,1,2,1,1',
 '1145420,6,1,1,1,2,1,2,1,1',
 '1160476,2,1,1,1,2,1,3,1,1',
 '1076352,3,6,4,10,3,3,3,4,1',
 '1294562,10,8,10,1,3,10,5,1,1',
 '897471,4,8,8,5,4,5,10,4,1',
 '1217952,4,1,1,1,2,1,2,1,1',
 '640744,10,10,10,7,9,10,7,10,10',
 '1212422,4,1,1,1,2,1,3,1,1',
 '1171578,3,1,1,1,2,1,1,1,1',
 '1224329,1,1,1,2,2,1,3,1,1',
 '1352848,3,10,7,8,5,8,7,4,1',
 '1318671,3,1,1,1,2,1,2,1,1',
 '1016277,6,8,8,1,3,4,3,7,1',
 '527337,4,1,1,1,2,1,1,1,1',
 '1258549,9,10,10,10,10,10,10,10,1',
 '8233704,4,1,1,1,1,1,2,1,1',
 '1226012,4,1,1,3,1,5,2,1,1',
 '695219,1,1,1,1,2,1,2,1,1',
 '1059552,1,1,1,1,2,1,3,1,1',
 '666942,1,1,1,1,2,1,3,1,1',
 '1072179,10,7,7,3,8,5,7,4,3',
 '846423,10,6,3,6,4,10,7,8,4',
 '625201,8,2,1,1,5,1,1,1,1',
 '1080233,7,6,6,3,2,10,7,1,1',
 '1147044,3,1,1,1,2,2,7,1,1',
 '369565,4,1,1,1,3,1,1,1,1',
 '1313658,3,1,1,1,2,1,1,

In [10]:
def error_callback(exc):
      raise Exception('Error while sending data to kafka: {0}'.format(str(exc)))


def write_to_kafka(topic_name, items):
      count=0
      producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
      for message, key in items:
        # print(message.encode('utf-8'))
        producer.send(topic_name,
                      key=key.encode('utf-8'),
                      value=message.encode('utf-8')).add_errback(error_callback)
        count+=1
      producer.flush()
      # print(message, key)
      print("Wrote {0} messages into topic: {1}".format(count, topic_name))

# write_to_kafka("cancer-train", zip(x_train, y_train))
# write_to_kafka("cancer-test", zip(x_test, y_test))

In [27]:
def decode_kafka_item(item):
    message = tf.io.decode_csv(item.message,
                            [[0.0] for i in range(10)])
    key = tf.strings.to_number(item.key)
    return (message, key)

In [28]:
BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64


train_ds = tfio.IODataset.from_kafka(topic='cancer-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE) 

2024-04-16 10:19:28.087298: I tensorflow_io/core/kernels/kafka_kernels.cc:349] Kafka tail: 42


In [29]:
OPTIMIZER = "adam"
LOSS = tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS = ['accuracy']
EPOCHS = 10

In [38]:
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(10,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())

Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_12 (Dense)            (None, 128)               1408      
                                                                 
 dropout_9 (Dropout)         (None, 128)               0         
                                                                 
 dense_13 (Dense)            (None, 256)               33024     
                                                                 
 dropout_10 (Dropout)        (None, 256)               0         
                                                                 
 dense_14 (Dense)            (None, 128)               32896     
                                                                 
 dropout_11 (Dropout)        (None, 128)               0         
                                                                 
 dense_15 (Dense)            (None, 1)                

In [39]:
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)

In [40]:
online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["cancer-train"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely,set it -1
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

2024-04-16 10:27:39.480179: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: session.timeout.ms=7000
2024-04-16 10:27:39.480193: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: max.poll.interval.ms=8000
2024-04-16 10:27:39.480199: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: auto.offset.reset=earliest
2024-04-16 10:27:39.480526: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: group.id=cgonline
2024-04-16 10:27:39.480535: I tensorflow_io/core/kernels/kafka_kernels.cc:879] Kafka configuration: bootstrap.servers=127.0.0.1:9092
2024-04-16 10:27:39.481202: I tensorflow_io/core/kernels/kafka_kernels.cc:919] max num of messages per batch: 10000
2024-04-16 10:27:39.482196: I tensorflow_io/core/kernels/kafka_kernels.cc:938] Creating the kafka consumer
2024-04-16 10:27:39.483856: I tensorflow_io/core/kernels/kafka_kernels.cc:945] Subscribing to the kafka topic: cancer-train


In [41]:
online_train_ds

<KafkaBatchIODataset element_spec=DatasetSpec((TensorSpec(shape=(), dtype=tf.string, name=None), TensorSpec(shape=(), dtype=tf.string, name=None)), TensorShape([]))>

In [42]:
def decode_kafka_online_item(raw_message, raw_key):
    print(raw_message)
    message = tf.io.decode_csv(raw_message, [[0.0] for i in range(10)])
    key = tf.strings.to_number(raw_key)
    return (message, key)
 
for mini_ds in online_train_ds:
    mini_ds = mini_ds.shuffle(buffer_size=32)
    mini_ds = mini_ds.map(decode_kafka_online_item)
    mini_ds = mini_ds.batch(32)
    if len(mini_ds) > 0:
      model.fit(mini_ds, epochs=3)
    print("One batch done")

2024-04-16 10:28:07.593106: I tensorflow/core/common_runtime/executor.cc:1210] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_0' with dtype int32
	 [[{{node Placeholder/_0}}]]
2024-04-16 10:28:07.627934: E tensorflow_io/core/kernels/kafka_kernels.cc:774] REBALANCE: Local: Assign partitions
2024-04-16 10:28:07.631779: E tensorflow_io/core/kernels/kafka_kernels.cc:776] Retrieved committed offsets with status code: 0
2024-04-16 10:28:07.631795: I tensorflow_io/core/kernels/kafka_kernels.cc:787] REBALANCE: cancer-train[0], OFFSET: 126 ERROR_CODE: 0
2024-04-16 10:28:07.631798: I tensorflow_io/core/kernels/kafka_kernels.cc:802] REBALANCE: Assigning partitions


Tensor("args_0:0", shape=(), dtype=string)
Epoch 1/3


2024-04-16 10:28:08.136595: I tensorflow_io/core/kernels/kafka_kernels.cc:996] EOF reached for all 1 partition(s)
2024-04-16 10:28:08.175903: I tensorflow/core/common_runtime/executor.cc:1210] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_2' with dtype string and shape [1]
	 [[{{node Placeholder/_2}}]]
2024-04-16 10:28:08.176019: I tensorflow/core/common_runtime/executor.cc:1210] [/device:CPU:0] (DEBUG INFO) Executor start aborting (this does not indicate an error and you can ignore this message): INVALID_ARGUMENT: You must feed a value for placeholder tensor 'Placeholder/_2' with dtype string and shape [1]
	 [[{{node Placeholder/_2}}]]
  output, from_logits = _get_logits(


Epoch 2/3
Epoch 3/3
One batch done


2024-04-16 10:29:39.600987: E tensorflow_io/core/kernels/kafka_kernels.cc:1001] Local: Timed out


## Handling of streaming data Using pytorch and confluent_kafka library

In [26]:
import sys
!{sys.executable} -m pip install -q confluent_kafka torch


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [None]:
from confluent_kafka import Consumer

conf = {
    'bootstrap.servers': '127.0.0.1:9092',
    'group.id': 'your_consumer_group_id',
    # Add any additional configuration here
}

consumer = Consumer(conf)
consumer.subscribe(['kafka_topic'])

while True:
    msg = consumer.poll(timeout=1.0)  # Adjust timeout as needed
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    # Process the received message here
    data = msg.value().decode('utf-8')
    print("Received message:", data)

consumer.close()


In [None]:
import torch
import torch.nn as nn

# Define your model architecture (this is just a placeholder)
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        # Define your layers here

    def forward(self, x):
        # Implement forward pass
        return x

# Load your pre-trained model
model = MyModel()
model.load_state_dict(torch.load('model.pth'))
model.eval()  # Set model to evaluation mode


In [None]:
with torch.no_grad():
    input_data = preprocess_data(data)  # Preprocess the data
    input_tensor = torch.tensor(input_data)  # Convert data to PyTorch tensor
    output = model(input_tensor)  # Perform inference
    # Process the model output as needed
