Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

feat: track egress network traffic #11

Merged
merged 3 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,12 +516,13 @@ func (a *Aggregator) processThroughputEvent(e throughput.ThroughputEvent) {
}

pkt := datastore.Packet{
Time: e.Timestamp,
Size: e.Size,
FromIP: e.SAddr,
FromPort: e.SPort,
ToIP: e.DAddr,
ToPort: e.DPort,
Time: e.Timestamp,
Size: e.Size,
FromIP: e.SAddr,
FromPort: e.SPort,
ToIP: e.DAddr,
ToPort: e.DPort,
IsIngress: e.IsIngress,
}

// determine source information
Expand Down
3 changes: 2 additions & 1 deletion aggregator/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func (a *Aggregator) processPod(d k8s.K8sResourceMessage) {
OwnerID: ownerID,
OwnerName: ownerName,

Labels: pod.GetLabels(),
Labels: pod.GetLabels(),
Annotations: pod.GetAnnotations(),
}

switch d.EventType {
Expand Down
23 changes: 14 additions & 9 deletions datastore/dto.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package datastore

type Pod struct {
UID string // Pod UID
Name string // Pod Name
Namespace string // Namespace
Image string // Main container image
IP string // Pod IP
OwnerType string // ReplicaSet or nil
OwnerID string // ReplicaSet UID
OwnerName string // ReplicaSet Name
Labels map[string]string
UID string // Pod UID
Name string // Pod Name
Namespace string // Namespace
Image string // Main container image
IP string // Pod IP
OwnerType string // ReplicaSet or nil
OwnerID string // ReplicaSet UID
OwnerName string // ReplicaSet Name
Labels map[string]string
Annotations map[string]string
}

type Service struct {
Expand Down Expand Up @@ -152,4 +153,8 @@ type Packet struct {
ToType Dest
ToUID string
ToPort uint16
// IsIngress indicates whether the packet was detected on the ingress or egress bpf filter.
// The egress bpf filter is used for the throughput metric, while the ingress bpf filter
// is used for the egress metric. (Seems backwards, but that is how it actually works.)
IsIngress bool
}
38 changes: 20 additions & 18 deletions datastore/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ type Event interface {
}

type PodEvent struct {
UID string `json:"uid"`
EventType string `json:"event_type"`
Name string `json:"name"`
Namespace string `json:"namespace"`
IP string `json:"ip"`
OwnerType string `json:"owner_type"`
OwnerName string `json:"owner_name"`
OwnerID string `json:"owner_id"`
Labels map[string]string `json:"labels"`
UID string `json:"uid"`
EventType string `json:"event_type"`
Name string `json:"name"`
Namespace string `json:"namespace"`
IP string `json:"ip"`
OwnerType string `json:"owner_type"`
OwnerName string `json:"owner_name"`
OwnerID string `json:"owner_id"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
}

func (p PodEvent) GetUID() string { return p.UID }
Expand Down Expand Up @@ -136,15 +137,16 @@ type RequestsPayload struct {

func convertPodToPodEvent(pod Pod, eventType string) PodEvent {
return PodEvent{
UID: pod.UID,
EventType: eventType,
Name: pod.Name,
Namespace: pod.Namespace,
IP: pod.IP,
OwnerType: pod.OwnerType,
OwnerName: pod.OwnerName,
OwnerID: pod.OwnerID,
Labels: pod.Labels,
UID: pod.UID,
EventType: eventType,
Name: pod.Name,
Namespace: pod.Namespace,
IP: pod.IP,
OwnerType: pod.OwnerType,
OwnerName: pod.OwnerName,
OwnerID: pod.OwnerID,
Labels: pod.Labels,
Annotations: pod.Annotations,
}
}

Expand Down
77 changes: 62 additions & 15 deletions datastore/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,30 @@ package datastore

import (
"context"
"encoding/json"
"strconv"
"strings"
"sync"

"github.com/ddosify/alaz/log"
"github.com/prometheus/client_golang/prometheus"
)

const (
accountIDLabel = "acorn.io/account-id"
appLabel = "acorn.io/app-public-name"
appNamespaceLabel = "acorn.io/app-namespace"
containerLabel = "acorn.io/container-name"
projectLabel = "acorn.io/project-name"

resolvedOfferingsAnnotation = "acorn.io/container-resolved-offerings"
)

var (
latencyHistLabels = []string{"toPod", "toAcornApp", "toAcornContainer", "toAcornAppNamespace"}
statusCounterLabels = []string{"toPod", "toAcornApp", "toAcornContainer", "toAcornAppNamespace", "status"}
throughputCounterLabels = []string{"fromPod", "fromAcornApp", "fromAcornContainer", "fromAcornAppNamespace", "fromHostname", "toPod", "toAcornApp", "toAcornContainer", "toAcornAppNamespace", "toPort", "toHostname"}
egressCounterLabels = []string{"fromPod", "fromAcornApp", "fromAcornContainer", "fromAcornProject", "fromAcornAccountID", "fromAcornComputeClass"}
)

type PrometheusExporter struct {
Expand All @@ -28,6 +35,7 @@ type PrometheusExporter struct {
latencyHistogram *prometheus.HistogramVec
statusCounter *prometheus.CounterVec
throughputCounter *prometheus.CounterVec
egressCounter *prometheus.CounterVec

podCache *eventCache
podIPCache *eventCache
Expand Down Expand Up @@ -111,6 +119,15 @@ func NewPrometheusExporter(ctx context.Context) *PrometheusExporter {
)
exporter.reg.MustRegister(exporter.throughputCounter)

exporter.egressCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "alaz",
Name: "egress",
},
egressCounterLabels,
)
exporter.reg.MustRegister(exporter.egressCounter)

go exporter.handleReqs()
go exporter.handlePackets()

Expand Down Expand Up @@ -174,12 +191,9 @@ func (p *PrometheusExporter) handlePackets() {
}

func (p *PrometheusExporter) handlePacket(pkt Packet) {
labels := prometheus.Labels{
"toPort": strconv.Itoa(int(pkt.ToPort)),
}

// We only want to keep metrics between pods in the same project (app namespace)
if pkt.FromType == PodSource && pkt.ToType == PodDest {
// Check for packets between pods in the same project.
// (Reminder: !pkt.IsIngress means that the packet was detected by the egress eBPF filter.)
if !pkt.IsIngress && pkt.FromType == PodSource && pkt.ToType == PodDest {
fromPod, found := p.podCache.get(pkt.FromUID)
toPod, found2 := p.podCache.get(pkt.ToUID)

Expand All @@ -188,19 +202,52 @@ func (p *PrometheusExporter) handlePacket(pkt Packet) {
fromPod.(PodEvent).Labels[appNamespaceLabel] == toPod.(PodEvent).Labels[appNamespaceLabel] &&
fromPod.(PodEvent).Labels[appLabel] == toPod.(PodEvent).Labels[appLabel] {

labels["fromPod"] = fromPod.(PodEvent).Name
labels["fromAcornApp"] = fromPod.(PodEvent).Labels[appLabel]
labels["fromAcornAppNamespace"] = fromPod.(PodEvent).Labels[appNamespaceLabel]
labels["fromAcornContainer"] = fromPod.(PodEvent).Labels[containerLabel]

labels["toPod"] = toPod.(PodEvent).Name
labels["toAcornApp"] = toPod.(PodEvent).Labels[appLabel]
labels["toAcornAppNamespace"] = toPod.(PodEvent).Labels[appNamespaceLabel]
labels["toAcornContainer"] = toPod.(PodEvent).Labels[containerLabel]
labels := prometheus.Labels{
"toPort": strconv.Itoa(int(pkt.ToPort)),
"fromPod": fromPod.(PodEvent).Name,
"fromAcornApp": fromPod.(PodEvent).Labels[appLabel],
"fromAcornAppNamespace": fromPod.(PodEvent).Labels[appNamespaceLabel],
"fromAcornContainer": fromPod.(PodEvent).Labels[containerLabel],
"toPod": toPod.(PodEvent).Name,
"toAcornApp": toPod.(PodEvent).Labels[appLabel],
"toAcornAppNamespace": toPod.(PodEvent).Labels[appNamespaceLabel],
"toAcornContainer": toPod.(PodEvent).Labels[containerLabel],
}

p.throughputCounter.With(setEmptyPrometheusLabels(labels, throughputCounterLabels)).Add(float64(pkt.Size))
}
}

// Check for packets from pods to outside the cluster.
// (Reminder: pkt.IsIngress just means that the packet was detected by the ingress eBPF filter, which is actually detecting egress traffic.)
// OutsideDest indicates that the destination IP address is not a known pod or service IP address.
// We also filter out the 10. prefix because that is the internal IP address range used by the cluster.
if pkt.IsIngress && pkt.FromType == PodSource && pkt.ToType == OutsideDest && !strings.HasPrefix(pkt.ToIP, "10.") {
fromPod, found := p.podCache.get(pkt.FromUID)

if found && fromPod.(PodEvent).Labels[accountIDLabel] != "" {
labels := prometheus.Labels{
"fromPod": fromPod.(PodEvent).Name,
"fromAcornApp": fromPod.(PodEvent).Labels[appLabel],
"fromAcornProject": fromPod.(PodEvent).Labels[projectLabel],
"fromAcornContainer": fromPod.(PodEvent).Labels[containerLabel],
"fromAcornAccountID": fromPod.(PodEvent).Labels[accountIDLabel],
}

if resolvedOfferingsJson, ok := fromPod.(PodEvent).Annotations[resolvedOfferingsAnnotation]; ok {
offerings := map[string]any{}
if err := json.Unmarshal([]byte(resolvedOfferingsJson), &offerings); err == nil {
if class, ok := offerings["class"]; ok {
labels["fromAcornComputeClass"] = class.(string)
}
} else {
log.Logger.Error().Msg(err.Error())
}
}

p.egressCounter.With(setEmptyPrometheusLabels(labels, egressCounterLabels)).Add(float64(pkt.Size))
}
}
}

func setEmptyPrometheusLabels(labels prometheus.Labels, labelList []string) prometheus.Labels {
Expand Down
19 changes: 16 additions & 3 deletions datastore/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewServer(ctx context.Context, reg *prometheus.Registry, podIPCache *eventC
}

func (s *Server) Serve() {
http.Handle("/metricz", s.authorizePrometheus(promhttp.HandlerFor(s.reg, promhttp.HandlerOpts{})))
http.Handle("/metricz", s.authorize(promhttp.HandlerFor(s.reg, promhttp.HandlerOpts{})))
go func() {
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Logger.Error().Err(err).Msg("error while serving metrics")
Expand All @@ -44,7 +44,10 @@ func (s *Server) Serve() {
log.Logger.Info().Msg("Prometheus HTTP server stopped")
}

func (s *Server) authorizePrometheus(handler http.Handler) http.Handler {
func (s *Server) authorize(handler http.Handler) http.Handler {
// Only two things are authorized to scrape Alaz:
// - Prometheus
// - cluster-agent
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var sourceIP string
parts := strings.Split(r.RemoteAddr, ":")
Expand All @@ -58,7 +61,7 @@ func (s *Server) authorizePrometheus(handler http.Handler) http.Handler {
}

pod, ok := s.podIPCache.get(sourceIP)
if ok && pod.(PodEvent).Namespace == s.prometheusNamespace {
if ok && (isPrometheus(pod.(PodEvent)) || isClusterAgent(pod.(PodEvent))) {
handler.ServeHTTP(w, r)
return
}
Expand All @@ -68,3 +71,13 @@ func (s *Server) authorizePrometheus(handler http.Handler) http.Handler {
w.Write([]byte("401 Unauthorized\n"))
})
}

func isPrometheus(p PodEvent) bool {
return p.Namespace == "prometheus-operator"
}

func isClusterAgent(p PodEvent) bool {
return p.Labels[appLabel] == "cluster-agent" &&
p.Labels[appNamespaceLabel] == "acorn" &&
p.Labels[containerLabel] == "cluster-agent"
}
Binary file modified ebpf/throughput/bpf_bpfeb.o
Binary file not shown.
Binary file modified ebpf/throughput/bpf_bpfel.o
Binary file not shown.
64 changes: 59 additions & 5 deletions ebpf/throughput/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type ThroughputEventBpf struct {
DPort uint16
SAddr [16]byte
DAddr [16]byte
IsIngress uint8
}

// for user space
Expand All @@ -34,6 +35,7 @@ type ThroughputEvent struct {
DPort uint16
SAddr string
DAddr string
IsIngress bool
}

// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
Expand Down Expand Up @@ -97,6 +99,7 @@ func DeployAndWait(ctx context.Context, ch chan interface{}, eventChan <-chan in
DPort: bpfEvent.DPort,
SAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.SAddr[0], bpfEvent.SAddr[1], bpfEvent.SAddr[2], bpfEvent.SAddr[3]),
DAddr: fmt.Sprintf("%d.%d.%d.%d", bpfEvent.DAddr[0], bpfEvent.DAddr[1], bpfEvent.DAddr[2], bpfEvent.DAddr[3]),
IsIngress: bpfEvent.IsIngress != 0,
}
}

Expand Down Expand Up @@ -148,11 +151,8 @@ func setFiltersOnCiliumInterfaces(objs bpfObjects) error {
errs = append(errs, fmt.Errorf("failed to set up egress filter for link %s: %w", link.Attrs().Name, err))
}

// We were previously using an ingress filter in addition to the egress filter, but it wasn't actually doing anything.
// For now, we will just delete those until we can figure out how to make them work.
// Egress on its own is enough to track throughput between all pods in the cluster.
if err := deleteIngressFilters(link); err != nil {
errs = append(errs, fmt.Errorf("failed to set up ingress filter for link %s: %w", link.Attrs().Name, err))
if err := setUpIngressFilter(link, objs); err != nil {
errs = append(errs, err)
}
}
}
Expand Down Expand Up @@ -190,6 +190,40 @@ func setUpEgressFilter(link netlink.Link, objs bpfObjects) error {
return netlink.FilterReplace(filter)
}

func setUpIngressFilter(link netlink.Link, objs bpfObjects) error {
existingFilters, err := netlink.FilterList(link, netlink.HANDLE_MIN_INGRESS)
if err != nil {
return err
}

for _, filter := range existingFilters {
if bpfFilter, ok := filter.(*netlink.BpfFilter); ok {
if bpfFilter.Name == "throughput_bpf_ingress" || bpfFilter.Name == "packet_classifier" {
if err := netlink.FilterDel(filter); err != nil {
return err
}
}
}
}

filter := &netlink.BpfFilter{
FilterAttrs: netlink.FilterAttrs{
LinkIndex: link.Attrs().Index,
Parent: netlink.HANDLE_MIN_INGRESS,
Protocol: unix.ETH_P_ALL,
Priority: 1,
},
Fd: objs.bpfPrograms.PacketClassifier.FD(),
Name: "throughput_bpf_ingress",
DirectAction: true,
}

if err := netlink.FilterReplace(filter); err != nil {
return fmt.Errorf("failed to set up ingress filter for link %s: %w", link.Attrs().Name, err)
}
return nil
}

func deleteIngressFilters(link netlink.Link) error {
existingFilters, err := netlink.FilterList(link, netlink.HANDLE_MIN_INGRESS)
if err != nil {
Expand All @@ -209,3 +243,23 @@ func deleteIngressFilters(link netlink.Link) error {

return nil
}

func deleteEgressFilters(link netlink.Link) error {
existingFilters, err := netlink.FilterList(link, netlink.HANDLE_MIN_EGRESS)
if err != nil {
return err
}

for _, filter := range existingFilters {
if filter.Type() == "bpf" {
bpfFilter := filter.(*netlink.BpfFilter)
if bpfFilter.Name == "throughput_bpf_egress" {
if err := netlink.FilterDel(bpfFilter); err != nil {
return err
}
}
}
}

return nil
}
Loading