Skip to content

Commit

Permalink
fix: add process logs & flush() returns future (#27)
Browse files Browse the repository at this point in the history
* fix: add logs during process

* fix: client.flush() returns a list of future objects from plugins

* test: update test
  • Loading branch information
Mercy811 authored Jun 22, 2022
1 parent 70fcdae commit 33d4c5c
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 21 deletions.
8 changes: 6 additions & 2 deletions src/amplitude/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,12 @@ def set_group(self, group_type: str, group_name: Union[str, List[str]], event_op
self.identify(identify_obj, event_options)

def flush(self):
"""Flush all event waiting to be sent in the buffer"""
self.__timeline.flush()
"""Flush all event waiting to be sent in the buffer
Returns:
A list of Future objects for all destination plugins
"""
return self.__timeline.flush()

def add(self, plugin: Plugin):
"""Add the plugin object to client instance. Events tracked by this client instance will be
Expand Down
2 changes: 1 addition & 1 deletion src/amplitude/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def execute(self, event: BaseEvent) -> None:

def flush(self):
"""Flush all event in storage instance."""
self.workers.flush()
return self.workers.flush()

def shutdown(self):
"""Shutdown plugins and works of the destination plugin."""
Expand Down
11 changes: 10 additions & 1 deletion src/amplitude/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@


class ResponseProcessor:

def __init__(self):
self.configuration = None
self.storage = None
Expand All @@ -15,11 +14,13 @@ def setup(self, configuration, storage):
def process_response(self, res, events):
if res.status == HttpStatus.SUCCESS:
self.callback(events, res.code, "Event sent successfully.")
self.log(events, res.code, "Event sent successfully.")
elif res.status == HttpStatus.TIMEOUT or res.status == HttpStatus.FAILED:
self.push_to_storage(events, 0, res)
elif res.status == HttpStatus.PAYLOAD_TOO_LARGE:
if len(events) == 1:
self.callback(events, res.code, res.error)
self.log(events, res.code, res.error)
else:
self.configuration._increase_flush_divider()
self.push_to_storage(events, 0, res)
Expand All @@ -28,6 +29,7 @@ def process_response(self, res, events):
raise InvalidAPIKeyError(res.error)
if res.missing_field:
self.callback(events, res.code, f"Request missing required field {res.missing_field}")
self.log(events, res.code, f"Request missing required field {res.missing_field}")
else:
invalid_index_set = res.invalid_or_silenced_index()
events_for_retry = []
Expand All @@ -38,6 +40,7 @@ def process_response(self, res, events):
else:
events_for_retry.append(event)
self.callback(events_for_callback, res.code, res.error)
self.log(events_for_callback, res.code, res.error)
self.push_to_storage(events_for_retry, 0, res)
elif res.status == HttpStatus.TOO_MANY_REQUESTS:
events_for_callback = []
Expand All @@ -56,13 +59,15 @@ def process_response(self, res, events):
self.push_to_storage(events_for_retry, 0, res)
else:
self.callback(events, res.code, res.error or "Unknown error")
self.log(events, res.code, res.error or "Unknown error")

def push_to_storage(self, events, delay, res):
for event in events:
event.retry += 1
success, message = self.storage.push(event, delay=delay)
if not success:
self.callback([event], res.code, message)
self.log([event], res.code, message)

def callback(self, events, code, message):
for event in events:
Expand All @@ -72,3 +77,7 @@ def callback(self, events, code, message):
event.callback(code, message)
except Exception:
self.configuration.logger.exception(f"Error callback for event {event}")

def log(self, events, code, message):
for event in events:
self.configuration.logger.info(message, extra={'code':code, 'event':event})
4 changes: 3 additions & 1 deletion src/amplitude/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ def remove(self, plugin):
self.plugins[plugin_type] = [p for p in self.plugins[plugin_type] if p != plugin]

def flush(self):
destination_futures = []
for destination in self.plugins[PluginType.DESTINATION]:
try:
destination.flush()
destination_futures.append(destination.flush())
except Exception:
self.logger.exception("Error for flush events")
return destination_futures

def process(self, event):
if self.configuration.opt_out:
Expand Down
2 changes: 1 addition & 1 deletion src/amplitude/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def stop(self):
def flush(self):
events = self.storage.pull_all()
if events:
self.threads_pool.submit(self.send, events)
return self.threads_pool.submit(self.send, events)

def send(self, events):
url = self.configuration.server_url
Expand Down
32 changes: 24 additions & 8 deletions src/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def callback_func(event, code, message=None):
self.client.configuration.use_batch = use_batch
for i in range(25):
self.client.track(BaseEvent("test_event", "test_user_id", event_properties={"id": i}))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
self.assertEqual(25, len(events))
post_method.assert_called()

Expand All @@ -53,7 +55,9 @@ def test_amplitude_client_track_invalid_api_key_log_error(self):
with self.assertLogs("test", "ERROR") as cm:
self.client.configuration.logger = logging.getLogger("test")
self.client.track(BaseEvent("test_event", "test_user_id"))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()
self.assertEqual(["ERROR:test:Invalid API Key"], cm.output)

Expand Down Expand Up @@ -127,7 +131,9 @@ def callback_func(event, code, message=None):
self.assertTrue(identify_obj.is_valid())
self.assertEqual({"$set": {"birth_date": "4-1-2022"}}, identify_obj.user_properties)
self.client.identify(identify_obj, EventOptions(user_id="test_user_id", device_id="test_device_id"))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()

def test_amplitude_client_group_identify_invalid_log_error_then_success(self):
Expand Down Expand Up @@ -162,7 +168,9 @@ def callback_func(event, code, message=None):
self.assertEqual({"$set": {"team_name": "Super Power"}}, identify_obj.user_properties)
self.client.group_identify("Sports", "Football", identify_obj,
EventOptions(user_id="test_user_id", device_id="test_device_id"))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()

def test_amplitude_set_group_success(self):
Expand All @@ -187,7 +195,9 @@ def callback_func(event, code, message=None):
self.client.configuration.use_batch = use_batch
self.client.set_group("type", ["test_group", "test_group_2"],
EventOptions(user_id="test_user_id", device_id="test_device_id"))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()

def test_amplitude_client_revenue_invalid_log_error_then_success(self):
Expand Down Expand Up @@ -222,7 +232,9 @@ def callback_func(event, code, message=None):
self.assertTrue(revenue_obj.is_valid())
revenue_obj.set_receipt("A0001", "0001A")
self.client.revenue(revenue_obj, EventOptions(user_id="test_user_id", device_id="test_device_id"))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()

def test_amplitude_client_flush_success(self):
Expand All @@ -244,9 +256,13 @@ def callback_func(event, code, message=None):
self.client.configuration.use_batch = use_batch
self.client.track(BaseEvent(event_type="flush_test", user_id="test_user_id",
device_id="test_device_id"))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()

def test_amplitude_add_remove_plugins_success(self):
Expand Down
14 changes: 7 additions & 7 deletions src/test/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_worker_flush_events_in_storage_success(self):
HttpClient.post = MagicMock()
HttpClient.post.return_value = success_response
self.push_event(self.get_events_list(50))
self.workers.flush()
self.workers.flush().result()
self.assertEqual(50, len(self.events_dict[200]))
HttpClient.post.assert_called()

Expand Down Expand Up @@ -109,7 +109,7 @@ def test_worker_send_events_with_invalid_request_response_trigger_callback(self)
HttpClient.post = MagicMock()
HttpClient.post.side_effect = [invalid_response, success_response]
self.workers.send(events)
self.workers.flush()
self.workers.flush().result()
self.assertEqual(self.events_dict[200], set(events[20:]))
for i in range(20, 100):
self.assertEqual(1, events[i].retry)
Expand Down Expand Up @@ -154,9 +154,9 @@ def test_worker_send_events_with_payload_too_large_response_decrease_flush_queue
self.workers.configuration.flush_queue_size = 30
self.workers.send(events)
self.assertEqual(15, self.workers.configuration.flush_queue_size)
self.workers.flush()
self.workers.flush().result()
self.assertEqual(10, self.workers.configuration.flush_queue_size)
self.workers.flush()
self.workers.flush().result()
self.assertEqual(30, len(self.events_dict[200]))

def test_worker_send_events_with_timeout_and_failed_response_retry_all_events(self):
Expand All @@ -167,8 +167,8 @@ def test_worker_send_events_with_timeout_and_failed_response_retry_all_events(se
HttpClient.post = MagicMock()
HttpClient.post.side_effect = [timeout_response, failed_response, success_response]
self.workers.send(events)
self.workers.flush()
self.workers.flush()
self.workers.flush().result()
self.workers.flush().result()
self.assertEqual(100, len(self.events_dict[200]))
self.assertEqual(3, HttpClient.post.call_count)

Expand Down Expand Up @@ -205,7 +205,7 @@ def test_worker_send_events_with_too_many_requests_response_callback_and_retry(s
while i > -15:
self.assertEqual(events[16 + i], self.workers.storage.buffer_data[i][1])
i -= 1
self.workers.flush()
self.workers.flush().result()
self.assertEqual(self.events_dict[200], set(events[2:]))

def test_worker_multithreading_process_events_with_random_response_success(self):
Expand Down

0 comments on commit 33d4c5c

Please sign in to comment.