Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/amplitude/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ def process_response(self, res, events):
self.callback(events, res.code, res.error)
self.log(events, res.code, res.error)
else:
self.configuration._increase_flush_divider()
# Reduce only if batch didn't exceed current limit (was expected to work).
# Batches larger than limit are from old limits already deemed too large,
# so failing again doesn't provide new information - skip to avoid over-reduction.
if len(events) <= self.configuration.flush_queue_size:
self.configuration._increase_flush_divider()
self.push_to_storage(events, 0, res)
elif res.status == HttpStatus.INVALID_REQUEST:
if res.error.startswith("Invalid API key:"):
Expand Down
38 changes: 36 additions & 2 deletions src/amplitude/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,43 @@ def stop(self):
self.threads_pool.shutdown()

def flush(self):
if not self.storage:
return None

events = self.storage.pull_all()
if events:
return self.threads_pool.submit(self.send, events)

if not events:
return None

batch_size = self.configuration.flush_queue_size
batches = [events[i:i + batch_size] for i in range(0, len(events), batch_size)]
batch_futures = [self.threads_pool.submit(self.send, batch) for batch in batches]

if len(batch_futures) == 1:
return batch_futures[0]

return self._create_combined_future(batch_futures)

def _create_combined_future(self, batch_futures):
def wait_for_all():
errors = []

for i, future in enumerate(batch_futures):
try:
future.result()
except Exception as e:
self.configuration.logger.error(
f"Flush batch {i+1}/{len(batch_futures)} failed: {e}"
)
errors.append(e)

# If any batches failed, raise the first error
if errors:
raise errors[0]

return None

return self.threads_pool.submit(wait_for_all)

def send(self, events):
url = self.configuration.server_url
Expand Down
29 changes: 26 additions & 3 deletions src/test/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,20 @@ def test_worker_initialize_setup_success(self):
self.assertIsNotNone(self.workers.response_processor)

def test_worker_stop_success(self):
self.workers.storage.pull_all = MagicMock()
for i in range(5):
self.workers.storage.push(BaseEvent(f"event_{i}", "test_user"))

success_response = Response(HttpStatus.SUCCESS)
HttpClient.post = MagicMock(return_value=success_response)

self.workers.stop()
self.assertFalse(self.workers.is_active)
self.assertTrue(self.workers.is_started)
self.workers.storage.pull_all.assert_called_once()

# Verify storage was flushed (all events removed)
self.assertEqual(0, self.workers.storage.total_events)
self.assertEqual(0, len(self.workers.storage.ready_queue))
self.assertEqual(0, len(self.workers.storage.buffer_data))

def test_worker_get_payload_success(self):
events = [BaseEvent("test_event1", "test_user"), BaseEvent("test_event2", "test_user")]
Expand Down Expand Up @@ -150,11 +159,25 @@ def test_worker_send_events_with_payload_too_large_response_decrease_flush_queue
success_response = Response(HttpStatus.SUCCESS)
payload_too_large_response = Response(HttpStatus.PAYLOAD_TOO_LARGE)
HttpClient.post = MagicMock()
HttpClient.post.side_effect = [payload_too_large_response, payload_too_large_response, success_response]
# First send gets PAYLOAD_TOO_LARGE (divider 1→2, size 30→15)
# First flush sends 2 batches of 15:
# - Batch 1 (15) fails: len(15) <= 15 → TRUE → increase (divider 2→3, size 15→10)
# - Batch 2 (15) fails: len(15) <= 10 → FALSE → don't increase
# Second flush sends 3 batches of 10, all succeed
HttpClient.post.side_effect = [
payload_too_large_response, # Initial send of 30 events
payload_too_large_response, # First flush batch 1 (15 events)
payload_too_large_response, # First flush batch 2 (15 events)
success_response, # Second flush batch 1 (10 events)
success_response, # Second flush batch 2 (10 events)
success_response # Second flush batch 3 (10 events)
]
self.workers.configuration.flush_queue_size = 30
self.workers.send(events)
self.assertEqual(15, self.workers.configuration.flush_queue_size)
self.workers.flush().result()
# After first flush, only first batch increased divider (15 <= 15)
# Second batch didn't (15 > 10), so divider only went 2→3
self.assertEqual(10, self.workers.configuration.flush_queue_size)
self.workers.flush().result()
self.assertEqual(30, len(self.events_dict[200]))
Expand Down
Loading