# Cybersecurity Analytics with Apache Spark Streaming

This notebook demonstrates real-time cybersecurity threat detection using Apache Spark Structured Streaming.

## Learning Objectives

- Process real-time security event streams with Spark
- Detect brute force attacks using time-based aggregations
- Identify DDoS attacks through traffic pattern analysis
- Handle late-arriving data with watermarking
- Store analytical results for further investigation

## Prerequisites

Before running this notebook, ensure:
1. Docker environment is running: `docker-compose up -d`
2. All services are healthy: `./lab-control.sh status`
3. Events are flowing: `python verify_events.py`

## 1. Environment Setup and Imports

First, let's set up our environment and import necessary libraries.

In [None]:
import os
import time
import logging
import socket
import uuid
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("üìö Libraries imported successfully")
print("üéØ Ready to start cybersecurity analytics!")

## 2. Spark Session Configuration

Configure Spark with the necessary packages for Kafka integration.

In [None]:
def detect_environment():
    """Detect if we're running in container or local environment"""
    try:
        socket.gethostbyname('kafka')
        return "container"
    except socket.gaierror:
        return "local"

# Detect environment
environment = detect_environment()
print(f"üîç Environment detected: {environment}")

# Set Kafka packages for Spark
os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell"

# Create unique checkpoint directory
checkpoint_dir = f"/tmp/spark-cybersec-{uuid.uuid4().hex[:8]}"

print(f"üìÅ Checkpoint directory: {checkpoint_dir}")

In [None]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("CybersecurityAnalytics") \
    .config("spark.sql.adaptive.enabled", "false") \
    .config("spark.sql.streaming.checkpointLocation", checkpoint_dir) \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .getOrCreate()

# Reduce log verbosity
spark.sparkContext.setLogLevel("WARN")

print("‚úÖ Spark session initialized successfully")
print(f"üåü Spark version: {spark.version}")
print(f"üîó Spark UI available at: http://localhost:4040")

## 3. Define Event Schema

Define the structure of our security events for proper parsing.

In [None]:
# Define the schema for security events
event_schema = StructType([
    StructField("timestamp", StringType(), True),
    StructField("event_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("source_ip", StringType(), True),
    StructField("destination_ip", StringType(), True),
    StructField("username", StringType(), True),
    StructField("result", StringType(), True),
    StructField("protocol", StringType(), True),
    StructField("port", IntegerType(), True),
    StructField("severity", StringType(), True),
    StructField("failure_reason", StringType(), True),
    StructField("geo_location", StructType([
        StructField("country", StringType(), True),
        StructField("city", StringType(), True),
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True)
    ]), True)
])

print("üìã Event schema defined")
print("üîç Schema includes: timestamp, IPs, authentication data, geo location")

## 4. Kafka Connection Setup

Establish connection to Kafka to read streaming security events.

In [None]:
def get_kafka_server(environment):
    """Get the correct Kafka server based on environment"""
    if environment == "container":
        return "kafka:29092"
    else:
        return "localhost:9092"

kafka_server = get_kafka_server(environment)
kafka_topic = "security-events"

print(f"üîó Connecting to Kafka server: {kafka_server}")
print(f"üì° Reading from topic: {kafka_topic}")

# Create Kafka streaming DataFrame
kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_server) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()

print("‚úÖ Kafka streaming DataFrame created")

## 5. Parse JSON Events

Parse the JSON events from Kafka and convert timestamps for analysis.

In [None]:
# Parse JSON events and convert to proper DataFrame
events_df = kafka_df \
    .select(from_json(col("value").cast("string"), event_schema).alias("event")) \
    .select("event.*") \
    .withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")) \
    .filter(col("timestamp").isNotNull())

print("‚úÖ JSON parsing configured")
print("üïí Timestamp conversion applied")
print("üßπ Null timestamp filtering enabled")

## 6. Basic Event Monitoring

Let's first verify that events are flowing correctly.

In [None]:
print("üîç Starting basic event monitoring...")
print("üí° This will show live events for 30 seconds")
print("üìä You should see authentication attempts, network connections, etc.")

# Simple event monitoring query
basic_monitor = events_df \
    .select("timestamp", "event_type", "source_ip", "result") \
    .writeStream \
    .format("console") \
    .option("truncate", False) \
    .option("numRows", 10) \
    .trigger(processingTime="10 seconds") \
    .start()

# Monitor for 30 seconds
time.sleep(30)
basic_monitor.stop()

print("\n‚úÖ Basic monitoring complete")
print("üéØ Ready to implement threat detection!")

## 7. Brute Force Attack Detection

Now let's implement our first threat detection: brute force attacks.

**Detection Logic:**
- Monitor failed authentication attempts
- Count failures per source IP in time windows
- Alert when threshold is exceeded

In [None]:
def detect_brute_force_attacks(events_df, threshold=5):
    """
    Detect brute force attacks by monitoring failed authentication attempts
    
    Args:
        events_df: Streaming DataFrame of security events
        threshold: Minimum failed attempts to trigger alert (default: 5)
    """
    
    brute_force_alerts = events_df \
        .filter(col("event_type") == "authentication") \
        .filter(col("result") == "failure") \
        .withWatermark("timestamp", "10 minutes") \
        .groupBy(
            window(col("timestamp"), "2 minutes", "30 seconds"),
            col("source_ip"),
            col("destination_ip")
        ) \
        .count() \
        .filter(col("count") >= threshold) \
        .select(
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("source_ip"),
            col("destination_ip").alias("target_ip"),
            col("count").alias("failed_attempts"),
            lit("üö® BRUTE_FORCE_DETECTED").alias("alert_type"),
            current_timestamp().alias("detected_at")
        )
    
    return brute_force_alerts

# Create brute force detection
print("üîç Setting up brute force detection...")
print("Threshold: ‚â•5 failed attempts in 2-minute windows")
print("Window slides every 30 seconds for real-time detection")

brute_force_alerts = detect_brute_force_attacks(events_df, threshold=5)

print("‚úÖ Brute force detection configured")
print("üéØ Start an attack to see alerts: ./lab-control.sh attack-bf")

## 8. Run Brute Force Detection

Let's start the detection and monitor for alerts. Make sure to start an attack in another terminal!

In [None]:
print("üö® Starting brute force detection monitor...")
print("üí° In another terminal, run: ./lab-control.sh attack-bf")
print("‚è±Ô∏è  Expected detection time: 2-3 minutes after attack starts")
print("\nüîç Watching for alerts (monitoring for 2 minutes)...")

# Start the brute force detection query
brute_force_query = brute_force_alerts \
    .writeStream \
    .format("console") \
    .option("truncate", False) \
    .option("numRows", 10) \
    .trigger(processingTime="30 seconds") \
    .start()

# Monitor for 2 minutes
time.sleep(120)
brute_force_query.stop()

print("\n‚úÖ Brute force detection monitoring complete")
print("üìä Analysis: Each alert shows source_ip, target_ip, and failed_attempts count")

## 9. DDoS Detection

Let's also implement DDoS detection by monitoring network connection patterns.

In [None]:
def detect_ddos_attacks(events_df, threshold=100):
    """
    Detect DDoS attacks by monitoring high-volume traffic to single targets
    
    Args:
        events_df: Streaming DataFrame of security events
        threshold: Minimum requests per minute to trigger alert (default: 100)
    """
    
    ddos_alerts = events_df \
        .filter(col("event_type") == "network_connection") \
        .withWatermark("timestamp", "5 minutes") \
        .groupBy(
            window(col("timestamp"), "1 minute", "30 seconds"),
            col("destination_ip")
        ) \
        .agg(
            count("*").alias("request_count"),
            approx_count_distinct("source_ip").alias("unique_sources")
        ) \
        .filter(col("request_count") >= threshold) \
        .select(
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("destination_ip").alias("target_ip"),
            col("request_count"),
            col("unique_sources"),
            lit("üö® DDOS_DETECTED").alias("alert_type"),
            current_timestamp().alias("detected_at")
        )
    
    return ddos_alerts

# Create DDoS detection
print("üîç Setting up DDoS detection...")
print("Threshold: ‚â•100 requests per minute to single target")
print("Using approx_count_distinct for streaming compatibility")

ddos_alerts = detect_ddos_attacks(events_df, threshold=100)

print("‚úÖ DDoS detection configured")
print("üéØ Start DDoS attack: ./lab-control.sh attack-ddos")

## 10. Combined Monitoring Dashboard

Let's create a combined monitoring setup that shows both brute force and DDoS detection.

In [None]:
def start_combined_monitoring(duration_minutes=3):
    """
    Start combined monitoring for both brute force and DDoS attacks
    
    Args:
        duration_minutes: How long to monitor (default: 3 minutes)
    """
    
    print("üö® Starting Combined Security Monitoring Dashboard")
    print("=" * 60)
    print("üí° In separate terminals, you can run:")
    print("   ‚Ä¢ ./lab-control.sh attack-bf    (brute force)")
    print("   ‚Ä¢ ./lab-control.sh attack-ddos  (DDoS)")
    print(f"\n‚è±Ô∏è  Monitoring for {duration_minutes} minutes...")
    print("üîç Alerts will appear every 30 seconds\n")
    
    # Start brute force monitoring
    bf_query = brute_force_alerts \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .option("numRows", 5) \
        .trigger(processingTime="30 seconds") \
        .queryName("BruteForceDetection") \
        .start()
    
    # Start DDoS monitoring
    ddos_query = ddos_alerts \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .option("numRows", 5) \
        .trigger(processingTime="30 seconds") \
        .queryName("DDoSDetection") \
        .start()
    
    # Monitor for specified duration
    duration_seconds = duration_minutes * 60
    time.sleep(duration_seconds)
    
    # Stop both queries
    bf_query.stop()
    ddos_query.stop()
    
    print("\n" + "=" * 60)
    print("‚úÖ Combined monitoring complete")
    print("üìä Summary of what you should have observed:")
    print("   ‚Ä¢ Brute force alerts: source_ip with high failed_attempts")
    print("   ‚Ä¢ DDoS alerts: target_ip with high request_count")
    print("   ‚Ä¢ Real-time detection within 2-3 minutes of attack start")

# Start the combined monitoring
start_combined_monitoring(duration_minutes=3)

## 11. Experiment with Thresholds

Let's experiment with different detection thresholds to understand their impact.

In [None]:
def experiment_with_thresholds():
    """Experiment with different detection thresholds"""
    
    print("üß™ Threshold Experimentation")
    print("=" * 40)
    
    # Different thresholds to test
    thresholds = [3, 5, 10, 20]
    
    for threshold in thresholds:
        print(f"\nüîç Testing threshold: {threshold} failed attempts")
        
        # Create detection with this threshold
        test_alerts = detect_brute_force_attacks(events_df, threshold=threshold)
        
        print(f"   ‚Ä¢ Lower threshold = More sensitive (more alerts)")
        print(f"   ‚Ä¢ Higher threshold = Less sensitive (fewer alerts)")
        print(f"   ‚Ä¢ Threshold {threshold}: {'Sensitive' if threshold <= 5 else 'Conservative'}")
    
    print("\nüìä Threshold Selection Guidelines:")
    print("   ‚Ä¢ threshold=3: Very sensitive, may have false positives")
    print("   ‚Ä¢ threshold=5: Balanced (recommended for lab)")
    print("   ‚Ä¢ threshold=10: Conservative, fewer false positives")
    print("   ‚Ä¢ threshold=20: Very conservative, may miss attacks")
    
    print("\nüí° Production Recommendation:")
    print("   Start with threshold=5, then adjust based on your environment")

# Run the threshold experiment
experiment_with_thresholds()

## 12. Database Integration (Advanced)

For production systems, you'll want to store alerts in a database. Here's how to set that up.

In [None]:
def write_alerts_to_database(alerts_df, table_name):
    """
    Write streaming alerts to PostgreSQL database
    
    Note: This requires foreachBatch for streaming database writes
    """
    
    # Database configuration
    db_properties = {
        "user": "spark_user",
        "password": "spark_password",
        "driver": "org.postgresql.Driver",
        "url": "jdbc:postgresql://postgres:5432/security_analytics"
    }
    
    def write_batch_to_postgres(batch_df, batch_id):
        """Write each batch to PostgreSQL"""
        if batch_df.count() > 0:
            print(f"üìù Writing {batch_df.count()} alerts to database (batch {batch_id})")
            batch_df.write \
                .mode("append") \
                .option("driver", db_properties["driver"]) \
                .option("url", db_properties["url"]) \
                .option("user", db_properties["user"]) \
                .option("password", db_properties["password"]) \
                .option("dbtable", table_name) \
                .save()
    
    # Create streaming query with database writes
    query = alerts_df \
        .writeStream \
        .foreachBatch(write_batch_to_postgres) \
        .trigger(processingTime="30 seconds") \
        .start()
    
    return query

print("üíæ Database Integration Setup")
print("=" * 30)
print("üéØ Next Steps for Students:")
print("1. Uncomment the code below to enable database writes")
print("2. Run: docker-compose exec postgres psql -U spark_user -d security_analytics")
print("3. Query: SELECT * FROM brute_force_alerts ORDER BY detected_at DESC;")
print("\nüí° This demonstrates production-ready alert storage")

# Uncomment these lines to enable database writes:
# db_query = write_alerts_to_database(brute_force_alerts, "brute_force_alerts")
# time.sleep(60)  # Run for 1 minute
# db_query.stop()

## 13. Summary and Next Steps

Congratulations! You've successfully implemented real-time cybersecurity analytics with Spark Streaming.

In [None]:
def show_summary():
    """Display learning summary and next steps"""
    
    print("üéì Learning Summary")
    print("=" * 50)
    print("\n‚úÖ What You've Accomplished:")
    print("   ‚Ä¢ Set up Spark Structured Streaming with Kafka")
    print("   ‚Ä¢ Implemented real-time brute force detection")
    print("   ‚Ä¢ Created DDoS attack monitoring")
    print("   ‚Ä¢ Used windowed aggregations and watermarking")
    print("   ‚Ä¢ Experimented with detection thresholds")
    print("   ‚Ä¢ Learned about database integration patterns")
    
    print("\nüìä Key Concepts Learned:")
    print("   ‚Ä¢ Streaming DataFrames and transformations")
    print("   ‚Ä¢ Window functions for time-based analysis")
    print("   ‚Ä¢ Watermarking for handling late data")
    print("   ‚Ä¢ Approximate aggregations (approx_count_distinct)")
    print("   ‚Ä¢ foreachBatch for custom output handling")
    
    print("\nüöÄ Next Steps & Advanced Topics:")
    print("   1. Implement geographic anomaly detection")
    print("   2. Add machine learning for anomaly detection")
    print("   3. Create alerting integrations (Slack, email)")
    print("   4. Build a real-time security dashboard")
    print("   5. Optimize performance for high-volume streams")
    print("   6. Implement multi-stage threat scoring")
    
    print("\nüí° Production Considerations:")
    print("   ‚Ä¢ Monitor streaming query health and performance")
    print("   ‚Ä¢ Implement proper error handling and recovery")
    print("   ‚Ä¢ Set up checkpointing for fault tolerance")
    print("   ‚Ä¢ Configure resource allocation and scaling")
    print("   ‚Ä¢ Establish alert tuning and false positive reduction")
    
    print("\nüîó Useful Resources:")
    print("   ‚Ä¢ Spark Streaming Guide: https://spark.apache.org/streaming/")
    print("   ‚Ä¢ Kafka Integration: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html")
    print("   ‚Ä¢ Security Analytics Patterns: Research SIEM and SOC practices")

# Display the summary
show_summary()

# Stop Spark session
print("\nüõë Stopping Spark session...")
spark.stop()
print("‚úÖ Session ended. Great work on completing the cybersecurity analytics lab!")