diff --git a/keepercommander/commands/pam_launch/connect_timing.py b/keepercommander/commands/pam_launch/connect_timing.py index 096dda5a8..f7fed5d6c 100644 --- a/keepercommander/commands/pam_launch/connect_timing.py +++ b/keepercommander/commands/pam_launch/connect_timing.py @@ -30,6 +30,110 @@ def connect_timing_log_enabled() -> bool: return _LOG.isEnabledFor(logging.DEBUG) +# --- Connect-phase delay helpers ------------------------------------------- +# +# The pam launch flow has several historically fixed sleeps whose durations +# only matter in the first-launch / slow-network case. Defaults here are tuned +# for the fast path; each env var below lets operators restore the legacy +# (conservative) values without a code roll. + +_WEBSOCKET_BACKEND_DELAY_ENV = 'WEBSOCKET_BACKEND_DELAY' +_WEBSOCKET_BACKEND_DELAY_FAST_DEFAULT = 0.30 # seconds — fast path default +_WEBSOCKET_BACKEND_DELAY_LEGACY_ENV = 'WEBSOCKET_BACKEND_DELAY_LEGACY' +_WEBSOCKET_BACKEND_DELAY_LEGACY_DEFAULT = 2.0 # seconds — adaptive fallback cap + +_PAM_PRE_OFFER_SEC_ENV = 'PAM_PRE_OFFER_SEC' +_PAM_PRE_OFFER_FAST_DEFAULT = 0.0 # seconds — merged into backend_delay +_PAM_PRE_OFFER_LEGACY_ENV = 'PAM_PRE_OFFER_LEGACY' # 1/true/yes → force legacy 1.0s + +_PAM_OFFER_RETRY_EXTRA_SEC_ENV = 'PAM_OFFER_RETRY_EXTRA_SEC' +_PAM_OFFER_RETRY_EXTRA_DEFAULT = 1.25 # seconds — retry backoff + +_PAM_OPEN_CONNECTION_DELAY_ENV = 'PAM_OPEN_CONNECTION_DELAY' +_PAM_OPEN_CONNECTION_DELAY_FAST_DEFAULT = 0.05 # seconds — safety margin + # (retry loop handles slow DataChannel) + +_PAM_WEBRTC_POLL_MS_ENV = 'PAM_WEBRTC_POLL_MS' +_PAM_WEBRTC_POLL_MS_DEFAULT = 25 # milliseconds — poll granularity + + +def _env_float(name: str, default: float) -> float: + """Read a float env var; return ``default`` when unset, empty, or unparseable.""" + raw = os.environ.get(name) + if raw is None: + return default + raw = str(raw).strip() + if raw == '': + return default + try: + return float(raw) + except (TypeError, ValueError): + return default + + +def _env_truthy(name: str) -> bool: + return os.environ.get(name, '').strip().lower() in ('1', 'true', 'yes', 'on') + + +def websocket_backend_delay_sec() -> float: + """Sleep after WebSocket connects and before POSTing the offer (router/backend + registration window). + + Set ``WEBSOCKET_BACKEND_DELAY`` to override. Default is 0.30s for the fast + path; the legacy value was 2.0s. Combined with the retry path, a single + unlucky launch still caps at the legacy total (see + ``websocket_backend_delay_legacy_sec``). + """ + return _env_float(_WEBSOCKET_BACKEND_DELAY_ENV, _WEBSOCKET_BACKEND_DELAY_FAST_DEFAULT) + + +def websocket_backend_delay_legacy_sec() -> float: + """Upper bound for the adaptive backend-delay catch-up on a first-attempt + offer failure. On retry the code sleeps up to + ``max(0, legacy - fast_default)`` more so the cumulative wait matches the + pre-change 2.0s behavior for the unlucky cold-router case. + """ + return _env_float(_WEBSOCKET_BACKEND_DELAY_LEGACY_ENV, _WEBSOCKET_BACKEND_DELAY_LEGACY_DEFAULT) + + +def pre_offer_delay_sec() -> float: + """Extra sleep between the backend-delay wait and the offer HTTP POST. + + Default 0.0 (the previous hardcoded 1.0s sleep was redundant — the + backend-delay wait already serves the same purpose). Set + ``PAM_PRE_OFFER_LEGACY=1`` to force the legacy 1.0s, or ``PAM_PRE_OFFER_SEC`` + for a custom value. + """ + if _env_truthy(_PAM_PRE_OFFER_LEGACY_ENV): + return max(1.0, _env_float(_PAM_PRE_OFFER_SEC_ENV, 1.0)) + return _env_float(_PAM_PRE_OFFER_SEC_ENV, _PAM_PRE_OFFER_FAST_DEFAULT) + + +def offer_retry_extra_delay_sec() -> float: + """Base delay before a retry of the gateway offer HTTP POST.""" + return _env_float(_PAM_OFFER_RETRY_EXTRA_SEC_ENV, _PAM_OFFER_RETRY_EXTRA_DEFAULT) + + +def open_connection_delay_sec() -> float: + """Sleep between ``webrtc_data_plane_connected`` and sending ``OpenConnection``. + + Historically 0.2s; reduced to 0.05s because the caller's retry loop with + exponential backoff already handles the "DataChannel not yet open" case. + Set ``PAM_OPEN_CONNECTION_DELAY`` to restore a larger safety margin. + """ + return _env_float(_PAM_OPEN_CONNECTION_DELAY_ENV, _PAM_OPEN_CONNECTION_DELAY_FAST_DEFAULT) + + +def webrtc_connection_poll_sec() -> float: + """Poll tick (seconds) for the ``tube_registry.get_connection_state`` loop + that waits for the WebRTC data plane to reach ``connected``. + + Default 25ms (previously 100ms). Set ``PAM_WEBRTC_POLL_MS`` to override. + """ + ms = _env_float(_PAM_WEBRTC_POLL_MS_ENV, _PAM_WEBRTC_POLL_MS_DEFAULT) + return max(0.001, ms / 1000.0) + + class PamConnectTiming: """Monotonic checkpoints for ``pam launch`` / tunnel open (debug or PAM_CONNECT_TIMING=1). diff --git a/keepercommander/commands/pam_launch/launch.py b/keepercommander/commands/pam_launch/launch.py index e861948ec..fbcd5e762 100644 --- a/keepercommander/commands/pam_launch/launch.py +++ b/keepercommander/commands/pam_launch/launch.py @@ -33,7 +33,11 @@ _version_at_least, _pam_settings_connection_port, ) -from .connect_timing import PamConnectTiming +from .connect_timing import ( + PamConnectTiming, + open_connection_delay_sec, + webrtc_connection_poll_sec, +) from .terminal_size import get_terminal_size_pixels, is_interactive_tty, PIXEL_MODE_GUACD, scale_screen_info from .terminal_reset import reset_local_terminal_after_pam_session from .crlf_merge_delay import ( @@ -1150,11 +1154,14 @@ def signal_handler_fn(signum, frame): python_handler.start() _cli_tc.checkpoint('python_handler_start_done') - # Wait for WebRTC connection to be established + # Wait for WebRTC connection to be established. + # Poll tick defaults to 25ms (was 100ms) — cheap FFI call, + # tightens P99 handoff latency. Set PAM_WEBRTC_POLL_MS to override. logging.debug("Waiting for WebRTC connection...") max_wait = 15 start_time = time.time() connected = False + poll_tick = webrtc_connection_poll_sec() while time.time() - start_time < max_wait: try: @@ -1165,7 +1172,7 @@ def signal_handler_fn(signum, frame): break except Exception as e: logging.debug(f"Checking connection state: {e}") - time.sleep(0.1) + time.sleep(poll_tick) if not connected: raise CommandError('pam launch', "WebRTC connection not established within timeout") @@ -1174,9 +1181,12 @@ def signal_handler_fn(signum, frame): # Wait for DataChannel to be ready and Gateway to wire the session. # connection state "connected" can precede DataChannel readiness; Gateway also needs # time to associate the WebRTC connection with the channel and prepare guacd. - # Configurable via PAM_OPEN_CONNECTION_DELAY (default 0.2s; use 2.0 if handshake never starts). - open_conn_delay = float(os.environ.get('PAM_OPEN_CONNECTION_DELAY', '0.2')) - time.sleep(open_conn_delay) + # Default 0.05s — a small safety margin on top of the open_handler_connection + # retry loop below (exponential backoff already handles slow DataChannel). + # Set PAM_OPEN_CONNECTION_DELAY=2.0 to restore the legacy safety wait. + open_conn_delay = open_connection_delay_sec() + if open_conn_delay > 0: + time.sleep(open_conn_delay) _cli_tc.checkpoint('open_connection_delay_done') # Send OpenConnection to Gateway to initiate guacd session diff --git a/keepercommander/commands/pam_launch/terminal_connection.py b/keepercommander/commands/pam_launch/terminal_connection.py index 1c840739c..530215014 100644 --- a/keepercommander/commands/pam_launch/terminal_connection.py +++ b/keepercommander/commands/pam_launch/terminal_connection.py @@ -75,7 +75,13 @@ from ...params import KeeperParams from ..pam_import.base import ConnectionProtocol -from .connect_timing import PamConnectTiming +from .connect_timing import ( + PamConnectTiming, + websocket_backend_delay_sec, + websocket_backend_delay_legacy_sec, + pre_offer_delay_sec, + offer_retry_extra_delay_sec, +) # Sentinel for "dag_linked_uid not resolved yet" — ``None`` is a valid resolved # result (no DAG-linked launch credential), so we need a distinct marker. @@ -1387,6 +1393,22 @@ def _open_terminal_webrtc_tunnel(params: KeeperParams, # Store signal handler reference tunnel_session.signal_handler = signal_handler # type: ignore[assignment] + # Start the dedicated WebSocket listener *before* ``create_tube``. The Rust + # tube creation takes ~500ms; running the WebSocket TLS handshake / router + # registration concurrently with it saves most of that window. The listener + # only reads the ``conversation_id`` from tunnel_session; the tube_id is + # used for the thread name and log context only (updated in-place after + # ``create_tube`` returns). No message will arrive before the gateway has + # received our offer, so there is no race between early listener start and + # the tube-id being rewritten from the temp UUID to the real one. + websocket_thread = start_websocket_listener( + params, tube_registry, timeout=300, gateway_uid=gateway_uid, + tunnel_session=tunnel_session, + router_tokens=router_tokens, + cookie_header=cookie_header, + ) + _pam_tc.checkpoint('websocket_listener_started_early') + logging.debug(f"{bcolors.OKBLUE}Creating WebRTC offer for {protocol} connection...{bcolors.ENDC}") if trickle_ice: logging.debug("Using trickle ICE for real-time candidate exchange") @@ -1520,20 +1542,15 @@ def _open_terminal_webrtc_tunnel(params: KeeperParams, logging.debug(f"Registered encryption key for conversation: {conversation_id}") logging.debug(f"Expecting WebSocket responses for conversation ID: {conversation_id}") - # Start WebSocket listener (pass cookie_header for ALB stickiness when trickle ICE) - websocket_thread = start_websocket_listener( - params, tube_registry, timeout=300, gateway_uid=gateway_uid, - tunnel_session=tunnel_session, - router_tokens=router_tokens, - cookie_header=cookie_header - ) - _pam_tc.checkpoint('websocket_listener_started') + # (WebSocket listener already started above, before create_tube.) # Wait for WebSocket to be ready before sending offer (same as pam tunnel start). # Use event.wait() when available so we proceed as soon as ready; fallback to short sleep. max_wait = 15.0 - # Same backend registration delay as when event is present (router/gateway need time to register) - backend_delay = float(os.environ.get('WEBSOCKET_BACKEND_DELAY', '2.0')) + # Router/gateway need a moment to register the conversation after the + # WebSocket handshake. Default 0.30s; on first-offer failure we top up + # with the delta to the legacy 2.0s before retrying (adaptive fallback). + backend_delay = websocket_backend_delay_sec() if tunnel_session.websocket_ready_event: logging.debug(f"Waiting for dedicated WebSocket to connect (max {max_wait}s)...") websocket_ready = tunnel_session.websocket_ready_event.wait(timeout=max_wait) @@ -1712,8 +1729,13 @@ def _open_terminal_webrtc_tunnel(params: KeeperParams, else: logging.debug(f"No linked pamUser for record {record_uid} - using pamMachine credentials directly") + # Formerly a fixed ``time.sleep(1)`` — now 0.0 by default because the + # preceding backend_delay already covers router registration. Set + # PAM_PRE_OFFER_LEGACY=1 (or PAM_PRE_OFFER_SEC=) to restore. + _pre_offer = pre_offer_delay_sec() _pam_tc.checkpoint('pre_offer_sleep_start') - time.sleep(1) # Allow time for WebSocket listener to start + if _pre_offer > 0: + time.sleep(_pre_offer) _pam_tc.checkpoint('pre_offer_sleep_done') # Send offer via HTTP POST - two paths: streaming vs non-streaming @@ -1752,6 +1774,83 @@ def _open_terminal_webrtc_tunnel(params: KeeperParams, message_id = GatewayAction.conversation_id_to_message_id(conversation_id_original) logging.debug(f"Generated messageId: {message_id} from conversationId: {conversation_id_original}") + # --- Gateway offer POST with retry + adaptive backend-delay fallback --- + # On a first-attempt failure that looks like a transient backend-not-ready + # condition (timeout, 502/503/504, controller_down, RRC timeout), sleep + # the retry base delay plus the delta between the fast default and the + # legacy ``WEBSOCKET_BACKEND_DELAY`` so the cumulative wait on the retry + # matches the pre-change behavior. Fast path stays fast; unlucky first + # try still gets the full safety window before a second attempt. + try: + _max_offer_attempts = max(1, int(os.environ.get('PAM_GATEWAY_OFFER_MAX_ATTEMPTS', '2'))) + except (TypeError, ValueError): + _max_offer_attempts = 2 + _offer_retry_extra = offer_retry_extra_delay_sec() + _offer_backend_catchup = max( + 0.0, + websocket_backend_delay_legacy_sec() - websocket_backend_delay_sec(), + ) + _offer_transient_patterns = ( + 'timeout', 'rrc_timeout', 'bad_state', 'connection', + '502', '503', '504', 'controller_down', + ) + + def _send_gateway_offer_with_retry(is_streaming, **extra_kwargs): + _resp = None + for _oa in range(_max_offer_attempts): + if _oa > 0: + if _offer_backend_catchup > 0 or _offer_retry_extra > 0: + _pam_tc.checkpoint('gateway_offer_backend_catchup_delay_start') + time.sleep(_offer_retry_extra + _offer_backend_catchup) + if _offer_backend_catchup > 0 or _offer_retry_extra > 0: + _pam_tc.checkpoint('gateway_offer_backend_catchup_delay_done') + _pam_tc.checkpoint( + 'gateway_offer_http_attempt_1' if _oa == 0 + else 'gateway_offer_http_attempt_{}'.format(_oa + 1) + ) + try: + _resp = router_send_action_to_gateway( + params=params, + destination_gateway_uid_str=gateway_uid, + gateway_action=GatewayActionWebRTCSession( + conversation_id=conversation_id_original, + inputs=inputs, + message_id=message_id, + ), + message_type=pam_pb2.CMT_CONNECT, + is_streaming=is_streaming, + gateway_timeout=30000, + **extra_kwargs, + ) + except requests.exceptions.RequestException as _re: + if _oa < _max_offer_attempts - 1: + logging.warning( + 'Gateway offer HTTP error (%s); retrying (attempt %s/%s)', + _re, _oa + 1, _max_offer_attempts, + ) + continue + raise + except Exception as _ge: + _em = str(_ge).lower() + if _oa < _max_offer_attempts - 1 and any( + _p in _em for _p in _offer_transient_patterns + ): + logging.warning( + 'Gateway offer transient failure (%s); retrying (attempt %s/%s)', + _ge, _oa + 1, _max_offer_attempts, + ) + continue + raise + if _resp is None and _oa < _max_offer_attempts - 1: + logging.warning( + 'Gateway offer returned no response; retrying (attempt %s/%s)', + _oa + 1, _max_offer_attempts, + ) + continue + break + _pam_tc.checkpoint('gateway_offer_http_done') + return _resp + # Two paths: streaming vs non-streaming if trickle_ice: # Streaming path: Response will come via WebSocket (use same tokens and session as WebSocket for ALB stickiness) @@ -1764,21 +1863,7 @@ def _open_terminal_webrtc_tunnel(params: KeeperParams, } if http_session is not None: offer_kwargs["http_session"] = http_session - _pam_tc.checkpoint('gateway_offer_http_attempt_1') - router_response = router_send_action_to_gateway( - params=params, - destination_gateway_uid_str=gateway_uid, - gateway_action=GatewayActionWebRTCSession( - conversation_id=conversation_id_original, - inputs=inputs, - message_id=message_id - ), - message_type=pam_pb2.CMT_CONNECT, - is_streaming=True, # Response will come via WebSocket - gateway_timeout=30000, - **offer_kwargs - ) - _pam_tc.checkpoint('gateway_offer_http_done') + router_response = _send_gateway_offer_with_retry(is_streaming=True, **offer_kwargs) logging.debug(f"{bcolors.OKGREEN}Offer sent to gateway (streaming mode){bcolors.ENDC}") @@ -1813,20 +1898,7 @@ def _open_terminal_webrtc_tunnel(params: KeeperParams, } else: # Non-streaming path: Handle response immediately - _pam_tc.checkpoint('gateway_offer_http_attempt_1') - router_response = router_send_action_to_gateway( - params=params, - destination_gateway_uid_str=gateway_uid, - gateway_action=GatewayActionWebRTCSession( - conversation_id=conversation_id_original, - inputs=inputs, - message_id=message_id - ), - message_type=pam_pb2.CMT_CONNECT, - is_streaming=False, # Response comes immediately in HTTP response - gateway_timeout=30000 - ) - _pam_tc.checkpoint('gateway_offer_http_done') + router_response = _send_gateway_offer_with_retry(is_streaming=False) logging.debug(f"{bcolors.OKGREEN}Offer sent to gateway (non-streaming mode){bcolors.ENDC}") logging.debug(f"Router response: {router_response}")