Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add process logs & flush() returns future #27

Merged
merged 8 commits into from
Jun 22, 2022
8 changes: 6 additions & 2 deletions src/amplitude/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,12 @@ def set_group(self, group_type: str, group_name: Union[str, List[str]], event_op
self.identify(identify_obj, event_options)

def flush(self):
"""Flush all event waiting to be sent in the buffer"""
self.__timeline.flush()
"""Flush all event waiting to be sent in the buffer

Returns:
A list of Future objects for all destination plugins
"""
return self.__timeline.flush()

def add(self, plugin: Plugin):
"""Add the plugin object to client instance. Events tracked by this client instance will be
Expand Down
2 changes: 1 addition & 1 deletion src/amplitude/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def execute(self, event: BaseEvent) -> None:

def flush(self):
"""Flush all event in storage instance."""
self.workers.flush()
return self.workers.flush()

def shutdown(self):
"""Shutdown plugins and works of the destination plugin."""
Expand Down
11 changes: 10 additions & 1 deletion src/amplitude/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@


class ResponseProcessor:

def __init__(self):
self.configuration = None
self.storage = None
Expand All @@ -15,11 +14,13 @@ def setup(self, configuration, storage):
def process_response(self, res, events):
if res.status == HttpStatus.SUCCESS:
self.callback(events, res.code, "Event sent successfully.")
self.log(events, res.code, "Event sent successfully.")
elif res.status == HttpStatus.TIMEOUT or res.status == HttpStatus.FAILED:
self.push_to_storage(events, 0, res)
elif res.status == HttpStatus.PAYLOAD_TOO_LARGE:
if len(events) == 1:
self.callback(events, res.code, res.error)
self.log(events, res.code, res.error)
else:
self.configuration._increase_flush_divider()
self.push_to_storage(events, 0, res)
Expand All @@ -28,6 +29,7 @@ def process_response(self, res, events):
raise InvalidAPIKeyError(res.error)
if res.missing_field:
self.callback(events, res.code, f"Request missing required field {res.missing_field}")
self.log(events, res.code, f"Request missing required field {res.missing_field}")
else:
invalid_index_set = res.invalid_or_silenced_index()
events_for_retry = []
Expand All @@ -38,6 +40,7 @@ def process_response(self, res, events):
else:
events_for_retry.append(event)
self.callback(events_for_callback, res.code, res.error)
self.log(events_for_callback, res.code, res.error)
self.push_to_storage(events_for_retry, 0, res)
elif res.status == HttpStatus.TOO_MANY_REQUESTS:
events_for_callback = []
Expand All @@ -56,13 +59,15 @@ def process_response(self, res, events):
self.push_to_storage(events_for_retry, 0, res)
else:
self.callback(events, res.code, res.error or "Unknown error")
self.log(events, res.code, res.error or "Unknown error")

def push_to_storage(self, events, delay, res):
for event in events:
event.retry += 1
success, message = self.storage.push(event, delay=delay)
if not success:
self.callback([event], res.code, message)
self.log([event], res.code, message)

def callback(self, events, code, message):
for event in events:
Expand All @@ -72,3 +77,7 @@ def callback(self, events, code, message):
event.callback(code, message)
except Exception:
self.configuration.logger.exception(f"Error callback for event {event}")

def log(self, events, code, message):
for event in events:
self.configuration.logger.info(message, extra={'code':code, 'event':event})
4 changes: 3 additions & 1 deletion src/amplitude/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ def remove(self, plugin):
self.plugins[plugin_type] = [p for p in self.plugins[plugin_type] if p != plugin]

def flush(self):
destination_futures = []
for destination in self.plugins[PluginType.DESTINATION]:
try:
destination.flush()
destination_futures.append(destination.flush())
except Exception:
self.logger.exception("Error for flush events")
return destination_futures

def process(self, event):
if self.configuration.opt_out:
Expand Down
2 changes: 1 addition & 1 deletion src/amplitude/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def stop(self):
def flush(self):
events = self.storage.pull_all()
if events:
self.threads_pool.submit(self.send, events)
return self.threads_pool.submit(self.send, events)

def send(self, events):
url = self.configuration.server_url
Expand Down
32 changes: 24 additions & 8 deletions src/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def callback_func(event, code, message=None):
self.client.configuration.use_batch = use_batch
for i in range(25):
self.client.track(BaseEvent("test_event", "test_user_id", event_properties={"id": i}))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
self.assertEqual(25, len(events))
post_method.assert_called()

Expand All @@ -53,7 +55,9 @@ def test_amplitude_client_track_invalid_api_key_log_error(self):
with self.assertLogs("test", "ERROR") as cm:
self.client.configuration.logger = logging.getLogger("test")
self.client.track(BaseEvent("test_event", "test_user_id"))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()
self.assertEqual(["ERROR:test:Invalid API Key"], cm.output)

Expand Down Expand Up @@ -127,7 +131,9 @@ def callback_func(event, code, message=None):
self.assertTrue(identify_obj.is_valid())
self.assertEqual({"$set": {"birth_date": "4-1-2022"}}, identify_obj.user_properties)
self.client.identify(identify_obj, EventOptions(user_id="test_user_id", device_id="test_device_id"))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra indent here. Otherwise look good to me.

post_method.assert_called_once()

def test_amplitude_client_group_identify_invalid_log_error_then_success(self):
Expand Down Expand Up @@ -162,7 +168,9 @@ def callback_func(event, code, message=None):
self.assertEqual({"$set": {"team_name": "Super Power"}}, identify_obj.user_properties)
self.client.group_identify("Sports", "Football", identify_obj,
EventOptions(user_id="test_user_id", device_id="test_device_id"))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()

def test_amplitude_set_group_success(self):
Expand All @@ -187,7 +195,9 @@ def callback_func(event, code, message=None):
self.client.configuration.use_batch = use_batch
self.client.set_group("type", ["test_group", "test_group_2"],
EventOptions(user_id="test_user_id", device_id="test_device_id"))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()

def test_amplitude_client_revenue_invalid_log_error_then_success(self):
Expand Down Expand Up @@ -222,7 +232,9 @@ def callback_func(event, code, message=None):
self.assertTrue(revenue_obj.is_valid())
revenue_obj.set_receipt("A0001", "0001A")
self.client.revenue(revenue_obj, EventOptions(user_id="test_user_id", device_id="test_device_id"))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()

def test_amplitude_client_flush_success(self):
Expand All @@ -244,9 +256,13 @@ def callback_func(event, code, message=None):
self.client.configuration.use_batch = use_batch
self.client.track(BaseEvent(event_type="flush_test", user_id="test_user_id",
device_id="test_device_id"))
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()
self.client.flush()
for flush_future in self.client.flush():
if flush_future:
flush_future.result()
post_method.assert_called_once()

def test_amplitude_add_remove_plugins_success(self):
Expand Down
14 changes: 7 additions & 7 deletions src/test/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_worker_flush_events_in_storage_success(self):
HttpClient.post = MagicMock()
HttpClient.post.return_value = success_response
self.push_event(self.get_events_list(50))
self.workers.flush()
self.workers.flush().result()
self.assertEqual(50, len(self.events_dict[200]))
HttpClient.post.assert_called()

Expand Down Expand Up @@ -109,7 +109,7 @@ def test_worker_send_events_with_invalid_request_response_trigger_callback(self)
HttpClient.post = MagicMock()
HttpClient.post.side_effect = [invalid_response, success_response]
self.workers.send(events)
self.workers.flush()
self.workers.flush().result()
self.assertEqual(self.events_dict[200], set(events[20:]))
for i in range(20, 100):
self.assertEqual(1, events[i].retry)
Expand Down Expand Up @@ -154,9 +154,9 @@ def test_worker_send_events_with_payload_too_large_response_decrease_flush_queue
self.workers.configuration.flush_queue_size = 30
self.workers.send(events)
self.assertEqual(15, self.workers.configuration.flush_queue_size)
self.workers.flush()
self.workers.flush().result()
self.assertEqual(10, self.workers.configuration.flush_queue_size)
self.workers.flush()
self.workers.flush().result()
self.assertEqual(30, len(self.events_dict[200]))

def test_worker_send_events_with_timeout_and_failed_response_retry_all_events(self):
Expand All @@ -167,8 +167,8 @@ def test_worker_send_events_with_timeout_and_failed_response_retry_all_events(se
HttpClient.post = MagicMock()
HttpClient.post.side_effect = [timeout_response, failed_response, success_response]
self.workers.send(events)
self.workers.flush()
self.workers.flush()
self.workers.flush().result()
self.workers.flush().result()
self.assertEqual(100, len(self.events_dict[200]))
self.assertEqual(3, HttpClient.post.call_count)

Expand Down Expand Up @@ -205,7 +205,7 @@ def test_worker_send_events_with_too_many_requests_response_callback_and_retry(s
while i > -15:
self.assertEqual(events[16 + i], self.workers.storage.buffer_data[i][1])
i -= 1
self.workers.flush()
self.workers.flush().result()
self.assertEqual(self.events_dict[200], set(events[2:]))

def test_worker_multithreading_process_events_with_random_response_success(self):
Expand Down