diff --git a/src/amplitude/timeline.py b/src/amplitude/timeline.py index f1c018e..66f94b6 100644 --- a/src/amplitude/timeline.py +++ b/src/amplitude/timeline.py @@ -44,7 +44,7 @@ def flush(self): destination_futures = [] for destination in self.plugins[PluginType.DESTINATION]: try: - destination_futures.append(destination.flush()) + destination_futures.extend(destination.flush()) except Exception: self.logger.exception("Error for flush events") return destination_futures diff --git a/src/amplitude/worker.py b/src/amplitude/worker.py index 0a292c7..de5a107 100644 --- a/src/amplitude/worker.py +++ b/src/amplitude/worker.py @@ -8,7 +8,6 @@ class Workers: - def __init__(self): self.threads_pool = ThreadPoolExecutor(max_workers=16) self.is_active = True @@ -37,9 +36,14 @@ def stop(self): self.threads_pool.shutdown() def flush(self): - events = self.storage.pull_all() - if events: - return self.threads_pool.submit(self.send, events) + futures = [] + with self.storage.lock: + while True: + events = self.storage.pull(self.configuration.flush_queue_size) + if not events: + break + futures.append(self.threads_pool.submit(self.send, events)) + return futures def send(self, events): url = self.configuration.server_url @@ -51,23 +55,22 @@ def send(self, events): self.configuration.logger.error("Invalid API Key") def get_payload(self, events) -> bytes: - payload_body = { - "api_key": self.configuration.api_key, - "events": [] - } + payload_body = {"api_key": self.configuration.api_key, "events": []} for event in events: event_body = event.get_event_body() if event_body: payload_body["events"].append(event_body) if self.configuration.options: payload_body["options"] = self.configuration.options - return json.dumps(payload_body, sort_keys=True).encode('utf8') + return json.dumps(payload_body, sort_keys=True).encode("utf8") def buffer_consumer(self): try: if self.is_active: with self.storage.lock: - self.storage.lock.wait(self.configuration.flush_interval_millis / 1000) + self.storage.lock.wait( + self.configuration.flush_interval_millis / 1000 + ) while True: if not self.storage.total_events: break