# Imports 

In [1]:
import cv2
import os
import numpy as np
import json
from kafka import KafkaProducer, KafkaConsumer
from cassandra.cluster import Cluster
from PIL import Image
import tensorflow as tf
from tensorflow.keras.applications.vgg16 import preprocess_input
import schedule
import time
import base64
import io
import requests
from keras.applications.vgg16 import VGG16  # Assuming you want to use Keras VGG16
from cassandra.util import uuid_from_time

# configuration

In [2]:
# Kafka configuration
bootstrap_servers = 'localhost:9092'
input_topic = 'input_topic'
output_topic = 'output_topicc'

# Cassandra configuration
cassandra_host = 'localhost'
cassandra_keyspace = 'your_keyspace'
cassandra_table = 'your_table_name11'

# Connect to Kafka
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers)
consumer.subscribe([input_topic])

# Kafka producer
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)


# Load the saved model
save_model = tf.keras.models.load_model(r'C:\Users\AL-FAJR\Desktop\Last_GP\GP_Model\my_keras_model.h5')
class_names = ['fire_images', 'non_fire_images']
image_folder = r"C:\Users\AL-FAJR\Desktop\Last_GP\FIRE-SMOKE-DATASET\Test\Fire"

# Cassandra

In [3]:
# Connect to Cassandra
cluster = Cluster([cassandra_host])
session = cluster.connect()
session.execute(f"CREATE KEYSPACE IF NOT EXISTS {cassandra_keyspace} WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }}")
session.set_keyspace(cassandra_keyspace)
session.execute(f"CREATE TABLE IF NOT EXISTS {cassandra_table} (id UUID  ,userid TEXT, homeid TEXT, email TEXT, phonenumber TEXT, class_name TEXT , image_filename TEXT , created_at TIMESTAMP, PRIMARY KEY (userid,created_at,id) )")

<cassandra.cluster.ResultSet at 0x1a2d5b53ee0>

# Kafka

In [None]:
# Consume messages from Kafka
for message in consumer:
    try:
        # Check if the message is empty
        if not message.value:
            print("Ignoring empty message.")
            continue

        # Print the content of the message
        print("Received message:", message.value)

        # Parse message as JSON
        data_dict = json.loads(message.value.decode('utf-8'))

        # Extract fields from the JSON
        userid = data_dict['userid']
        homeid = data_dict['homeid']
        email = data_dict['email']
        phonenumber = data_dict['phonenumber']

        # Read image from folder
        image_path = os.path.join(image_folder, data_dict['image_filename'])
        image = Image.open(image_path)

        # Preprocess image for VGG16
        image = image.resize((224, 224))
        image = image.convert('RGB')
        image_array = preprocess_input(np.array(image))
        image_array = np.expand_dims(image_array, axis=0)

        # Classify the image
        prediction = save_model.predict(image_array)
        class_index = np.where(prediction >= 0.5, 1, 0)[0][0]
        class_name = class_names[class_index]
        probability = float(prediction)
       

        session.execute(f"INSERT INTO {cassandra_table} (id, userid, homeid, email, phonenumber, class_name, image_filename, created_at) VALUES (uuid(), '{userid}', '{homeid}', '{email}', '{phonenumber}','{class_name}', '{data_dict['image_filename']}', toTimestamp(now()))")

        # Publish data to another Kafka topic
        data_dict['probability'] = probability
        data_dict['class_name'] = class_name

        # Convert the dictionary to JSON
        json_data = json.dumps(data_dict)

        # Send the JSON data to the output topic
        producer.send(output_topic, value=json_data.encode('utf-8'))

        print("Processed message:", message.value)

    except Exception as e:
        print("Error processing message:", e)

# Close Kafka producer and consumer
producer.close()
consumer.close()


Received message: b'{"userid": "userid","homeid": "homeid","email":"email","phonenumber":"phonenumber","image_filename":"image_49.jpg"}'
Processed message: b'{"userid": "userid","homeid": "homeid","email":"email","phonenumber":"phonenumber","image_filename":"image_49.jpg"}'
