# Data Science - Big Data Technologies Part 2 SoSe2024📊🔍

## Apache Kafka 📝

This notebook represents my submission for the second exam part in Big Data Technologies for the summer semester of 2024.

### Authors 👥
- **Martin Brucker** (942815) 🧑‍💻

**Due**: 20.06.2024 at 23:59

**Contact Information**: martin.brucker@student.fh-kiel.de 📧

# Task 1 - INstall and configure Kafka 

run the docker-compose up with kafka and zookeeper


![DockerDesktop](dockerDekstopImg.png)

# Task 2 - Create measurements topic

Enter the Docker Container via the bash


![DockerEXEC](dockerExecContainer.png)

after that generate the measurements top

![GenerateTopic](kafkaTopicGenerated.png)



# Task 3 - Test the connection

In [7]:
# !pip install kafka-python
import json
import time
from kafka import KafkaProducer

In [8]:
measurement = {
    'timestamp': time.time(),
    'temperature': 20.0  # Temperature of 20 degrees Celsius
}

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

producer.send('measurements', value=measurement)
producer.flush()
producer.close()

# Task 4 - Periodically create random measurements

In [9]:
import json
import random
import time
from kafka import KafkaConsumer
import threading

In [14]:
def create_random_measurement():
    """
    Generates a random measurement including timestamp, temperature, and humidity.
    """
    measurement = {
        'timestamp': time.time(),
        'temperature': round(random.uniform(-20.0, 40.0), 2),  # Random temperature between -20 and 40 degrees Celsius
        'humidity': round(random.uniform(0.0, 100.0), 2)      # Random humidity between 0% and 100%
    }
    return measurement

def produce_measurements():
    """
    Produces random measurements and sends them to the Kafka topic 'measurements'.
    """
    producer = KafkaProducer(
        bootstrap_servers='localhost:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    
    try:
        while True:
            measurement = create_random_measurement()
            producer.send('measurements', value=measurement)
            print(f"Sent: {measurement}")
            producer.flush()
            time.sleep(5)  # Wait for 5 seconds before sending the next measurement
    except KeyboardInterrupt:
        print("Stopped by user")
    finally:
        producer.close()

# Run the producer in a separate thread
producer_thread = threading.Thread(target=produce_measurements)
producer_thread.start()

Sent: {'timestamp': 1717863366.536789, 'temperature': 12.4, 'humidity': 36.77}
Sent: {'timestamp': 1717863371.547196, 'temperature': 28.26, 'humidity': 16.27}
Sent: {'timestamp': 1717863376.556234, 'temperature': 22.8, 'humidity': 35.93}


# Task 5 - Receive Messages from the measurement topic

In [15]:
import json
from kafka import KafkaConsumer

def consume_measurements():
    consumer = KafkaConsumer(
        'measurements',
        bootstrap_servers='localhost:9092',
        auto_offset_reset='latest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )
    
    print("Consuming messages from 'measurements' topic...")
    for message in consumer:
        measurement = message.value
        print("Received measurement:")
        print(f"Timestamp: {measurement['timestamp']}")
        print(f"Temperature: {measurement['temperature']} °C")
        print(f"Humidity: {measurement['humidity']} %")
        print("-" * 35)

consume_thread = threading.Thread(target=consume_measurements)
consume_thread.start()


Consuming messages from 'measurements' topic...
Received measurement:
Timestamp: 1717863369.6000967
Temperature: 13.03 °C
Humidity: 60.88 %
-----------------------------------
Received measurement:
Timestamp: 1717863369.8229668
Temperature: 10.38 °C
Humidity: 51.06 %
-----------------------------------
Received measurement:
Timestamp: 1717863371.547196
Temperature: 28.26 °C
Humidity: 16.27 %
-----------------------------------
Received measurement:
Timestamp: 1717863371.7399979
Temperature: 27.46 °C
Humidity: 44.34 %
-----------------------------------
Received measurement:
Timestamp: 1717863374.6101527
Temperature: 34.44 °C
Humidity: 99.73 %
-----------------------------------
Received measurement:
Timestamp: 1717863374.830941
Temperature: 29.22 °C
Humidity: 50.49 %
-----------------------------------
Received measurement:
Timestamp: 1717863376.556234
Temperature: 22.8 °C
Humidity: 35.93 %
-----------------------------------
Received measurement:
Timestamp: 1717863376.7484767
Temperat