Skip to content

Commit

Permalink
fix: use new thread instead of background blocking thread (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
bohan-amplitude committed Apr 29, 2022
1 parent 2604b6a commit 24f807f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
3 changes: 1 addition & 2 deletions src/amplitude/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def setup(self, client):
self.configuration = client.configuration
self.storage = client.configuration.get_storage()
self.workers.setup(client.configuration, self.storage)
self.storage.setup(client.configuration)
self.workers.start()
self.storage.setup(client.configuration, self.workers)

def execute(self, event: BaseEvent) -> None:
event = self.timeline.process(event)
Expand Down
7 changes: 6 additions & 1 deletion src/amplitude/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self):
self.ready_queue: List[BaseEvent] = []
self.buffer_lock_cv = Condition()
self.configuration = None
self.workers = None

@property
def lock(self):
Expand All @@ -50,12 +51,15 @@ def max_retry(self) -> int:

@property
def wait_time(self) -> int:
if self.ready_queue:
return -1
if self.buffer_data:
return min(self.buffer_data[0][0] - utils.current_milliseconds(), self.configuration.flush_interval_millis)
return self.configuration.flush_interval_millis

def setup(self, configuration):
def setup(self, configuration, workers):
self.configuration = configuration
self.workers = workers

def push(self, event: BaseEvent, delay: int = 0) -> Tuple[bool, Optional[str]]:
if event.retry and self.total_events > constants.MAX_BUFFER_CAPACITY:
Expand All @@ -64,6 +68,7 @@ def push(self, event: BaseEvent, delay: int = 0) -> Tuple[bool, Optional[str]]:
return False, f"Event reached max retry times {self.max_retry}."
total_delay = delay + self._get_retry_delay(event.retry)
self._insert_event(total_delay, event)
self.workers.start()
return True, None

def pull(self, batch_size: int) -> List[BaseEvent]:
Expand Down
37 changes: 24 additions & 13 deletions src/amplitude/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
from threading import Thread, RLock

from amplitude.exception import InvalidAPIKeyError
from amplitude.http_client import HttpClient
Expand All @@ -12,7 +12,8 @@ class Workers:
def __init__(self):
self.threads_pool = ThreadPoolExecutor(max_workers=16)
self.is_active = True
self.consumer = Thread(target=self.buffer_consumer)
self.consumer_lock = RLock()
self.is_started = False
self.configuration = None
self.storage = None
self.response_processor = ResponseProcessor(self)
Expand All @@ -23,18 +24,22 @@ def setup(self, configuration, storage):
self.response_processor.setup(configuration, storage)

def start(self):
self.consumer.start()
with self.consumer_lock:
if not self.is_started:
self.is_started = True
consumer = Thread(target=self.buffer_consumer)
consumer.start()

def stop(self):
self.flush()
self.is_active = False
self.consumer.join()
self.is_started = True
self.threads_pool.shutdown()

def flush(self):
events = self.storage.pull_all()
if events:
self.send(events)
self.threads_pool.submit(self.send, events)

def send(self, events):
url = self.configuration.server_url
Expand All @@ -59,12 +64,18 @@ def get_payload(self, events) -> bytes:
return json.dumps(payload_body).encode('utf8')

def buffer_consumer(self):
while self.is_active:
if self.is_active:
with self.storage.lock:
events = self.storage.pull(self.configuration.flush_queue_size)
if events:
self.threads_pool.submit(self.send, events)
else:
wait_time = self.storage.wait_time / 1000
if wait_time > 0:
self.storage.lock.wait(wait_time)
self.storage.lock.wait(self.configuration.flush_interval_millis / 1000)
while True:
if not self.storage.total_events:
break
events = self.storage.pull(self.configuration.flush_queue_size)
if events:
self.threads_pool.submit(self.send, events)
else:
wait_time = self.storage.wait_time / 1000
if wait_time > 0:
self.storage.lock.wait(wait_time)
with self.consumer_lock:
self.is_started = False

0 comments on commit 24f807f

Please sign in to comment.