Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor socket matching #152

Merged
merged 7 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 83 additions & 4 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{},
}

go a.clearSocketLines(ctx)
go a.updateSocketMap(ctx)
return a
}

Expand Down Expand Up @@ -689,7 +690,7 @@ func (a *Aggregator) processHttp2Frames() {
}

req.Latency = d.WriteTimeNs - req.Latency
req.StartTime = d.EventReadTime
req.StartTime = int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6) // nano to milli
req.Completed = true
req.FromIP = skInfo.Saddr
req.ToIP = skInfo.Daddr
Expand Down Expand Up @@ -1019,11 +1020,13 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
log.Logger.Debug().Uint32("pid", d.Pid).
Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).Any("payload", string(d.Payload[:d.PayloadSize])).Msg("socket not found")
return

// go check pid-fd for the socket
a.fetchSocketOnNotFound(ctx, d)
}

reqDto := datastore.Request{
StartTime: d.EventReadTime,
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: skInfo.Saddr,
ToIP: skInfo.Daddr,
Expand Down Expand Up @@ -1245,6 +1248,73 @@ func (a *Aggregator) fetchSocketMap(pid uint32) *SocketMap {
return sockMap
}

// This is a mitigation for the case a tcp event is missed
func (a *Aggregator) updateSocketMap(ctx context.Context) {
ticker := time.NewTicker(3 * time.Minute)

f := func() {
a.liveProcessesMu.RLock()
defer a.liveProcessesMu.RUnlock()
for pid := range a.liveProcesses {
sockMap := a.clusterInfo.SocketMaps[pid]
if sockMap.mu == nil {
continue
}

sockMap.mu.Lock()
for _, skLine := range sockMap.M {
skLine.getConnectionInfo()
}
sockMap.mu.Unlock()
}
}

for {
select {
case <-ticker.C:
f()
case <-ctx.Done():
return
}
}
}

func (a *Aggregator) fetchSocketOnNotFound(ctx context.Context, d *l7_req.L7Event) bool {
a.liveProcessesMu.Lock()

a.liveProcesses[d.Pid] = struct{}{}
sockMap := a.clusterInfo.SocketMaps[d.Pid]
// pid does not exists
// acquire sockMap lock

// in case of reference to mu is nil, pid exec event did not come yet
// create a new mutex for the pid
// to avoid race around the mutex, we need to lock the liveProcessesMu
if sockMap.mu == nil {
log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("fetchSocketOnNotFound: pid not found")

a.muIndex.Add(1)
a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))] = &sync.RWMutex{}
a.clusterInfo.SocketMaps[d.Pid].mu = a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))]
}
a.liveProcessesMu.Unlock()

// creates sockMap.M
skInfo := a.findRelatedSocket(ctx, d)
if skInfo == nil {
// go try reading from kernel files
err := sockMap.M[d.Fd].getConnectionInfo()
if err != nil {
log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Err(err).Msg("fetchSocketOnNotFound: failed to get connection info")
return false
} else {
log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("fetchSocketOnNotFound: connection info found")
return true
}
}
return true
}

func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) *SockInfo {
sockMap := a.clusterInfo.SocketMaps[d.Pid]
// acquire sockMap lock
Expand All @@ -1261,7 +1331,7 @@ func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) *

skLine, ok := sockMap.M[d.Fd]
if !ok {
log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("error finding skLine, go look for it")
log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("create skLine...")
// start new socket line, find already established connections
skLine = NewSocketLine(d.Pid, d.Fd)
sockMap.M[d.Fd] = skLine
Expand Down Expand Up @@ -1475,3 +1545,12 @@ func getPidMax() (int, error) {
}
return pidMax, nil
}

func convertKernelTimeToUserspaceTime(writeTime uint64) uint64 {
// get first timestamp from kernel and corresponding userspace time
return l7_req.FirstUserspaceTime - (l7_req.FirstKernelTime - writeTime)
}

func convertUserTimeToKernelTime(now uint64) uint64 {
return l7_req.FirstKernelTime - (l7_req.FirstUserspaceTime - now)
}
3 changes: 0 additions & 3 deletions aggregator/pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func TestPostgresParseWithKnownStmt(t *testing.T) {
WriteTimeNs: 0,
Tid: 0,
Seq: 0,
EventReadTime: 0,
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down Expand Up @@ -72,7 +71,6 @@ func TestPostgresParseWithKnownStmt(t *testing.T) {
WriteTimeNs: 0,
Tid: 0,
Seq: 0,
EventReadTime: 0,
})

if err != nil {
Expand Down Expand Up @@ -109,7 +107,6 @@ func TestPostgresParseWithUnknownStmt(t *testing.T) {
WriteTimeNs: 0,
Tid: 0,
Seq: 0,
EventReadTime: 0,
})

if err != nil {
Expand Down
120 changes: 119 additions & 1 deletion aggregator/sock_num_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"fmt"
"net"
"os"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -45,6 +47,19 @@ func (nl *SocketLine) AddValue(timestamp uint64, sockInfo *SockInfo) {
nl.mu.Lock()
defer nl.mu.Unlock()

// ignore close events
if sockInfo == nil {
return
}

// if last element is equal to the current element, ignore
if len(nl.Values) > 0 {
last := nl.Values[len(nl.Values)-1].SockInfo
if last != nil && last.Saddr == sockInfo.Saddr && last.Sport == sockInfo.Sport && last.Daddr == sockInfo.Daddr && last.Dport == sockInfo.Dport {
return
}
}

nl.Values = insertIntoSortedSlice(nl.Values, TimestampedSocket{Timestamp: timestamp, SockInfo: sockInfo})
}

Expand Down Expand Up @@ -100,7 +115,6 @@ func (nl *SocketLine) DeleteUnused() {
for i < len(nl.Values)-1 {
if nl.Values[i].SockInfo != nil && nl.Values[i+1].SockInfo != nil {
result = append(result, nl.Values[i+1])
log.Logger.Debug().Msgf("deleting socket line %v", nl.Values[i])
i = i + 2
} else {
result = append(result, nl.Values[i])
Expand Down Expand Up @@ -249,3 +263,107 @@ func insertIntoSortedSlice(sortedSlice []TimestampedSocket, newItem TimestampedS

return sortedSlice
}

// reverse slice
func reverseSlice(s []string) []string {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
return s
}

// convertHexToIP converts a hex string IP address to a human-readable IP address.
func convertHexToIP(hex string) string {
var ipParts []string
for i := 0; i < len(hex); i += 2 {
part, _ := strconv.ParseInt(hex[i:i+2], 16, 64)
ipParts = append(ipParts, fmt.Sprintf("%d", part))
}
ipParts = reverseSlice(ipParts)
return strings.Join(ipParts, ".")
}

// convertHexToPort converts a hex string port to a human-readable port.
func convertHexToPort(hex string) int {
port, _ := strconv.ParseInt(hex, 16, 64)
if port < 0 || port > 65535 {
return 0
}
return int(port)
Fixed Show fixed Hide fixed
}

func getInodeFromFD(pid, fd string) (string, error) {
fdPath := fmt.Sprintf("/proc/%s/fd/%s", pid, fd)
link, err := os.Readlink(fdPath)
if err != nil {
return "", err
}

re := regexp.MustCompile(`socket:\[(\d+)\]`)
match := re.FindStringSubmatch(link)
if len(match) < 2 {
return "", fmt.Errorf("no inode found in link: %s", link)
}

return match[1], nil
}

func findTCPConnection(inode string, pid string) (string, error) {
tcpFile, err := os.Open(fmt.Sprintf("/proc/%s/net/tcp", pid))
if err != nil {
return "", err
}
defer tcpFile.Close()

scanner := bufio.NewScanner(tcpFile)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, inode) {
return line, nil
}
}

return "", fmt.Errorf("no TCP connection found for inode %s", inode)
}

func parseTcpLine(line string) (localIP string, localPort int, remoteIP string, remotePort int) {
fields := strings.Fields(line)
localAddress := fields[1]
remoteAddress := fields[2]

localIP = convertHexToIP(localAddress[:8])
localPort = convertHexToPort(localAddress[9:])
remoteIP = convertHexToIP(remoteAddress[:8])
remotePort = convertHexToPort(remoteAddress[9:])

return
}

func (nl *SocketLine) getConnectionInfo() error {
inode, err := getInodeFromFD(fmt.Sprintf("%d", nl.pid), fmt.Sprintf("%d", nl.fd))
if err != nil {
return err
}

connectionInfo, err := findTCPConnection(inode, fmt.Sprintf("%d", nl.pid))
if err != nil {
return err
}

localIP, localPort, remoteIP, remotePort := parseTcpLine(connectionInfo)

skInfo := &SockInfo{
Pid: nl.pid,
Fd: nl.fd,
Saddr: localIP,
Sport: uint16(localPort),
Daddr: remoteIP,
Dport: uint16(remotePort),
}

// add to socket line
// convert to bpf time
log.Logger.Debug().Msgf("Adding socket line read from user space %v", skInfo)
nl.AddValue(convertUserTimeToKernelTime(uint64(time.Now().UnixNano())), skInfo)
return nil
}
Binary file modified ebpf/c/bpf_bpfeb.o
Binary file not shown.
Binary file modified ebpf/c/bpf_bpfel.o
Binary file not shown.
15 changes: 8 additions & 7 deletions ebpf/c/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,15 @@ SEC("tracepoint/syscalls/sys_exit_connect")
int sys_exit_connect(void *ctx)
{
__u64 id = bpf_get_current_pid_tgid();
__u32 pid = id >> 32;

__u8 *val = bpf_map_lookup_elem(&container_pids, &pid);
if (!val)
{
return 0; // not a container process, ignore
}
// __u32 pid = id >> 32;

bpf_map_delete_elem(&fd_by_pid_tgid, &id);

// __u8 *val = bpf_map_lookup_elem(&container_pids, &pid);
// if (!val)
// {
// return 0; // not a container process, ignore
// }

return 0;
}
13 changes: 11 additions & 2 deletions ebpf/l7_req/l7.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"os"
"sync"
"time"
"unsafe"

Expand Down Expand Up @@ -247,6 +248,9 @@ func (e RedisMethodConversion) String() string {
}
}

var FirstKernelTime uint64 = 0 // nanoseconds since boot
var FirstUserspaceTime uint64 = 0

// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
// // go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS bpf l7.c -- -I../headers

Expand Down Expand Up @@ -320,7 +324,6 @@ type L7Event struct {
WriteTimeNs uint64 // start time of write syscall
Tid uint32
Seq uint32 // tcp seq num
EventReadTime int64
}

const L7_EVENT = "l7_event"
Expand Down Expand Up @@ -567,6 +570,7 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) {
}
}()

readKernelTime := &sync.Once{}
go func() {
var record perf.Record
droppedCount := 0
Expand All @@ -588,6 +592,12 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) {

l7Event := (*bpfL7Event)(unsafe.Pointer(&record.RawSample[0]))

// runs once
readKernelTime.Do(func() {
FirstUserspaceTime = uint64(time.Now().UnixNano())
FirstKernelTime = l7Event.WriteTimeNs
})

protocol := L7ProtocolConversion(l7Event.Protocol).String()
var method string
switch protocol {
Expand Down Expand Up @@ -624,7 +634,6 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) {
WriteTimeNs: l7Event.WriteTimeNs,
Tid: l7Event.Tid,
Seq: l7Event.Seq,
EventReadTime: time.Now().UnixMilli(),
}

go func(l7Event *L7Event) {
Expand Down
5 changes: 2 additions & 3 deletions main_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,8 @@ func (sim *Simulator) httpTraffic(ctx context.Context, t *Traffic) {
WriteTimeNs: t.pod.OpenConnections[t.fd] + 10,

// tracing purposes
Tid: 0,
Seq: 0,
EventReadTime: 0,
Tid: 0,
Seq: 0,
}
// select {
// case
Expand Down
Loading