In [134]:
from kafka import KafkaConsumer
from time import sleep 
from json import dumps, loads
import json
from datetime import datetime

In [135]:
consumer = KafkaConsumer('demo_test',
                         bootstrap_servers=['okore-joel:9092'],
                         value_deserializer=lambda x:
                         loads(x.decode('utf-8')))

In [136]:
columns = ['Unnamed: 0', 'trans_date_trans_time', 'cc_num', 'merchant', 'category',
       'amt', 'first', 'last', 'gender', 'street', 'city', 'state', 'zip',
       'lat', 'long', 'city_pop', 'job', 'dob', 'trans_num', 'unix_time',
       'merch_lat', 'merch_long', 'merch_zipcode']

In [137]:
# Haversine formula to calculate distance between two lat/long points in miles
def haversine(lat1, lon1, lat2, lon2):
    # Radius of the Earth in miles
    R = 3958.8
    # Convert degrees to radians
    lat1 = np.radians(lat1)
    lon1 = np.radians(lon1)
    lat2 = np.radians(lat2)
    lon2 = np.radians(lon2)
    # Haversine formula
    dlat = lat2 - lat1
    dlon = lon2 - lon1
    a = np.sin(dlat / 2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2)**2
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
    return R * c

# Vectorized calculation of distances
def calculate_distance_vectorized(data):
    data['distance'] = haversine(data['lat'], data['long'], data['merch_lat'], data['merch_long'])
    return data

In [138]:
dataframe = pd.DataFrame()

In [139]:
import pickle
import pandas as pd
import numpy as np
from tabulate import tabulate

# Load CSN_map from the pickle file
with open('CSN_map.pkl', 'rb') as f:
    CSN_map = pickle.load(f)

model = CSN_map['model']
scaler = CSN_map['scaler']
input_cols = CSN_map['input_cols']
categorical_cols = CSN_map['categorical_cols']
numeric_cols = CSN_map['numeric_cols']

for message in consumer:
    data = pd.DataFrame([message.value])  # Convert received data to DataFrame
    received_timestamp = datetime.now()
    
    # Get the sent timestamp from the message
    sent_timestamp = datetime.strptime(data['sent_timestamp'].values[0], '%Y-%m-%d %H:%M:%S.%f')
    
    # Calculate the time difference between producer and consumer
    time_difference = received_timestamp - sent_timestamp

    # Feature Engineering - date
    data['trans_date_trans_time'] = pd.to_datetime(data['trans_date_trans_time'])
    data['transaction_hour'] = data['trans_date_trans_time'].dt.hour
    data['transaction_day_of_week'] = data['trans_date_trans_time'].dt.dayofweek
    data['transaction_day'] = data['trans_date_trans_time'].dt.day

    # Feature Engineering - others
    data['amt'] = data['amt'].fillna(70.35103545607033)
    data['merch_zipcode'] = data['merch_zipcode'].fillna(45860.0)
    data['category'] = data['category'].fillna('gas_transport')
    data['merchant'] = data['merchant'].fillna('fraud_Kilback LLC')
    data['job'] = data['job'].fillna('Film/video editor')
    data['state'] = data['state'].fillna('TX')

    # Label (Gender) Encoder
    data['gender'] = data['gender'].map({'M': 1, 'F': 0})

    # One-hot encoding for categorical columns
    data = pd.get_dummies(data, columns=categorical_cols, drop_first=True)

    # Feature Engineering - age
    data['dob'] = pd.to_datetime(data['dob'])
    data['age'] = (pd.to_datetime('today') - data['dob']).dt.days // 365

    # Feature Engineering - Haversine
    data = calculate_distance_vectorized(data)
    
    # Scaling
    data[numeric_cols] = scaler.transform(data[numeric_cols])

    # Splitting and Preping (Droping irrelevant columns)
    # exclude_columns = ['first', 'last', 'street', 'city', 'dob', 'trans_num', 'trans_date_trans_time', 'Unnamed: 0']
    # data = data.drop(exclude_columns, axis=1)
    
    dataframe = data

    first_name = data.loc[0, 'first']
    last_name = data.loc[0, 'last']
    trans_num = data.loc[0, 'trans_num']
    
     # Make predictions
    prediction = model.predict(data[input_cols])

    if prediction == 1:
        print(f"[{received_timestamp}] Prediction for {first_name} {last_name} || Transaction: {trans_num} || TRANSACTION DECLINED (Processed {time_difference.total_seconds()} secs after production)\n")
    else:
        print(f"[{received_timestamp}] Prediction for {first_name} {last_name} || Transaction: {trans_num} || TRANSACTION ACCEPTED (Processed {time_difference.total_seconds()} secs after production)\n")

[2024-10-14 15:41:14.894698] Prediction for Anthony Allen || Transaction: c48f2efccb689f2daef9ebdb03e1edec || TRANSACTION ACCEPTED (Processed 0.006183 secs after production)

[2024-10-14 15:41:15.895525] Prediction for Daniel Boyd || Transaction: 68f2b98cba5c4ae6b7ff3f701d0cdc69 || TRANSACTION ACCEPTED (Processed 0.005163 secs after production)

[2024-10-14 15:41:16.898521] Prediction for Micheal Walters || Transaction: b832170811fa7ce7ec76d9af27adecd2 || TRANSACTION ACCEPTED (Processed 0.00639 secs after production)

[2024-10-14 15:41:17.901334] Prediction for Jeffrey Munoz || Transaction: 7bbb699e093e3fb7d16c96093e95d66f || TRANSACTION ACCEPTED (Processed 0.00716 secs after production)

[2024-10-14 15:41:18.904074] Prediction for Joanna Hudson || Transaction: 5666921b2c9a994021da1bdbb9aea908 || TRANSACTION ACCEPTED (Processed 0.007688 secs after production)

[2024-10-14 15:41:19.905652] Prediction for Christine Harris || Transaction: 72eefdc7ab123dfc9a4adee5aeae9e11 || TRANSACTION AC

KeyboardInterrupt: 