diff --git a/src/schematic/event_buffer.py b/src/schematic/event_buffer.py index 475a3fe..e5a4bfc 100644 --- a/src/schematic/event_buffer.py +++ b/src/schematic/event_buffer.py @@ -31,8 +31,7 @@ def __init__( self.max_events = max_events self.max_retries = max_retries self.initial_retry_delay = initial_retry_delay - self.flush_lock = threading.Lock() - self.push_lock = threading.Lock() + self.lock = threading.Lock() # Single lock for all buffer operations self.shutdown = threading.Event() self.stopped = False @@ -42,55 +41,53 @@ def __init__( self.flush_thread.start() def _flush(self): - with self.flush_lock: + with self.lock: if not self.events: return + events_to_process = [event for event in self.events if event is not None] + self.events.clear() - events = [event for event in self.events if event is not None] - - # Initialize retry counter and success flag - retry_count = 0 - success = False - last_exception = None - - # Try with retries and exponential backoff - while retry_count <= self.max_retries and not success: - try: - if retry_count > 0: - # Log retry attempt - self.logger.info(f"Retrying event batch submission (attempt {retry_count} of {self.max_retries})") - - # Attempt to send events - self.events_api.create_event_batch(events=events) - success = True - - except Exception as e: - last_exception = e - retry_count += 1 - - if retry_count <= self.max_retries: - # Calculate backoff with jitter - delay = self.initial_retry_delay * (2 ** (retry_count - 1)) - jitter = random.uniform(0, 0.1 * delay) # 10% jitter - wait_time = delay + jitter - - self.logger.warning( - f"Event batch submission failed: {e}. " - f"Retrying in {wait_time:.2f} seconds..." - ) - - # Wait before retry - time.sleep(wait_time) - - # After all retries, if still not successful, log the error - if not success: - self.logger.error( - f"Event batch submission failed after {self.max_retries} retries: {last_exception}" - ) - elif retry_count > 0: - self.logger.info(f"Event batch submission succeeded after {retry_count} retries") + if events_to_process: + self._process_events(events_to_process) - self.events.clear() + def _process_events(self, events_to_process): + """Process events with retry logic - called without holding lock""" + retry_count = 0 + success = False + last_exception = None + + while retry_count <= self.max_retries and not success: + try: + if retry_count > 0: + self.logger.info(f"Retrying event batch submission (attempt {retry_count} of {self.max_retries})") + + self.events_api.create_event_batch(events=events_to_process) + success = True + + except Exception as e: + last_exception = e + retry_count += 1 + + if retry_count <= self.max_retries: + # Calculate backoff with jitter + delay = self.initial_retry_delay * (2 ** (retry_count - 1)) + jitter = random.uniform(0, 0.1 * delay) # 10% jitter + wait_time = delay + jitter + + self.logger.warning( + f"Event batch submission failed: {e}. " + f"Retrying in {wait_time:.2f} seconds..." + ) + + # Wait before retry + time.sleep(wait_time) + + if not success: + self.logger.error( + f"Event batch submission failed after {self.max_retries} retries: {last_exception}" + ) + elif retry_count > 0: + self.logger.info(f"Event batch submission succeeded after {retry_count} retries") def _periodic_flush(self): while not self.shutdown.is_set(): @@ -102,11 +99,17 @@ def push(self, event: CreateEventRequestBody): self.logger.error("Event buffer is stopped, not accepting new events") return - with self.push_lock: + should_flush = False + with self.lock: if len(self.events) >= self.max_events: - self._flush() + should_flush = True + else: + self.events.append(event) - self.events.append(event) + if should_flush: + self._flush() + with self.lock: + self.events.append(event) def stop(self): try: @@ -136,62 +139,58 @@ def __init__( self.initial_retry_delay = initial_retry_delay self.shutdown_event = asyncio.Event() self.stopped = False - self.flush_lock = asyncio.Lock() - self.push_lock = asyncio.Lock() + self.lock = asyncio.Lock() # Single lock for all buffer operations # Start periodic flushing task self.flush_task = asyncio.create_task(self._periodic_flush()) async def _flush(self): - async with self.flush_lock: + async with self.lock: if not self.events: return + events_to_process = [event for event in self.events if event is not None] + self.events.clear() - events = [event for event in self.events if event is not None] - - # Initialize retry counter and success flag - retry_count = 0 - success = False - last_exception = None - - # Try with retries and exponential backoff - while retry_count <= self.max_retries and not success: - try: - if retry_count > 0: - # Log retry attempt - self.logger.info(f"Retrying event batch submission (attempt {retry_count} of {self.max_retries})") - - # Attempt to send events - await self.events_api.create_event_batch(events=events) - success = True - - except Exception as e: - last_exception = e - retry_count += 1 - - if retry_count <= self.max_retries: - # Calculate backoff with jitter - delay = self.initial_retry_delay * (2 ** (retry_count - 1)) - jitter = random.uniform(0, 0.1 * delay) # 10% jitter - wait_time = delay + jitter - - self.logger.warning( - f"Event batch submission failed: {e}. " - f"Retrying in {wait_time:.2f} seconds..." - ) - - # Wait before retry (asyncio sleep) - await asyncio.sleep(wait_time) - - # After all retries, if still not successful, log the error - if not success: - self.logger.error( - f"Event batch submission failed after {self.max_retries} retries: {last_exception}" - ) - elif retry_count > 0: - self.logger.info(f"Event batch submission succeeded after {retry_count} retries") + if events_to_process: + await self._process_events_async(events_to_process) - self.events.clear() + async def _process_events_async(self, events_to_process): + """Process events with retry logic - called without holding lock""" + # Initialize retry counter and success flag + retry_count = 0 + success = False + last_exception = None + + while retry_count <= self.max_retries and not success: + try: + if retry_count > 0: + self.logger.info(f"Retrying event batch submission (attempt {retry_count} of {self.max_retries})") + + await self.events_api.create_event_batch(events=events_to_process) + success = True + + except Exception as e: + last_exception = e + retry_count += 1 + + if retry_count <= self.max_retries: + delay = self.initial_retry_delay * (2 ** (retry_count - 1)) + jitter = random.uniform(0, 0.1 * delay) # 10% jitter + wait_time = delay + jitter + + self.logger.warning( + f"Event batch submission failed: {e}. " + f"Retrying in {wait_time:.2f} seconds..." + ) + + await asyncio.sleep(wait_time) + + if not success: + self.logger.error( + f"Event batch submission failed after {self.max_retries} retries: {last_exception}" + ) + elif retry_count > 0: + self.logger.info(f"Event batch submission succeeded after {retry_count} retries") async def _periodic_flush(self): while not self.shutdown_event.is_set(): @@ -208,11 +207,17 @@ async def push(self, event: CreateEventRequestBody): self.logger.error("Event buffer is stopped, not accepting new events") return - async with self.push_lock: + should_flush = False + async with self.lock: if len(self.events) >= self.max_events: - await self._flush() + should_flush = True + else: + self.events.append(event) - self.events.append(event) + if should_flush: + await self._flush() + async with self.lock: + self.events.append(event) async def stop(self): try: