# Steps

The work is divided into three stages:

- Data and Producer
- Data Processing
- Processed Data

The Jupyter notebook is documented alongside the development of the algorithm, continuing the presentation for the Internet of Things course.

![title](Images/cap.png)

# 1 - Data Collection with Kafka Producer
![title](Images/cap_1.png)

Importing the Necessary Libraries

In [None]:
import subprocess
from kafka import KafkaProducer
import json
import time

1.1 - Producing Data via Kafka

Before producing the data, previously executed topics are closed. The idea behind this code is to assume that the sensor logs already return historical data for a specific period, thus avoiding the duplication of information.

In [7]:
kafka_topics_cmd = '/home/alberthsoliveira/Downloads/kafka_2.13-3.7.0/bin/kafka-topics.sh'

def delete_kafka_topic(topic_name):
    try:
        subprocess.run([kafka_topics_cmd, '--bootstrap-server', 'localhost:9092', '--delete', '--topic', topic_name], check=True)
    except subprocess.CalledProcessError as e:
        pass

def create_kafka_topic(topic_name):
    try:
        subprocess.run([kafka_topics_cmd, '--bootstrap-server', 'localhost:9092', '--create', '--topic', topic_name, '--partitions', '1', '--replication-factor', '1'], check=True)
    except subprocess.CalledProcessError as e:
        pass

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',  
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  
)

topic_name = 'sensor_data'

delete_kafka_topic(topic_name)
time.sleep(5)  
create_kafka_topic(topic_name)

with open('data.json', 'r') as file:
    dados = json.load(file)

for data in dados:
    print(data)
    producer.send(topic_name, value=data)

producer.flush()

Error while executing topic command : Topic 'sensor_data' already exists.


[2024-06-29 14:22:05,328] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'sensor_data' already exists.
 (org.apache.kafka.tools.TopicCommand)


{'id_pumps': 1, 'timestamp': '2024-06-27 00:00:00', 'temperature': 19.53, 'fuel': 26.41, 'water': 3066.87}
{'id_pumps': 2, 'timestamp': '2024-06-27 00:00:00', 'temperature': 49.53, 'fuel': 84.24, 'water': 4871.02}
{'id_pumps': 3, 'timestamp': '2024-06-27 00:00:00', 'temperature': 48.1, 'fuel': 89.71, 'water': 2028.99}
{'id_pumps': 1, 'timestamp': '2024-06-27 00:30:00', 'temperature': 11.12, 'fuel': 99.3, 'water': 3980.58}
{'id_pumps': 2, 'timestamp': '2024-06-27 00:30:00', 'temperature': 25.55, 'fuel': 89.72, 'water': 2586.94}
{'id_pumps': 3, 'timestamp': '2024-06-27 00:30:00', 'temperature': 11.7, 'fuel': 89.11, 'water': 1449.12}
{'id_pumps': 1, 'timestamp': '2024-06-27 01:00:00', 'temperature': 17.57, 'fuel': 57.67, 'water': 1062.65}
{'id_pumps': 2, 'timestamp': '2024-06-27 01:00:00', 'temperature': 39.13, 'fuel': 78.9, 'water': 1348.2}
{'id_pumps': 3, 'timestamp': '2024-06-27 01:00:00', 'temperature': 0.67, 'fuel': 20.71, 'water': 1305.03}
{'id_pumps': 1, 'timestamp': '2024-06-27 01