Skip to content

Commit

Permalink
hubble/relay: flush old flows when the buffer drain timeout is reached
Browse files Browse the repository at this point in the history
[ upstream commit 7d56068 ]

Draining only 1 flow every time the buffer drain timeout is reached
proves to be insufficient for certain cases. The worst case typically
happens for a request with historic data, some filters and follow-mode.
Using the Hubble CLI, this means requests like this one:

    hubble observe --last=100 --follow --pod kube-system/coredns-f9fd979d6-66mfg

The behavior before this patch for a request like this, assuming 250
flows matching the requests are stored in Hubble instances ring buffers
and new flows matching the filter criteria are infrequent, would be to
forward 150 flows, then drain 1 flow to forward every time the drain
timeout is reached. This means that for a buffer size of 100, it would
take at least 100 seconds (with the default buffer drain timeout of 1s)
to drain the buffer (unless enough new flows are received in the
meantime and fill the buffer again).

The new code drains all flows in the queue for which the timestamp is
older than `now-bufferDrainTimeout`. In other words, this would be the
old behavior:

    ...
    QUEUE FULL draining flow 149
    QUEUE FULL draining flow 150
    TIMEOUT REACHED
    DRAINED 1 flow
    TIMEOUT REACHED
    DRAINED 1 flow
    TIMEOUT REACHED
    DRAINED 1 flow
    ...

And this is the new behavior:

    ...
    QUEUE FULL draining flow 149
    QUEUE FULL draining flow 150
    TIMEOUT REACHED
    DRAINED 100 flow
    TIMEOUT REACHED
    DRAINED 0 flow
    TIMEOUT REACHED
    DRAINED 0 flow
    ...

If new flows are received but not in enough numbers to fill the buffer
within the time window of `bufferDrainTimeout`, the behavior would look
something like this:

    ...
    QUEUE FULL draining flow 149
    QUEUE FULL draining flow 150
    TIMEOUT REACHED
    DRAINED 100 flow
    TIMEOUT REACHED
    DRAINED 34 flow
    TIMEOUT REACHED
    DRAINED 72 flow
    ...

In effect, this means that a time window of `bufferDrainTimeout`
(default to 1s) is considered to sort flows before forwarding them to
the client. This dramatically improves the query experience for requests
in follow-mode.

Suggested-by: Tom Payne <tom@isovalent.com>
Signed-off-by: Robin Hahling <robin.hahling@gw-computing.net>
  • Loading branch information
rolinh authored and jrfastab committed Nov 6, 2020
1 parent ecbd10a commit f844457
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions pkg/hubble/relay/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,11 @@ func sortFlows(
}
}
pq.Push(flow)
case <-time.After(bufferDrainTimeout): // make sure to drain the queue when no new flow responses are received
if f := pq.Pop(); f != nil {
case t := <-time.After(bufferDrainTimeout):
// Make sure to drain old flows from the queue when no new
// flows are received. The bufferDrainTimeout duration is used
// as a sorting window.
for _, f := range pq.PopOlderThan(t.Add(-bufferDrainTimeout)) {
select {
case sortedFlows <- f:
case <-ctx.Done():
Expand All @@ -120,9 +123,7 @@ func sortFlows(
return
}
}

}()

return sortedFlows
}

Expand Down

0 comments on commit f844457

Please sign in to comment.