<a href="https://colab.research.google.com/github/cluainin/Lab6/blob/main/Lab6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install tensorflow-io
!pip install kafka-python

Collecting tensorflow-io
  Downloading tensorflow_io-0.37.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (49.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.6/49.6 MB[0m [31m12.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorflow-io
Successfully installed tensorflow-io-0.37.0
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


Import packages

In [2]:
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

Validate tf and tfio imports

In [6]:
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))

tensorflow-io version: 0.37.0
tensorflow version: 2.15.0


Download and setup Kafka and Zookeeper instances downloaded 2.13-3.7.0 latest version

In [12]:
!curl -sSOL https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
!tar -xzf kafka_2.13-3.7.0.tgz



Using the default configurations (provided by Apache Kafka) for spinning up the instances.

In [13]:
!./kafka_2.13-3.7.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.7.0/config/zookeeper.properties
!./kafka_2.13-3.7.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.7.0/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

Waiting for 10 secs until kafka and zookeeper services are up and running


Once the instances are started as daemon processes, grep for kafka in the processes list. The two java processes correspond to zookeeper and the kafka instances.

In [14]:
!ps -ef | grep kafka

root       14515     224  0 19:43 ?        00:00:00 /bin/bash -c ps -ef | grep kafka
root       14517   14515  0 19:43 ?        00:00:00 grep kafka


Create the kafka topics with the following specs:

susy-train: partitions=1, replication-factor=1
susy-test: partitions=2, replication-factor=1

In [15]:
!./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train
!./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test


Created topic susy-train.
Created topic susy-test.


Describe the topic for details on the configuration

In [16]:
!./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train
!./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test

Topic: susy-train	TopicId: 5J9m4jSZQv2okDHm2FXdgQ	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: susy-train	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic: susy-test	TopicId: ZGdJQsEQQ0KVvsuJN_a9ig	PartitionCount: 2	ReplicationFactor: 1	Configs: 
	Topic: susy-test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: susy-test	Partition: 1	Leader: 0	Replicas: 0	Isr: 0


The replication factor 1 indicates that the data is not being replicated. This is due to the presence of a single broker in our kafka setup. In production systems, the number of bootstrap servers can be in the range of 100's of nodes. That is where the fault-tolerance using replication comes into picture.

SUSY Dataset
Kafka being an event streaming platform, enables data from various sources to be written into it. For instance:

Web traffic logs
Astronomical measurements
IoT sensor data
Product reviews and many more.
For the purpose of this tutorial, lets download the SUSY dataset and feed the data into kafka manually. The goal of this classification problem is to distinguish between a signal process which produces supersymmetric particles and a background process which does not.

In [22]:
!curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

#!curl -sSOL https://archive.ics.uci.edu/static/public/279/susy.zip






Explore the dataset
The first column is the class label (1 for signal, 0 for background), followed by the 18 features (8 low-level features then 10 high-level features). The first 8 features are kinematic properties measured by the particle detectors in the accelerator. The last 10 features are functions of the first 8 features. These are high-level features derived by physicists to help discriminate between the two classes.

In [23]:
COLUMNS = [
          #  labels
           'class',
          #  low-level features
           'lepton_1_pT',
           'lepton_1_eta',
           'lepton_1_phi',
           'lepton_2_pT',
           'lepton_2_eta',
           'lepton_2_phi',
           'missing_energy_magnitude',
           'missing_energy_phi',
          #  high-level derived features
           'MET_rel',
           'axial_MET',
           'M_R',
           'M_TR_2',
           'R',
           'MT2',
           'S_R',
           'M_Delta_R',
           'dPhi_r_b',
           'cos(theta_r1)'
           ]

The entire dataset consists of 5 million rows. However, for the purpose of this tutorial, let's consider only a fraction of the dataset (100,000 rows) so that less time is spent on the moving the data and more time on understanding the functionality of the api.

In [24]:
susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()

Unnamed: 0,class,lepton_1_pT,lepton_1_eta,lepton_1_phi,lepton_2_pT,lepton_2_eta,lepton_2_phi,missing_energy_magnitude,missing_energy_phi,MET_rel,axial_MET,M_R,M_TR_2,R,MT2,S_R,M_Delta_R,dPhi_r_b,cos(theta_r1)
0,0.0,0.972861,0.653855,1.176225,1.157156,-1.739873,-0.874309,0.567765,-0.175,0.810061,-0.252552,1.921887,0.889637,0.410772,1.145621,1.932632,0.994464,1.367815,0.040714
1,1.0,1.667973,0.064191,-1.225171,0.506102,-0.338939,1.672543,3.475464,-1.219136,0.012955,3.775174,1.045977,0.568051,0.481928,0.0,0.44841,0.205356,1.321893,0.377584
2,1.0,0.44484,-0.134298,-0.709972,0.451719,-1.613871,-0.768661,1.219918,0.504026,1.831248,-0.431385,0.526283,0.941514,1.587535,2.024308,0.603498,1.562374,1.135454,0.18091
3,1.0,0.381256,-0.976145,0.693152,0.448959,0.891753,-0.677328,2.03306,1.533041,3.04626,-1.005285,0.569386,1.015211,1.582217,1.551914,0.761215,1.715464,1.492257,0.090719
4,1.0,1.309996,-0.690089,-0.676259,1.589283,-0.693326,0.622907,1.087562,-0.381742,0.589204,1.365479,1.179295,0.968218,0.728563,0.0,1.083158,0.043429,1.154854,0.094859


In [25]:
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)

(100000, 19)

In [26]:
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])

(54025, 45975)

Split the dataset

In [27]:
train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]

x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]

# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))


Number of training samples:  60000
Number of testing sample:  40000


In [28]:
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)

(60000, 60000, 40000, 40000)

Store the train and test data in kafka
Storing the data in kafka simulates an environment for continuous remote data retrieval for training and inference purposes.

In [29]:
def error_callback(exc):
    raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
  for message, key in items:
    producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
    count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))

Wrote 60000 messages into topic: susy-train
Wrote 40000 messages into topic: susy-test


Define the tfio train dataset
The IODataset class is utilized for streaming data from kafka into tensorflow. The class inherits from tf.data.Dataset and thus has all the useful functionalities of tf.data.Dataset out of the box.

In [31]:
def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)

NotImplementedError: unable to open file: libtensorflow_io.so, from paths: ['/usr/local/lib/python3.10/dist-packages/tensorflow_io/python/ops/libtensorflow_io.so']
caused by: ['/usr/local/lib/python3.10/dist-packages/tensorflow_io/python/ops/libtensorflow_io.so: undefined symbol: _ZNK10tensorflow4data11DatasetBase8FinalizeEPNS_15OpKernelContextESt8functionIFN4absl12lts_202308028StatusOrIN3tsl4core11RefCountPtrIS1_EEEEvEE']

In [32]:
#resolve the issue above by checking tensorflow is installed

pip install tensorflow-io



In [35]:
#install libraries

import tensorflow as tf
import tensorflow_io as tfio


In [36]:
#tensorflow & tensorflow i/o versions

print(tf.__version__)
print(tfio.__version__)


2.15.0
0.37.0


Modified code to get the original code to run

In [37]:
# Install TensorFlow I/O
!pip install tensorflow-io

import tensorflow as tf
import tensorflow_io as tfio

NUM_COLUMNS = 18  # Update this to the actual number of columns in your CSV

def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE = 64
SHUFFLE_BUFFER_SIZE = 64

# Create the Kafka IODataset
train_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=['susy-train'],
    group_id='test',
    servers='localhost:9092',  # Update with your Kafka broker address
    stream_timeout=10000
)

train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)




NotImplementedError: unable to open file: libtensorflow_io.so, from paths: ['/usr/local/lib/python3.10/dist-packages/tensorflow_io/python/ops/libtensorflow_io.so']
caused by: ['/usr/local/lib/python3.10/dist-packages/tensorflow_io/python/ops/libtensorflow_io.so: undefined symbol: _ZNK10tensorflow4data11DatasetBase8FinalizeEPNS_15OpKernelContextESt8functionIFN4absl12lts_202308028StatusOrIN3tsl4core11RefCountPtrIS1_EEEEvEE']

Uninstall tensorflow and tensorflow i/o to try to resolve the above issue

In [38]:
!pip uninstall -y tensorflow tensorflow-io


Found existing installation: tensorflow 2.15.0
Uninstalling tensorflow-2.15.0:
  Successfully uninstalled tensorflow-2.15.0
Found existing installation: tensorflow-io 0.37.0
Uninstalling tensorflow-io-0.37.0:
  Successfully uninstalled tensorflow-io-0.37.0


reinstall again

In [39]:
!pip install tensorflow==2.11.0 tensorflow-io==0.29.0


Collecting tensorflow==2.11.0
  Downloading tensorflow-2.11.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (588.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m588.3/588.3 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting tensorflow-io==0.29.0
  Downloading tensorflow_io-0.29.0-cp310-cp310-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (26.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.9/26.9 MB[0m [31m10.7 MB/s[0m eta [36m0:00:00[0m
Collecting gast<=0.4.0,>=0.2.1 (from tensorflow==2.11.0)
  Downloading gast-0.4.0-py3-none-any.whl (9.8 kB)
Collecting keras<2.12,>=2.11.0 (from tensorflow==2.11.0)
  Downloading keras-2.11.0-py2.py3-none-any.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m79.1 MB/s[0m eta [36m0:00:00[0m
Collecting protobuf<3.20,>=3.9.2 (from tensorflow==2.11.0)
  Downloading protobuf-3.19.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_

Import the libraries

In [1]:
import tensorflow as tf
import tensorflow_io as tfio


check versions

In [2]:
print(tf.__version__)
print(tfio.__version__)


2.11.0
0.29.0


Run code again

In [3]:
NUM_COLUMNS = 18  # Update this to the actual number of columns in your CSV

def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE = 64
SHUFFLE_BUFFER_SIZE = 64

train_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=['susy-train'],
    group_id='test',
    servers='localhost:9092',  # Update with your Kafka broker address
    stream_timeout=10000
)

train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)


Instructions for updating:
Use `tf.data.Dataset.counter(...)` instead.
Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089
Instructions for updating:
Use `tf.data.Dataset.take_while(...)


TypeError: in user code:


    TypeError: outer_factory.<locals>.inner_factory.<locals>.tf__decode_kafka_item() takes 1 positional argument but 2 were given


In [4]:
# Install TensorFlow I/O
!pip install tensorflow-io

import tensorflow as tf
import tensorflow_io as tfio

NUM_COLUMNS = 18  # Update this to the actual number of columns in your CSV

def decode_kafka_item(message, key):
    message = tf.io.decode_csv(message, [[0.0] for i in range(NUM_COLUMNS)])
    key = tf.strings.to_number(key)
    return (message, key)

BATCH_SIZE = 64
SHUFFLE_BUFFER_SIZE = 64

# Create the Kafka IODataset
train_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=['susy-train'],
    group_id='test',
    servers='localhost:9092',  # Update with your Kafka broker address
    stream_timeout=10000
)

train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(lambda item: decode_kafka_item(item.message, item.key))
train_ds = train_ds.batch(BATCH_SIZE)




TypeError: in user code:


    TypeError: outer_factory.<locals>.inner_factory.<locals>.<lambda>() takes 1 positional argument but 2 were given


In [5]:
# Install TensorFlow I/O
!pip install tensorflow-io

import tensorflow as tf
import tensorflow_io as tfio

NUM_COLUMNS = 18  # Update this to the actual number of columns in your CSV

def decode_kafka_item(item):
    message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
    key = tf.strings.to_number(item.key)
    return (message, key)

BATCH_SIZE = 64
SHUFFLE_BUFFER_SIZE = 64

# Create the Kafka IODataset
train_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=['susy-train'],
    group_id='test',
    servers='localhost:9092',  # Update with your Kafka broker address
    stream_timeout=10000
)

train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(lambda item: decode_kafka_item(item))
train_ds = train_ds.batch(BATCH_SIZE)




TypeError: in user code:


    TypeError: outer_factory.<locals>.inner_factory.<locals>.<lambda>() takes 1 positional argument but 2 were given


In [6]:
# Install TensorFlow I/O
!pip install tensorflow-io

import tensorflow as tf
import tensorflow_io as tfio

NUM_COLUMNS = 18  # Update this to the actual number of columns in your CSV

def decode_kafka_item(message, key):
    message = tf.io.decode_csv(message, [[0.0] for _ in range(NUM_COLUMNS)])
    key = tf.strings.to_number(key)
    return (message, key)

BATCH_SIZE = 64
SHUFFLE_BUFFER_SIZE = 64

# Create the Kafka IODataset
train_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=['susy-train'],
    group_id='test',
    servers='localhost:9092',  # Update with your Kafka broker address
    stream_timeout=10000
)

# Shuffle and batch the dataset
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(lambda item: decode_kafka_item(item.message, item.key))
train_ds = train_ds.batch(BATCH_SIZE)




TypeError: in user code:


    TypeError: outer_factory.<locals>.inner_factory.<locals>.<lambda>() takes 1 positional argument but 2 were given


In [7]:
import tensorflow as tf
import tensorflow_io as tfio

NUM_COLUMNS = 18
BATCH_SIZE = 64
SHUFFLE_BUFFER_SIZE = 64

def decode_kafka_item(item):
    message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
    key = tf.strings.to_number(item.key)
    return (message, key)

train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)


# **Build and train the model**

In [8]:
# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10


In [9]:
# design/build the model
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 128)               2432      
                                                                 
 dropout (Dropout)           (None, 128)               0         
                                                                 
 dense_1 (Dense)             (None, 256)               33024     
                                                                 
 dropout_1 (Dropout)         (None, 256)               0         
                                                                 
 dense_2 (Dense)             (None, 128)               32896     
                                                                 
 dropout_2 (Dropout)         (None, 128)               0         
                                                                 
 dense_3 (Dense)             (None, 1)                 1

In [10]:
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)

In [11]:
# fit the model
model.fit(train_ds, epochs=EPOCHS)

Epoch 1/10


  output, from_logits = _get_logits(


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7c3c5b778c10>

Note: Please do not confuse the training step with online training. It's an entirely different paradigm which will be covered in a later section.

Since only a fraction of the dataset is being utilized, our accuracy is limited to ~78% during the training phase. However, please feel free to store additional data in kafka for a better model performance. Also, since the goal was to just demonstrate the functionality of the tfio kafka datasets, a smaller and less-complicated neural network was used. However, one can increase the complexity of the model, modify the learning strategy, tune hyper-parameters etc for exploration purposes. For a baseline approach, please refer to this article.

# **Infer on the test data**
To infer on the test data by adhering to the 'exactly-once' semantics along with fault-tolerance, the streaming.KafkaGroupIODataset can be utilized.

# **Define the tfio test dataset**
The stream_timeout parameter blocks for the given duration for new data points to be streamed into the topic. This removes the need for creating new datasets if the data is being streamed into the topic in an intermittent fashion.

In [12]:
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["susy-test"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)

Though this class can be used for training purposes, there are caveats which need to be addressed. Once all the messages are read from kafka and the latest offsets are committed using the streaming.KafkaGroupIODataset, the consumer doesn't restart reading the messages from the beginning. Thus, while training, it is possible only to train for a single epoch with the data continuously flowing in. This kind of a functionality has limited use cases during the training phase wherein, once a datapoint has been consumed by the model it is no longer required and can be discarded.

However, this functionality shines when it comes to robust inference with exactly-once semantics.

# **evaluate the performance on the test data**

In [13]:
res = model.evaluate(test_ds)
print("test loss, test acc:", res)

test loss, test acc: [0.44017064571380615, 0.7963500022888184]


Since the inference is based on 'exactly-once' semantics, the evaluation on the test set can be run only once. In order to run the inference again on the test data, a new consumer group should be used.

# Track the offset lag of the testcg consumer group

In [14]:
!./kafka_2.13-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg


Consumer group 'testcg' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testcg          susy-test       0          21684           21684           0               -               -               -
testcg          susy-test       1          18316           18316           0               -               -               -


Once the current-offset matches the log-end-offset for all the partitions, it indicates that the consumer(s) have completed fetching all the messages from the kafka topic.

**`Online learning`**

The online machine learning paradigm is a bit different from the traditional/conventional way of training machine learning models. In the former case, the model continues to incrementally learn/update it's parameters as soon as the new data points are available and this process is expected to continue indefinitely. This is unlike the latter approaches where the dataset is fixed and the model iterates over it n number of times. In online learning, the data once consumed by the model may not be available for training again.

By utilizing the streaming.KafkaBatchIODataset, it is now possible to train the models in this fashion. Let's continue to use our SUSY dataset for demonstrating this functionality.

# **The tfio training dataset for online learning**

The streaming.KafkaBatchIODataset is similar to the streaming.KafkaGroupIODataset in it's API. Additionally, it is recommended to utilize the stream_timeout parameter to configure the duration for which the dataset will block for new messages before timing out. In the instance below, the dataset is configured with a stream_timeout of 10000 milliseconds. This implies that, after all the messages from the topic have been consumed, the dataset will wait for an additional 10 seconds before timing out and disconnecting from the kafka cluster. If new messages are streamed into the topic before timing out, the data consumption and model training resumes for those newly consumed data points. To block indefinitely, set it to -1.

In [15]:
online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["susy-train"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

Every item that the online_train_ds generates is a tf.data.Dataset in itself. Thus, all the standard transformations can be applied as usual.

In [16]:
def decode_kafka_online_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

for mini_ds in online_train_ds:
  mini_ds = mini_ds.shuffle(buffer_size=32)
  mini_ds = mini_ds.map(decode_kafka_online_item)
  mini_ds = mini_ds.batch(32)
  if len(mini_ds) > 0:
    model.fit(mini_ds, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3
Epoch 1/3
Epoch 2/3
Epoch 3/3


The incrementally trained model can be saved in a periodic fashion (based on use-cases) and can be utilized to infer on the test data in either online or offline modes.

Note: The streaming.KafkaBatchIODataset and streaming.KafkaGroupIODataset are still in experimental phase and have scope for improvements based on user-feedback.