From 2cce696fff2d6e98fcf4c00294d0d7a907c4b538 Mon Sep 17 00:00:00 2001 From: Daniel Saxton Date: Mon, 3 Nov 2025 12:19:03 -0600 Subject: [PATCH 1/3] Respect flush_queue_size when calling Workers.flush --- src/amplitude/timeline.py | 2 +- src/amplitude/worker.py | 22 ++++++++++++---------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/amplitude/timeline.py b/src/amplitude/timeline.py index f1c018e..66f94b6 100644 --- a/src/amplitude/timeline.py +++ b/src/amplitude/timeline.py @@ -44,7 +44,7 @@ def flush(self): destination_futures = [] for destination in self.plugins[PluginType.DESTINATION]: try: - destination_futures.append(destination.flush()) + destination_futures.extend(destination.flush()) except Exception: self.logger.exception("Error for flush events") return destination_futures diff --git a/src/amplitude/worker.py b/src/amplitude/worker.py index 0a292c7..b00d05b 100644 --- a/src/amplitude/worker.py +++ b/src/amplitude/worker.py @@ -8,7 +8,6 @@ class Workers: - def __init__(self): self.threads_pool = ThreadPoolExecutor(max_workers=16) self.is_active = True @@ -37,9 +36,13 @@ def stop(self): self.threads_pool.shutdown() def flush(self): - events = self.storage.pull_all() - if events: - return self.threads_pool.submit(self.send, events) + futures = [] + while self.storage.total_events: + events = self.storage.pull(self.configuration.flush_queue_size) + if events: + future = self.threads_pool.submit(self.send, events) + futures.append(future) + return futures def send(self, events): url = self.configuration.server_url @@ -51,23 +54,22 @@ def send(self, events): self.configuration.logger.error("Invalid API Key") def get_payload(self, events) -> bytes: - payload_body = { - "api_key": self.configuration.api_key, - "events": [] - } + payload_body = {"api_key": self.configuration.api_key, "events": []} for event in events: event_body = event.get_event_body() if event_body: payload_body["events"].append(event_body) if self.configuration.options: payload_body["options"] = self.configuration.options - return json.dumps(payload_body, sort_keys=True).encode('utf8') + return json.dumps(payload_body, sort_keys=True).encode("utf8") def buffer_consumer(self): try: if self.is_active: with self.storage.lock: - self.storage.lock.wait(self.configuration.flush_interval_millis / 1000) + self.storage.lock.wait( + self.configuration.flush_interval_millis / 1000 + ) while True: if not self.storage.total_events: break From 3f4fad8d7be140ed72f5b077ef71844424258640 Mon Sep 17 00:00:00 2001 From: Daniel Saxton Date: Mon, 10 Nov 2025 15:24:35 -0600 Subject: [PATCH 2/3] Use self.storage.lock --- src/amplitude/worker.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/amplitude/worker.py b/src/amplitude/worker.py index b00d05b..fa4d064 100644 --- a/src/amplitude/worker.py +++ b/src/amplitude/worker.py @@ -37,11 +37,12 @@ def stop(self): def flush(self): futures = [] - while self.storage.total_events: - events = self.storage.pull(self.configuration.flush_queue_size) - if events: - future = self.threads_pool.submit(self.send, events) - futures.append(future) + with self.storage.lock: + while self.storage.total_events: + events = self.storage.pull(self.configuration.flush_queue_size) + if events: + future = self.threads_pool.submit(self.send, events) + futures.append(future) return futures def send(self, events): From 4964d8030570759b6b8cc772fe59ea280d619cfa Mon Sep 17 00:00:00 2001 From: Daniel Saxton Date: Wed, 12 Nov 2025 09:27:16 -0600 Subject: [PATCH 3/3] Fix --- src/amplitude/worker.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/amplitude/worker.py b/src/amplitude/worker.py index fa4d064..de5a107 100644 --- a/src/amplitude/worker.py +++ b/src/amplitude/worker.py @@ -38,11 +38,11 @@ def stop(self): def flush(self): futures = [] with self.storage.lock: - while self.storage.total_events: + while True: events = self.storage.pull(self.configuration.flush_queue_size) - if events: - future = self.threads_pool.submit(self.send, events) - futures.append(future) + if not events: + break + futures.append(self.threads_pool.submit(self.send, events)) return futures def send(self, events):