In [2]:
#!pip install kafka-python


Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
     -------------------------------------- 246.5/246.5 kB 1.3 MB/s eta 0:00:00
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2


In [1]:
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda x: json.dumps(x).encode('utf-8')  
)

transaction_message = {
    "lat": 37.7749,
    "long": -122.4194,
    "merch_lat": 37.8044,
    "merch_long": -122.2711,
    "amt": 10000.50,
    "category": "grocery_net",
    "gender": "M",
    "age": 34,
    "transaction_id": "txn125548"
}

try:
    producer.send('my-topic-2', value=transaction_message)
    producer.flush()  
    print("Message sent successfully!")
except Exception as e:
    print(f"Error sending message: {e}")
finally:
    producer.close()


Message sent successfully!


In [2]:
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'my-topic-2',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest', 
    value_deserializer=lambda x: json.loads(x.decode('utf-8')) if x and x.strip() else None  
)

for message in consumer:
    try:
        if message.value:
            print(f"Received message: {message.value}")
            break 
        else:
            print("Empty message received, skipping...")
            break  
    except json.JSONDecodeError:
        print("Error: Failed to decode message as JSON.")
        break
    except Exception as e:
        print(f"Error processing message: {e}")
        break

Received message: {'lat': 37.7749, 'long': -122.4194, 'merch_lat': 37.8044, 'merch_long': -122.2711, 'amt': 10000.5, 'category': 'grocery_net', 'gender': 'M', 'age': 34, 'transaction_id': 'txn125548'}


In [3]:
#pip install mysql-connector-python
#!pip3 install ipython-sql
#!pip3 install mysqlclient
#!pip install pymysql

In [4]:
import pymysql

conn = pymysql.connect(
    host='localhost',
    port=3307,
    user='kafka_user',
    passwd='kafka_password',
    db='kafka_db'
)

cursor = conn.cursor()

drop_table_query = "DROP TABLE IF EXISTS transactions;"
cursor.execute(drop_table_query)
conn.commit()
print("Table 'transactions' dropped successfully.")

create_table_query = """
CREATE TABLE IF NOT EXISTS transactions (
    id INT AUTO_INCREMENT PRIMARY KEY,
    fraud INT DEFAULT 0
);
"""

cursor.execute(create_table_query)
conn.commit()
print("Table 'transactions' created successfully.")

cursor.close()
conn.close()


Table 'transactions' dropped successfully.
Table 'transactions' created successfully.


In [5]:
import pandas as pd
import numpy as np
import pickle
from geopy.distance import geodesic
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.base import BaseEstimator, TransformerMixin
from geopy.distance import geodesic
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from imblearn.over_sampling import SMOTE
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import cross_validate
from sklearn.metrics import make_scorer, recall_score, precision_score, accuracy_score, f1_score
import pickle

In [6]:
class DistanceTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
        latitudes = X['lat'].values
        longitudes = X['long'].values
        merchant_latitudes = X['merch_lat'].values
        merchant_longitudes = X['merch_long'].values
        
        distances = np.vectorize(lambda lat, lon, merch_lat, merch_lon: geodesic((lat, lon), (merch_lat, merch_lon)).km)(latitudes, longitudes, merchant_latitudes, merchant_longitudes)
        return distances.reshape(-1, 1)

In [7]:
preprocessor = ColumnTransformer(
    transformers=[
        ('category', OneHotEncoder(handle_unknown='ignore'), ['category']),
        ('gender', LabelEncoder(), ['gender']),
        ('num', StandardScaler(), ['amt', 'age', 'distance']),
        ('distance', DistanceTransformer(), ['lat', 'long', 'merch_lat', 'merch_long'])
    ])

In [8]:
with open(r"D:\Intern\ALX Internship\fraud_detection_pipeline.pkl", 'rb') as file:
    model = pickle.load(file)


In [10]:
consumer = KafkaConsumer(
    'my-topic-2',  
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',  
    value_deserializer=lambda x: json.loads(x.decode('utf-8')) if x else None  
)

conn = pymysql.connect(
    host='localhost',
    port=3307,
    user='kafka_user',
    passwd='kafka_password',
    db='kafka_db'
)
cursor = conn.cursor()

def insert_transaction(transaction_id, fraud):
    insert_query = "INSERT INTO transactions (transaction_id, fraud) VALUES (%s, %s)"
    cursor.execute(insert_query, (transaction_id, fraud))
    conn.commit()
    print(f"Transaction {transaction_id} inserted with fraud prediction: {fraud}")

for message in consumer:
    try:
        if message.value:
            print(f"Received message: {message.value}")

            message_data = message.value
            transaction_id = message_data.get('transaction_id')
            features = [
                message_data['lat'],
                message_data['long'],
                message_data['merch_lat'],
                message_data['merch_long'],
                message_data['amt'],
                message_data['category'],
                message_data['gender'],
                message_data['age']
            ]

            if hasattr(model, 'transform') and hasattr(model, 'transformers_'):
                fraud_prediction = model.predict(features_transformed)[0]
            else:
                fraud_prediction = model.predict([features])[0]

            insert_transaction(transaction_id, fraud_prediction)
            
            break  
        else:
            print("Empty message received, skipping...")
            break
    except json.JSONDecodeError:
        print("Error: Failed to decode message as JSON.")
        break  
    except Exception as e:
        print(f"Error processing message: {e}")
        break  

cursor.close()
conn.close()


Received message: {'lat': 37.7749, 'long': -122.4194, 'merch_lat': 37.8044, 'merch_long': -122.2711, 'amt': 10000.5, 'category': 'grocery_net', 'gender': 'M', 'age': 34, 'transaction_id': 'txn125548'}
Error processing message: This ColumnTransformer instance is not fitted yet. Call 'fit' with appropriate arguments before using this estimator.


In [12]:
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda x: json.dumps(x).encode('utf-8')  # تحويل البيانات إلى JSON
)

transaction_message = {
    "lat": 37.7749,
    "long": -122.4194,
    "merch_lat": 37.8044,
    "merch_long": -122.2711,
    "amt": 10000.50,
    "category": "grocery_net",
    "gender": "M",
    "age": 34,
    "transaction_id": "txn125548"
}

try:
    producer.send('my-topic-2', value=transaction_message)
    producer.flush()  
    print("Message sent successfully!")
except Exception as e:
    print(f"Error sending message: {e}")
finally:
    producer.close()


Message sent successfully!


In [11]:
conn = pymysql.connect(
    host='localhost',
    port=3307,
    user='kafka_user',
    passwd='kafka_password',
    db='kafka_db'
)
cursor = conn.cursor()

try:
    select_query = "SELECT * FROM transactions;"
    cursor.execute(select_query)
    rows = cursor.fetchall()
    print(f"Number of rows: {len(rows)}")
    for row in rows:
        print(row)
except Exception as e:
    print(f"Error reading data: {e}")

cursor.close()
conn.close()

Number of rows: 0
