In [None]:
import os
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType
import pandas as pd
import joblib
from influxdb_client import InfluxDBClient as InfluxDBClientV2, Point, WriteOptions
from influxdb_client.client.write_api import SYNCHRONOUS
print('start')
# ========== Spark & Kafka Setup ==========
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'
findspark.init()

spark = SparkSession.builder \
    .appName("AnomalyDetectionStream") \
    .getOrCreate()

# ========== Schema for Incoming Kafka Logs ==========
log_schema = StructType() \
    .add("timestamp", StringType()) \
    .add("source_ip", StringType()) \
    .add("destination_ip", StringType()) \
    .add("protocol", StringType()) \
    .add("port", StringType()) \
    .add("threat", StringType()) \
    .add("user_agent", StringType()) \
    .add("location", StringType()) \
    .add("bytes_sent", StringType()) \
    .add("bytes_received", StringType())

# ========== Load Model and Preprocessing Tools ==========
model = joblib.load("best_model.joblib")
scaler = joblib.load("scaler.joblib")
encoders = joblib.load("encoders.joblib")
target_encoder = joblib.load("target_encoder.joblib")

# Get the original feature order from the scaler
feature_order = scaler.feature_names_in_

# ========== Read Stream from Kafka ==========
df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "logs") \
    .option("startingOffsets", "latest") \
    .load()

df_value = df_raw.selectExpr("CAST(value AS STRING)")
df_parsed = df_value.select(from_json(col("value"), log_schema).alias("data")).select("data.*")

# ========== Batch Prediction and Write to InfluxDB ==========
def predict_and_store(batch_df, batch_id):
    pdf = batch_df.toPandas()
    if pdf.empty:
        return

    try:
        # Ensure we only keep columns we need and in correct order
        pdf = pdf[list(feature_order) + ["timestamp"]]
        pdf.dropna(subset=feature_order, inplace=True)
        
        if pdf.empty:
            return

        # Encode categorical columns
        # for col_name in ["source_ip", "destination_ip", "protocol", "user_agent", "location", "port"]:
        #     le = encoders[col_name]
        #     pdf[col_name] = pdf[col_name].map(lambda x: le.transform([x])[0] if x in le.classes_ else -1)

        # Encode categorical columns - FIXED VERSION
        for col_name in ["source_ip", "destination_ip", "protocol", "user_agent", "location", "port"]:
            le = encoders[col_name]
            # Create a set of known classes for faster lookup
            known_classes = set(le.classes_)
            # Use vectorized operations instead of map+lambda
            pdf[col_name] = pdf[col_name].apply(
                lambda x: le.transform([x])[0] if x in known_classes else -1
            )

        # Convert numeric fields
        pdf["bytes_sent"] = pdf["bytes_sent"].astype(float)
        pdf["bytes_received"] = pdf["bytes_received"].astype(float)

        # Prepare features in correct order
        X = pdf[feature_order]
        X_scaled = scaler.transform(X)
        preds = model.predict(X_scaled)
        pdf["predicted_threat"] = target_encoder.inverse_transform(preds)

        # ========== InfluxDB v2 Setup ==========
        token = "yfV-ntkcS4nMJYXIVf0HDsqqIlx0bnRQjDl2ECGRZTuBMndKX0s674uglHTRj9eDkDRrn5fv2lRYzbszPxQxuQ=="
        org = "myorg"
        bucket = "mybucket"
        influx_host = "http://influxdb:8086"

        influx_client = InfluxDBClientV2(
            url=influx_host,
            token=token,
            org=org
        )
        write_api = influx_client.write_api(write_options=SYNCHRONOUS)

        # Write each row as a point
        for _, row in pdf.iterrows():
            point = (
                Point("network_logs")
                .tag("source_ip", str(row["source_ip"]))
                .tag("destination_ip", str(row["destination_ip"]))
                .tag("protocol", str(row["protocol"]))
                .field("bytes_sent", float(row["bytes_sent"]))
                .field("bytes_received", float(row["bytes_received"]))
                .field("predicted_threat", str(row["predicted_threat"]))
                .time(row["timestamp"])
            )
            write_api.write(bucket=bucket, org=org, record=point)

        print(f"✅ Batch {batch_id} written to InfluxDB with {len(pdf)} entries")
        influx_client.close()

    except Exception as e:
        print(f"❌ Error in batch {batch_id}: {e}")

# ========== Start Streaming ==========
query = df_parsed.writeStream \
    .trigger(processingTime="30 seconds") \
    .foreachBatch(predict_and_store) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoints/anomaly_detection") \
    .start()

query.awaitTermination()

print('end')

In [2]:
## %pip install influxdb-client


Collecting influxdb-client
  Downloading influxdb_client-1.49.0-py3-none-any.whl.metadata (65 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.5/65.5 kB[0m [31m901.2 kB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hCollecting reactivex>=4.0.4 (from influxdb-client)
  Downloading reactivex-4.1.0-py3-none-any.whl.metadata (5.7 kB)
Downloading influxdb_client-1.49.0-py3-none-any.whl (746 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m746.3/746.3 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading reactivex-4.1.0-py3-none-any.whl (218 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m218.6/218.6 kB[0m [31m8.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: reactivex, influxdb-client
Successfully installed influxdb-client-1.49.0 reactivex-4.1.0
Note: you may need to restart the kernel to use updated packages.


In [8]:
%pip install findspark


Note: you may need to restart the kernel to use updated packages.
