In [30]:
#https://ipython.readthedocs.io/en/stable/config/extensions/autoreload.html?highlight=autoreload
%load_ext autoreload
%autoreload 2

import json
import time
import uuid
from datetime import datetime
import pandas as pd
from Kafka import KafkaWriter, KafkaReader
from OpenWeatherMap import OpenWeatherMap
from confluent_kafka import Consumer, DeserializingConsumer, TopicPartition

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [38]:
def keydecoder(k, ctx):
    return k.decode('utf-8')
def valuedecoder(v, ctx):
    return json.loads(v.decode('utf-8'))

def clean_forecast_data() -> None:
    kafkaForecast = DeserializingConsumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'buffering',
        'client.id': 'buffering',
        'auto.offset.reset': 'earliest',
        'value.deserializer': valuedecoder,
        'key.deserializer': keydecoder
    })
    kafkaForecast.assign([TopicPartition('weather.forecast', 0)])

    kafkaCleaned = DeserializingConsumer({
        'bootstrap.servers': 'localhost:9092',
        'group.id': 'cleaning',
        'client.id': 'cleaning',
        'auto.offset.reset': 'latest',
        'enable.auto.commit': 'False',
        'value.deserializer': valuedecoder,
        'key.deserializer': keydecoder
    })
    kafkaCleaned.assign([TopicPartition('weather.cleaned', 0)])

    kafka = KafkaWriter('weather.cleaned')
    
    try:
        print("Looking for new unique data ...")

        while True:
            msg = kafkaForecast.poll(1.0)
            
            if msg is not None:
                offsets = kafkaCleaned.get_watermark_offsets(TopicPartition('weather.cleaned', 0))
                kafkaCleaned.seek(TopicPartition('weather.cleaned', 0, offsets[1]-1))
                lastCleaned = kafkaCleaned.poll(1.0)
                if lastCleaned is None:
                    kafka.store(msg.key(), msg.value())
                    print("Stored first data!")
                elif msg.value() != lastCleaned.value():
                    kafka.store(msg.key(), msg.value())
                    print("Stored new data!")
                else:
                    print("Nothing to do!")

    except KeyboardInterrupt:
        print("... search for new values stopped!")
    finally:
        kafkaForecast.close()
        kafkaCleaned.close()
        kafka.producer.close()

In [39]:
clean_forecast_data()

Looking for new unique data ...
(0, 1)
last cleaned:  <cimpl.Message object at 0x7fef255fb1c0>
Nothing to do!
(0, 1)
last cleaned:  <cimpl.Message object at 0x7fef255fb340>
Nothing to do!
(0, 1)
last cleaned:  <cimpl.Message object at 0x7fef255fb1c0>
Nothing to do!
(0, 1)
last cleaned:  <cimpl.Message object at 0x7fef255fb340>
Stored new data!
(0, 2)
last cleaned:  <cimpl.Message object at 0x7fef255fb540>
Stored new data!
(0, 3)
last cleaned:  <cimpl.Message object at 0x7fef255fb7c0>
Nothing to do!
... search for new values stopped!
