# Read mnist data from keras

In [1]:
import numpy as np
import pickle
from tensorflow import keras
from tensorflow.keras import layers
import mlflow

from hyperopt import hp, tpe, fmin, Trials, SparkTrials, STATUS_OK

# Model / data parameters
num_classes = 10
input_shape = (28, 28, 1)

# Load the data and split it between train and test sets
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
# Scale images to the [0, 1] range
x_train = x_train.astype("float32") / 255
x_test = x_test.astype("float32") / 255
print(x_train.shape)

# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
print(y_train.shape)

(60000, 28, 28)
(60000, 10)


## Reshape the mnist dataset and save into txt file

In [None]:
train_all = np.concatenate((x_train, x_test))
train_save = np.split(train_all,[10000])
file_name = 'mnist_data.p'
print(train_save[0].shape)
print(train_save[1].shape)
pickle.dump(train_save[0],open(file_name,"wb"))

Save label

In [None]:
file_name = 'mnist_label.p'
label_all = np.concatenate((y_train,y_test))
train_label = np.split(label_all,[10000])
print(train_label[0].shape)
print(train_label[1].shape)
pickle.dump(train_label[0],open(file_name,"wb"))

Try to read bin file

In [None]:
file_name = 'mnist_data.p'
read_x_train = pickle.load(open(file_name,'rb'))
read_x_train = read_x_train.reshape(read_x_train.shape[0],-1).tolist()
file_name = 'mnist_label.p'
read_y_train = pickle.load(open(file_name,'rb')).tolist()
print(read_y_train[0])

In [None]:
import pickle
from pathlib import Path
import pandas as pd
import numpy as np

import json
import mlflow
from tensorflow import keras
from tensorflow.keras import layers
from kafka import KafkaConsumer, KafkaProducer
from hyperopt import hp, tpe, fmin, Trials, SparkTrials, STATUS_OK

KAFKA_HOST = 'redpc:9092'
TOPICS = ['mnist_app', 'mnist_train']
PATH = Path('mnist_data/')
APPEND_DATA = PATH/'messages'
TRAIN_DATA = PATH/'train'

consumer = KafkaConsumer(bootstrap_servers=KAFKA_HOST)
consumer.subscribe(TOPICS)

mlflow.set_tracking_uri("http://redpc:5000")
model_name = "mnist_best_model"
client = mlflow.tracking.MlflowClient()

model = mlflow.pyfunc.load_model(
    model_uri=f"models:/{model_name}/Production"
)

for kafka_msg in consumer:
    msg_value = json.loads(kafka_msg.value)
    if kafka_msg.topic == 'mnist_app' and 'prediction' not in msg_value:
        request_id = msg_value['request_id']
        pred_data = np.array(msg_value['data']).reshape(1,28,28,1)
        pred = model.predict(pred_data)
        print(pred)

In [None]:
import json
import pandas as pd
import numpy as np

import mlflow
from pathlib import Path
from tensorflow import keras
from tensorflow.keras import layers
from utils.kafka_producer import publish_messages, append_data
from kafka import KafkaConsumer, KafkaProducer


# The global parameters. Data path and server uri
KAFKA_HOST = 'redpc:9092'
TOPICS = ['mnist_app', 'mnist_train']
MODLE_NAME = 'mnist_best_model'
PATH = Path('mnist_data/')
APPEND_DATA = PATH/'messages'
TRAIN_DATA = PATH/'train'
RETRAIN_EVERY = 50

mlflow.set_tracking_uri("http://redpc:5000")
client = mlflow.tracking.MlflowClient()


def reload_model(model_name:str, model_version:str = None) -> mlflow.pyfunc.PyFuncModel:
    '''
    The models are stored in the MLflow tracking server.
    Fetch the latest model
    '''
    if model_version:
        model = mlflow.pyfunc.load_model(
            model_uri=f"models:/{model_name}/{model_version}"
        )
    else:
        model = mlflow.pyfunc.load_model(
            model_uri=f"models:/{model_name}/Production"
        )
    return model

def predict(model,message):
    pred_data = np.array(message).reshape(1,28,28,1)
    pred = model.predict(pred_data)
    return pred

if __name__ == '__main__':
    model = reload_model(model_name = MODLE_NAME)
    consumer = KafkaConsumer(bootstrap_servers=KAFKA_HOST)
    consumer.subscribe(TOPICS)
    message_count = 0
    batch_id = 0
    for kafka_msg in consumer:
        msg_value = json.loads(kafka_msg.value)
        if kafka_msg.topic == 'mnist_train' and 'training_completed' in msg_value and msg_value['training_completed']:
            latest_version = msg_value['model_version']
            model = reload_model(MODLE_NAME,latest_version)
            print(f'New model reloaded: version {latest_version}')
        elif kafka_msg.topic == 'mnist_app' and 'prediction' not in msg_value:
            request_id = msg_value['request_id']
            pred = predict(model,msg_value['data'])
            tmp_app = {'request_id': request_id, 'prediction': np.argmax(pred)}
            publish_messages(topic='mnist_app',messages=tmp_app)
            append_data(msg_value,TRAIN_DATA,APPEND_DATA,batch_id)
            message_count += 1
            if message_count % RETRAIN_EVERY == 0:
                tmp_train = {'retrain': True, 'batch_id': batch_id}
                publish_messages(topic='mnist_train', messages=tmp_train)
                batch_id += 1



In [None]:
import json
import pandas as pd
import numpy as np

import mlflow
from pathlib import Path
from tensorflow import keras
from tensorflow.keras import layers
from utils.kafka_producer import publish_messages, append_data
from kafka import KafkaConsumer, KafkaProducer

tmp_train = {'retrain': True, 'batch_id': 1}
publish_messages(topic='mnist_train', messages=tmp_train)