Convert NetFlow/sFlow traffic data into graph representations suitable for GNN-based anomaly detection and network analysis.
Network traffic analysis increasingly relies on Graph Neural Networks (GNNs) to detect anomalies — port scans, DDoS attacks, lateral movement, data exfiltration. Raw NetFlow/sFlow/IPFIX data must be transformed into graph structures before feeding into these models. flowgraph bridges this gap: it parses flow exports, builds communication graphs with rich node/edge features, supports temporal windowing for dynamic analysis, and exports to NetworkX or PyTorch Geometric formats.
pip install flowgraphWith PyTorch Geometric support (for GNN pipelines):
pip install "flowgraph[torch]"Development:
pip install -e ".[dev]"from flowgraph import FlowGraph
from flowgraph.parsers import parse_netflow_v5
from flowgraph.features import compute_features
# Parse NetFlow v5 CSV export
flows = parse_netflow_v5("netflow_export.csv")
print(f"Loaded {len(flows)} flow records")
# Build communication graph
graph = FlowGraph().build(flows)
print(f"Graph: {graph.node_count} nodes, {graph.edge_count} edges")
# Compute graph-level features
features = compute_features(graph)
print(f"Density: {features['density']:.4f}")
print(f"Clustering: {features['clustering_coefficient']:.4f}")
# Export to NetworkX for further analysis
G = graph.to_networkx()
# Top talkers by outbound traffic
for ip, bytes_out in graph.top_talkers(n=5):
print(f" {ip}: {bytes_out:,} bytes")Analyze how traffic patterns evolve over time — essential for detecting scanning sweeps, DDoS ramp-ups, or periodic C2 beaconing:
from flowgraph import TemporalFlowGraph
from flowgraph.parsers import parse_netflow_v5
from flowgraph.features import compute_features
flows = parse_netflow_v5("netflow_export.csv")
# Split into 5-minute windows
temporal = TemporalFlowGraph(window_size="5min")
windows = temporal.windows(flows)
for i, graph in enumerate(windows):
features = compute_features(graph)
print(f"Window {i}: {features['node_count']} nodes, "
f"density={features['density']:.4f}, "
f"max_in_degree={features['max_in_degree']}")Window sizes support human-readable durations: "30s", "5min", "1h", "1d", "2h30min".
| Format | Parser | Typical Source |
|---|---|---|
| NetFlow v5 | parse_netflow_v5() |
nfdump, flow-tools |
| NetFlow v9 / IPFIX | parse_netflow_v9() |
nfdump, SiLK, pmacct |
| sFlow | parse_sflow() |
sflowtool, sFlow-RT |
| Generic CSV | CSVFlowParser() |
Any — custom column mapping |
All parsers auto-detect common column name variations. Pass a custom column_map dictionary for non-standard exports.
- Nodes = IP addresses (or subnets with
aggregate_by="subnet")- Attributes:
total_in_bytes,total_out_bytes,total_in_packets,total_out_packets,unique_peers
- Attributes:
- Edges = directed traffic between IP pairs
- Attributes:
total_bytes,total_packets,flow_count,protocols,ports
- Attributes:
from flowgraph.exporters import to_networkx, to_json, from_json, to_pyg
# NetworkX DiGraph
G = to_networkx(graph)
# JSON (file or string)
json_str = to_json(graph, path="graph.json")
graph2 = from_json("graph.json")
# PyTorch Geometric (requires torch + torch-geometric)
data = to_pyg(graph)
print(data.x.shape) # [num_nodes, 5]
print(data.edge_attr.shape) # [num_edges, 3]# Parse and summarize
flowgraph parse flows.csv --format netflow-v5
# Build graph with features
flowgraph build flows.csv --format netflow-v5
# Temporal analysis
flowgraph build flows.csv --format netflow-v5 --window 5min
# Compute features from saved graph
flowgraph features graph.jsonMIT License. Copyright (c) 2026 Corey Wade.