<a href="https://colab.research.google.com/github/9characters/ML-projects/blob/main/Online_learning_using_Python_Kafka.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Lab 6: Online Learning using Apache Kafka
#### Using Apache Kafka to simulate real time data stream model training.

### Learning Objecives
* Learn to import kafka and write to and read from kafka.
* Learn to use layers in keras.
* Learn to use kafka for data streams and train the model in real-time.
* Learn the concept of online learning.

### Install Package
Install Kafka and Tensorflow IO. Tensorflow IO is an extension package to Tensorflow which supports integration with Apache Kafka and other systems. 

In [None]:
!pip install tensorflow-io
!pip install kafka-python

Collecting tensorflow-io
  Downloading tensorflow_io-0.21.0-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (22.7 MB)
[K     |████████████████████████████████| 22.7 MB 1.2 MB/s 
Collecting tensorflow-io-gcs-filesystem==0.21.0
  Downloading tensorflow_io_gcs_filesystem-0.21.0-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (2.1 MB)
[K     |████████████████████████████████| 2.1 MB 48.6 MB/s 
Installing collected packages: tensorflow-io-gcs-filesystem, tensorflow-io
Successfully installed tensorflow-io-0.21.0 tensorflow-io-gcs-filesystem-0.21.0
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[K     |████████████████████████████████| 246 kB 11.5 MB/s 
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


### Imports
Import all the required libraries for the lab including Kafka, pandas, Tensorflow, and more.



In [None]:
import time 
from kafka import KafkaProducer
from sklearn.model_selection import train_test_split
import pandas as pd 
import tensorflow as tf
import tensorflow_io as tfio

### Download Kafka
Download and setup Kafka for real time data stream simulation. 



In [None]:
!curl -sSOL https://downloads.apache.org/kafka/2.8.1/kafka_2.12-2.8.1.tgz
!tar -xzf kafka_2.12-2.8.1.tgz 

### Start running Kafka and Zookeeper server instances 

Start Kafka and Zookeeper servers as a daemon processes. Zookeeper is a centralized service for maintaing configuration information, naming, providing distributed synchronization, and providing group services.

In [None]:
!./kafka_2.12-2.8.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.12-2.8.1/config/zookeeper.properties
!./kafka_2.12-2.8.1/bin/kafka-server-start.sh -daemon ./kafka_2.12-2.8.1/config/server.properties

### Create a topic to store events
Create topic for train and test dataset to store events in Kafka. Kafka is a distributed event streaming platform that lets you read, write, store and process events. These events or messages are organized and stored in topics. In simple terms, topic is similar to a folder in a filesystem, and the events are the file in that folder.

In [None]:
!./kafka_2.12-2.8.1/bin/kafka-topics.sh --create --topic susy-train --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
!./kafka_2.12-2.8.1/bin/kafka-topics.sh --create --topic susy-test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2

[2021-10-29 18:56:42,847] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-10-29 18:56:42,891] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2021-10-29 18:56:43,093] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Created topic susy-train.
Created topic susy-test.


### Describe the topic for details
Describe command helps us gather details on topic, it's partitions, replicas, and other important information.


In [None]:
!./kafka_2.12-2.8.1/bin/kafka-topics.sh --describe --topic susy-train --bootstrap-server localhost:9092
!./kafka_2.12-2.8.1/bin/kafka-topics.sh --describe --topic susy-test --bootstrap-server localhost:9092

Topic: susy-train	TopicId: _iexn9tNT4Kg7WQ5JjaDlQ	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: susy-train	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic: susy-test	TopicId: g4xrW9ZqSLi8Mm0yYf0Hug	PartitionCount: 2	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: susy-test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: susy-test	Partition: 1	Leader: 0	Replicas: 0	Isr: 0


### Mount Google Drive

In the code cell below, we mount the google drive to the colab environment so that we have access to the local version of the dataset.

In [None]:
!gdown --id "1y0Ej7oYbKKB6YdK_yJ1OkCv6V_a7JAgk"

Downloading...
From: https://drive.google.com/uc?id=1y0Ej7oYbKKB6YdK_yJ1OkCv6V_a7JAgk
To: /content/SUSY.csv
100% 2.39G/2.39G [00:25<00:00, 93.3MB/s]


### Define features
COLUMNS is used to define each of the feature in the SUSY dataset.

In [None]:
COLUMNS = [
           'class',
           'lepton_1_pT',
           'lepton_1_eta',
           'lepton_1_phi',
           'lepton_2_pT',
           'lepton_2_eta',
           'lepton_2_phi',
           'missing_energy_magnitude',
           'missing_energy_phi',
           'MET_rel',
           'axial_MET',
           'M_R',
           'M_TR_2',
           'R',
           'MT2',
           'S_R',
           'M_Delta_R',
           'dPhi_r_b',
           'cos(theta_r1)']

### SUSY Dataset

SUSY Data set is produced using Monte Carlo simulations. It is the data produced from the particle accelerators. The first column of the dataset is the label followed by 8 features which are kinematic properties measured by the particle detectors in the accelerator. The last 10 features are the high-level features derived by physicists to help discriminate between the two classes signal process or a background process.  

### Read CSV
Use Pandas to load SUSY dataset from the CSV file and provide the column name for each of the features.

In [None]:
mydata = pd.read_csv('SUSY.csv', header=None, names=COLUMNS, nrows=100000)

### Visualize Data

Panda functions helps us visualize the SUSY dataset. 

In [None]:
mydata.head()

Unnamed: 0,class,lepton_1_pT,lepton_1_eta,lepton_1_phi,lepton_2_pT,lepton_2_eta,lepton_2_phi,missing_energy_magnitude,missing_energy_phi,MET_rel,axial_MET,M_R,M_TR_2,R,MT2,S_R,M_Delta_R,dPhi_r_b,cos(theta_r1)
0,0.0,0.972861,0.653855,1.176225,1.157156,-1.739873,-0.874309,0.567765,-0.175,0.810061,-0.252552,1.921887,0.889637,0.410772,1.145621,1.932632,0.994464,1.367815,0.040714
1,1.0,1.667973,0.064191,-1.225171,0.506102,-0.338939,1.672543,3.475464,-1.219136,0.012955,3.775174,1.045977,0.568051,0.481928,0.0,0.44841,0.205356,1.321893,0.377584
2,1.0,0.44484,-0.134298,-0.709972,0.451719,-1.613871,-0.768661,1.219918,0.504026,1.831248,-0.431385,0.526283,0.941514,1.587535,2.024308,0.603498,1.562374,1.135454,0.18091
3,1.0,0.381256,-0.976145,0.693152,0.448959,0.891753,-0.677328,2.03306,1.533041,3.04626,-1.005285,0.569386,1.015211,1.582217,1.551914,0.761215,1.715464,1.492257,0.090719
4,1.0,1.309996,-0.690089,-0.676259,1.589283,-0.693326,0.622907,1.087562,-0.381742,0.589204,1.365479,1.179295,0.968218,0.728563,0.0,1.083158,0.043429,1.154854,0.094859


### Split Train and Test data
As always, it is necessary to split the data into train, test, and validation. In this context, we are splitting 80% of the data to be the train data. The remaining 20% of the data is split between test and validation dataset.

In [None]:
train_df, test_df = train_test_split(mydata, test_size=0.3, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]

x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]

Number of training samples:  70000
Number of testing sample:  30000


### Convert data to list format
Read each row from the dataframe and convert it to the list format to feed to Kafka.

In [None]:
#Convert each test and train dataframe to list form to feed to kafka
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
len(x_train), len(y_train), len(x_test), len(y_test)

(70000, 70000, 30000, 30000)

In [None]:
NUM_COLUMNS = len(x_train_df.columns)

### Create Kafka Producer 
Create Kafka producer which takes in data and sends the record to the partition within a topic in Kafka cluster. 

In [None]:
#send each record to a partition within a topic in kafka cluster
def write_to_kafka(topic, items):
  count=0
  producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
  for message, key in items:
    producer.send(topic, key=key.encode('utf-8'), value=message.encode('utf-8'))
    count += 1 
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic))

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))

Wrote 70000 messages into topic: susy-train
Wrote 30000 messages into topic: susy-test


### Online Learning
Unlike traditional training of machine learning models, online learning is based on incrementally learning or updating parameters as soon as the new data points are available. This process continues indefinitely. In the code below, stream_timeout is set to 10000 milliseconds which means as all the messages are consumed from the topic, the dataset will wait for 10 more seconds before timing out and disconnecting from the Kafka cluster. If additional data arrives in that time period, model training resumes. 

In [None]:
online_train_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["susy-train"],
    group_id="cgonline",
    servers="localhost:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_online_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

BATCH_SIZE = 32
online_train_ds = online_train_ds.shuffle(buffer_size=32)
online_train_ds = online_train_ds.map(decode_kafka_online_item)
online_train_ds = online_train_ds.batch(BATCH_SIZE)

### Initialize variables to create ANN
Initialize Optimizer, loss function, metrics function, and epochs for Neural Network.


In [None]:
OPTIMIZER="adam"
LOSS = tf.keras.losses.BinaryCrossentropy()
METRICS = ['accuracy']
EPOCHS = 1

### Define Model
Create Neural network containing multiple layers and use Dropout layer to prevent overfitting in model. 

In [None]:
#define model input shape, and layers of NN
model = tf.keras.Sequential()
model.add(tf.keras.layers.Dense(128, input_shape=(NUM_COLUMNS,), activation='relu'))
model.add(tf.keras.layers.Dropout(0.2))
model.add(tf.keras.layers.Dense(256, activation='relu'))
model.add(tf.keras.layers.Dropout(0.4))
model.add(tf.keras.layers.Dense(128, activation='relu'))
model.add(tf.keras.layers.Dropout(0.4))
model.add(tf.keras.layers.Dense(1, activation='sigmoid'))
print(model.summary())

Model: "sequential_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_8 (Dense)              (None, 128)               2432      
_________________________________________________________________
dropout_6 (Dropout)          (None, 128)               0         
_________________________________________________________________
dense_9 (Dense)              (None, 256)               33024     
_________________________________________________________________
dropout_7 (Dropout)          (None, 256)               0         
_________________________________________________________________
dense_10 (Dense)             (None, 128)               32896     
_________________________________________________________________
dropout_8 (Dropout)          (None, 128)               0         
_________________________________________________________________
dense_11 (Dense)             (None, 1)                

In [None]:
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)

In [None]:
model.fit(online_train_ds, epochs=EPOCHS)

### Prepare test data

Prepare the test dataset using KafkaGroupIODataset to stream and test with the model we initialized before.

In [None]:
#prepare test data to evaluate on the online trained model
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["susy-test"],
    group_id="testcg",
    servers="localhost:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(32)

### Evaluate Model
Use predefined function evaluate to figure loss value and metric value with the test data.

In [None]:
res = model.evaluate(test_ds)



In [None]:
!./kafka_2.12-2.8.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group cgonline
!./kafka_2.12-2.8.1/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testcg


Consumer group 'cgonline' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
cgonline        susy-train      0          70000           70000           0               -               -               -

Consumer group 'testcg' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testcg          susy-test       0          16223           16223           0               -               -               -
testcg          susy-test       1          13777           13777           0               -               -               -


**What is the difference between this Lab (Lab 6) and Lab 4 (IDS)? [5 points]**

In Lab 4, we do not incorporate the change in dataset or any incremental learning. The data was static and just trained over the data. But in Lab 6, we continually get the data as the real-time data steram. This will help to learn the new data continually using the tools like apache-kafka in lab 6.


**Did you observe any differences in result during the evaluation of the model when you rerun? [5 points]**

When we try to run the model again with the data from the same topic "sysy-train", then the error "Empty array" appears. This means that the data from the topic has already been used up, and there is no new data in th event stream.

Still, if we try to run the same program starting from the first, then we see that the accuracy and loss have not changed significantly. Therefore, to see the changes in the evaluations metrices, more topics with new data need to be created and trained using the Neural Networks Model. The results on multiple runs are as follows:

First Run => loss: 0.4421 - accuracy: 0.7940

Second Run => loss: 0.4407 - accuracy: 0.7971

Third Run => loss: 0.4459 - accuracy: 0.7925

