Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 52 additions & 7 deletions cli/src/etos_client/sse/v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,33 @@
)


class LogRetry(Retry):
"""A Retry class that logs progress during initial connection attempts."""

def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
self._attempt_count = 0
super().__init__(*args, **kwargs)

def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None):
"""Override increment to add logging for initial connection attempts."""
self._attempt_count += 1
new_retry = super().increment(method, url, response, error, _pool, _stacktrace)

# Copy counter to new retry instance
if hasattr(new_retry, '_attempt_count'):
new_retry._attempt_count = self._attempt_count

# Log progress for 404 responses (test run not ready) at specific intervals
is_404_response = response is not None and response.status == 404
is_logging_interval = self._attempt_count in [4, 8, 12, 16, 20]

if is_404_response and is_logging_interval:
self.logger.info("Still waiting for test run to start...")

return new_retry


class Desynced(Exception):
"""The event stream has desynced."""

Expand Down Expand Up @@ -77,8 +104,29 @@ def version(cls) -> str:
"""SSE protocol version."""
return "v1"

def __connect(self, stream_id: str, retry_not_found=False) -> Iterable[bytes]:
"""Connect to an event-stream server, retrying if necessary.
def __connect(self, stream_id: str, is_initial_connection=False) -> Iterable[bytes]:
"""Handle connection for reconnections."""
if is_initial_connection:
# Use LogRetry with extended retries for initial connection
retries = LogRetry(
total=None,
read=0,
connect=20, # More attempts for initial connection
status=20,
backoff_factor=2, # Exponential backoff
backoff_max=120, # Cap at 2 minutes
status_forcelist={413, 429, 503, 404}, # Include 404 for initial connection
)
self.logger.info("Connecting to SSE server")
self.logger.info("Waiting for test run to start...")
else:
# Standard retries for reconnections
retries = RETRIES

return self.__do_connect(stream_id, retries)

def __do_connect(self, stream_id: str, retries: Retry) -> Iterable[bytes]:
"""Connect to an event-stream server with the given retry policy.

Sets the attribute `__release` which must be closed before exiting.
"""
Expand All @@ -88,12 +136,9 @@ def __connect(self, stream_id: str, retry_not_found=False) -> Iterable[bytes]:
}
if self.last_event_id is not None:
headers["Last-Event-ID"] = str(self.last_event_id)

try:
retries = RETRIES
if retry_not_found:
retries = retries.new(status_forcelist={413, 429, 503, 404})
url = f"{self.url}/sse/{self.version()}/events/{stream_id}"
self.logger.info("Connecting to SSE server at %s", url)
response = self.__pool.request(
"GET",
url,
Expand Down Expand Up @@ -195,7 +240,7 @@ def event_stream(self, stream_id: str) -> Iterable[Event]:
stream = []
while not self.__shutdown:
while self.__connected is False:
stream = self.__connect(stream_id, retry_not_found=not bool(stream))
stream = self.__connect(stream_id, is_initial_connection=not bool(stream))
time.sleep(1)
if not stream:
self.logger.warning("Failed connecting to stream. Reconnecting")
Expand Down
71 changes: 61 additions & 10 deletions cli/src/etos_client/sse/v2alpha/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@
from urllib3.util import Retry

#from etos_lib.messaging.events import Shutdown, Event, parse # import disabled due to: https://github.com/eiffel-community/etos/issues/417
# dummy class: remove when the etos_lib.messaging module is available
# dummy classes: remove when the etos_lib.messaging module is available
class Event:
pass

class Shutdown:
pass

def parse(event):
pass


CHUNK_SIZE = 500
RETRIES = Retry(
Expand All @@ -41,6 +47,33 @@ class Event:
)


class LogRetry(Retry):
"""A Retry class that logs progress during initial connection attempts."""

def __init__(self, *args, **kwargs):
self.logger = logging.getLogger(__name__)
self._attempt_count = 0
super().__init__(*args, **kwargs)

def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None):
"""Override increment to add logging for initial connection attempts."""
self._attempt_count += 1
new_retry = super().increment(method, url, response, error, _pool, _stacktrace)

# Copy counter to new retry instance
if hasattr(new_retry, '_attempt_count'):
new_retry._attempt_count = self._attempt_count

# Log progress for 404 responses (test run not ready) at specific intervals
is_404_response = response is not None and response.status == 404
is_logging_interval = self._attempt_count in [4, 8, 12, 16, 20]

if is_404_response and is_logging_interval:
self.logger.info("Still waiting for test run to start...")

return new_retry


class Desynced(Exception):
"""The event stream has desynced."""

Expand Down Expand Up @@ -85,8 +118,29 @@ def version(cls) -> str:
"""SSE protocol version."""
return "v2alpha"

def __connect(self, stream_id: str, apikey: str, retry_not_found=False) -> Iterable[bytes]:
"""Connect to an event-stream server, retrying if necessary.
def __connect(self, stream_id: str, apikey: str, is_initial_connection=False) -> Iterable[bytes]:
"""Handle connection for reconnections."""
if is_initial_connection:
# Use LogRetry with extended retries for initial connection
retries = LogRetry(
total=None,
read=0,
connect=20, # More attempts for initial connection
status=20,
backoff_factor=2, # Exponential backoff
backoff_max=120, # Cap at 2 minutes
status_forcelist={413, 429, 503, 404}, # Include 404 for initial connection
)
self.logger.info("Connecting to SSE server")
self.logger.info("Waiting for test run to start...")
else:
# Standard retries for reconnections
retries = RETRIES

return self.__do_connect(stream_id, apikey, retries)

def __do_connect(self, stream_id: str, apikey: str, retries: Retry) -> Iterable[bytes]:
"""Connect to an event-stream server with the given retry policy.

Sets the attribute `__release` which must be closed before exiting.
"""
Expand All @@ -97,13 +151,10 @@ def __connect(self, stream_id: str, apikey: str, retry_not_found=False) -> Itera
}
if self.last_event_id is not None:
headers["Last-Event-ID"] = str(self.last_event_id)

try:
retries = RETRIES
if retry_not_found:
retries = retries.new(status_forcelist={413, 429, 503, 404})
filter = "&".join(f"filter={value}" for value in self.filter)
url = f"{self.url}/sse/{self.version()}/events/{stream_id}?{filter}"
self.logger.info("Connecting to SSE server at %s", url)
filter_param = "&".join(f"filter={value}" for value in self.filter)
url = f"{self.url}/sse/{self.version()}/events/{stream_id}?{filter_param}"
response = self.__pool.request(
"GET",
url,
Expand Down Expand Up @@ -221,7 +272,7 @@ def event_stream(self, stream_id: str, apikey: str) -> Iterable[Event]:
stream = []
while not self.__shutdown:
while self.__connected is False:
stream = self.__connect(stream_id, apikey, retry_not_found=not bool(stream))
stream = self.__connect(stream_id, apikey, is_initial_connection=not bool(stream))
time.sleep(1)
if not stream:
self.logger.warning("Failed connecting to stream. Reconnecting")
Expand Down
Loading