In [3]:
from kafka import KafkaProducer
import pandas as pd
import json
# Kafka Configuration
KAFKA_BROKER = 'localhost:9092' 
TOPIC_NAME = 'transactions0'

# Initialize Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKER,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Load your dataset
file_path = 'streaming_data.csv'  
df = pd.read_csv(file_path)
df = df.sample(frac=1).reset_index(drop=True)

# Drop nulls and preprocess to match your feature requirements
df.dropna(inplace=True)
df['Same_Location'] = (df['Sender_bank_location'] == df['Receiver_bank_location']).astype(int)

# Select relevant columns (adjust as per your features)
columns = ['Amount', 'Payment_currency', 'Received_currency',
       'Sender_bank_location', 'Receiver_bank_location', 'Payment_type',
       'Same_Location', 'Hour', 'Sender_transaction_count',
       'Receiver_transaction_count', 'currency_match', 'Is_laundering']

# Send rows to Kafka one by one
print("Starting to stream transactions to Kafka...")
for index, row in df.iterrows():
    # Convert row to dictionary
    message = row[columns].to_dict()
    
    # Send message to Kafka
    producer.send(TOPIC_NAME, message)


print("All transactions have been sent to Kafka.")
producer.flush()
producer.close()


Starting to stream transactions to Kafka...
All transactions have been sent to Kafka.
