Skip to content

Commit

Permalink
test: Unittest event processing pipeline (#23)
Browse files Browse the repository at this point in the history
* unittest event processing pipeline

* worker consumer use finally to ensure flag change

* remove user_id/device_id requirements for group_identify
  • Loading branch information
bohan-amplitude committed May 5, 2022
1 parent a427eca commit 16e56f4
Show file tree
Hide file tree
Showing 12 changed files with 536 additions and 57 deletions.
7 changes: 3 additions & 4 deletions src/amplitude/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def identify(self, identify_obj: Identify, event_options: EventOptions, event_pr
self.track(event)

def group_identify(self, group_type: str, group_name: str, identify_obj: Identify,
event_options: EventOptions,
event_options: Optional[EventOptions] = None,
event_properties: Optional[dict] = None,
user_properties: Optional[dict] = None):
"""Send a group identify event to update group properties
Expand All @@ -84,11 +84,10 @@ def group_identify(self, group_type: str, group_name: str, identify_obj: Identif
group_type (str): The group type e.g. "sport"
group_name (str): The group name e.g. "soccer"
identify_obj (amplitude.event.Identify): Identify object contain operations of updating group properties
event_options (amplitude.event.EventOptions): Provide additional information to group identify event
like user_id.
event_options (amplitude.event.EventOptions, optional): Provide additional information to
group identify event like user_id.
event_properties (dict, optional): A dictionary of event properties. Defaults to None.
user_properties (dict, optional): A dictionary of user properties. Defaults to None.
"""
if not identify_obj.is_valid():
self.configuration.logger.error("Empty group identify properties")
Expand Down
2 changes: 2 additions & 0 deletions src/amplitude/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ def verify_event(event):
Returns:
True is event is valid, False otherwise.
"""
if isinstance(event, GroupIdentifyEvent):
return True
if (not isinstance(event, BaseEvent)) or \
(not event["event_type"]) or \
(not event["user_id"] and not event["device_id"]):
Expand Down
6 changes: 3 additions & 3 deletions src/amplitude/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

class ResponseProcessor:

def __init__(self, worker):
self.configuration = worker.configuration
self.storage = worker.storage
def __init__(self):
self.configuration = None
self.storage = None

def setup(self, configuration, storage):
self.configuration = configuration
Expand Down
4 changes: 2 additions & 2 deletions src/amplitude/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def max_retry(self) -> int:
@property
def wait_time(self) -> int:
if self.ready_queue:
return -1
return 0
if self.buffer_data:
return min(self.buffer_data[0][0] - utils.current_milliseconds(), self.configuration.flush_interval_millis)
return self.configuration.flush_interval_millis
Expand All @@ -62,7 +62,7 @@ def setup(self, configuration, workers):
self.workers = workers

def push(self, event: BaseEvent, delay: int = 0) -> Tuple[bool, Optional[str]]:
if event.retry and self.total_events > constants.MAX_BUFFER_CAPACITY:
if event.retry and self.total_events >= constants.MAX_BUFFER_CAPACITY:
return False, "Destination buffer full. Retry temporarily disabled"
if event.retry >= self.max_retry:
return False, f"Event reached max retry times {self.max_retry}."
Expand Down
38 changes: 21 additions & 17 deletions src/amplitude/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self):
self.is_started = False
self.configuration = None
self.storage = None
self.response_processor = ResponseProcessor(self)
self.response_processor = ResponseProcessor()

def setup(self, configuration, storage):
self.configuration = configuration
Expand Down Expand Up @@ -61,21 +61,25 @@ def get_payload(self, events) -> bytes:
payload_body["events"].append(event_body)
if self.configuration.options:
payload_body["options"] = self.configuration.options
return json.dumps(payload_body).encode('utf8')
return json.dumps(payload_body, sort_keys=True).encode('utf8')

def buffer_consumer(self):
if self.is_active:
with self.storage.lock:
self.storage.lock.wait(self.configuration.flush_interval_millis / 1000)
while True:
if not self.storage.total_events:
break
events = self.storage.pull(self.configuration.flush_queue_size)
if events:
self.threads_pool.submit(self.send, events)
else:
wait_time = self.storage.wait_time / 1000
if wait_time > 0:
self.storage.lock.wait(wait_time)
with self.consumer_lock:
self.is_started = False
try:
if self.is_active:
with self.storage.lock:
self.storage.lock.wait(self.configuration.flush_interval_millis / 1000)
while True:
if not self.storage.total_events:
break
events = self.storage.pull(self.configuration.flush_queue_size)
if events:
self.threads_pool.submit(self.send, events)
else:
wait_time = self.storage.wait_time / 1000
if wait_time > 0:
self.storage.lock.wait(wait_time)
except Exception:
self.configuration.logger.exception("Consumer thread error")
finally:
with self.consumer_lock:
self.is_started = False
10 changes: 0 additions & 10 deletions src/test/storage.py

This file was deleted.

136 changes: 136 additions & 0 deletions src/test/test_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import time
import unittest
import random
from threading import Thread
from unittest.mock import MagicMock

import amplitude.utils
from amplitude.storage import InMemoryStorageProvider
from amplitude import constants, Config, BaseEvent
from amplitude.worker import Workers


class AmplitudeStorageTestCase(unittest.TestCase):

def setUp(self) -> None:
self.provider = InMemoryStorageProvider()
self.storage = self.provider.get_storage()
worker = Workers()
self.storage.setup(Config(), worker)

def test_storage_empty_in_memory_storage_pull_return_empty_list(self):
self.assertEqual(0, self.storage.total_events)
self.assertEqual([], self.storage.pull(20))

def test_storage_empty_in_memory_storage_pull_all_return_empty_list(self):
self.assertEqual(0, self.storage.total_events)
self.assertEqual([], self.storage.pull_all())

def test_storage_empty_in_memory_storage_wait_time_return_flush_interval(self):
self.assertEqual(0, self.storage.total_events)
self.assertEqual(constants.FLUSH_INTERVAL_MILLIS, self.storage.wait_time)

def test_storage_in_memory_storage_push_new_event_success(self):
self.storage.workers.start = MagicMock()
event_list = []
for i in range(50):
event = BaseEvent("test_event_" + str(i), user_id="test_user")
self.storage.push(event)
event_list.append(event)
self.assertEqual(50, self.storage.total_events)
self.assertEqual(event_list, self.storage.ready_queue)
self.assertEqual(event_list[:30], self.storage.pull(30))
self.assertEqual(20, self.storage.total_events)
self.assertEqual(event_list[30:], self.storage.pull_all())
self.assertEqual(50, self.storage.workers.start.call_count)
self.assertEqual(0, self.storage.total_events)

def test_storage_im_memory_storage_push_events_with_delay_success(self):
event_set = set()
self.storage.workers.start = MagicMock()
self.push_event(self.storage, event_set, 50)
self.assertEqual(50, self.storage.total_events)
self.assertEqual(50, len(self.storage.ready_queue) + len(self.storage.buffer_data))
self.assertEqual(event_set, set(self.storage.pull_all()))
self.assertEqual(50, self.storage.workers.start.call_count)

def test_storage_in_memory_storage_multithreading_push_event_success(self):
event_set = set()
self.storage.workers.start = MagicMock()
threads = []
for _ in range(50):
t = Thread(target=self.push_event, args=(self.storage, event_set, 100))
t.start()
threads.append(t)
for t in threads:
t.join()
self.assertEqual(5000, self.storage.workers.start.call_count)
self.assertEqual(5000, self.storage.total_events)
self.assertEqual(5000, len(self.storage.ready_queue) + len(self.storage.buffer_data))
self.assertEqual(event_set, set(self.storage.pull_all()))

def test_storage_in_memory_storage_push_retry_event_exceed_max_capacity_failed(self):
self.storage.workers.start = MagicMock()
self.push_event(self.storage, set(), constants.MAX_BUFFER_CAPACITY)
self.assertEqual(constants.MAX_BUFFER_CAPACITY, self.storage.total_events)
event = BaseEvent("test_event", "test_user")
event.retry += 1
self.storage.workers.start.reset_mock()
is_success, message = self.storage.push(event)
self.assertFalse(is_success)
self.assertEqual("Destination buffer full. Retry temporarily disabled", message)
self.assertEqual(constants.MAX_BUFFER_CAPACITY, self.storage.total_events)
self.storage.workers.start.assert_not_called()

def test_storage_in_memory_storage_push_event_exceed_max_retry_failed(self):
self.storage.workers.start = MagicMock()
event = BaseEvent("test_event", "test_user")
event.retry = self.storage.max_retry
is_success, message = self.storage.push(event)
self.assertFalse(is_success)
self.assertEqual(f"Event reached max retry times {self.storage.max_retry}.", message)
self.assertEqual(0, self.storage.total_events)
self.storage.workers.start.assert_not_called()

def test_storage_in_memory_storage_wait_time_events_in_ready_queue_zero(self):
self.storage.workers.start = MagicMock()
self.storage.push(BaseEvent("test_event", "test_user"))
self.assertEqual(0, self.storage.wait_time)

def test_storage_in_memory_storage_wait_time_event_in_buffer_flush_interval_maximum(self):
self.storage.workers.start = MagicMock()
self.storage.push(BaseEvent("test_event", "test_user"), 200)
self.assertTrue(0 < self.storage.wait_time <= 200)
self.storage.pull_all()
self.storage.push(BaseEvent("test_event", "test_user"), constants.FLUSH_INTERVAL_MILLIS + 500)
self.assertTrue(constants.FLUSH_INTERVAL_MILLIS >= self.storage.wait_time)

def test_storage_in_memory_storage_retry_event_verify_retry_delay_success(self):
self.storage.workers.start = MagicMock()
expect_delay = [0, 100, 100, 200, 200, 400, 400, 800, 800, 1600, 1600, 3200, 3200]
for retry, delay in enumerate(expect_delay):
event = BaseEvent("test_event", "test_user")
event.retry = retry
self.assertEqual(delay, self.storage._get_retry_delay(event.retry))

def test_storage_pull_events_from_ready_queue_and_buffer_data_success(self):
self.storage.workers.start = MagicMock()
self.push_event(self.storage, set(), 200)
first_event_in_buffer_data = self.storage.buffer_data[0][1]
# wait 100 ms - max delay of push_event()
time.sleep(0.1)
events = self.storage.pull(len(self.storage.ready_queue) + 1)
self.assertEqual(first_event_in_buffer_data, events[-1])
self.assertEqual(200 - len(events), self.storage.total_events)
self.assertEqual(self.storage.total_events, len(self.storage.buffer_data))

@staticmethod
def push_event(storage, event_set, count):
for i in range(count):
event = BaseEvent("test_event_" + str(i), user_id="test_user")
storage.push(event, random.randint(0, 100))
event_set.add(event)


if __name__ == '__main__':
unittest.main()
90 changes: 90 additions & 0 deletions src/test/test_timeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import unittest
from unittest.mock import MagicMock

from amplitude import Config, EventPlugin, PluginType, BaseEvent
from amplitude.timeline import Timeline
from amplitude.plugin import AmplitudeDestinationPlugin


class AmplitudeTimelineTestCase(unittest.TestCase):

def setUp(self) -> None:
self.timeline = Timeline(Config())

def test_timeline_add_remove_plugins_with_types_success(self):
before = EventPlugin(PluginType.BEFORE)
enrich = EventPlugin(PluginType.ENRICHMENT)
destination = AmplitudeDestinationPlugin()
self.timeline.add(before)
self.assertEqual(before, self.timeline.plugins[PluginType.BEFORE][0])
self.timeline.add(enrich)
self.assertEqual(enrich, self.timeline.plugins[PluginType.ENRICHMENT][0])
self.timeline.add(destination)
self.assertEqual(destination, self.timeline.plugins[PluginType.DESTINATION][0])
self.timeline.remove(before)
self.assertFalse(self.timeline.plugins[PluginType.BEFORE])
self.timeline.remove(enrich)
self.assertFalse(self.timeline.plugins[PluginType.ENRICHMENT])
self.timeline.remove(destination)
self.assertFalse(self.timeline.plugins[PluginType.DESTINATION])

def test_timeline_shutdown_destination_plugin_success(self):
destination = AmplitudeDestinationPlugin()
destination.shutdown = MagicMock()
self.timeline.add(destination)
self.timeline.shutdown()
destination.shutdown.assert_called_once()

def test_timeline_flush_destination_plugin_success(self):
destination = AmplitudeDestinationPlugin()
destination.flush = MagicMock()
self.timeline.add(destination)
self.timeline.flush()
destination.flush.assert_called_once()

def test_timeline_process_event_with_plugin_success(self):
event = BaseEvent("test_event", "test_user")
event2 = BaseEvent("test_event", "test_user", event_properties={"processed": True})
enrich = EventPlugin(PluginType.ENRICHMENT)
enrich.execute = MagicMock()
enrich.execute.return_value = event2
destination = AmplitudeDestinationPlugin()
destination.execute = MagicMock()
self.timeline.add(enrich)
self.timeline.add(destination)
self.assertEqual(event2, self.timeline.process(event))
enrich.execute.assert_called_once_with(event)
destination.execute.assert_called_once()

def test_timeline_process_event_with_plugin_return_none_stop(self):
event = BaseEvent("test_event", "test_user")
event2 = BaseEvent("test_event", "test_user", event_properties={"processed": True})
enrich = EventPlugin(PluginType.ENRICHMENT)
enrich.execute = MagicMock()
enrich.execute.return_value = event2
enrich2 = EventPlugin(PluginType.ENRICHMENT)
enrich2.execute = MagicMock()
enrich2.execute.return_value = None
destination = AmplitudeDestinationPlugin()
destination.execute = MagicMock()
self.timeline.add(enrich)
self.timeline.add(enrich2)
self.timeline.add(destination)
self.assertIsNone(self.timeline.process(event))
enrich.execute.assert_called_once_with(event)
enrich2.execute.assert_called_once_with(event2)
destination.execute.assert_not_called()

def test_timeline_config_opt_out_skip_process_with_info_log(self):
enrich = EventPlugin(PluginType.ENRICHMENT)
enrich.execute = MagicMock()
self.timeline.add(enrich)
self.timeline.configuration.opt_out = True
with self.assertLogs(None, "INFO") as cm:
self.timeline.process(BaseEvent("test_event", "test_user"))
self.assertEqual(["INFO:amplitude:Skipped event for opt out config"], cm.output)
enrich.execute.assert_not_called()


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion src/test/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest

from amplitude import utils,constants
from amplitude import utils, constants


class AmplitudeUtilsTestCase(unittest.TestCase):
Expand Down
Loading

0 comments on commit 16e56f4

Please sign in to comment.