Skip to content

Commit

Permalink
hubble-relay: Suppress duplicate errors within the same time window
Browse files Browse the repository at this point in the history
This commit adds a new stage to the hubble-relay processing which
supresses duplicate errors which may occur on multiple nodes within a
certain time window.

The list of nodes is merged, such that the reported error contains
each node on which the error occured.

Signed-off-by: Sebastian Wicki <sebastian@isovalent.com>
  • Loading branch information
gandro committed May 20, 2020
1 parent 814f17d commit 65aa5e3
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 15 deletions.
74 changes: 73 additions & 1 deletion pkg/hubble/relay/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,77 @@ func nodeStatusEvent(state relaypb.NodeState, nodeNames ...string) *observerpb.G
}
}

func aggregateErrors(
ctx context.Context,
responses <-chan *observerpb.GetFlowsResponse,
errorAggregationWindow time.Duration,
) <-chan *observerpb.GetFlowsResponse {
aggregated := make(chan *observerpb.GetFlowsResponse, cap(responses))

var flushPending <-chan time.Time
var pendingResponse *observerpb.GetFlowsResponse

go func() {
defer close(aggregated)
aggregateErrorsLoop:
for {
select {
case response, ok := <-responses:
if !ok {
// flush any pending response before exiting
if pendingResponse != nil {
select {
case aggregated <- pendingResponse:
case <-ctx.Done():
}
}
return
}

// any non-error responses are directly forwarded
current := response.GetNodeStatus()
if current.GetStateChange() != relaypb.NodeState_NODE_ERROR {
select {
case aggregated <- response:
continue aggregateErrorsLoop
case <-ctx.Done():
return
}
}

// either merge with pending or flush it
if pending := pendingResponse.GetNodeStatus(); pending != nil {
if current.GetMessage() == pending.GetMessage() {
pending.NodeName = append(pending.NodeName, current.NodeName...)
continue aggregateErrorsLoop
}

select {
case aggregated <- pendingResponse:
case <-ctx.Done():
return
}
}

pendingResponse = response
flushPending = time.After(errorAggregationWindow)
case <-flushPending:
select {
case aggregated <- pendingResponse:
pendingResponse = nil
flushPending = nil
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}

}()
return aggregated
}

// GetFlows implements observer.ObserverServer.GetFlows by proxying requests to
// the hubble instance the proxy is connected to.
func (s *Server) GetFlows(req *observerpb.GetFlowsRequest, stream observerpb.Observer_GetFlowsServer) error {
Expand Down Expand Up @@ -211,7 +282,8 @@ func (s *Server) GetFlows(req *observerpb.GetFlowsRequest, stream observerpb.Obs
close(flows)
}()

sortedFlows := sortFlows(ctx, flows, qlen, s.opts.BufferDrainTimeout)
aggregated := aggregateErrors(ctx, flows, s.opts.ErrorAggregationWindow)
sortedFlows := sortFlows(ctx, aggregated, qlen, s.opts.BufferDrainTimeout)

// inform the client about the nodes from which we expect to receive flows first
if len(connectedNodes) > 0 {
Expand Down
16 changes: 9 additions & 7 deletions pkg/hubble/relay/relayoption/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (

// Default is the reference point for default values.
var Default = Options{
HubbleTarget: "unix://" + defaults.HubbleSockPath,
DialTimeout: 5 * time.Second,
RetryTimeout: 30 * time.Second,
ListenAddress: fmt.Sprintf(":%d", hubbledefaults.RelayPort),
Debug: false,
BufferMaxLen: 100,
BufferDrainTimeout: 1 * time.Second,
HubbleTarget: "unix://" + defaults.HubbleSockPath,
DialTimeout: 5 * time.Second,
RetryTimeout: 30 * time.Second,
ListenAddress: fmt.Sprintf(":%d", hubbledefaults.RelayPort),
Debug: false,

BufferMaxLen: 100,
BufferDrainTimeout: 1 * time.Second,
ErrorAggregationWindow: 3 * time.Second,
}
30 changes: 23 additions & 7 deletions pkg/hubble/relay/relayoption/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import (

// Options stores all the configuration values for the hubble-relay server.
type Options struct {
HubbleTarget string
DialTimeout time.Duration
RetryTimeout time.Duration
ListenAddress string
Debug bool
BufferMaxLen int
BufferDrainTimeout time.Duration
HubbleTarget string
DialTimeout time.Duration
RetryTimeout time.Duration
ListenAddress string
Debug bool

BufferMaxLen int
BufferDrainTimeout time.Duration
ErrorAggregationWindow time.Duration
}

// Option customizes the configuration of the hubble-relay server.
Expand Down Expand Up @@ -111,3 +113,17 @@ func WithBufferDrainTimeout(d time.Duration) Option {
return nil
}
}

// ErrorAggregationWindow sets a time window during which errors with the same
// error message are coalesced. The aggregated error is forwarded to the
// downstream consumer either when the window expires or when a new, different
// error occurs (whichever happens first)
func WithErrorAggregationWindow(d time.Duration) Option {
return func(o *Options) error {
if d <= 0 {
return fmt.Errorf("value for ErrorAggregationWindow must be greater than 0: %d", d)
}
o.ErrorAggregationWindow = d
return nil
}
}

0 comments on commit 65aa5e3

Please sign in to comment.