Skip to content

Commit

Permalink
Merge pull request #118 from ddosify/fix/delay-ds-start
Browse files Browse the repository at this point in the history
fix early ds start
  • Loading branch information
fatihbaltaci committed Mar 22, 2024
2 parents dae5ba9 + f34da2b commit bbda366
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 43 deletions.
16 changes: 10 additions & 6 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{},

// set distinct mutex for every live process
for pid := range a.liveProcesses {
a.muIndex.Add(1)
a.muArray[a.muIndex.Load()] = &sync.RWMutex{}
sockMaps[pid].mu = a.muArray[a.muIndex.Load()]
a.muIndex.Add(1)
a.getAlreadyExistingSockets(pid)
}

Expand Down Expand Up @@ -455,15 +455,14 @@ func (a *Aggregator) processExec(d *proc.ProcEvent) {

a.liveProcesses[d.Pid] = struct{}{}

// create lock on demand
a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))] = &sync.RWMutex{}
a.muIndex.Add(1)

// if duplicate exec event comes, underlying mutex will be changed
// if first assigned mutex is locked and another exec event comes, mutex will be changed
// and unlock of unlocked mutex now is a possibility
// to avoid this case, if a socket map already has a mutex, don't change it
if a.clusterInfo.SocketMaps[d.Pid].mu == nil {
// create lock on demand
a.muIndex.Add(1)
a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))] = &sync.RWMutex{}
a.clusterInfo.SocketMaps[d.Pid].mu = a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))]
}
}
Expand Down Expand Up @@ -687,8 +686,8 @@ func (a *Aggregator) processHttp2Frames() {
return
}

req.StartTime = d.EventReadTime
req.Latency = d.WriteTimeNs - req.Latency
req.StartTime = d.EventReadTime
req.Completed = true
req.FromIP = skInfo.Saddr
req.ToIP = skInfo.Daddr
Expand All @@ -712,6 +711,11 @@ func (a *Aggregator) processHttp2Frames() {
return
}

if d.WriteTimeNs < req.Latency {
// ignore
return
}

a.ds.PersistRequest(req)
}

Expand Down
66 changes: 37 additions & 29 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ type BackendDS struct {

traceInfoPool *poolutil.Pool[*TraceInfo]

metricsExport bool
gpuMetricsExport bool
metricsExportInterval int

podEventChan chan interface{} // *PodEvent
svcEventChan chan interface{} // *SvcEvent
depEventChan chan interface{} // *DepEvent
Expand Down Expand Up @@ -277,32 +281,38 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
if err != nil {
bs = defaultBatchSize
}

resourceChanSize := 200

ds := &BackendDS{
ctx: ctx,
host: conf.Host,
c: client,
batchSize: bs,
reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}),
aliveConnPool: newAliveConnPool(func() *ConnInfo { return &ConnInfo{} }, func(r *ConnInfo) {}),
traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}),
reqChanBuffer: make(chan *ReqInfo, conf.ReqBufferSize),
connChanBuffer: make(chan *ConnInfo, conf.ConnBufferSize),
podEventChan: make(chan interface{}, 5*resourceChanSize),
svcEventChan: make(chan interface{}, 2*resourceChanSize),
rsEventChan: make(chan interface{}, 2*resourceChanSize),
depEventChan: make(chan interface{}, 2*resourceChanSize),
epEventChan: make(chan interface{}, resourceChanSize),
containerEventChan: make(chan interface{}, 5*resourceChanSize),
dsEventChan: make(chan interface{}, resourceChanSize),
traceEventQueue: list.New(),
ctx: ctx,
host: conf.Host,
c: client,
batchSize: bs,
reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}),
aliveConnPool: newAliveConnPool(func() *ConnInfo { return &ConnInfo{} }, func(r *ConnInfo) {}),
traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}),
reqChanBuffer: make(chan *ReqInfo, conf.ReqBufferSize),
connChanBuffer: make(chan *ConnInfo, conf.ConnBufferSize),
podEventChan: make(chan interface{}, 5*resourceChanSize),
svcEventChan: make(chan interface{}, 2*resourceChanSize),
rsEventChan: make(chan interface{}, 2*resourceChanSize),
depEventChan: make(chan interface{}, 2*resourceChanSize),
epEventChan: make(chan interface{}, resourceChanSize),
containerEventChan: make(chan interface{}, 5*resourceChanSize),
dsEventChan: make(chan interface{}, resourceChanSize),
traceEventQueue: list.New(),
metricsExport: conf.MetricsExport,
gpuMetricsExport: conf.GpuMetricsExport,
metricsExportInterval: conf.MetricsExportInterval,
}

go ds.sendReqsInBatch(bs)
go ds.sendConnsInBatch(bs)
go ds.sendTraceEventsInBatch(10 * bs)
return ds
}

func (ds *BackendDS) Start() {
go ds.sendReqsInBatch(ds.batchSize)
go ds.sendConnsInBatch(ds.batchSize)
go ds.sendTraceEventsInBatch(10 * ds.batchSize)

// events are resynced every 60 seconds on k8s informers
// resourceBatchSize ~ burst size, if more than resourceBatchSize events are sent in a moment, blocking can occur
Expand All @@ -321,17 +331,17 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe

// send node-exporter and nvidia-gpu metrics
go func() {
if !(conf.MetricsExport || conf.GpuMetricsExport) {
if !(ds.metricsExport || ds.gpuMetricsExport) {
return
}

var nodeMetrics, gpuMetrics bool
if conf.MetricsExport {
if ds.metricsExport {
go ds.exportNodeMetrics()
nodeMetrics = true // by default
}

if conf.GpuMetricsExport {
if ds.gpuMetricsExport {
err := ds.exportGpuMetrics()
if err != nil {
log.Logger.Error().Msgf("error exporting gpu metrics: %v", err)
Expand All @@ -340,7 +350,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
}
}

t := time.NewTicker(time.Duration(conf.MetricsExportInterval) * time.Second)
t := time.NewTicker(time.Duration(ds.metricsExportInterval) * time.Second)
for {
select {
case <-ds.ctx.Done():
Expand Down Expand Up @@ -380,8 +390,6 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
// ds.reqInfoPool.Done()
log.Logger.Info().Msg("backend datastore stopped")
}()

return ds
}

func (b *BackendDS) enqueueTraceInfo(traceInfo *TraceInfo) {
Expand Down Expand Up @@ -478,7 +486,7 @@ func (b *BackendDS) sendMetricsToBackend(r io.Reader) {
return
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(b.ctx, 10*time.Second)
defer cancel()

resp, err := b.c.Do(req.WithContext(ctx))
Expand Down Expand Up @@ -980,7 +988,7 @@ type nodeExportLogger struct {
}

func (l nodeExportLogger) Log(keyvals ...interface{}) error {
l.logger.Debug().Msg(fmt.Sprint(keyvals...))
// l.logger.Debug().Msg(fmt.Sprint(keyvals...))
return nil
}

Expand Down
8 changes: 1 addition & 7 deletions ebpf/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,6 @@ func (e *EbpfCollector) close() {
log.Logger.Info().Msg("ebpf collector closed")
}

// in order to prevent the memory peak at the beginning
// we'll attach to processes one by one
func (e *EbpfCollector) ListenForEncryptedReqs(pid uint32) {
e.tlsAttachQueue <- pid
}

// we check the size of the executable before reading it into memory
// because it can be very large
// otherwise we can get stuck to memory limit defined in k8s
Expand All @@ -191,6 +185,7 @@ func (e *EbpfCollector) AttachUprobesForEncrypted() {
e.mu.Unlock()

go func(pid uint32) {
log.Logger.Debug().Str("ctx", "tls-uprobes").Uint32("pid", pid).Msg("attaching uprobes for encrypted connections")
// attach to libssl uprobes if process is using libssl
errs := e.AttachSslUprobesOnProcess("/proc", pid)
if len(errs) > 0 {
Expand Down Expand Up @@ -221,7 +216,6 @@ func (e *EbpfCollector) AttachUprobesForEncrypted() {
}

}(pid)

}
}

Expand Down
Binary file modified ebpf/l7_req/bpf_bpfeb.o
Binary file not shown.
Binary file modified ebpf/l7_req/bpf_bpfel.o
Binary file not shown.
Binary file added ebpf/proc/bpf_bpfeb.o
Binary file not shown.
Binary file added ebpf/proc/bpf_bpfel.o
Binary file not shown.
Binary file modified ebpf/tcp_state/bpf_bpfeb.o
Binary file not shown.
Binary file modified ebpf/tcp_state/bpf_bpfel.o
Binary file not shown.
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func main() {
ReqBufferSize: 40000, // TODO: get from a conf file
ConnBufferSize: 1000, // TODO: get from a conf file
})
go dsBackend.SendHealthCheck(ebpfEnabled, metricsEnabled, distTracingEnabled, k8sVersion)

// deploy ebpf programs
var ec *ebpf.EbpfCollector
Expand All @@ -81,6 +80,8 @@ func main() {
go ec.ListenEvents()
}

dsBackend.Start()
go dsBackend.SendHealthCheck(ebpfEnabled, metricsEnabled, distTracingEnabled, k8sVersion)
go http.ListenAndServe(":8181", nil)

<-k8sCollector.Done()
Expand Down

0 comments on commit bbda366

Please sign in to comment.