Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 105 additions & 100 deletions src/schematic/event_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ def __init__(
self.max_events = max_events
self.max_retries = max_retries
self.initial_retry_delay = initial_retry_delay
self.flush_lock = threading.Lock()
self.push_lock = threading.Lock()
self.lock = threading.Lock() # Single lock for all buffer operations
self.shutdown = threading.Event()
self.stopped = False

Expand All @@ -42,55 +41,53 @@ def __init__(
self.flush_thread.start()

def _flush(self):
with self.flush_lock:
with self.lock:
if not self.events:
return
events_to_process = [event for event in self.events if event is not None]
self.events.clear()

events = [event for event in self.events if event is not None]

# Initialize retry counter and success flag
retry_count = 0
success = False
last_exception = None

# Try with retries and exponential backoff
while retry_count <= self.max_retries and not success:
try:
if retry_count > 0:
# Log retry attempt
self.logger.info(f"Retrying event batch submission (attempt {retry_count} of {self.max_retries})")

# Attempt to send events
self.events_api.create_event_batch(events=events)
success = True

except Exception as e:
last_exception = e
retry_count += 1

if retry_count <= self.max_retries:
# Calculate backoff with jitter
delay = self.initial_retry_delay * (2 ** (retry_count - 1))
jitter = random.uniform(0, 0.1 * delay) # 10% jitter
wait_time = delay + jitter

self.logger.warning(
f"Event batch submission failed: {e}. "
f"Retrying in {wait_time:.2f} seconds..."
)

# Wait before retry
time.sleep(wait_time)

# After all retries, if still not successful, log the error
if not success:
self.logger.error(
f"Event batch submission failed after {self.max_retries} retries: {last_exception}"
)
elif retry_count > 0:
self.logger.info(f"Event batch submission succeeded after {retry_count} retries")
if events_to_process:
self._process_events(events_to_process)

self.events.clear()
def _process_events(self, events_to_process):
"""Process events with retry logic - called without holding lock"""
retry_count = 0
success = False
last_exception = None

while retry_count <= self.max_retries and not success:
try:
if retry_count > 0:
self.logger.info(f"Retrying event batch submission (attempt {retry_count} of {self.max_retries})")

self.events_api.create_event_batch(events=events_to_process)
success = True

except Exception as e:
last_exception = e
retry_count += 1

if retry_count <= self.max_retries:
# Calculate backoff with jitter
delay = self.initial_retry_delay * (2 ** (retry_count - 1))
jitter = random.uniform(0, 0.1 * delay) # 10% jitter
wait_time = delay + jitter

self.logger.warning(
f"Event batch submission failed: {e}. "
f"Retrying in {wait_time:.2f} seconds..."
)

# Wait before retry
time.sleep(wait_time)

if not success:
self.logger.error(
f"Event batch submission failed after {self.max_retries} retries: {last_exception}"
)
elif retry_count > 0:
self.logger.info(f"Event batch submission succeeded after {retry_count} retries")

def _periodic_flush(self):
while not self.shutdown.is_set():
Expand All @@ -102,11 +99,17 @@ def push(self, event: CreateEventRequestBody):
self.logger.error("Event buffer is stopped, not accepting new events")
return

with self.push_lock:
should_flush = False
with self.lock:
if len(self.events) >= self.max_events:
self._flush()
should_flush = True
else:
self.events.append(event)

self.events.append(event)
if should_flush:
self._flush()
with self.lock:
self.events.append(event)

def stop(self):
try:
Expand Down Expand Up @@ -136,62 +139,58 @@ def __init__(
self.initial_retry_delay = initial_retry_delay
self.shutdown_event = asyncio.Event()
self.stopped = False
self.flush_lock = asyncio.Lock()
self.push_lock = asyncio.Lock()
self.lock = asyncio.Lock() # Single lock for all buffer operations

# Start periodic flushing task
self.flush_task = asyncio.create_task(self._periodic_flush())

async def _flush(self):
async with self.flush_lock:
async with self.lock:
if not self.events:
return
events_to_process = [event for event in self.events if event is not None]
self.events.clear()

events = [event for event in self.events if event is not None]

# Initialize retry counter and success flag
retry_count = 0
success = False
last_exception = None

# Try with retries and exponential backoff
while retry_count <= self.max_retries and not success:
try:
if retry_count > 0:
# Log retry attempt
self.logger.info(f"Retrying event batch submission (attempt {retry_count} of {self.max_retries})")

# Attempt to send events
await self.events_api.create_event_batch(events=events)
success = True

except Exception as e:
last_exception = e
retry_count += 1

if retry_count <= self.max_retries:
# Calculate backoff with jitter
delay = self.initial_retry_delay * (2 ** (retry_count - 1))
jitter = random.uniform(0, 0.1 * delay) # 10% jitter
wait_time = delay + jitter

self.logger.warning(
f"Event batch submission failed: {e}. "
f"Retrying in {wait_time:.2f} seconds..."
)

# Wait before retry (asyncio sleep)
await asyncio.sleep(wait_time)

# After all retries, if still not successful, log the error
if not success:
self.logger.error(
f"Event batch submission failed after {self.max_retries} retries: {last_exception}"
)
elif retry_count > 0:
self.logger.info(f"Event batch submission succeeded after {retry_count} retries")
if events_to_process:
await self._process_events_async(events_to_process)

self.events.clear()
async def _process_events_async(self, events_to_process):
"""Process events with retry logic - called without holding lock"""
# Initialize retry counter and success flag
retry_count = 0
success = False
last_exception = None

while retry_count <= self.max_retries and not success:
try:
if retry_count > 0:
self.logger.info(f"Retrying event batch submission (attempt {retry_count} of {self.max_retries})")

await self.events_api.create_event_batch(events=events_to_process)
success = True

except Exception as e:
last_exception = e
retry_count += 1

if retry_count <= self.max_retries:
delay = self.initial_retry_delay * (2 ** (retry_count - 1))
jitter = random.uniform(0, 0.1 * delay) # 10% jitter
wait_time = delay + jitter

self.logger.warning(
f"Event batch submission failed: {e}. "
f"Retrying in {wait_time:.2f} seconds..."
)

await asyncio.sleep(wait_time)

if not success:
self.logger.error(
f"Event batch submission failed after {self.max_retries} retries: {last_exception}"
)
elif retry_count > 0:
self.logger.info(f"Event batch submission succeeded after {retry_count} retries")

async def _periodic_flush(self):
while not self.shutdown_event.is_set():
Expand All @@ -208,11 +207,17 @@ async def push(self, event: CreateEventRequestBody):
self.logger.error("Event buffer is stopped, not accepting new events")
return

async with self.push_lock:
should_flush = False
async with self.lock:
if len(self.events) >= self.max_events:
await self._flush()
should_flush = True
else:
self.events.append(event)

self.events.append(event)
if should_flush:
await self._flush()
async with self.lock:
self.events.append(event)

async def stop(self):
try:
Expand Down
Loading