From f8444570361f81ff95bc0255cfeeae00b4854f8c Mon Sep 17 00:00:00 2001 From: Robin Hahling Date: Mon, 26 Oct 2020 17:25:49 +0100 Subject: [PATCH] hubble/relay: flush old flows when the buffer drain timeout is reached [ upstream commit 7d56068cff16f1a16018766528c06139bc586427 ] Draining only 1 flow every time the buffer drain timeout is reached proves to be insufficient for certain cases. The worst case typically happens for a request with historic data, some filters and follow-mode. Using the Hubble CLI, this means requests like this one: hubble observe --last=100 --follow --pod kube-system/coredns-f9fd979d6-66mfg The behavior before this patch for a request like this, assuming 250 flows matching the requests are stored in Hubble instances ring buffers and new flows matching the filter criteria are infrequent, would be to forward 150 flows, then drain 1 flow to forward every time the drain timeout is reached. This means that for a buffer size of 100, it would take at least 100 seconds (with the default buffer drain timeout of 1s) to drain the buffer (unless enough new flows are received in the meantime and fill the buffer again). The new code drains all flows in the queue for which the timestamp is older than `now-bufferDrainTimeout`. In other words, this would be the old behavior: ... QUEUE FULL draining flow 149 QUEUE FULL draining flow 150 TIMEOUT REACHED DRAINED 1 flow TIMEOUT REACHED DRAINED 1 flow TIMEOUT REACHED DRAINED 1 flow ... And this is the new behavior: ... QUEUE FULL draining flow 149 QUEUE FULL draining flow 150 TIMEOUT REACHED DRAINED 100 flow TIMEOUT REACHED DRAINED 0 flow TIMEOUT REACHED DRAINED 0 flow ... If new flows are received but not in enough numbers to fill the buffer within the time window of `bufferDrainTimeout`, the behavior would look something like this: ... QUEUE FULL draining flow 149 QUEUE FULL draining flow 150 TIMEOUT REACHED DRAINED 100 flow TIMEOUT REACHED DRAINED 34 flow TIMEOUT REACHED DRAINED 72 flow ... In effect, this means that a time window of `bufferDrainTimeout` (default to 1s) is considered to sort flows before forwarding them to the client. This dramatically improves the query experience for requests in follow-mode. Suggested-by: Tom Payne Signed-off-by: Robin Hahling --- pkg/hubble/relay/observer/observer.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/hubble/relay/observer/observer.go b/pkg/hubble/relay/observer/observer.go index 9045ca9f5253c..458d3aff57733 100644 --- a/pkg/hubble/relay/observer/observer.go +++ b/pkg/hubble/relay/observer/observer.go @@ -100,8 +100,11 @@ func sortFlows( } } pq.Push(flow) - case <-time.After(bufferDrainTimeout): // make sure to drain the queue when no new flow responses are received - if f := pq.Pop(); f != nil { + case t := <-time.After(bufferDrainTimeout): + // Make sure to drain old flows from the queue when no new + // flows are received. The bufferDrainTimeout duration is used + // as a sorting window. + for _, f := range pq.PopOlderThan(t.Add(-bufferDrainTimeout)) { select { case sortedFlows <- f: case <-ctx.Done(): @@ -120,9 +123,7 @@ func sortFlows( return } } - }() - return sortedFlows }