Skip to content

ArkimeNetLab/network-data-pipeline

Repository files navigation

🧩 Network Data Processing Pipeline

This repository contains the core data collection and processing pipeline for the network observability project.
It captures, transforms, and enriches network traffic data before sending it to Arkime for indexing and visualization.


📘 Overview

The pipeline automates the entire workflow of collecting, decoding, and enriching network traffic data.
It acts as the bridge between raw network traffic (captured via Mitmproxy) and the visualization layer (Arkime Dashboard).

The main purpose of this module is to:

  • Intercept and log HTTPS network traffic in real-time.
  • Convert captured logs into a standard packet capture format (.pcap).
  • Analyze traffic using nDPI (Deep Packet Inspection).
  • Publish and consume data asynchronously using RabbitMQ.
  • Enrich Arkime sessions with metadata such as application type, risk level, and category through the Flask API.

🏗️ Architecture

Pipeline Architecture


📁 Repository Structure


network-data-pipeline/
│
├── producer_of_logs.py      # Captures and publishes .mitm logs to RabbitMQ
├── consumer_of_logs.py      # Converts .mitm to .pcap and performs enrichment
├── flask_api.py             # Flask web service for cache, enrich, and debug endpoints
├── run_pipeline.py          # Orchestrates the pipeline (Flask, Producer, Consumer)
├── requirements.txt         # Python dependencies
└── README.md


⚙️ Components Explained

🧠 1. Producer of Logs (producer_of_logs.py)

  • Uses Mitmproxy to intercept HTTPS traffic from clients configured with a local proxy.
  • Saves captured traffic into .mitm log files.
  • Publishes metadata and file paths into a RabbitMQ queue (traffic_queue) for downstream processing.

Example:

python3 producer_of_logs.py 180

Collects and logs traffic for 180 seconds.


📦 2. Consumer of Logs (consumer_of_logs.py)

  • Listens to the traffic_queue in RabbitMQ.
  • Downloads .mitm files from the producer.
  • Converts them to .pcap format using a custom parser (mitm2pcap).
  • Analyzes packets using nDPI to detect application types and protocols.
  • Sends processed .pcap data to the Flask API at /update-cache.
  • Finally, streams the processed traffic to Arkime using pcap-over-ip.

🌐 3. Flask API Service (flask_api.py)

The Flask API serves as the central hub that connects the pipeline with Arkime and the Wise Service.

Main Endpoints

Endpoint Method Description
/update-cache POST Receives parsed flows and stores them temporarily in a TTL cache.
/enrich POST Provides enrichment data (e.g., risk, category, app) for Arkime Wise Service.
/debug-cache GET Displays the current cached sessions and debug info.

This layer ensures data integrity, synchronization, and accessibility between the components.


🔄 4. Pipeline Runner (run_pipeline.py)

  • Runs the entire pipeline in a single command.
  • Automatically starts Flask, Producer, and Consumer processes.
  • Waits for the defined duration, then gracefully terminates all subprocesses.

Example:

python3 run_pipeline.py 180

🧰 Installation

1. Prerequisites

Make sure the following are installed:

sudo apt install python3 python3-pip rabbitmq-server mitmproxy git build-essential

2. Install Python dependencies

pip3 install -r requirements.txt

Typical dependencies include:

flask
requests
cachetools
scapy
pika
psutil

3. (Optional) Enable IP forwarding

sudo sysctl -w net.ipv4.ip_forward=1

🚀 Usage

Option 1: Run each component manually

# Terminal 1
python3 flask_api.py

# Terminal 2
python3 producer_of_logs.py 180

# Terminal 3
python3 consumer_of_logs.py

Option 2: Run all components together

python3 run_pipeline.py 180

📊 Data Flow Summary

Step Action Tool/Module
1 Capture HTTPS traffic Mitmproxy (via Producer)
2 Publish metadata RabbitMQ
3 Consume and convert logs Consumer
4 Analyze packets nDPI
5 Cache and enrich data Flask API
6 Stream enriched sessions Arkime

🧪 Example Output

After running the pipeline:

  • .mitm logs appear in /tmp/*.mitm
  • .pcap files are generated for Arkime
  • Cached flow data can be checked with:
curl http://127.0.0.1:5000/debug-cache

Sample JSON output:

{
  "flows_cached": 154,
  "last_update": "2025-10-17T14:32:06Z"
}

🧱 Integration with Arkime

Once the pipeline is running:

  1. The Consumer sends .pcap data to Arkime via pcap-over-ip.

  2. Arkime indexes the flows.

  3. When viewing sessions, Arkime queries /enrich on the Flask API (or Wise Service) to display custom fields such as:

    • app
    • category
    • risk_level

🔐 Security Notes

  • Always capture traffic with user consent.
  • Use TLS certificates properly with Mitmproxy.
  • Restrict API access to localhost or trusted networks.
  • Consider using HTTPS for the Flask API in production.

📚 Related Repositories

About

Data Processing Pipeline

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages