## Jupyter notebook for executing the real time classification

Preparations:

In [1]:
from data_processing.RealTimeClassification import *

config = Configuration()

# suppress debugging messages of tensorflow
# os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# load the scalers of the training data for the normalisation
scalers = load_scalers(config)



Change config if needed:

In [2]:
print('Configured Server:', config.get_connection())


Configured Server: localhost:9092


Create consumers for each topic:

In [3]:
consumers = []
limiting_consumer = None

print('Creating consumers ...\n')

# if using the fabric simulation start at the start of the topics
# for live classification start at newest messages possible
offset = 'earliest' if config.testing_using_fabric_sim else 'latest'

try:
    # create consumers for all topics
    for topic in config.topic_list:
        c = KafkaConsumer(topic, bootstrap_servers=config.get_connection(),
                          value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                          auto_offset_reset=offset)

        # based on the topic select one of the consumers for time interval determination
        if topic == config.limiting_topic:
            limiting_consumer = c

        consumers.append(c)
except errors.NoBrokersAvailable:
    print('Configured kafka server is not available. Please check the connection or change the configuration.')
    

Creating consumers ...

Configured kafka server is not available. Please check the connection or change the configuration.


SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


Create and start a classifier thread that handles the classification of processed examples:

In [None]:
print('\nCreating classifier ...')
print('\nUsed model file:')
print(config.directory_model_to_use, '\n')

print('The classifier will use k=' + str(config.k_of_knn) + ' for the k-NN algorithm')
print('The mean similarity output is calculated on the basis of the k most similar cases')
print('The time span is the time between the end timestamp of the')
print('interval and the current time right before the output.')
print('The total time is the time needed for the completely processing the example,')
print('including the time in the queue.\n')
classifier = Classifier(config)
classifier.start()

Start the classification process as soon as data is available:

In [None]:
print('Waiting for data to classify ...\n')
try:

    # classify as until interrupted
    while 1:
        start_time = time.perf_counter()
        # read data for a single example from kafka, results contains lists of single messages
        results = read_single_example(consumers, limiting_consumer, config)

        # combine into a single dataframe
        df = list_to_dataframe(results, config)

        # transform dataframe into a array that can be used as neural network input
        example = df.to_numpy()

        # normalize the data of the example
        example = normalise_dataframe(example, scalers)

        # create a queue element containing
        element = (example, df.index[0], df.index[-1], start_time)

        # add element to the queue of examples to classify
        classifier.examples_to_classify.put(element)

        # reset all consumer offsets by two messages to reduce the time intervals that are left out
        for i in range(len(consumers)):
            partition = TopicPartition(config.topic_list[i], 0)
            last_offset = consumers[i].position(partition)
            new_offset = last_offset - 2 if last_offset - 2 >= 0 else 0
            consumers[i].seek(partition, new_offset)

except KeyboardInterrupt:
    # interrupt the classifier thread
    print('Exiting ...\n')
    classifier.stop = True
