In [16]:
from confluent_kafka import Producer,Consumer
import csv
import time
import pandas as pd
from sklearn.ensemble import IsolationForest
import pickle

In [17]:
def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

In [18]:
def read_csv_file(file_path):
    with open(file_path, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            yield row

In [19]:
def preprocess_data(row):
    # Preprocess the data as per your requirements
    processed_row = [row['step'], row['type'], row['amount'], row['nameOrig'], row['oldbalanceOrg'], row['newbalanceOrig'],
       row['nameDest'], row['oldbalanceDest'], row['newbalanceDest'], row['isFlaggedFraud']]
    return processed_row

In [20]:
def produce_messages(bootstrap_servers, topic, file_path):
    p = Producer({'bootstrap.servers': bootstrap_servers})

    for row in read_csv_file(file_path):
        processed_row = preprocess_data(row)
        message = ','.join(processed_row).encode('utf-8')

        # Produce the message to the Kafka topic
        p.produce(topic, message, callback=delivery_report)

        # Wait for a little while before sending the next message
        time.sleep(0.000001)

    p.flush()

In [24]:
def has_more_data():
    # Check if there is more data streaming from the dataset
    # Implement the logic specific to your dataset and streaming setup

    # Example: Check if there are any remaining lines in the streaming dataset file
    # You may need to track the current position or maintain a state variable to determine if there is more data
    # Return True if there is more data, False otherwise
    # This example assumes that the dataset is a CSV file named 'data_streaming.csv'

    with open('data_streaming.csv', 'r') as file:
        # Check if there is any remaining line to read
        if file.readline():
            return True
        else:
            return False

In [26]:
def consume_messages(bootstrap_servers, topic):
    # Create a Kafka consumer
    consumer_conf = {
        'bootstrap.servers': bootstrap_servers,
        'group.id': 'fault_detection_consumer',
        'auto.offset.reset': 'earliest'
    }
    consumer = Consumer(consumer_conf)
    consumer.subscribe([topic])

    # Load the trained machine learning model
    with open('logistic_regression_model.pkl', 'rb') as file:
        model = pickle.load(file)

    should_terminate = False  # Flag variable to control termination

    while not should_terminate:
        # Poll for new messages from the Kafka topic
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            print(f'Consumer error: {msg.error()}')
            continue

        # Process the received message
        data = msg.value().decode('utf-8').split(',')
        if len(data) != model.coef_.shape[1]:
            print(f'Error: Number of features in the data ({len(data)}) does not match the model ({model.coef_.shape[1]}).')
            continue

        # Convert data to numeric values
        try:
            data = [float(value) for value in data]
        except ValueError:
            print('Error: Unable to convert data to numeric values.')
            continue

        # Reshape the data to match the expected input shape of the model
        data = [data]  # Wrap the data in a list

        prediction = model.predict(data)[0]

        # Perform actions based on the prediction (e.g., logging, alerting)
        if prediction == 1:
            print('Fault detected!')

        consumer.commit()

        # Check termination condition
        if not has_more_data():  # Replace with your condition to check if there is more data streaming
            should_terminate = True

    consumer.close()


In [27]:
def load_model():
    # Load and return your trained machine learning model
    # Example: return joblib.load('model.pkl')
    return IsolationForest()

In [28]:
if __name__ == '__main__':
    bootstrap_servers = '192.168.1.27:9092'  # Replace with your Kafka bootstrap servers
    topic = 'test'  # Replace with your Kafka topic
    file_path = 'D:\\Big_data\\New folder\\streaming_data.csv'  # Replace with the path to your dataset file

    produce_messages(bootstrap_servers, topic, file_path)
    consume_messages(bootstrap_servers, topic)

Message delivered to test [1]
Message delivered to test [1]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [0]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [2]
Message delivered to test [1]
Message delivered to test [1]
Message delivered to test [1]
Message de

KeyboardInterrupt: 