In [181]:
import pandas as pd
from sklearn.preprocessing import LabelEncoder

from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic
import json
import time

In [182]:
test_df = pd.read_csv("../data/processed/fraudTest.csv")
test_df.head(5)

Unnamed: 0,trans_date_trans_time,cc_num,merchant,category,amt,first,last,gender,street,city,...,is_fraud,trans_year,trans_month,trans_day,trans_hour,trans_minute,trans_second,dob_year,dob_month,dob_day
0,2020-06-21 12:14:25,2291163933867244,fraud_Kirlin and Sons,personal_care,2.86,Jeff,Elliott,M,351 Darlene Green,Columbia,...,0,2020,6,21,12,14,25,1968,3,19
1,2020-06-21 12:14:33,3573030041201292,fraud_Sporer-Keebler,personal_care,29.84,Joanne,Williams,F,3638 Marsh Union,Altonah,...,0,2020,6,21,12,14,33,1990,1,17
2,2020-06-21 12:14:53,3598215285024754,"fraud_Swaniawski, Nitzsche and Welch",health_fitness,41.28,Ashley,Lopez,F,9333 Valentine Point,Bellmore,...,0,2020,6,21,12,14,53,1970,10,21
3,2020-06-21 12:15:15,3591919803438423,fraud_Haley Group,misc_pos,60.05,Brian,Williams,M,32941 Krystal Mill Apt. 552,Titusville,...,0,2020,6,21,12,15,15,1987,7,25
4,2020-06-21 12:15:17,3526826139003047,fraud_Johnston-Casper,travel,3.19,Nathan,Massey,M,5783 Evan Roads Apt. 465,Falmouth,...,0,2020,6,21,12,15,17,1955,7,6


In [183]:
# Encode categorical data

encoder = LabelEncoder()
test_df["merchant"] = encoder.fit_transform(test_df["merchant"])
test_df["category"] = encoder.fit_transform(test_df["category"])
test_df["gender"] = encoder.fit_transform(test_df["gender"])
test_df["job"] = encoder.fit_transform(test_df["job"])

In [184]:
# Drop unimportant features

test_df.drop(columns=['cc_num','first', 'last', 'street', 'city', 'state', 'zip', 'dob', 'trans_num','trans_date_trans_time','trans_year','trans_month','trans_day','trans_hour','trans_minute','trans_second','dob_year','dob_month','dob_day'],inplace=True)
test_df.head(5)

Unnamed: 0,merchant,category,amt,gender,lat,long,city_pop,job,unix_time,merch_lat,merch_long,is_fraud
0,319,10,2.86,1,33.9659,-80.9355,333497,275,1371816865,33.986391,-81.200714,0
1,591,10,29.84,0,40.3207,-110.436,302,392,1371816873,39.450498,-109.960431,0
2,611,5,41.28,0,40.6729,-73.5365,34496,259,1371816893,40.49581,-74.196111,0
3,222,9,60.05,1,28.5697,-80.8191,54767,407,1371816915,28.812398,-80.883061,0
4,292,13,3.19,1,44.2529,-85.017,1126,196,1371816917,44.959148,-85.884734,0


In [185]:
test_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 555719 entries, 0 to 555718
Data columns (total 12 columns):
 #   Column      Non-Null Count   Dtype  
---  ------      --------------   -----  
 0   merchant    555719 non-null  int64  
 1   category    555719 non-null  int64  
 2   amt         555719 non-null  float64
 3   gender      555719 non-null  int64  
 4   lat         555719 non-null  float64
 5   long        555719 non-null  float64
 6   city_pop    555719 non-null  int64  
 7   job         555719 non-null  int64  
 8   unix_time   555719 non-null  int64  
 9   merch_lat   555719 non-null  float64
 10  merch_long  555719 non-null  float64
 11  is_fraud    555719 non-null  int64  
dtypes: float64(5), int64(7)
memory usage: 50.9 MB


In [186]:
admin_client = AdminClient({'bootstrap.servers': 'localhost:9092'})

topic = NewTopic('CCT', num_partitions=3, replication_factor=1)
admin_client.create_topics([topic])

producer = Producer({'bootstrap.servers': 'localhost:9092'})

topic = 'CCT'

for index, row in test_df.iterrows():
    transaction = {
        "merchant": row['merchant'],
        "category": row['category'],
        "amt": row['amt'],
        "gender": row['gender'],
        "lat": row['lat'],
        "long": row['long'],
        "city_pop": row['city_pop'],
        "job": row['job'],
        "unix_time": row['unix_time'],
        "merch_lat": row['merch_lat'],
        "merch_long": row['merch_long'],
        "is_fraud": row['is_fraud'],
    }
    
    producer.produce(
    topic= topic,
    value= json.dumps(transaction).encode('utf-8')
    )
    producer.poll(0)
    time.sleep(0.01)

producer.flush()

KeyboardInterrupt: 