From 954e28855b6470a60b1f4ec6447c9b39aa0df610 Mon Sep 17 00:00:00 2001 From: Jin Xu Date: Tue, 18 Nov 2025 16:11:50 -0800 Subject: [PATCH 1/2] fix: Respect flush_queue_size in Workers.flush() to prevent payload size errors --- src/amplitude/processor.py | 5 ++++- src/amplitude/worker.py | 46 ++++++++++++++++++++++++++++++++++++-- src/test/test_worker.py | 29 +++++++++++++++++++++--- 3 files changed, 74 insertions(+), 6 deletions(-) diff --git a/src/amplitude/processor.py b/src/amplitude/processor.py index 72c18b0..efa54a7 100644 --- a/src/amplitude/processor.py +++ b/src/amplitude/processor.py @@ -22,7 +22,10 @@ def process_response(self, res, events): self.callback(events, res.code, res.error) self.log(events, res.code, res.error) else: - self.configuration._increase_flush_divider() + # Only reduce if batch was at or below current limit + # This prevents multiple reductions from same flush operation + if len(events) <= self.configuration.flush_queue_size: + self.configuration._increase_flush_divider() self.push_to_storage(events, 0, res) elif res.status == HttpStatus.INVALID_REQUEST: if res.error.startswith("Invalid API key:"): diff --git a/src/amplitude/worker.py b/src/amplitude/worker.py index 0a292c7..3a0d64c 100644 --- a/src/amplitude/worker.py +++ b/src/amplitude/worker.py @@ -37,9 +37,51 @@ def stop(self): self.threads_pool.shutdown() def flush(self): + if not self.storage: + return None + events = self.storage.pull_all() - if events: - return self.threads_pool.submit(self.send, events) + + if not events: + return None + + batch_size = self.configuration.flush_queue_size + batches = [] + for i in range(0, len(events), batch_size): + batches.append(events[i:i + batch_size]) + + batch_futures = [] + for batch in batches: + batch_futures.append(self.threads_pool.submit(self.send, batch)) + + if len(batch_futures) == 1: + return batch_futures[0] + + return self._create_combined_future(batch_futures) + + def _create_combined_future(self, batch_futures): + def wait_for_all(): + errors = [] + + for i, future in enumerate(batch_futures): + try: + future.result() + except Exception as e: + self.configuration.logger.error( + f"Flush batch {i+1}/{len(batch_futures)} failed: {e}" + ) + errors.append(e) + + # If any batches failed, raise the first error + if errors: + raise errors[0] + + self.configuration.logger.info( + f"Flush completed: {len(batch_futures)} batches sent" + ) + return None + + return self.threads_pool.submit(wait_for_all) def send(self, events): url = self.configuration.server_url diff --git a/src/test/test_worker.py b/src/test/test_worker.py index 43fd23a..4fcc263 100644 --- a/src/test/test_worker.py +++ b/src/test/test_worker.py @@ -38,11 +38,20 @@ def test_worker_initialize_setup_success(self): self.assertIsNotNone(self.workers.response_processor) def test_worker_stop_success(self): - self.workers.storage.pull_all = MagicMock() + for i in range(5): + self.workers.storage.push(BaseEvent(f"event_{i}", "test_user")) + + success_response = Response(HttpStatus.SUCCESS) + HttpClient.post = MagicMock(return_value=success_response) + self.workers.stop() self.assertFalse(self.workers.is_active) self.assertTrue(self.workers.is_started) - self.workers.storage.pull_all.assert_called_once() + + # Verify storage was flushed (all events removed) + self.assertEqual(0, self.workers.storage.total_events) + self.assertEqual(0, len(self.workers.storage.ready_queue)) + self.assertEqual(0, len(self.workers.storage.buffer_data)) def test_worker_get_payload_success(self): events = [BaseEvent("test_event1", "test_user"), BaseEvent("test_event2", "test_user")] @@ -150,11 +159,25 @@ def test_worker_send_events_with_payload_too_large_response_decrease_flush_queue success_response = Response(HttpStatus.SUCCESS) payload_too_large_response = Response(HttpStatus.PAYLOAD_TOO_LARGE) HttpClient.post = MagicMock() - HttpClient.post.side_effect = [payload_too_large_response, payload_too_large_response, success_response] + # First send gets PAYLOAD_TOO_LARGE (divider 1→2, size 30→15) + # First flush sends 2 batches of 15: + # - Batch 1 (15) fails: len(15) <= 15 → TRUE → increase (divider 2→3, size 15→10) + # - Batch 2 (15) fails: len(15) <= 10 → FALSE → don't increase + # Second flush sends 3 batches of 10, all succeed + HttpClient.post.side_effect = [ + payload_too_large_response, # Initial send of 30 events + payload_too_large_response, # First flush batch 1 (15 events) + payload_too_large_response, # First flush batch 2 (15 events) + success_response, # Second flush batch 1 (10 events) + success_response, # Second flush batch 2 (10 events) + success_response # Second flush batch 3 (10 events) + ] self.workers.configuration.flush_queue_size = 30 self.workers.send(events) self.assertEqual(15, self.workers.configuration.flush_queue_size) self.workers.flush().result() + # After first flush, only first batch increased divider (15 <= 15) + # Second batch didn't (15 > 10), so divider only went 2→3 self.assertEqual(10, self.workers.configuration.flush_queue_size) self.workers.flush().result() self.assertEqual(30, len(self.events_dict[200])) From b45771f0902a2bb3eae11f9645f8c67ed3508ef8 Mon Sep 17 00:00:00 2001 From: Jin Xu Date: Wed, 19 Nov 2025 09:09:13 -0800 Subject: [PATCH 2/2] use list comprehension --- src/amplitude/processor.py | 5 +++-- src/amplitude/worker.py | 12 ++---------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/src/amplitude/processor.py b/src/amplitude/processor.py index efa54a7..5965d74 100644 --- a/src/amplitude/processor.py +++ b/src/amplitude/processor.py @@ -22,8 +22,9 @@ def process_response(self, res, events): self.callback(events, res.code, res.error) self.log(events, res.code, res.error) else: - # Only reduce if batch was at or below current limit - # This prevents multiple reductions from same flush operation + # Reduce only if batch didn't exceed current limit (was expected to work). + # Batches larger than limit are from old limits already deemed too large, + # so failing again doesn't provide new information - skip to avoid over-reduction. if len(events) <= self.configuration.flush_queue_size: self.configuration._increase_flush_divider() self.push_to_storage(events, 0, res) diff --git a/src/amplitude/worker.py b/src/amplitude/worker.py index 3a0d64c..ec78825 100644 --- a/src/amplitude/worker.py +++ b/src/amplitude/worker.py @@ -46,13 +46,8 @@ def flush(self): return None batch_size = self.configuration.flush_queue_size - batches = [] - for i in range(0, len(events), batch_size): - batches.append(events[i:i + batch_size]) - - batch_futures = [] - for batch in batches: - batch_futures.append(self.threads_pool.submit(self.send, batch)) + batches = [events[i:i + batch_size] for i in range(0, len(events), batch_size)] + batch_futures = [self.threads_pool.submit(self.send, batch) for batch in batches] if len(batch_futures) == 1: return batch_futures[0] @@ -76,9 +71,6 @@ def wait_for_all(): if errors: raise errors[0] - self.configuration.logger.info( - f"Flush completed: {len(batch_futures)} batches sent" - ) return None return self.threads_pool.submit(wait_for_all)