# Kafka client
This notebook gives an example of both kafka-python and confluent-kafka. The setup is to simply read a message form one topic (sensordata) and write to another (predictions).

In the final solution this will not be a notebook, and there will of course be some model involved, but this is our MVP.

Note that the required certificate files are on the NAS.

## kafka-python

The open source version. Might lose messages? https://blog.datasyndrome.com/a-tale-of-two-kafka-clients-c613efab49df

We first setup the consumer and producer, start reading and publishing messages, forever.

In [None]:
from time import sleep
from kafka import KafkaConsumer, KafkaProducer

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
    except Exception as ex:
        print(str(ex))

producer = KafkaProducer(bootstrap_servers=['sam01.ynformed.nl:9092'], 
                         api_version=(0, 10),
                         ssl_certfile = "kafka.client.pem",
                         ssl_keyfile = "kafka.client.key",
                         security_protocol='SSL',
                         ssl_check_hostname=True,
                         ssl_cafile = "ca-cert")

consumer = KafkaConsumer('sensordata', 
                         auto_offset_reset='earliest',
                         bootstrap_servers=['sam01.ynformed.nl:9092'], 
                         api_version=(0, 10),
                         consumer_timeout_ms=1000,
                         security_protocol='SSL',
                         ssl_check_hostname=True,
                         ssl_certfile = "kafka.client.pem",
                         ssl_keyfile = "kafka.client.key",
                         ssl_cafile = "ca-cert")

Next we check the available topcis, just for show, and run the loop, this is all. Easy right?

In [None]:
consumer.topics()

In [None]:
print('Started consume-produce loop...')
while True:
    for msg in consumer:
        result = msg.value
#         Uncomment to see the message
#         print(result)
        publish_message(producer, 'predictions', 'prediction', 
                        "{'prediction': 0.34, 'input': " + str(result) + "}")
    sleep(5)
    
consumer.close()
print("Ended, how did this happen?")

# Confluent kafka

Also included, because I don't know yet which is best.

In [None]:
from confluent_kafka import Consumer, Producer

kafka_consumer = Consumer(
    {
        "api.version.request": True,
        "enable.auto.commit": True,
        "group.id": 'jupyter-consumer',
        "bootstrap.servers": 'sam01.ynformed.nl:9092',
        "security.protocol": "ssl",
        "ssl.ca.location": '/home/jovyan/ca-cert',
        "ssl.certificate.location": '/home/jovyan/kafka.client.pem',
        "ssl.key.location": '/home/jovyan/kafka.client.key',
        "default.topic.config": {"auto.offset.reset": "smallest"}
    }
)

kafka_producer = Producer(
    {
        "api.version.request": True,
        "enable.auto.commit": True,
        "group.id": '1',
        "bootstrap.servers": 'sam01.ynformed.nl:9092',
        "security.protocol": "ssl",
        "ssl.ca.location": '/home/jovyan/ca-cert',
        "ssl.certificate.location": '/home/jovyan/kafka.client.pem',
        "ssl.key.location": '/home/jovyan/kafka.client.key',
        "default.topic.config": {"auto.offset.reset": "smallest"}
    }
)

kafka_consumer.subscribe(['sensordata'])

In [None]:
kafka_producer.list_topics()

In [None]:
while True:
    msg = kafka_consumer.poll()
    result = msg.value()
#     print(result)
    kafka_producer.produce('predictions', "{'prediction': 0.34, 'input': " + str(result) + "}")