In [1]:
#!pip install kafka-python

In [2]:
#!pip install tqdm

In [3]:
import pandas as pd
from kafka import KafkaProducer
from datetime import datetime
from json import dumps
from tqdm import tqdm

In [4]:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))
if producer.bootstrap_connected():
    print(f"Successfully connected to bootstrap server")
else:
    print("Couldn't connect to bootstrap server.")

TOPIC_NAME = "ml-raw-dns"

Successfully connected to bootstrap server


In [5]:
def produce_message(producer_instance, topic, message):
    producer_instance.send(topic, message)
    producer_instance.flush()
    return

In [6]:
# Load
df = pd.read_csv("combined_test_df.csv")

# Drop auto‑generated index column if present
df.drop(columns=["Unnamed: 0"], inplace=True, errors="ignore")

features_to_drop = [
    'bwd psh flags', 'bwd urg flags', 'fwd avg bytes/bulk',
    'fwd avg packets/bulk', 'fwd avg bulk rate', 'bwd avg bytes/bulk',
    'bwd avg packets/bulk', 'bwd avg bulk rate', 'fwd header length.1'
]

df.drop(columns=features_to_drop, axis=1,inplace=True)

# Convert labels to binary
df["label"] = df["label"].apply(lambda x: 0 if x == "BENIGN" else 1)

# Specify the top features exactly
# best_features = [
#     "bwd packets/s",
#     "destination port",
#     "init_win_bytes_forward",
#     "flow packets/s",
#     "bwd packet length min",
#     "down/up ratio",
#     "psh flag count",
#     "act_data_pkt_fwd",
#     "total fwd packets",
#     "subflow fwd bytes",
# ]

# Shuffle and reset index
df = df.sample(frac=1, random_state=42).reset_index(drop=True)

# Save
df.to_csv("combined_test_df_phase2.csv", index=False)

print("Dataset updated: dropped 'Unnamed: 0', converted labels, reshuffled rows.")
print("New shape:", df.shape)
print(df.head())

Dataset updated: dropped 'Unnamed: 0', converted labels, reshuffled rows.
New shape: (53576, 70)
   destination port  flow duration  total fwd packets  total backward packets  \
0                53          44717                  2                       2   
1                80        8033638                  4                       0   
2             59641        2087651                  1                       6   
3               443       64147291                 28                      28   
4                50       50462129                  8                       1   

   total length of fwd packets  total length of bwd packets  \
0                           74                          154   
1                           24                            0   
2                            6                           36   
3                         1450                        24412   
4                         7300                            0   

   fwd packet length max  fwd packet 

In [7]:
with open("combined_test_df_phase2.csv") as f:
    start_time = datetime.now()
    for i, line in tqdm(enumerate(f)):
        produce_message(producer_instance=producer, topic=TOPIC_NAME, message=line)
    end_time = datetime.now()
    print(f"Batch took {end_time-start_time} time for ingesting data")

print("Ingestion Completed")

53577it [04:11, 213.23it/s]

Batch took 0:04:11.462434 time for ingesting data
Ingestion Completed



