diff --git a/examples/README.md b/examples/README.md index 30d82cac1..7b715aa2a 100644 --- a/examples/README.md +++ b/examples/README.md @@ -18,6 +18,7 @@ * [tcp_close](tcprtt/) - Log RTT of IPv4 TCP connections using eBPF CO-RE helpers. * XDP - Attach a program to a network interface to process incoming packets. * [xdp](xdp/) - Print packet counts by IPv4 source address. + * [queue](queue/) - Print IPv4 Address and Unix Timestamp of each incoming packet. * Add your use case(s) here! ## How to run diff --git a/examples/queue/bpf_bpfeb.go b/examples/queue/bpf_bpfeb.go new file mode 100644 index 000000000..007a2ac85 --- /dev/null +++ b/examples/queue/bpf_bpfeb.go @@ -0,0 +1,125 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build mips || mips64 || ppc64 || s390x + +package main + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +type bpfPacketData struct { + SrcIp uint32 + _ [4]byte + Timestamp uint64 +} + +// loadBpf returns the embedded CollectionSpec for bpf. +func loadBpf() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BpfBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf: %w", err) + } + + return spec, err +} + +// loadBpfObjects loads bpf and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpfObjects +// *bpfPrograms +// *bpfMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpfSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfSpecs struct { + bpfProgramSpecs + bpfMapSpecs +} + +// bpfSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfProgramSpecs struct { + XdpProgFunc *ebpf.ProgramSpec `ebpf:"xdp_prog_func"` +} + +// bpfMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfMapSpecs struct { + QueueWithData *ebpf.MapSpec `ebpf:"queue_with_data"` +} + +// bpfObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfObjects struct { + bpfPrograms + bpfMaps +} + +func (o *bpfObjects) Close() error { + return _BpfClose( + &o.bpfPrograms, + &o.bpfMaps, + ) +} + +// bpfMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfMaps struct { + QueueWithData *ebpf.Map `ebpf:"queue_with_data"` +} + +func (m *bpfMaps) Close() error { + return _BpfClose( + m.QueueWithData, + ) +} + +// bpfPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfPrograms struct { + XdpProgFunc *ebpf.Program `ebpf:"xdp_prog_func"` +} + +func (p *bpfPrograms) Close() error { + return _BpfClose( + p.XdpProgFunc, + ) +} + +func _BpfClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_bpfeb.o +var _BpfBytes []byte diff --git a/examples/queue/bpf_bpfeb.o b/examples/queue/bpf_bpfeb.o new file mode 100644 index 000000000..c6ece1646 Binary files /dev/null and b/examples/queue/bpf_bpfeb.o differ diff --git a/examples/queue/bpf_bpfel.go b/examples/queue/bpf_bpfel.go new file mode 100644 index 000000000..aa2f2f24f --- /dev/null +++ b/examples/queue/bpf_bpfel.go @@ -0,0 +1,125 @@ +// Code generated by bpf2go; DO NOT EDIT. +//go:build 386 || amd64 || arm || arm64 || loong64 || mips64le || mipsle || ppc64le || riscv64 + +package main + +import ( + "bytes" + _ "embed" + "fmt" + "io" + + "github.com/cilium/ebpf" +) + +type bpfPacketData struct { + SrcIp uint32 + _ [4]byte + Timestamp uint64 +} + +// loadBpf returns the embedded CollectionSpec for bpf. +func loadBpf() (*ebpf.CollectionSpec, error) { + reader := bytes.NewReader(_BpfBytes) + spec, err := ebpf.LoadCollectionSpecFromReader(reader) + if err != nil { + return nil, fmt.Errorf("can't load bpf: %w", err) + } + + return spec, err +} + +// loadBpfObjects loads bpf and converts it into a struct. +// +// The following types are suitable as obj argument: +// +// *bpfObjects +// *bpfPrograms +// *bpfMaps +// +// See ebpf.CollectionSpec.LoadAndAssign documentation for details. +func loadBpfObjects(obj interface{}, opts *ebpf.CollectionOptions) error { + spec, err := loadBpf() + if err != nil { + return err + } + + return spec.LoadAndAssign(obj, opts) +} + +// bpfSpecs contains maps and programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfSpecs struct { + bpfProgramSpecs + bpfMapSpecs +} + +// bpfSpecs contains programs before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfProgramSpecs struct { + XdpProgFunc *ebpf.ProgramSpec `ebpf:"xdp_prog_func"` +} + +// bpfMapSpecs contains maps before they are loaded into the kernel. +// +// It can be passed ebpf.CollectionSpec.Assign. +type bpfMapSpecs struct { + QueueWithData *ebpf.MapSpec `ebpf:"queue_with_data"` +} + +// bpfObjects contains all objects after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfObjects struct { + bpfPrograms + bpfMaps +} + +func (o *bpfObjects) Close() error { + return _BpfClose( + &o.bpfPrograms, + &o.bpfMaps, + ) +} + +// bpfMaps contains all maps after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfMaps struct { + QueueWithData *ebpf.Map `ebpf:"queue_with_data"` +} + +func (m *bpfMaps) Close() error { + return _BpfClose( + m.QueueWithData, + ) +} + +// bpfPrograms contains all programs after they have been loaded into the kernel. +// +// It can be passed to loadBpfObjects or ebpf.CollectionSpec.LoadAndAssign. +type bpfPrograms struct { + XdpProgFunc *ebpf.Program `ebpf:"xdp_prog_func"` +} + +func (p *bpfPrograms) Close() error { + return _BpfClose( + p.XdpProgFunc, + ) +} + +func _BpfClose(closers ...io.Closer) error { + for _, closer := range closers { + if err := closer.Close(); err != nil { + return err + } + } + return nil +} + +// Do not access this directly. +// +//go:embed bpf_bpfel.o +var _BpfBytes []byte diff --git a/examples/queue/bpf_bpfel.o b/examples/queue/bpf_bpfel.o new file mode 100644 index 000000000..b0f78cee9 Binary files /dev/null and b/examples/queue/bpf_bpfel.o differ diff --git a/examples/queue/main.go b/examples/queue/main.go new file mode 100644 index 000000000..7562e917b --- /dev/null +++ b/examples/queue/main.go @@ -0,0 +1,115 @@ +// This program demonstrates attaching an eBPF program to a network interface +// with XDP (eXpress Data Path). The program parses the IPv4 source address +// from packets and pushes the address alongside the computed packet arrival timestamp +// into a Queue. This is just an example and probably does not represent the most +// efficient way to perform such a task. Another potential solution would be to use +// an HashMap with a small __u64 arrays associated to each IPv4 address (key). +// In both the two ways it is possible to lose some packet if (a) queue is not large +// enough or the packet processing time is slow or (b) if the associated array is +// smaller than the actual received packet from an address. +// The userspace program (Go code in this file) prints the contents +// of the map to stdout every second, parsing the raw structure into a human-readable +// IPv4 address and Unix timestamp. +// It is possible to modify the XDP program to drop or redirect packets +// as well -- give it a try! +// This example depends on bpf_link, available in Linux kernel version 5.7 or newer. +package main + +import ( + "encoding/binary" + "fmt" + "log" + "net" + "net/netip" + "os" + "strings" + "syscall" + "time" + + "github.com/cilium/ebpf" + "github.com/cilium/ebpf/link" +) + +//go:generate go run github.com/cilium/ebpf/cmd/bpf2go bpf xdp.c -- -I../headers + +func main() { + if len(os.Args) < 2 { + log.Fatalf("Please specify a network interface") + } + + // Look up the network interface by name. + ifaceName := os.Args[1] + iface, err := net.InterfaceByName(ifaceName) + if err != nil { + log.Fatalf("lookup network iface %q: %s", ifaceName, err) + } + + // Load pre-compiled programs into the kernel. + objs := bpfObjects{} + if err := loadBpfObjects(&objs, nil); err != nil { + log.Fatalf("loading objects: %s", err) + } + defer objs.Close() + + // Attach the program. + l, err := link.AttachXDP(link.XDPOptions{ + Program: objs.XdpProgFunc, + Interface: iface.Index, + }) + if err != nil { + log.Fatalf("could not attach XDP program: %s", err) + } + defer l.Close() + + log.Printf("Attached XDP program to iface %q (index %d)", iface.Name, iface.Index) + log.Printf("Press Ctrl-C to exit and remove the program") + + // Retrieve boot time once, and use it for every later ktime conversions + bootTime := getSysBoot() + + // Print the contents of the BPF queue (packet source IP address and timestamp). + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for range ticker.C { + s, err := formatMapContents(objs.QueueWithData, bootTime) + if err != nil { + log.Printf("Error reading map: %s", err) + continue + } + log.Printf("Map contents:\n%s", s) + } +} + +// formatMapContents formats an output string with the content of the provided map. +// +// For each entry, the function outputs a line containing the human-readable IPv4 address +// retrieved from the packet structure formatted and the converted ktime_ns into Unix Time +// +// In case of error or empty map, the function returns the corresponding error. +func formatMapContents(m *ebpf.Map, bootTime time.Time) (string, error) { + var ( + sb strings.Builder + val bpfPacketData + ) + iter := m.Iterate() + for iter.Next(nil, &val) { + // Convert the __u32 into human-readable IPv4 + a4 := [4]byte{} + binary.LittleEndian.PutUint32(a4[:], val.SrcIp) + addr := netip.AddrFrom4(a4) + + // Convert ktime timestamp into Time struct, adding the retrieved + // timestamp to the previously computer boot time + t := bootTime.Add(time.Duration(val.Timestamp) * time.Nanosecond) + + sb.WriteString(fmt.Sprintf("\t%s - %s\n", addr, t)) + } + return sb.String(), iter.Err() +} + +// Retrieve system boot time and convert it into Time struct +func getSysBoot() time.Time { + sysInfo := &syscall.Sysinfo_t{} + syscall.Sysinfo(sysInfo) + return time.Now().Add(-time.Duration(sysInfo.Uptime) * time.Second) +} diff --git a/examples/queue/xdp.c b/examples/queue/xdp.c new file mode 100644 index 000000000..7e86a08b5 --- /dev/null +++ b/examples/queue/xdp.c @@ -0,0 +1,74 @@ +//go:build ignore + +#include "bpf_endian.h" +#include "common.h" + +char __license[] SEC("license") = "Dual MIT/GPL"; + +#define MAX_MAP_ENTRIES 16 + +/* Define packet struct data containing: + - source IPv4 address + - timestamp of the packet +*/ +struct packet_data { + __u32 src_ip; + __u64 timestamp; +}; + +/* Define a Queue for pushing incoming IPv4 Address and timestamp for each packet */ +struct { + __uint(type, BPF_MAP_TYPE_QUEUE); + __uint(max_entries, MAX_MAP_ENTRIES); + __type(value, struct packet_data); // packet data struct, ip and timestamp +} queue_with_data SEC(".maps"); + +/* +Attempt to parse the IPv4 source address from the packet and push the address +alongside with the timestamp associated to the packet into the defined Queue. +*/ +static __always_inline int parse_ip_src_addr(struct xdp_md *ctx, __u32 *ip_src_addr) { + void *data_end = (void *)(long)ctx->data_end; + void *data = (void *)(long)ctx->data; + + // First, parse the ethernet header. + struct ethhdr *eth = data; + if ((void *)(eth + 1) > data_end) { + return 0; + } + + if (eth->h_proto != bpf_htons(ETH_P_IP)) { + // The protocol is not IPv4, so we can't parse an IPv4 source address. + return 0; + } + + // Then parse the IP header. + struct iphdr *ip = (void *)(eth + 1); + if ((void *)(ip + 1) > data_end) { + return 0; + } + + // Return the source IP address in network byte order. + *ip_src_addr = (__u32)(ip->saddr); + return 1; +} + +SEC("xdp") +int xdp_prog_func(struct xdp_md *ctx) { + struct packet_data packet = {}; + + if (!parse_ip_src_addr(ctx, &packet.src_ip)) { + // Not an IPv4 packet, so skip it. + goto done; + } + + // IPv4 packet, compute the timestamp + packet.timestamp = bpf_ktime_get_ns(); + + // Push the structure into the Queue if there is still space, + // otherwise just ignore it. + bpf_map_push_elem(&queue_with_data, &packet, BPF_ANY); + +done: + return XDP_PASS; +} diff --git a/map.go b/map.go index 1eeee1776..cb5968f87 100644 --- a/map.go +++ b/map.go @@ -1506,17 +1506,42 @@ func newMapIterator(target *Map) *MapIterator { } } -// Next decodes the next key and value. -// -// Iterating a hash map from which keys are being deleted is not -// safe. You may see the same key multiple times. Iteration may -// also abort with an error, see IsIterationAborted. +// nextQueueMap decodes the next value from a map of type +// Queue or Stack. // // Returns false if there are no more entries. You must check // the result of Err afterwards. +func (mi *MapIterator) nextQueueMap(valueOut interface{}) bool { + if mi.err != nil || mi.done { + return false + } + + // For Queue/Stack map block the iteration after maxEntries + // to avoid potential infinite loops + // (values can be pushed to map while doing pop) + if mi.count == mi.maxEntries { + mi.err = fmt.Errorf("%w", ErrIterationAborted) + return false + } + + mi.count++ + err := mi.target.LookupAndDelete(nil, valueOut) + if errors.Is(err, ErrKeyNotExist) { + return false + } + if err != nil { + mi.err = fmt.Errorf("look up next value: %w", err) + return false + } + return true +} + +// next decodes the next key and value from a map with key-value pair +// (except Queue and Stack). // -// See Map.Get for further caveats around valueOut. -func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool { +// Returns false if there are no more entries. You must check +// the result of Err afterwards. +func (mi *MapIterator) next(keyOut, valueOut interface{}) bool { if mi.err != nil || mi.done { return false } @@ -1575,6 +1600,26 @@ func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool { return false } +// Next decodes the next key and value. +// +// Iterating a hash map from which keys are being deleted is not +// safe. You may see the same key multiple times. Iteration may +// also abort with an error, see IsIterationAborted. +// +// Iterating a Queue/Stack will lookup only the value, as those +// maps do not use a key. +// +// Returns false if there are no more entries. You must check +// the result of Err afterwards. +// +// See Map.Get for further caveats around valueOut. +func (mi *MapIterator) Next(keyOut, valueOut interface{}) bool { + if mi.target.typ.isQueueStack() { + return mi.nextQueueMap(valueOut) + } + return mi.next(keyOut, valueOut) +} + // Err returns any encountered error. // // The method must be called after Next returns nil. diff --git a/map_test.go b/map_test.go index be36721a8..58e7b09d9 100644 --- a/map_test.go +++ b/map_test.go @@ -820,7 +820,27 @@ func TestMapQueue(t *testing.T) { } } - var v uint32 + var ( + v uint32 + v2 uint32 + ) + if err := m.Lookup(nil, &v); err != nil { + t.Fatal("Lookup (Peek) on Queue:", err) + } + + if err := m.Lookup(nil, &v2); err != nil { + t.Fatal("Lookup (Peek) consecutive on Queue:", err) + } + + if v != v2 { + t.Fatal("Lookup (Peek) value removal from Queue:") + } + + if v != 42 { + t.Error("Want value 42, got", v) + } + v = 0 + if err := m.LookupAndDelete(nil, &v); err != nil { t.Fatal("Can't lookup and delete element:", err) } @@ -839,6 +859,74 @@ func TestMapQueue(t *testing.T) { if err := m.LookupAndDelete(nil, &v); !errors.Is(err, ErrKeyNotExist) { t.Fatal("Lookup and delete on empty Queue:", err) } + + if err := m.Lookup(nil, &v); !errors.Is(err, ErrKeyNotExist) { + t.Fatal("Lookup (Peek) on empty Queue:", err) + } +} + +func TestMapStack(t *testing.T) { + testutils.SkipOnOldKernel(t, "4.20", "map type stack") + + m, err := NewMap(&MapSpec{ + Type: Stack, + ValueSize: 4, + MaxEntries: 2, + }) + if err != nil { + t.Fatal(err) + } + defer m.Close() + + for _, v := range []uint32{42, 4242} { + if err := m.Put(nil, v); err != nil { + t.Fatalf("Can't put %d: %s", v, err) + } + } + + var ( + v uint32 + v2 uint32 + ) + if err := m.Lookup(nil, &v); err != nil { + t.Fatal("Lookup (Peek) on Stack:", err) + } + + if err := m.Lookup(nil, &v2); err != nil { + t.Fatal("Lookup (Peek) consecutive on Stack:", err) + } + + if v != v2 { + t.Fatal("Lookup (Peek) value removal from Stack:") + } + + if v != 4242 { + t.Error("Want value 4242, got", v) + } + v = 0 + + if err := m.LookupAndDelete(nil, &v); err != nil { + t.Fatal("Can't lookup and delete element:", err) + } + if v != 4242 { + t.Error("Want value 4242, got", v) + } + + v = 0 + if err := m.LookupAndDelete(nil, unsafe.Pointer(&v)); err != nil { + t.Fatal("Can't lookup and delete element using unsafe.Pointer:", err) + } + if v != 42 { + t.Error("Want value 42, got", v) + } + + if err := m.LookupAndDelete(nil, &v); !errors.Is(err, ErrKeyNotExist) { + t.Fatal("Lookup and delete on empty Stack:", err) + } + + if err := m.Lookup(nil, &v); !errors.Is(err, ErrKeyNotExist) { + t.Fatal("Lookup (Peek) on empty Stack:", err) + } } func TestMapInMap(t *testing.T) { diff --git a/types.go b/types.go index 5146721c8..8de253384 100644 --- a/types.go +++ b/types.go @@ -102,6 +102,12 @@ func (mt MapType) hasPerCPUValue() bool { return mt == PerCPUHash || mt == PerCPUArray || mt == LRUCPUHash || mt == PerCPUCGroupStorage } +// isQueueStack returns true if the Map is a Queue (BPF_MAP_TYPE_QUEUE) +// or Stack (BPF_MAP_TYPE_STACK) +func (mt MapType) isQueueStack() bool { + return mt == Queue || mt == Stack +} + // canStoreMapOrProgram returns true if the Map stores references to another Map // or Program. func (mt MapType) canStoreMapOrProgram() bool {