# Event Consumer & MITRE ATT&CK Classifier

## Purpose of This Notebook

This notebook implements the **consumer and classification stage** of the cybersecurity pipeline.

Its responsibility is to:

- consume security events from Kafka,
- classify them according to the **MITRE ATT&CK framework**,
- emit **distributed tracing information**,
- and persist classification results for later analysis.

This notebook represents the **core analytical stage** of the pipeline.

---

## Role in the Overall Pipeline

Conceptually, this notebook corresponds to the **detection and enrichment layer** in a SOC pipeline.

In real-world systems, this stage would include:

- rule-based detection,
- machine learning models,
- correlation logic,
- enrichment with external context.

Here, the logic is intentionally simplified to keep the focus on **pipeline architecture**, not detection accuracy.

---

## Key Architectural Ideas Demonstrated

### 1. Asynchronous Consumption from a Queue

The consumer reads events from Kafka using a **consumer group**.

Important implications:

- the consumer is decoupled from the producer,
- it can be restarted without data loss,
- multiple consumers could be added to scale processing.

This mirrors how real SOC pipelines handle high event volumes.

---

### 2. Stateless Processing with Persistent Output

The consumer:

- does **not** store internal state,
- processes each event independently,
- writes results to a persistent output file.

This design makes the pipeline:

- easier to reason about,
- easier to scale,
- easier to debug.

---

### 3. MITRE ATT&CK as a Classification Vocabulary

Each event is mapped to:

- a **MITRE ATT&CK tactic** (high-level goal),
- a **MITRE ATT&CK technique** (concrete method).

The classification logic is **rule-based and minimal by design**.

The goal is not accurate detection, but understanding:

> **how cybersecurity events are normalized and labeled in pipelines**.

---

## Tracing Model Used Here

This notebook continues the **end-to-end trace** started by the producer.

Key design choices:

- the `trace_id` is reconstructed from `event_id`,
- consumer spans become **children of producer spans**,
- each processing step is represented as a separate span.

As a result, one event appears in Jaeger as **one complete pipeline trace**.

---

## What Is Traced

For each consumed event, the following spans are emitted:

- `consume_event` — message consumption from Kafka
- `classify_event` — MITRE ATT&CK classification logic
- `write_csv` — persistence of classification results

Each span contains stceleration, or performance optimization.
TRE tactic and technique.

---

## Output Format

Classification results are written to a local CSV file:



## Output Format

Classification results are written to a local CSV file: `classified_packets.csv`

This file acts as a **materialized view** of the pipeline output.

CSV is used intentionally because:

- it requires no database setup,
- it is easy to inspect,
- it integrates naturally with Jupyter for analysis.

---

## What This Notebook Does NOT Do

- It does **not** train machine learning models.
- It does **not** optimize throughput or latency.
- It does **not** perform visualization.
- It does **not** correlate events across time.

These aspects are addressed in later notebooks and labs.

---

## How This Notebook Is Used in the Lab

1. Start the full pipeline using `docker compose`.
2. Run the producer notebook to generate events.
3. Run this notebook to consume and classify events.
4. Observe:
   - classified results accumulating in CSV,
   - end-to-end traces appearing in Jaeger.

This notebook should run **in parallel** with the producer.

---

## Key Takeaway

> **This notebook shows how raw security telemetry becomes structured detection output.**

In real cybersecurity systems, this transformation step is critical:
it connects low-level events to high-level security meaning.

Understanding this stage is essential before introducing
machine learning, GPU acceleration, or performance optimization.

In [None]:
import json
import csv
import os
import random
from datetime import datetime

from kafka import KafkaConsumer

# ---------------- OpenTelemetry ----------------
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
    OTLPSpanExporter,
)
from opentelemetry.trace import SpanKind, TraceFlags
from opentelemetry.trace import set_span_in_context
from opentelemetry.trace import SpanContext, NonRecordingSpan

# ---------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------

KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
KAFKA_TOPIC = "raw-events"
KAFKA_GROUP_ID = "packet-classifier"

OTLP_ENDPOINT = "jaeger:4317"
SERVICE_NAME = "packet-classifier"

OUTPUT_CSV = "classified_packets.csv"

# ---------------------------------------------------------------------
# Tracing setup (OTLP → Jaeger)
# ---------------------------------------------------------------------

resource = Resource.create(
    {
        "service.name": SERVICE_NAME,
    }
)

trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)

otlp_exporter = OTLPSpanExporter(
    endpoint=OTLP_ENDPOINT,
    insecure=True,
)

span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# ---------------------------------------------------------------------
# Kafka consumer
# ---------------------------------------------------------------------

consumer = KafkaConsumer(
    KAFKA_TOPIC,
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    group_id=KAFKA_GROUP_ID,
    auto_offset_reset="earliest",
    enable_auto_commit=True,
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)

print("Kafka consumer started")
print(f"Topic: {KAFKA_TOPIC}")
print(f"OTLP endpoint: {OTLP_ENDPOINT}")

# ---------------------------------------------------------------------
# MITRE ATT&CK classification (toy logic)
# ---------------------------------------------------------------------

def classify_event(event):
    """
    Very simple rule-based MITRE ATT&CK classification.
    This is INTENTIONALLY simplistic — focus is on the pipeline.
    """

    if event["event_type"] == "process_start":
        cmd = (event.get("command_line") or "").lower()

        if "encodedcommand" in cmd:
            return "TA0002", "T1059.001"  # Execution / PowerShell
        if "cmd.exe" in cmd:
            return "TA0002", "T1059.003"  # Execution / Windows Command Shell
        return "TA0002", "T1059"

    if event["event_type"] == "user_login":
        if event.get("logon_type") == "failure":
            return "TA0006", "T1110"  # Credential Access / Brute Force
        return "TA0001", "T1078"      # Initial Access / Valid Accounts

    return "TA0000", "T0000"

# ---------------------------------------------------------------------
# CSV initialization
# ---------------------------------------------------------------------

file_exists = os.path.exists(OUTPUT_CSV)

csv_file = open(OUTPUT_CSV, "a", newline="")
writer = csv.DictWriter(
    csv_file,
    fieldnames=[
        "event_id",
        "timestamp",
        "user",
        "host",
        "source_ip",
        "event_type",
        "mitre_tactic",
        "mitre_technique",
    ],
)

if not file_exists:
    writer.writeheader()

# ---------------------------------------------------------------------
# Main consume loop
# ---------------------------------------------------------------------

for msg in consumer:
    event = msg.value
    event_id = event["event_id"]

    # Restore trace context from event_id
    trace_id = int(event_id.replace("-", ""), 16)

    parent_ctx = set_span_in_context(
        NonRecordingSpan(
            SpanContext(
                trace_id=trace_id,
                span_id=random.getrandbits(64),
                is_remote=True,
                trace_flags=TraceFlags(TraceFlags.SAMPLED),
                trace_state={},
            )
        )
    )

    with tracer.start_as_current_span(
        "consume_event",
        context=parent_ctx,
        kind=SpanKind.CONSUMER,
    ) as span:

        span.set_attribute("event.id", event_id)
        span.set_attribute("event.type", event["event_type"])
        span.set_attribute("host.name", event["host"])
        span.set_attribute("user.name", event["user"])

        with tracer.start_as_current_span("classify_event"):
            tactic, technique = classify_event(event)

        span.set_attribute("mitre.tactic", tactic)
        span.set_attribute("mitre.technique", technique)

        record = {
            "event_id": event_id,
            "timestamp": event["timestamp"],
            "user": event["user"],
            "host": event["host"],
            "source_ip": event["source_ip"],
            "event_type": event["event_type"],
            "mitre_tactic": tactic,
            "mitre_technique": technique,
        }

        with tracer.start_as_current_span("write_csv"):
            writer.writerow(record)
            csv_file.flush()

        print(
            f"Processed {event_id} → {tactic} / {technique}"
        )

Kafka consumer started
Topic: raw-events
OTLP endpoint: jaeger:4317
Processed 38b6dde6-b295-46b1-bad4-2b650569ff04 → TA0002 / T1059.003
Processed 702a7208-d87f-416b-9007-7a98e43d35b0 → TA0002 / T1059.001
Processed 68c3d64b-a2d5-4383-8b27-887f54811e82 → TA0002 / T1059.001
Processed 7c3e4c7f-7a9e-4b6c-b56a-af25e3d068c4 → TA0006 / T1110
Processed 72dd6841-fc8e-4bf4-a4f9-cef7d13ddcc7 → TA0001 / T1078
Processed 4d2ca9a5-afe6-4052-afec-d830f81b0714 → TA0002 / T1059.001
Processed fad32f30-95c9-424a-b32b-1d2b16856579 → TA0002 / T1059.003
Processed f197b2c0-2f17-45ff-b893-66883ab496cb → TA0006 / T1110
Processed 3a615c2c-89c1-4d3a-a684-b8f86031b63d → TA0006 / T1110
Processed 3022addf-dd79-4f2a-955c-78729883688d → TA0006 / T1110
Processed 76bd909b-3c21-436b-a021-57a67269e610 → TA0001 / T1078
Processed 201c2f5d-f3c4-43d5-a997-5f5a9794154f → TA0002 / T1059.003
Processed 27114a65-7392-45f4-b37b-573f93895539 → TA0006 / T1110
Processed 2265a5c0-bb6b-46df-920b-3e7c42dc09aa → TA0002 / T1059.001
Processe