diff --git a/cli/src/etos_client/sse/v1/client.py b/cli/src/etos_client/sse/v1/client.py index b9f9c777..0cc82ebf 100644 --- a/cli/src/etos_client/sse/v1/client.py +++ b/cli/src/etos_client/sse/v1/client.py @@ -38,6 +38,33 @@ ) +class LogRetry(Retry): + """A Retry class that logs progress during initial connection attempts.""" + + def __init__(self, *args, **kwargs): + self.logger = logging.getLogger(__name__) + self._attempt_count = 0 + super().__init__(*args, **kwargs) + + def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None): + """Override increment to add logging for initial connection attempts.""" + self._attempt_count += 1 + new_retry = super().increment(method, url, response, error, _pool, _stacktrace) + + # Copy counter to new retry instance + if hasattr(new_retry, '_attempt_count'): + new_retry._attempt_count = self._attempt_count + + # Log progress for 404 responses (test run not ready) at specific intervals + is_404_response = response is not None and response.status == 404 + is_logging_interval = self._attempt_count in [4, 8, 12, 16, 20] + + if is_404_response and is_logging_interval: + self.logger.info("Still waiting for test run to start...") + + return new_retry + + class Desynced(Exception): """The event stream has desynced.""" @@ -77,8 +104,29 @@ def version(cls) -> str: """SSE protocol version.""" return "v1" - def __connect(self, stream_id: str, retry_not_found=False) -> Iterable[bytes]: - """Connect to an event-stream server, retrying if necessary. + def __connect(self, stream_id: str, is_initial_connection=False) -> Iterable[bytes]: + """Handle connection for reconnections.""" + if is_initial_connection: + # Use LogRetry with extended retries for initial connection + retries = LogRetry( + total=None, + read=0, + connect=20, # More attempts for initial connection + status=20, + backoff_factor=2, # Exponential backoff + backoff_max=120, # Cap at 2 minutes + status_forcelist={413, 429, 503, 404}, # Include 404 for initial connection + ) + self.logger.info("Connecting to SSE server") + self.logger.info("Waiting for test run to start...") + else: + # Standard retries for reconnections + retries = RETRIES + + return self.__do_connect(stream_id, retries) + + def __do_connect(self, stream_id: str, retries: Retry) -> Iterable[bytes]: + """Connect to an event-stream server with the given retry policy. Sets the attribute `__release` which must be closed before exiting. """ @@ -88,12 +136,9 @@ def __connect(self, stream_id: str, retry_not_found=False) -> Iterable[bytes]: } if self.last_event_id is not None: headers["Last-Event-ID"] = str(self.last_event_id) + try: - retries = RETRIES - if retry_not_found: - retries = retries.new(status_forcelist={413, 429, 503, 404}) url = f"{self.url}/sse/{self.version()}/events/{stream_id}" - self.logger.info("Connecting to SSE server at %s", url) response = self.__pool.request( "GET", url, @@ -195,7 +240,7 @@ def event_stream(self, stream_id: str) -> Iterable[Event]: stream = [] while not self.__shutdown: while self.__connected is False: - stream = self.__connect(stream_id, retry_not_found=not bool(stream)) + stream = self.__connect(stream_id, is_initial_connection=not bool(stream)) time.sleep(1) if not stream: self.logger.warning("Failed connecting to stream. Reconnecting") diff --git a/cli/src/etos_client/sse/v2alpha/client.py b/cli/src/etos_client/sse/v2alpha/client.py index b5f4d5da..3496fa84 100644 --- a/cli/src/etos_client/sse/v2alpha/client.py +++ b/cli/src/etos_client/sse/v2alpha/client.py @@ -25,10 +25,16 @@ from urllib3.util import Retry #from etos_lib.messaging.events import Shutdown, Event, parse # import disabled due to: https://github.com/eiffel-community/etos/issues/417 -# dummy class: remove when the etos_lib.messaging module is available +# dummy classes: remove when the etos_lib.messaging module is available class Event: pass +class Shutdown: + pass + +def parse(event): + pass + CHUNK_SIZE = 500 RETRIES = Retry( @@ -41,6 +47,33 @@ class Event: ) +class LogRetry(Retry): + """A Retry class that logs progress during initial connection attempts.""" + + def __init__(self, *args, **kwargs): + self.logger = logging.getLogger(__name__) + self._attempt_count = 0 + super().__init__(*args, **kwargs) + + def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None): + """Override increment to add logging for initial connection attempts.""" + self._attempt_count += 1 + new_retry = super().increment(method, url, response, error, _pool, _stacktrace) + + # Copy counter to new retry instance + if hasattr(new_retry, '_attempt_count'): + new_retry._attempt_count = self._attempt_count + + # Log progress for 404 responses (test run not ready) at specific intervals + is_404_response = response is not None and response.status == 404 + is_logging_interval = self._attempt_count in [4, 8, 12, 16, 20] + + if is_404_response and is_logging_interval: + self.logger.info("Still waiting for test run to start...") + + return new_retry + + class Desynced(Exception): """The event stream has desynced.""" @@ -85,8 +118,29 @@ def version(cls) -> str: """SSE protocol version.""" return "v2alpha" - def __connect(self, stream_id: str, apikey: str, retry_not_found=False) -> Iterable[bytes]: - """Connect to an event-stream server, retrying if necessary. + def __connect(self, stream_id: str, apikey: str, is_initial_connection=False) -> Iterable[bytes]: + """Handle connection for reconnections.""" + if is_initial_connection: + # Use LogRetry with extended retries for initial connection + retries = LogRetry( + total=None, + read=0, + connect=20, # More attempts for initial connection + status=20, + backoff_factor=2, # Exponential backoff + backoff_max=120, # Cap at 2 minutes + status_forcelist={413, 429, 503, 404}, # Include 404 for initial connection + ) + self.logger.info("Connecting to SSE server") + self.logger.info("Waiting for test run to start...") + else: + # Standard retries for reconnections + retries = RETRIES + + return self.__do_connect(stream_id, apikey, retries) + + def __do_connect(self, stream_id: str, apikey: str, retries: Retry) -> Iterable[bytes]: + """Connect to an event-stream server with the given retry policy. Sets the attribute `__release` which must be closed before exiting. """ @@ -97,13 +151,10 @@ def __connect(self, stream_id: str, apikey: str, retry_not_found=False) -> Itera } if self.last_event_id is not None: headers["Last-Event-ID"] = str(self.last_event_id) + try: - retries = RETRIES - if retry_not_found: - retries = retries.new(status_forcelist={413, 429, 503, 404}) - filter = "&".join(f"filter={value}" for value in self.filter) - url = f"{self.url}/sse/{self.version()}/events/{stream_id}?{filter}" - self.logger.info("Connecting to SSE server at %s", url) + filter_param = "&".join(f"filter={value}" for value in self.filter) + url = f"{self.url}/sse/{self.version()}/events/{stream_id}?{filter_param}" response = self.__pool.request( "GET", url, @@ -221,7 +272,7 @@ def event_stream(self, stream_id: str, apikey: str) -> Iterable[Event]: stream = [] while not self.__shutdown: while self.__connected is False: - stream = self.__connect(stream_id, apikey, retry_not_found=not bool(stream)) + stream = self.__connect(stream_id, apikey, is_initial_connection=not bool(stream)) time.sleep(1) if not stream: self.logger.warning("Failed connecting to stream. Reconnecting")