In [37]:
from kafka import KafkaConsumer, KafkaProducer
import logging
import json
import time
from statistics import mean

In [9]:
logging.basicConfig(level=logging.INFO)

bootstrap_servers = 'localhost:9092'
valid_data_topic = 'valid_data'
clean_data_topic = 'clean_data'
monitoring_topic = 'monitoring'

sample_data = [{
                'ts': '2023-06-15T00:00:00Z',
                'station_id': 'station1',
                'sensor0': 3.4,
                'sensor1': 5.6,
                'sensor2': 6.0,
                'sensor3': 8.2
            }]

In [6]:
kafka_producer = KafkaProducer(bootstrap_servers = bootstrap_servers,
                               value_serializer=lambda x: json.dumps(x).encode('utf-8'))

for data in sample_data:
    kafka_producer.send(clean_data_topic, value=data)
    print(f'Sent: {data}')

kafka_producer.flush()
kafka_producer.close()

time.sleep(5)


kafka_consumer = KafkaConsumer(clean_data_topic,
                               bootstrap_servers = bootstrap_servers,
                               value_deserializer=lambda x: json.loads(x.decode('utf-8')),
                               auto_offset_reset='earliest',
                                enable_auto_commit=False)

print("Starting consumer...")
for message in kafka_consumer:
    received_data = message.value
    print(f"Received: {received_data}")

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. 
INFO:kafka.conn:<BrokerConnection node_id=0 host=localho

Sent: {'ts': '2023-06-16T00:00:00Z', 'station_id': 'station1', 'sensor0': 20.5, 'sensor1': 20.7, 'sensor2': 20.6, 'sensor3': 20.8}


INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ('clean_data',)
INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='clean_data', partition=0)]
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connecti

Starting consumer...
Received: {'ts': '2023-06-16T00:00:00Z', 'station_id': 'station1', 'sensor0': 20.5, 'sensor1': 20.7, 'sensor2': 20.6, 'sensor3': 20.8}


In [32]:
my_test = {'ts': '2023-06-16T00:00:00Z', 'station_id': 'station1', 'sensor0': 5.4, 'sensor1': 20.7, 'sensor2': 1.2, 'sensor3': 20.8}

In [41]:
sensors = ['sensor0','sensor1','sensor2','sensor3']

In [42]:
new_list = [my_test[k] for k in sensors]

In [43]:
new_list.sort()

In [58]:
new_list

[4.5, 4.6, 7.5, 7.6]

In [63]:
def clean_data(sensors):
    sensors = sorted(sensors)
    diffs = [abs(sensors[i] - sensors[i+1]) for i in range(len(sensors)-1)]

    if all(d < 2.0 for d in diffs):
        return sum(sensors) / len(sensors), None
    elif len([d for d in diffs if d >= 2.0]) == 1:
        unreliable_sensor = sensors[diffs.index(max(diffs))]
        reliable_sensors = [s for s in sensors if abs(s - unreliable_sensor) >= 2.0]
        return sum(reliable_sensors) / len(reliable_sensors), 'UNRELIABLE_SENSOR_READING'
    else:
        return None, 'UNRELIABLE_ROW'

In [110]:
def clean_data(sensors):
    readings = sorted(sensors)
    differences = [abs(readings[i] - readings[i + 1]) for i in range(len(readings) - 1)]
    outlier_list = [readings[i] for i in range(len(readings) - 1) if abs(readings[i] - readings[i + 1]) > 2]

    if all(diff < 2.0 for diff in differences):
        return mean(readings), "Reliable"
    elif (abs(readings[0] - readings[1]) > 2 and not abs(readings[3] - readings[2]) > 2) \
        or ((abs(readings[3] - readings[2]) > 2 and not abs(readings[0] - readings[1]) > 2)) :
        readings = [set(readings) - set(outlier_list)]
        return mean(readings.remove(outlier_list)), "UnreliableSensorReading"
    elif (abs(readings[0] - readings[1]) > 2 and abs(readings[3] - readings[2]) > 2) or (abs(readings[2] - readings[1])):
        return None, "UnreliableRow"

In [132]:
def clean_data(sensors):
    readings = sorted(sensors)
    differences = [abs(readings[i] - readings[i + 1]) for i in range(len(readings) - 1)]
    outlier_list = [readings[i] for i in range(len(readings) - 1) if abs(readings[i] - readings[i + 1]) > 2]

    if all(diff < 2.0 for diff in differences):
        return mean(readings), "Reliable"
    elif len(outlier_list) < 2 and len(outlier_list) > 0:
        readings = [set(readings) - set(outlier_list)]
        return mean(readings.remove(outlier_list)), "UnreliableSensorReading"
    elif (abs(readings[0] - readings[1]) > 2 and abs(readings[3] - readings[2]) > 2) or (abs(readings[2] - readings[1])):
        return None, "UnreliableRow"

In [133]:
new_list = [3.4, 5.6, 6.0, 8.2]

In [134]:
clean_data(new_list)

(None, 'UnreliableRow')

In [128]:
new_test = [1.2,4.0,4.5,5.2]

In [116]:
[new_list[i] for i in range(len(new_list) - 1) if abs(new_list[i] - new_list[i + 1]) > 2]


[3.4, 6.0]