# **Kafka Data Stream Processing with Python**
## **Message Receiving**

## The Used Packages

In [11]:
from kafka import KafkaConsumer
import json
import time
import threading

## Receiving the Messages from the **topic** measurements periodically
#### The code snippet below was run while the random message creator script (kafka_random_measurements.ipynb) running. In this way, it received the messages from the topic measurements.

#### Note: The message receiving has been limited to a specific time duration (90 seconds), as done while creating the random measurements.

In [13]:
stop_flag = False

def consume_messages(duration):
    global stop_flag

    bootstrap_servers = 'localhost:9092'
    
    topic = 'measurements'
    
    consumer = KafkaConsumer(topic,
                             bootstrap_servers=bootstrap_servers,
                             value_deserializer=lambda v: json.loads(v.decode('utf-8')))

    def stop_consumer():
        global stop_flag
        time.sleep(duration)
        stop_flag = True
        print("Stopping consumer after the specified duration.")

    stop_thread = threading.Thread(target=stop_consumer)
    stop_thread.start()

    try:
        for message in consumer:
            if stop_flag:
                break
            measurement_data = message.value
            print(f"Received: {measurement_data}")
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()
        print("Consumer stopped.")

if __name__ == "__main__":
    # the receiving is limited to a specific time duration (90 seconds).
    duration = 90
    consume_messages(duration)

Received: {'measurement_type': 'temperature', 'value': 60.96}
Received: {'measurement_type': 'temperature', 'value': 36.95}
Received: {'measurement_type': 'wind_speed', 'value': 57.84}
Received: {'measurement_type': 'moisture', 'value': 28.21}
Received: {'measurement_type': 'wind_speed', 'value': 99.95}
Received: {'measurement_type': 'moisture', 'value': 55.56}
Received: {'measurement_type': 'wind_speed', 'value': 4.85}
Received: {'measurement_type': 'wind_speed', 'value': 2.71}
Received: {'measurement_type': 'moisture', 'value': 47.39}
Received: {'measurement_type': 'temperature', 'value': 75.43}
Received: {'measurement_type': 'temperature', 'value': 64.34}
Received: {'measurement_type': 'temperature', 'value': 5.4}
Received: {'measurement_type': 'temperature', 'value': 36.37}
Received: {'measurement_type': 'wind_speed', 'value': 50.78}
Received: {'measurement_type': 'moisture', 'value': 16.74}
Received: {'measurement_type': 'temperature', 'value': 12.4}
Received: {'measurement_type':

## Sent & Received Messages Display

#### As it is displayed below, all the sent messages are received. However, the first sent message 'Sent: {'measurement_type': 'temperature', 'value': 56.16}' is not displayed because of a short delay in mili seconds.

Sent: {'measurement_type': 'temperature', 'value': 56.16}

Sent: {'measurement_type': 'temperature', 'value': 60.96}

Sent: {'measurement_type': 'temperature', 'value': 36.95}

Sent: {'measurement_type': 'wind_speed', 'value': 57.84}

Sent: {'measurement_type': 'moisture', 'value': 28.21}

Sent: {'measurement_type': 'wind_speed', 'value': 99.95}

Sent: {'measurement_type': 'moisture', 'value': 55.56}

Sent: {'measurement_type': 'wind_speed', 'value': 4.85}

Sent: {'measurement_type': 'wind_speed', 'value': 2.71}

Sent: {'measurement_type': 'moisture', 'value': 47.39}

Sent: {'measurement_type': 'temperature', 'value': 75.43}

Sent: {'measurement_type': 'temperature', 'value': 64.34}

Sent: {'measurement_type': 'temperature', 'value': 5.4}

Sent: {'measurement_type': 'temperature', 'value': 36.37}

Sent: {'measurement_type': 'wind_speed', 'value': 50.78}

Sent: {'measurement_type': 'moisture', 'value': 16.74}

Sent: {'measurement_type': 'temperature', 'value': 12.4}

Sent: {'measurement_type': 'wind_speed', 'value': 87.5}

-------------------------------------------------------------------------------------------------------------

Received: {'measurement_type': 'temperature', 'value': 60.96}

Received: {'measurement_type': 'temperature', 'value': 36.95}

Received: {'measurement_type': 'wind_speed', 'value': 57.84}

Received: {'measurement_type': 'moisture', 'value': 28.21}

Received: {'measurement_type': 'wind_speed', 'value': 99.95}

Received: {'measurement_type': 'moisture', 'value': 55.56}

Received: {'measurement_type': 'wind_speed', 'value': 4.85}

Received: {'measurement_type': 'wind_speed', 'value': 2.71}

Received: {'measurement_type': 'moisture', 'value': 47.39}

Received: {'measurement_type': 'temperature', 'value': 75.43}

Received: {'measurement_type': 'temperature', 'value': 64.34}

Received: {'measurement_type': 'temperature', 'value': 5.4}

Received: {'measurement_type': 'temperature', 'value': 36.37}

Received: {'measurement_type': 'wind_speed', 'value': 50.78}

Received: {'measurement_type': 'moisture', 'value': 16.74}

Received: {'measurement_type': 'temperature', 'value': 12.4}

Received: {'measurement_type': 'wind_speed', 'value': 87.5}