In [3]:
from kafka import KafkaProducer,KafkaAdminClient
from kafka.admin import NewTopic
import logging
import json

def create_kafka_topic(topic_name,num_partitions=1,replication_factor=1):
    admin_client = KafkaAdminClient(bootstrap_servers = 'localhost:9092',
                                    client_id = 'GalalEwida-client')
    topic = NewTopic(name=topic_name,
                     num_partitions=num_partitions,
                     replication_factor=replication_factor)
    try:
        admin_client.create_topics(new_topics=[topic],
                                   validate_only=False)
        print(f"Topic '{topic_name}' created successfully.")
    except Exception as e:
        print(f"Error creating topic: {e}")


    
def get_kafka_producer():
    try:
        return KafkaProducer(
            bootstrap_servers="localhost:9092",
            value_serializer=lambda value: json.dumps(value).encode('utf-8'),
            key_serializer=lambda key: str(key).encode('utf-8'),
            acks='all',
            retries=5,  # Increased for better resilience
            max_in_flight_requests_per_connection=5,
            request_timeout_ms=10000,
            linger_ms=5,
            batch_size=32000,
            partitioner=lambda key, all_partitions, available_partitions: (
                hash(key) % len(all_partitions) if key is not None else 0
            )
        )
    except Exception as e:
        logging.error(f"Error creating Kafka producer: {e}")
        raise

In [4]:
import pandas as pd
import time
def fraudulent_data_generator(file_path):
    df = pd.read_csv(file_path)
    for _,row in df.iterrows():
        yield row.to_dict()

def set_orders(producer,topic_name,data_generator):
    topic_name = topic_name

    for i,fraud_data in enumerate(data_generator):
            YN = fraud_data['is-Fraud']
            producer.send(topic_name,key = YN,value=fraud_data )
            print("Produced record:", i,", to topic:", topic_name,"in partition :",YN)
            time.sleep(2)
    producer.flush()
    producer.close()

In [5]:
# import sys
# from pathlib import Path
# sys.path.append(str(Path(__file__).resolve().parent.parent))
import pandas as pd
import logging
from config.Kafka_config import create_kafka_topic,get_kafka_producer
from src.data_generator import fraudulent_data_generator,set_orders


def main():
    #load_dataset
    topic_name = "New-Topic"
    file_path = r"dataset\fraud-dataset.csv"
    fraudulent_dataset_generator = fraudulent_data_generator(file_path=file_path)
    producer = get_kafka_producer()
    #create a new topic
    create_kafka_topic(topic_name=topic_name,num_partitions=3,replication_factor=1)
    set_orders(producer=producer,topic_name=topic_name,data_generator=fraudulent_dataset_generator)



if __name__ == "__main__":
    main()

Topic 'New-Topic' created successfully.
Produced record: 0 , to topic: New-Topic in partition : No
Produced record: 1 , to topic: New-Topic in partition : No
Produced record: 2 , to topic: New-Topic in partition : No
Produced record: 3 , to topic: New-Topic in partition : No


KeyboardInterrupt: 

In [6]:
from kafka import KafkaConsumer
import json

def create_kafka_consumer(topic_name, group_id=None):
    try:
        consumer = KafkaConsumer(
            topic_name,
            bootstrap_servers='localhost:9092',
            auto_offset_reset='earliest',  # Start reading from the beginning of the topic
            enable_auto_commit=True,
            group_id=group_id,  # Optional: add a consumer group ID
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),  # Deserialize JSON messages
            key_deserializer=lambda x: x.decode('utf-8') if x else None  # Deserialize keys
        )
        return consumer
    except Exception as e:
        print(f"Error creating Kafka consumer: {e}")
        return None

def consume_messages(topic_name, group_id=None):
    consumer = create_kafka_consumer(topic_name, group_id)
    
    if consumer is None:
        return
    
    try:
        print(f"Starting to consume messages from topic: {topic_name}")
        for message in consumer:
            try:
                # Print detailed message information
                print(f"Topic: {message.topic}")
                print(f"Partition: {message.partition}")
                print(f"Offset: {message.offset}")
                print(f"Key: {message.key}")
                print(f"Value: {message.value}")
                print("-" * 50)
            except Exception as e:
                print(f"Error processing message: {e}")
    
    except KeyboardInterrupt:
        print("Stopping consumer...")
    finally:
        consumer.close()

# Usage
if __name__ == "__main__":
    consume_messages("New-Topic")  # Use the exact topic name from producer

Starting to consume messages from topic: New-Topic
Topic: New-Topic
Partition: 2
Offset: 0
Key: No
Value: {'step': 283, 'type': 'CASH_IN', 'amount': 210329.84, 'nameOrig': 'C1159819632', 'is-Fraud': 'No'}
--------------------------------------------------
Topic: New-Topic
Partition: 2
Offset: 1
Key: No
Value: {'step': 132, 'type': 'CASH_OUT', 'amount': 215489.19, 'nameOrig': 'C1372369468', 'is-Fraud': 'No'}
--------------------------------------------------
Topic: New-Topic
Partition: 2
Offset: 2
Key: No
Value: {'step': 355, 'type': 'DEBIT', 'amount': 4431.05, 'nameOrig': 'C1059822709', 'is-Fraud': 'No'}
--------------------------------------------------
Topic: New-Topic
Partition: 2
Offset: 3
Key: No
Value: {'step': 135, 'type': 'CASH_OUT', 'amount': 214026.2, 'nameOrig': 'C1464960643', 'is-Fraud': 'No'}
--------------------------------------------------
Stopping consumer...


In [1]:
from kafka import KafkaAdminClient
from kafka.admin import NewTopic,config_resource,ConfigResourceType
from kafka.errors import TopicAlreadyExistsError, UnknownTopicOrPartitionError

In [2]:
admin_client = KafkaAdminClient(bootstrap_servers="localhost:9092")
admin_client.delete_topics(['New-Topic'])
admin_client.close()

In [7]:
from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic
import json
import time

In [8]:
def create_kafka_topic():
    admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092', client_id="admin-client")
    
    topic_name = "orders"
    num_partitions = 4
    replication_factor = 1  # Adjust based on your cluster
    
    topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
    
    try:
        admin_client.create_topics(new_topics=[topic], validate_only=False)
        print(f"Topic '{topic_name}' created successfully.")
    except Exception as e:
        print(f"Error creating topic: {e}")
    
create_kafka_topic()


Topic 'orders' created successfully.


In [9]:
def get_kafka_producer():
    return KafkaProducer(
        bootstrap_servers='localhost:9092',
        key_serializer=str.encode,  # Ensure the key is encoded as bytes
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        partitioner=lambda key, all_partitions, available_partitions: hash(key) % len(all_partitions),
    )

In [10]:
def send_orders(producer):
    regions = ["North", "South", "East", "West"]
    topic_name = "orders"

    for i in range(100):
        region = regions[i % len(regions)]  # Rotate through regions
        order = {
            "order_id": i + 1,
            "item": f"Item-{i+1}",
            "region": region,
            "price": round(100 + i * 1.5, 2)
        }
        
        print(f"Sending order {order} to region {region}.")
        producer.send(topic_name, key=region, value=order)

    # Ensure all messages are sent before closing
    producer.flush()

producer = get_kafka_producer()
send_orders(producer)


Sending order {'order_id': 1, 'item': 'Item-1', 'region': 'North', 'price': 100.0} to region North.
Sending order {'order_id': 2, 'item': 'Item-2', 'region': 'South', 'price': 101.5} to region South.
Sending order {'order_id': 3, 'item': 'Item-3', 'region': 'East', 'price': 103.0} to region East.
Sending order {'order_id': 4, 'item': 'Item-4', 'region': 'West', 'price': 104.5} to region West.
Sending order {'order_id': 5, 'item': 'Item-5', 'region': 'North', 'price': 106.0} to region North.
Sending order {'order_id': 6, 'item': 'Item-6', 'region': 'South', 'price': 107.5} to region South.
Sending order {'order_id': 7, 'item': 'Item-7', 'region': 'East', 'price': 109.0} to region East.
Sending order {'order_id': 8, 'item': 'Item-8', 'region': 'West', 'price': 110.5} to region West.
Sending order {'order_id': 9, 'item': 'Item-9', 'region': 'North', 'price': 112.0} to region North.
Sending order {'order_id': 10, 'item': 'Item-10', 'region': 'South', 'price': 113.5} to region South.
Sendin