diff --git a/src/amplitude/processor.py b/src/amplitude/processor.py index 72c18b0..5965d74 100644 --- a/src/amplitude/processor.py +++ b/src/amplitude/processor.py @@ -22,7 +22,11 @@ def process_response(self, res, events): self.callback(events, res.code, res.error) self.log(events, res.code, res.error) else: - self.configuration._increase_flush_divider() + # Reduce only if batch didn't exceed current limit (was expected to work). + # Batches larger than limit are from old limits already deemed too large, + # so failing again doesn't provide new information - skip to avoid over-reduction. + if len(events) <= self.configuration.flush_queue_size: + self.configuration._increase_flush_divider() self.push_to_storage(events, 0, res) elif res.status == HttpStatus.INVALID_REQUEST: if res.error.startswith("Invalid API key:"): diff --git a/src/amplitude/worker.py b/src/amplitude/worker.py index 0a292c7..ec78825 100644 --- a/src/amplitude/worker.py +++ b/src/amplitude/worker.py @@ -37,9 +37,43 @@ def stop(self): self.threads_pool.shutdown() def flush(self): + if not self.storage: + return None + events = self.storage.pull_all() - if events: - return self.threads_pool.submit(self.send, events) + + if not events: + return None + + batch_size = self.configuration.flush_queue_size + batches = [events[i:i + batch_size] for i in range(0, len(events), batch_size)] + batch_futures = [self.threads_pool.submit(self.send, batch) for batch in batches] + + if len(batch_futures) == 1: + return batch_futures[0] + + return self._create_combined_future(batch_futures) + + def _create_combined_future(self, batch_futures): + def wait_for_all(): + errors = [] + + for i, future in enumerate(batch_futures): + try: + future.result() + except Exception as e: + self.configuration.logger.error( + f"Flush batch {i+1}/{len(batch_futures)} failed: {e}" + ) + errors.append(e) + + # If any batches failed, raise the first error + if errors: + raise errors[0] + + return None + + return self.threads_pool.submit(wait_for_all) def send(self, events): url = self.configuration.server_url diff --git a/src/test/test_worker.py b/src/test/test_worker.py index 43fd23a..4fcc263 100644 --- a/src/test/test_worker.py +++ b/src/test/test_worker.py @@ -38,11 +38,20 @@ def test_worker_initialize_setup_success(self): self.assertIsNotNone(self.workers.response_processor) def test_worker_stop_success(self): - self.workers.storage.pull_all = MagicMock() + for i in range(5): + self.workers.storage.push(BaseEvent(f"event_{i}", "test_user")) + + success_response = Response(HttpStatus.SUCCESS) + HttpClient.post = MagicMock(return_value=success_response) + self.workers.stop() self.assertFalse(self.workers.is_active) self.assertTrue(self.workers.is_started) - self.workers.storage.pull_all.assert_called_once() + + # Verify storage was flushed (all events removed) + self.assertEqual(0, self.workers.storage.total_events) + self.assertEqual(0, len(self.workers.storage.ready_queue)) + self.assertEqual(0, len(self.workers.storage.buffer_data)) def test_worker_get_payload_success(self): events = [BaseEvent("test_event1", "test_user"), BaseEvent("test_event2", "test_user")] @@ -150,11 +159,25 @@ def test_worker_send_events_with_payload_too_large_response_decrease_flush_queue success_response = Response(HttpStatus.SUCCESS) payload_too_large_response = Response(HttpStatus.PAYLOAD_TOO_LARGE) HttpClient.post = MagicMock() - HttpClient.post.side_effect = [payload_too_large_response, payload_too_large_response, success_response] + # First send gets PAYLOAD_TOO_LARGE (divider 1→2, size 30→15) + # First flush sends 2 batches of 15: + # - Batch 1 (15) fails: len(15) <= 15 → TRUE → increase (divider 2→3, size 15→10) + # - Batch 2 (15) fails: len(15) <= 10 → FALSE → don't increase + # Second flush sends 3 batches of 10, all succeed + HttpClient.post.side_effect = [ + payload_too_large_response, # Initial send of 30 events + payload_too_large_response, # First flush batch 1 (15 events) + payload_too_large_response, # First flush batch 2 (15 events) + success_response, # Second flush batch 1 (10 events) + success_response, # Second flush batch 2 (10 events) + success_response # Second flush batch 3 (10 events) + ] self.workers.configuration.flush_queue_size = 30 self.workers.send(events) self.assertEqual(15, self.workers.configuration.flush_queue_size) self.workers.flush().result() + # After first flush, only first batch increased divider (15 <= 15) + # Second batch didn't (15 > 10), so divider only went 2→3 self.assertEqual(10, self.workers.configuration.flush_queue_size) self.workers.flush().result() self.assertEqual(30, len(self.events_dict[200]))