In [6]:
import numpy as np
import tensorflow as tf
import confluent_kafka as kafka

# 1. MNIST Kafka Producer, run separately
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
print("train: ", (x_train.shape, y_train.shape))

producer = kafka.Producer({'bootstrap.servers': 'kafka:29092'})
count = 0
for (x, y) in zip(x_train, y_train):
  
  producer.poll(0)
  producer.produce('xx', x.tobytes())
  producer.produce('yy', y.tobytes())
  count += 1

print("count(x, y): ", count)
producer.flush()

train:  ((60000, 28, 28), (60000,))
count(x, y):  60000


0

In [7]:
count = 0
for (x, y) in zip(x_test, y_test):
  
  producer.poll(0)
  producer.produce('xx_test', x.tobytes())
  producer.produce('yy_test', y.tobytes())
  count += 1
 
print("count(x, y): ", count)
producer.flush()

count(x, y):  10000


0

In [8]:
import numpy as np
import tensorflow as tf
import tensorflow_io.kafka as kafka_io
import datetime

# 2. KafkaDataset with map function
def func_x(x):
  # Decode image to (28, 28)
  x = tf.io.decode_raw(x, out_type=tf.uint8)
  x = tf.reshape(x, [28, 28])
  # Convert to float32 for tf.keras
  x = tf.image.convert_image_dtype(x, tf.float32)
  return x
def func_y(y):
  # Decode image to (,)
  y = tf.io.decode_raw(y, out_type=tf.uint8)
  y = tf.reshape(y, [])
  return y
train_images = kafka_io.KafkaDataset(['xx:0'], servers="kafka:29092", group='xx', eof=True).map(func_x)
train_labels = kafka_io.KafkaDataset(['yy:0'], servers="kafka:29092", group='yy', eof=True).map(func_y)
train_kafka = tf.data.Dataset.zip((train_images, train_labels)).batch(1)
print(train_kafka)

# 3. Keras model
model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=(28, 28)),
    tf.keras.layers.Dense(128, activation=tf.nn.relu),
    tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# 4. Add TensorBoard to monitor the model training
log_dir="logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
#tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

# default: 5 epochs and 12000 steps
result = model.fit(train_kafka, epochs=5, steps_per_epoch=1200)

<DatasetV1Adapter shapes: ((?, 28, 28), (?,)), types: (tf.float32, tf.uint8)>
Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [13]:
test_images = kafka_io.KafkaDataset(['xx:0'], servers="kafka:29092", group='xx', eof=True).map(func_x)
test_labels = kafka_io.KafkaDataset(['yy:0'], servers="kafka:29092", group='yy', eof=True).map(func_y)
test_kafka = tf.data.Dataset.zip((test_images, test_labels)).batch(1)
score = model.evaluate(test_kafka, steps=100)

