<a href="https://colab.research.google.com/github/SharmaNatasha/Machine-Learning-using-Python/blob/master/Streaming_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip uninstall tensorflow
!pip uninstall tensorflow_io
!pip install tensorflow==2.7.1
!pip install tensorflow_io==0.23.1
!pip install kafka-python

In [None]:
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
# SKLearn libraries
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

In [None]:
!curl -sSOL https://downloads.apache.org/kafka/3.3.2/kafka_2.13-3.3.2.tgz
!tar -xzf kafka_2.13-3.3.2.tgz

In [None]:
!./kafka_2.13-3.3.2/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.3.2/config/zookeeper.properties
!./kafka_2.13-3.3.2/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.3.2/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

Waiting for 10 secs until kafka and zookeeper services are up and running


In [None]:
!ps -ef | grep kafka

root       11334       1 13 10:46 ?        00:00:02 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/content/kafka_2.13-3.3.2/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/content/kafka_2.13-3.3.2/bin/../logs -Dlog4j.configuration=file:./kafka_2.13-3.3.2/bin/../config/log4j.properties -cp /content/kafka_2.13-3.3.2/bin/../libs/activation-1.1.1.jar:/content/kafka_2.13-3.3.2/bin/../libs/aopalliance-repackaged-2.6.1.jar:/content/kafka_2.13-3.3.2/bin/../libs/argparse4j-0.7.0.jar:/content/kafka_2.13-3.3.2/bin/../libs/audience-annotations-0.5.0.jar:/content/kafka_2.13-3.3.2/bin/../libs/commons-cli-1.4.jar:/content/kafka_2.13-3.3.2/bin/../libs/commons-lang3-3.12.0.jar:/content/kafka_2.1

In [None]:
!./kafka_2.13-3.3.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic cancer-train
!./kafka_2.13-3.3.2/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic cancer-test

Created topic cancer-train.
Created topic cancer-test.


In [None]:
cancer_df = pd.read_csv('breast-cancer-wisconsin.data.csv')
cancer_df.head()

Unnamed: 0,Sample code number,Clump Thickness,Uniformity of Cell Size,uniformity of Cell Shape,Marginal Adhesion,Single Epithelial Cell Size,Bare Nuclei,Bland Chromatin,Normal Nucleoli,Mitoses,Class
0,1000025,5,1,1,1,2,1,3,1,1,2
1,1002945,5,4,4,5,7,10,3,2,1,2
2,1015425,3,1,1,1,2,2,3,1,1,2
3,1016277,6,8,8,1,3,4,3,7,1,2
4,1017023,4,1,1,3,2,1,3,1,1,2


In [None]:
cancer_df.dtypes

Sample code number              int64
Clump Thickness                 int64
Uniformity of Cell Size         int64
uniformity of Cell Shape        int64
Marginal Adhesion               int64
Single Epithelial Cell Size     int64
Bare Nuclei                    object
Bland Chromatin                 int64
Normal Nucleoli                 int64
Mitoses                         int64
Class                           int64
dtype: object

In [None]:
# Number of datapoints and columns
len(cancer_df), len(cancer_df.columns)

(699, 11)

In [None]:
cancer_df['Class'] = cancer_df['Class'].replace(2,0)
cancer_df['Class'] = cancer_df['Class'].replace(4,1)

In [None]:
len(cancer_df[cancer_df["Class"]==0]), len(cancer_df[cancer_df["Class"]==1])

(458, 241)

In [None]:
cancer_df = cancer_df.drop(['Bare Nuclei'], axis=1)
cancer_df.dtypes

Sample code number             int64
Clump Thickness                int64
Uniformity of Cell Size        int64
uniformity of Cell Shape       int64
Marginal Adhesion              int64
Single Epithelial Cell Size    int64
Bland Chromatin                int64
Normal Nucleoli                int64
Mitoses                        int64
Class                          int64
dtype: object

In [None]:
cancer_df.isnull().sum()

Sample code number             0
Clump Thickness                0
Uniformity of Cell Size        0
uniformity of Cell Shape       0
Marginal Adhesion              0
Single Epithelial Cell Size    0
Bland Chromatin                0
Normal Nucleoli                0
Mitoses                        0
Class                          0
dtype: int64

In [None]:
train_df, test_df = train_test_split(cancer_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))

x_train_df = train_df.drop(["Class"], axis=1)
y_train_df = train_df["Class"]

x_test_df = test_df.drop(["Class"], axis=1)
y_test_df = test_df["Class"]

len(x_train_df.columns)

Number of training samples:  419
Number of testing sample:  280


9

In [None]:
x_train_df.to_csv(index=False).split("\n")[1:]

In [None]:
# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))

x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))

In [None]:
NUM_COLUMNS = len(x_train_df.columns)
print(NUM_COLUMNS)
len(x_train), len(y_train), len(x_test), len(y_test)

9


(419, 419, 280, 280)

In [None]:
x_train_df.head(1)

Unnamed: 0,Sample code number,Clump Thickness,Uniformity of Cell Size,uniformity of Cell Shape,Marginal Adhesion,Single Epithelial Cell Size,Bland Chromatin,Normal Nucleoli,Mitoses
6,1018099,1,1,1,1,2,3,1,1


In [None]:
def error_callback(exc):
      raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))

def write_to_kafka(topic_name, items):
      count=0
      producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
      for message, key in items:
        producer.send(topic_name,
                      key=key.encode('utf-8'),
                      value=message.encode('utf-8')).add_errback(error_callback)
        count+=1
      producer.flush()
      print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka("cancer-train", zip(x_train, y_train))
write_to_kafka("cancer-test", zip(x_test, y_test))

Wrote 419 messages into topic: cancer-train
Wrote 280 messages into topic: cancer-test


In [None]:
def decode_kafka_item(item):
  message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(item.key)
  return (message, key)

BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64

train_ds = tfio.IODataset.from_kafka('cancer-train1', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)

In [None]:
# Set the parameters

OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10

In [None]:
# design/build the model
model = tf.keras.Sequential([
  tf.keras.layers.Input(shape=(9,)),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(0.4),
  tf.keras.layers.Dense(1, activation='sigmoid')
])

print(model.summary())

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 128)               1280      
                                                                 
 dropout (Dropout)           (None, 128)               0         
                                                                 
 dense_1 (Dense)             (None, 256)               33024     
                                                                 
 dropout_1 (Dropout)         (None, 256)               0         
                                                                 
 dense_2 (Dense)             (None, 128)               32896     
                                                                 
 dropout_2 (Dropout)         (None, 128)               0         
                                                                 
 dense_3 (Dense)             (None, 1)                 1

In [None]:
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)

In [None]:
model.fit(train_ds, epochs=EPOCHS)

Epoch 1/10


  return dispatch_target(*args, **kwargs)


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7f64138b0790>

In [None]:
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
    topics=["cancer-test1"],
    group_id="testcg",
    servers="127.0.0.1:9092",
    stream_timeout=10000,
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

def decode_kafka_test_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)

test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)

Instructions for updating:
Use `tf.data.Dataset.take_while(...)


In [None]:
res = model.evaluate(test_ds)
print("test loss, test acc:", res)

test loss, test acc: [2.1182117462158203, 0.3321428596973419]


In [None]:
online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
    topics=["cancer-train1"],
    group_id="cgonline",
    servers="127.0.0.1:9092",
    stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1.
    configuration=[
        "session.timeout.ms=7000",
        "max.poll.interval.ms=8000",
        "auto.offset.reset=earliest"
    ],
)

Instructions for updating:
Use `tf.data.Dataset.take_while(...)


In [None]:
def decode_kafka_online_item(raw_message, raw_key):
  message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
  key = tf.strings.to_number(raw_key)
  return (message, key)
  
for mini_ds in online_train_ds:
  mini_ds = mini_ds.shuffle(buffer_size=32)
  mini_ds = mini_ds.map(decode_kafka_online_item)
  mini_ds = mini_ds.batch(32)
  if len(mini_ds) > 0:
    model.fit(mini_ds, epochs=3)

Epoch 1/3


  return dispatch_target(*args, **kwargs)


Epoch 2/3
Epoch 3/3
