# Data Deduplication Microservice

## Imports

In [1]:
# reload .py files on change:
# https://ipython.readthedocs.io/en/stable/config/extensions/autoreload.html?highlight=autoreload
%load_ext autoreload
%autoreload 2

import json
import time
import uuid
from datetime import datetime
import pandas as pd

# import custom classes
from Kafka import KafkaConfluentWriter, KafkaConfluentReader
from OpenWeatherMap import OpenWeatherMap

## Deduplication Logic
The service needs to monitor new messages from data collection with buffering and sort out duplicates for forwarding to new cleaned topic.

In [2]:
kafkaConfluentWriter = KafkaConfluentWriter('weather.cleaned') # producer for cleaned topic
kafkaConluentReaderForecast = KafkaConfluentReader('weather.forecast', True) # consumer for buffered topic
kafkaConfluentReaderCleaned = KafkaConfluentReader('weather.cleaned', False) # consumer for cleaned topic

# TASK 4: DEDUPLICATION in new topic
# for each new message in the forecast topic check if it already exists in cleaned topic
# and forward message to cleaned topic if it's in fact a new message with updated forecasts
def clean_forecast_data() -> None:    
    try:
        print("Looking for new unique data ...")

        while True: # constantly consume new messages from buffered topic
            msg = kafkaConluentReaderForecast.poll(1.0) # get message at offset from buffered topic
            
            if msg is not None: # if a new message is present check its content
                lastCleaned = kafkaConfluentReaderCleaned.get_latest_message(1.0) # get latest message from cleaned topic
                if lastCleaned is None: # cleaned topic is empty - add the new message
                    kafkaConfluentWriter.produce(msg.key(), msg.value())
                    print("Stored first data!")
                elif msg.value() != lastCleaned.value(): # new message is different than latest message - add the new message
                    kafkaConfluentWriter.produce(msg.key(), msg.value())
                    print("Stored new data!")
                else: # new message is not different - do nothing
                    print("Nothing to do!")

    except KeyboardInterrupt: # stop service with KeyboardInterrupt
        print("... search for new values stopped!")

%3|1671616013.310|FAIL|rdkafka#producer-1| [thrd:sandbox-hdp.hortonworks.com:6667/bootstrap]: sandbox-hdp.hortonworks.com:6667/bootstrap: Connect to ipv4#172.18.0.2:6667 failed: Connection refused (after 0ms in state CONNECT)
%3|1671616013.313|FAIL|cd495c8e3d2c4278bb9e3bf1a4bca6cc#consumer-2| [thrd:sandbox-hdp.hortonworks.com:6667/bootstrap]: sandbox-hdp.hortonworks.com:6667/bootstrap: Connect to ipv4#172.18.0.2:6667 failed: Connection refused (after 0ms in state CONNECT)
%3|1671616013.315|FAIL|402168dbff8d45699e0b2c50b85ccc19#consumer-3| [thrd:sandbox-hdp.hortonworks.com:6667/bootstrap]: sandbox-hdp.hortonworks.com:6667/bootstrap: Connect to ipv4#172.18.0.2:6667 failed: Connection refused (after 0ms in state CONNECT)
%3|1671616014.312|FAIL|cd495c8e3d2c4278bb9e3bf1a4bca6cc#consumer-2| [thrd:sandbox-hdp.hortonworks.com:6667/bootstrap]: sandbox-hdp.hortonworks.com:6667/bootstrap: Connect to ipv4#172.18.0.2:6667 failed: Connection refused (after 0ms in state CONNECT, 1 identical error(s) 

## Run the Service

In [3]:
# forward messages with actual updated forecasts to new topic until service is stopped (KeyboardInterrupt)
clean_forecast_data()

Looking for new unique data ...


%3|1671616044.335|FAIL|cd495c8e3d2c4278bb9e3bf1a4bca6cc#consumer-2| [thrd:sandbox-hdp.hortonworks.com:6667/bootstrap]: sandbox-hdp.hortonworks.com:6667/bootstrap: Connect to ipv4#172.18.0.2:6667 failed: Connection refused (after 0ms in state CONNECT, 30 identical error(s) suppressed)
%3|1671616044.335|FAIL|402168dbff8d45699e0b2c50b85ccc19#consumer-3| [thrd:sandbox-hdp.hortonworks.com:6667/bootstrap]: sandbox-hdp.hortonworks.com:6667/bootstrap: Connect to ipv4#172.18.0.2:6667 failed: Connection refused (after 0ms in state CONNECT, 30 identical error(s) suppressed)
%3|1671616044.340|FAIL|rdkafka#producer-1| [thrd:sandbox-hdp.hortonworks.com:6667/bootstrap]: sandbox-hdp.hortonworks.com:6667/bootstrap: Connect to ipv4#172.18.0.2:6667 failed: Connection refused (after 0ms in state CONNECT, 30 identical error(s) suppressed)
%3|1671616074.356|FAIL|402168dbff8d45699e0b2c50b85ccc19#consumer-3| [thrd:sandbox-hdp.hortonworks.com:6667/bootstrap]: sandbox-hdp.hortonworks.com:6667/bootstrap: Connect

ConsumeError: KafkaError{code=NOT_COORDINATOR,val=16,str="Failed to fetch committed offsets for 0 partition(s) in group "f15ae444f65f42a095d033e68a64ff3a": Broker: Not coordinator"}