Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add process logs & flush() returns future #27

Merged
merged 8 commits into from
Jun 22, 2022
3 changes: 2 additions & 1 deletion src/amplitude/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Amplitude: the Amplitude client class
"""

import logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

from typing import Optional, Union, List

from amplitude.config import Config
Expand Down Expand Up @@ -131,7 +132,7 @@ def set_group(self, group_type: str, group_name: Union[str, List[str]], event_op

def flush(self):
"""Flush all event waiting to be sent in the buffer"""
self.__timeline.flush()
return self.__timeline.flush()

def add(self, plugin: Plugin):
"""Add the plugin object to client instance. Events tracked by this client instance will be
Expand Down
2 changes: 1 addition & 1 deletion src/amplitude/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def execute(self, event: BaseEvent) -> None:

def flush(self):
"""Flush all event in storage instance."""
self.workers.flush()
return self.workers.flush()

def shutdown(self):
"""Shutdown plugins and works of the destination plugin."""
Expand Down
11 changes: 10 additions & 1 deletion src/amplitude/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@


class ResponseProcessor:

def __init__(self):
self.configuration = None
self.storage = None
Expand All @@ -15,11 +14,13 @@ def setup(self, configuration, storage):
def process_response(self, res, events):
if res.status == HttpStatus.SUCCESS:
self.callback(events, res.code, "Event sent successfully.")
self.log(events, res.code, "Event sent successfully.")
elif res.status == HttpStatus.TIMEOUT or res.status == HttpStatus.FAILED:
self.push_to_storage(events, 0, res)
elif res.status == HttpStatus.PAYLOAD_TOO_LARGE:
if len(events) == 1:
self.callback(events, res.code, res.error)
self.log(events, res.code, res.error)
else:
self.configuration._increase_flush_divider()
self.push_to_storage(events, 0, res)
Expand All @@ -28,6 +29,7 @@ def process_response(self, res, events):
raise InvalidAPIKeyError(res.error)
if res.missing_field:
self.callback(events, res.code, f"Request missing required field {res.missing_field}")
self.log(events, res.code, f"Request missing required field {res.missing_field}")
else:
invalid_index_set = res.invalid_or_silenced_index()
events_for_retry = []
Expand All @@ -38,6 +40,7 @@ def process_response(self, res, events):
else:
events_for_retry.append(event)
self.callback(events_for_callback, res.code, res.error)
self.log(events_for_callback, res.code, res.error)
self.push_to_storage(events_for_retry, 0, res)
elif res.status == HttpStatus.TOO_MANY_REQUESTS:
events_for_callback = []
Expand All @@ -56,13 +59,15 @@ def process_response(self, res, events):
self.push_to_storage(events_for_retry, 0, res)
else:
self.callback(events, res.code, res.error or "Unknown error")
self.log(events, res.code, res.error or "Unknown error")

def push_to_storage(self, events, delay, res):
for event in events:
event.retry += 1
success, message = self.storage.push(event, delay=delay)
if not success:
self.callback([event], res.code, message)
self.log([event], res.code, message)

def callback(self, events, code, message):
for event in events:
Expand All @@ -72,3 +77,7 @@ def callback(self, events, code, message):
event.callback(code, message)
except Exception:
self.configuration.logger.exception(f"Error callback for event {event}")

def log(self, events, code, message):
for event in events:
self.configuration.logger.info(message, extra={'code':code, 'event':event})
2 changes: 1 addition & 1 deletion src/amplitude/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def remove(self, plugin):
def flush(self):
for destination in self.plugins[PluginType.DESTINATION]:
try:
destination.flush()
return destination.flush()
Copy link
Contributor

@bohan-amplitude bohan-amplitude Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since timeline could have multiple destination plugins, return here may end the function earlier and only return future of first flush call. Should return a list of future for all destination plugins

except Exception:
self.logger.exception("Error for flush events")

Expand Down
2 changes: 1 addition & 1 deletion src/amplitude/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def stop(self):
def flush(self):
events = self.storage.pull_all()
if events:
self.threads_pool.submit(self.send, events)
return self.threads_pool.submit(self.send, events)

def send(self, events):
url = self.configuration.server_url
Expand Down
16 changes: 9 additions & 7 deletions src/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def callback_func(event, code, message=None):
self.client.configuration.use_batch = use_batch
for i in range(25):
self.client.track(BaseEvent("test_event", "test_user_id", event_properties={"id": i}))
self.client.flush()
flush_future = self.client.flush()
if flush_future:
flush_future.result()
self.assertEqual(25, len(events))
post_method.assert_called()

Expand All @@ -53,7 +55,7 @@ def test_amplitude_client_track_invalid_api_key_log_error(self):
with self.assertLogs("test", "ERROR") as cm:
self.client.configuration.logger = logging.getLogger("test")
self.client.track(BaseEvent("test_event", "test_user_id"))
self.client.flush()
self.client.flush().result()
post_method.assert_called_once()
self.assertEqual(["ERROR:test:Invalid API Key"], cm.output)

Expand Down Expand Up @@ -127,7 +129,7 @@ def callback_func(event, code, message=None):
self.assertTrue(identify_obj.is_valid())
self.assertEqual({"$set": {"birth_date": "4-1-2022"}}, identify_obj.user_properties)
self.client.identify(identify_obj, EventOptions(user_id="test_user_id", device_id="test_device_id"))
self.client.flush()
self.client.flush().result()
post_method.assert_called_once()

def test_amplitude_client_group_identify_invalid_log_error_then_success(self):
Expand Down Expand Up @@ -162,7 +164,7 @@ def callback_func(event, code, message=None):
self.assertEqual({"$set": {"team_name": "Super Power"}}, identify_obj.user_properties)
self.client.group_identify("Sports", "Football", identify_obj,
EventOptions(user_id="test_user_id", device_id="test_device_id"))
self.client.flush()
self.client.flush().result()
post_method.assert_called_once()

def test_amplitude_set_group_success(self):
Expand All @@ -187,7 +189,7 @@ def callback_func(event, code, message=None):
self.client.configuration.use_batch = use_batch
self.client.set_group("type", ["test_group", "test_group_2"],
EventOptions(user_id="test_user_id", device_id="test_device_id"))
self.client.flush()
self.client.flush().result()
post_method.assert_called_once()

def test_amplitude_client_revenue_invalid_log_error_then_success(self):
Expand Down Expand Up @@ -222,7 +224,7 @@ def callback_func(event, code, message=None):
self.assertTrue(revenue_obj.is_valid())
revenue_obj.set_receipt("A0001", "0001A")
self.client.revenue(revenue_obj, EventOptions(user_id="test_user_id", device_id="test_device_id"))
self.client.flush()
self.client.flush().result()
post_method.assert_called_once()

def test_amplitude_client_flush_success(self):
Expand All @@ -244,7 +246,7 @@ def callback_func(event, code, message=None):
self.client.configuration.use_batch = use_batch
self.client.track(BaseEvent(event_type="flush_test", user_id="test_user_id",
device_id="test_device_id"))
self.client.flush()
self.client.flush().result()
post_method.assert_called_once()
self.client.flush()
post_method.assert_called_once()
Expand Down
14 changes: 7 additions & 7 deletions src/test/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_worker_flush_events_in_storage_success(self):
HttpClient.post = MagicMock()
HttpClient.post.return_value = success_response
self.push_event(self.get_events_list(50))
self.workers.flush()
self.workers.flush().result()
self.assertEqual(50, len(self.events_dict[200]))
HttpClient.post.assert_called()

Expand Down Expand Up @@ -109,7 +109,7 @@ def test_worker_send_events_with_invalid_request_response_trigger_callback(self)
HttpClient.post = MagicMock()
HttpClient.post.side_effect = [invalid_response, success_response]
self.workers.send(events)
self.workers.flush()
self.workers.flush().result()
self.assertEqual(self.events_dict[200], set(events[20:]))
for i in range(20, 100):
self.assertEqual(1, events[i].retry)
Expand Down Expand Up @@ -154,9 +154,9 @@ def test_worker_send_events_with_payload_too_large_response_decrease_flush_queue
self.workers.configuration.flush_queue_size = 30
self.workers.send(events)
self.assertEqual(15, self.workers.configuration.flush_queue_size)
self.workers.flush()
self.workers.flush().result()
self.assertEqual(10, self.workers.configuration.flush_queue_size)
self.workers.flush()
self.workers.flush().result()
self.assertEqual(30, len(self.events_dict[200]))

def test_worker_send_events_with_timeout_and_failed_response_retry_all_events(self):
Expand All @@ -167,8 +167,8 @@ def test_worker_send_events_with_timeout_and_failed_response_retry_all_events(se
HttpClient.post = MagicMock()
HttpClient.post.side_effect = [timeout_response, failed_response, success_response]
self.workers.send(events)
self.workers.flush()
self.workers.flush()
self.workers.flush().result()
self.workers.flush().result()
self.assertEqual(100, len(self.events_dict[200]))
self.assertEqual(3, HttpClient.post.call_count)

Expand Down Expand Up @@ -205,7 +205,7 @@ def test_worker_send_events_with_too_many_requests_response_callback_and_retry(s
while i > -15:
self.assertEqual(events[16 + i], self.workers.storage.buffer_data[i][1])
i -= 1
self.workers.flush()
self.workers.flush().result()
self.assertEqual(self.events_dict[200], set(events[2:]))

def test_worker_multithreading_process_events_with_random_response_success(self):
Expand Down