In [1]:
from kafka import KafkaProducer
import time
import json
import random

# Create a Kafka producer
producer = KafkaProducer(bootstrap_servers='localhost:9092', 
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# Simulate sending temperature data
for _ in range(10):
    temperature_data = {'sensor_id': 'sensor_1', 'temperature_celsius': random.uniform(20.0, 30.0)}
    producer.send('temperature_readings', temperature_data)
    print(f"Sent: {temperature_data}")
    time.sleep(1)  # Simulate data coming in every second

producer.flush()
producer.close()


Sent: {'sensor_id': 'sensor_1', 'temperature_celsius': 20.04767003828712}
Sent: {'sensor_id': 'sensor_1', 'temperature_celsius': 28.851360954902056}
Sent: {'sensor_id': 'sensor_1', 'temperature_celsius': 26.45123829366633}
Sent: {'sensor_id': 'sensor_1', 'temperature_celsius': 20.633674825965578}
Sent: {'sensor_id': 'sensor_1', 'temperature_celsius': 28.863455746038184}
Sent: {'sensor_id': 'sensor_1', 'temperature_celsius': 20.83606002236788}
Sent: {'sensor_id': 'sensor_1', 'temperature_celsius': 22.19183824074376}
Sent: {'sensor_id': 'sensor_1', 'temperature_celsius': 20.695726175634086}
Sent: {'sensor_id': 'sensor_1', 'temperature_celsius': 24.21892812173479}
Sent: {'sensor_id': 'sensor_1', 'temperature_celsius': 20.234915494661184}


In [2]:
from kafka import KafkaConsumer
import json

# Create a Kafka consumer
consumer = KafkaConsumer('temperature_readings',
                         bootstrap_servers='localhost:9092',
                         auto_offset_reset='earliest',
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))

# Process messages
for message in consumer:
    data = message.value
    temperature_fahrenheit = data['temperature_celsius'] * 9/5 + 32
    print(f"Received sensor data: {data}")
    print(f"Converted temperature: {temperature_fahrenheit:.2f}°F")


Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 27.04656711744542}
Converted temperature: 80.68°F
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 29.331151282229342}
Converted temperature: 84.80°F
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 23.93018564285076}
Converted temperature: 75.07°F
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 23.467464234042083}
Converted temperature: 74.24°F
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 26.758098805203556}
Converted temperature: 80.16°F
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 24.63807057807208}
Converted temperature: 76.35°F
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 21.343074108112386}
Converted temperature: 70.42°F
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 25.85138322289007}
Converted temperature: 78.53°F
Received sensor data: {'sens

KeyboardInterrupt: 

In [10]:
from kafka.admin import KafkaAdminClient, NewTopic

# Create an instance of KafkaAdminClient
admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092",
    client_id='delete-topic-example'
)

# Define the topic to delete
topic_name = 'temperature_readings'

# Delete the topic
try:
    admin_client.delete_topics([topic_name])
    print(f"Topic '{topic_name}' deleted successfully.")
except Exception as e:
    print(f"Failed to delete topic '{topic_name}': {e}")

# Close the admin client
admin_client.close()


Topic 'temperature_readings' deleted successfully.


In [11]:
import threading
from kafka import KafkaProducer, KafkaConsumer
import json
import time
import random

def produce():
    producer = KafkaProducer(bootstrap_servers='localhost:9092', 
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    for _ in range(3):
        temperature_data = {'sensor_id': 'sensor_1', 'temperature_celsius': random.uniform(20.0, 30.0)}
        producer.send('temperature_readings', temperature_data)
        print(f"Sent: {temperature_data}")
        time.sleep(5)
    producer.flush()
    producer.close()

def consume():
    consumer = KafkaConsumer('temperature_readings',
                             bootstrap_servers='localhost:9092',
                             auto_offset_reset='earliest',
                             value_deserializer=lambda x: json.loads(x.decode('utf-8')))
    for message in consumer:
        data = message.value
        temperature_fahrenheit = data['temperature_celsius'] * 9/5 + 32
        print(f"Received sensor data: {data}")
        print(f"Converted temperature: {temperature_fahrenheit:.2f}°F")

# Run producer and consumer in separate threads
producer_thread = threading.Thread(target=produce)
consumer_thread = threading.Thread(target=consume)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()


Sent: {'sensor_id': 'sensor_1', 'temperature_celsius': 25.62798379907189}
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 25.62798379907189}
Converted temperature: 78.13°F
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 25.62798379907189}
Converted temperature: 78.13°F
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 25.62798379907189}
Converted temperature: 78.13°F
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 25.62798379907189}
Converted temperature: 78.13°F
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 25.62798379907189}
Converted temperature: 78.13°F
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 25.62798379907189}
Converted temperature: 78.13°F
Sent: {'sensor_id': 'sensor_1', 'temperature_celsius': 21.69774781591713}
Received sensor data: {'sensor_id': 'sensor_1', 'temperature_celsius': 21.69774781591713}
Converted temperature: 71.06°F
Recei

KeyboardInterrupt: 