From 8be2c839a7d01d2f96dae89c478cf27b1600e870 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 7 Mar 2023 00:10:37 -0500 Subject: [PATCH] Re-poll immediately on full batch Signed-off-by: Peter Broadhurst --- internal/events/event_poller.go | 10 ++++++++-- internal/events/event_poller_test.go | 11 ++++++++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/internal/events/event_poller.go b/internal/events/event_poller.go index e5ee1d5e4e..3cbab4bf76 100644 --- a/internal/events/event_poller.go +++ b/internal/events/event_poller.go @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2023 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -282,9 +282,15 @@ func (ep *eventPoller) shoulderTap() { func (ep *eventPoller) waitForShoulderTapOrPollTimeout(lastEventCount int) bool { l := log.L(ep.ctx) longTimeoutDuration := ep.conf.eventPollTimeout + + if lastEventCount >= ep.conf.eventBatchSize { + l.Tracef("Polling immediately due to full previous event batch") + return true + } + // For throughput optimized environments, we can set an eventBatchingTimeout to allow messages to arrive // between polling cycles (at the cost of some dispatch latency) - if ep.conf.eventBatchTimeout > 0 && lastEventCount > 0 && lastEventCount < ep.conf.eventBatchSize { + if ep.conf.eventBatchTimeout > 0 && lastEventCount > 0 { shortTimeout := time.NewTimer(ep.conf.eventBatchTimeout) select { case <-shortTimeout.C: diff --git a/internal/events/event_poller_test.go b/internal/events/event_poller_test.go index f7d633a8b7..3e395c4f4c 100644 --- a/internal/events/event_poller_test.go +++ b/internal/events/event_poller_test.go @@ -279,12 +279,21 @@ func TestWaitForShoulderTapOrExitCloseBatch(t *testing.T) { assert.False(t, ep.waitForShoulderTapOrPollTimeout(1)) } -func TestWaitForShoulderTapOrExitClosePoll(t *testing.T) { +func TestWaitForShoulderTapImmediateRepollOnFullBatch(t *testing.T) { mdi := &databasemocks.Plugin{} ep, cancel := newTestEventPoller(t, mdi, nil, nil) cancel() ep.conf.eventBatchTimeout = 1 * time.Minute ep.conf.eventBatchSize = 1 + assert.True(t, ep.waitForShoulderTapOrPollTimeout(1)) +} + +func TestWaitForShoulderTapOrExitClosePoll(t *testing.T) { + mdi := &databasemocks.Plugin{} + ep, cancel := newTestEventPoller(t, mdi, nil, nil) + cancel() + ep.conf.eventBatchTimeout = 1 * time.Minute + ep.conf.eventBatchSize = 2 assert.False(t, ep.waitForShoulderTapOrPollTimeout(1)) }