Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions keepercommander/commands/pam_launch/connect_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,110 @@ def connect_timing_log_enabled() -> bool:
return _LOG.isEnabledFor(logging.DEBUG)


# --- Connect-phase delay helpers -------------------------------------------
#
# The pam launch flow has several historically fixed sleeps whose durations
# only matter in the first-launch / slow-network case. Defaults here are tuned
# for the fast path; each env var below lets operators restore the legacy
# (conservative) values without a code roll.

_WEBSOCKET_BACKEND_DELAY_ENV = 'WEBSOCKET_BACKEND_DELAY'
_WEBSOCKET_BACKEND_DELAY_FAST_DEFAULT = 0.30 # seconds — fast path default
_WEBSOCKET_BACKEND_DELAY_LEGACY_ENV = 'WEBSOCKET_BACKEND_DELAY_LEGACY'
_WEBSOCKET_BACKEND_DELAY_LEGACY_DEFAULT = 2.0 # seconds — adaptive fallback cap

_PAM_PRE_OFFER_SEC_ENV = 'PAM_PRE_OFFER_SEC'
_PAM_PRE_OFFER_FAST_DEFAULT = 0.0 # seconds — merged into backend_delay
_PAM_PRE_OFFER_LEGACY_ENV = 'PAM_PRE_OFFER_LEGACY' # 1/true/yes → force legacy 1.0s

_PAM_OFFER_RETRY_EXTRA_SEC_ENV = 'PAM_OFFER_RETRY_EXTRA_SEC'
_PAM_OFFER_RETRY_EXTRA_DEFAULT = 1.25 # seconds — retry backoff

_PAM_OPEN_CONNECTION_DELAY_ENV = 'PAM_OPEN_CONNECTION_DELAY'
_PAM_OPEN_CONNECTION_DELAY_FAST_DEFAULT = 0.05 # seconds — safety margin
# (retry loop handles slow DataChannel)

_PAM_WEBRTC_POLL_MS_ENV = 'PAM_WEBRTC_POLL_MS'
_PAM_WEBRTC_POLL_MS_DEFAULT = 25 # milliseconds — poll granularity


def _env_float(name: str, default: float) -> float:
"""Read a float env var; return ``default`` when unset, empty, or unparseable."""
raw = os.environ.get(name)
if raw is None:
return default
raw = str(raw).strip()
if raw == '':
return default
try:
return float(raw)
except (TypeError, ValueError):
return default


def _env_truthy(name: str) -> bool:
return os.environ.get(name, '').strip().lower() in ('1', 'true', 'yes', 'on')


def websocket_backend_delay_sec() -> float:
"""Sleep after WebSocket connects and before POSTing the offer (router/backend
registration window).

Set ``WEBSOCKET_BACKEND_DELAY`` to override. Default is 0.30s for the fast
path; the legacy value was 2.0s. Combined with the retry path, a single
unlucky launch still caps at the legacy total (see
``websocket_backend_delay_legacy_sec``).
"""
return _env_float(_WEBSOCKET_BACKEND_DELAY_ENV, _WEBSOCKET_BACKEND_DELAY_FAST_DEFAULT)


def websocket_backend_delay_legacy_sec() -> float:
"""Upper bound for the adaptive backend-delay catch-up on a first-attempt
offer failure. On retry the code sleeps up to
``max(0, legacy - fast_default)`` more so the cumulative wait matches the
pre-change 2.0s behavior for the unlucky cold-router case.
"""
return _env_float(_WEBSOCKET_BACKEND_DELAY_LEGACY_ENV, _WEBSOCKET_BACKEND_DELAY_LEGACY_DEFAULT)


def pre_offer_delay_sec() -> float:
"""Extra sleep between the backend-delay wait and the offer HTTP POST.

Default 0.0 (the previous hardcoded 1.0s sleep was redundant — the
backend-delay wait already serves the same purpose). Set
``PAM_PRE_OFFER_LEGACY=1`` to force the legacy 1.0s, or ``PAM_PRE_OFFER_SEC``
for a custom value.
"""
if _env_truthy(_PAM_PRE_OFFER_LEGACY_ENV):
return max(1.0, _env_float(_PAM_PRE_OFFER_SEC_ENV, 1.0))
return _env_float(_PAM_PRE_OFFER_SEC_ENV, _PAM_PRE_OFFER_FAST_DEFAULT)


def offer_retry_extra_delay_sec() -> float:
"""Base delay before a retry of the gateway offer HTTP POST."""
return _env_float(_PAM_OFFER_RETRY_EXTRA_SEC_ENV, _PAM_OFFER_RETRY_EXTRA_DEFAULT)


def open_connection_delay_sec() -> float:
"""Sleep between ``webrtc_data_plane_connected`` and sending ``OpenConnection``.

Historically 0.2s; reduced to 0.05s because the caller's retry loop with
exponential backoff already handles the "DataChannel not yet open" case.
Set ``PAM_OPEN_CONNECTION_DELAY`` to restore a larger safety margin.
"""
return _env_float(_PAM_OPEN_CONNECTION_DELAY_ENV, _PAM_OPEN_CONNECTION_DELAY_FAST_DEFAULT)


def webrtc_connection_poll_sec() -> float:
"""Poll tick (seconds) for the ``tube_registry.get_connection_state`` loop
that waits for the WebRTC data plane to reach ``connected``.

Default 25ms (previously 100ms). Set ``PAM_WEBRTC_POLL_MS`` to override.
"""
ms = _env_float(_PAM_WEBRTC_POLL_MS_ENV, _PAM_WEBRTC_POLL_MS_DEFAULT)
return max(0.001, ms / 1000.0)


class PamConnectTiming:
"""Monotonic checkpoints for ``pam launch`` / tunnel open (debug or PAM_CONNECT_TIMING=1).

Expand Down
22 changes: 16 additions & 6 deletions keepercommander/commands/pam_launch/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
_version_at_least,
_pam_settings_connection_port,
)
from .connect_timing import PamConnectTiming
from .connect_timing import (
PamConnectTiming,
open_connection_delay_sec,
webrtc_connection_poll_sec,
)
from .terminal_size import get_terminal_size_pixels, is_interactive_tty, PIXEL_MODE_GUACD, scale_screen_info
from .terminal_reset import reset_local_terminal_after_pam_session
from .crlf_merge_delay import (
Expand Down Expand Up @@ -1150,11 +1154,14 @@ def signal_handler_fn(signum, frame):
python_handler.start()
_cli_tc.checkpoint('python_handler_start_done')

# Wait for WebRTC connection to be established
# Wait for WebRTC connection to be established.
# Poll tick defaults to 25ms (was 100ms) — cheap FFI call,
# tightens P99 handoff latency. Set PAM_WEBRTC_POLL_MS to override.
logging.debug("Waiting for WebRTC connection...")
max_wait = 15
start_time = time.time()
connected = False
poll_tick = webrtc_connection_poll_sec()

while time.time() - start_time < max_wait:
try:
Expand All @@ -1165,7 +1172,7 @@ def signal_handler_fn(signum, frame):
break
except Exception as e:
logging.debug(f"Checking connection state: {e}")
time.sleep(0.1)
time.sleep(poll_tick)

if not connected:
raise CommandError('pam launch', "WebRTC connection not established within timeout")
Expand All @@ -1174,9 +1181,12 @@ def signal_handler_fn(signum, frame):
# Wait for DataChannel to be ready and Gateway to wire the session.
# connection state "connected" can precede DataChannel readiness; Gateway also needs
# time to associate the WebRTC connection with the channel and prepare guacd.
# Configurable via PAM_OPEN_CONNECTION_DELAY (default 0.2s; use 2.0 if handshake never starts).
open_conn_delay = float(os.environ.get('PAM_OPEN_CONNECTION_DELAY', '0.2'))
time.sleep(open_conn_delay)
# Default 0.05s — a small safety margin on top of the open_handler_connection
# retry loop below (exponential backoff already handles slow DataChannel).
# Set PAM_OPEN_CONNECTION_DELAY=2.0 to restore the legacy safety wait.
open_conn_delay = open_connection_delay_sec()
if open_conn_delay > 0:
time.sleep(open_conn_delay)
_cli_tc.checkpoint('open_connection_delay_done')

# Send OpenConnection to Gateway to initiate guacd session
Expand Down
154 changes: 113 additions & 41 deletions keepercommander/commands/pam_launch/terminal_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@
from ...params import KeeperParams

from ..pam_import.base import ConnectionProtocol
from .connect_timing import PamConnectTiming
from .connect_timing import (
PamConnectTiming,
websocket_backend_delay_sec,
websocket_backend_delay_legacy_sec,
pre_offer_delay_sec,
offer_retry_extra_delay_sec,
)

# Sentinel for "dag_linked_uid not resolved yet" — ``None`` is a valid resolved
# result (no DAG-linked launch credential), so we need a distinct marker.
Expand Down Expand Up @@ -1387,6 +1393,22 @@ def _open_terminal_webrtc_tunnel(params: KeeperParams,
# Store signal handler reference
tunnel_session.signal_handler = signal_handler # type: ignore[assignment]

# Start the dedicated WebSocket listener *before* ``create_tube``. The Rust
# tube creation takes ~500ms; running the WebSocket TLS handshake / router
# registration concurrently with it saves most of that window. The listener
# only reads the ``conversation_id`` from tunnel_session; the tube_id is
# used for the thread name and log context only (updated in-place after
# ``create_tube`` returns). No message will arrive before the gateway has
# received our offer, so there is no race between early listener start and
# the tube-id being rewritten from the temp UUID to the real one.
websocket_thread = start_websocket_listener(
params, tube_registry, timeout=300, gateway_uid=gateway_uid,
tunnel_session=tunnel_session,
router_tokens=router_tokens,
cookie_header=cookie_header,
)
_pam_tc.checkpoint('websocket_listener_started_early')

logging.debug(f"{bcolors.OKBLUE}Creating WebRTC offer for {protocol} connection...{bcolors.ENDC}")
if trickle_ice:
logging.debug("Using trickle ICE for real-time candidate exchange")
Expand Down Expand Up @@ -1520,20 +1542,15 @@ def _open_terminal_webrtc_tunnel(params: KeeperParams,
logging.debug(f"Registered encryption key for conversation: {conversation_id}")
logging.debug(f"Expecting WebSocket responses for conversation ID: {conversation_id}")

# Start WebSocket listener (pass cookie_header for ALB stickiness when trickle ICE)
websocket_thread = start_websocket_listener(
params, tube_registry, timeout=300, gateway_uid=gateway_uid,
tunnel_session=tunnel_session,
router_tokens=router_tokens,
cookie_header=cookie_header
)
_pam_tc.checkpoint('websocket_listener_started')
# (WebSocket listener already started above, before create_tube.)

# Wait for WebSocket to be ready before sending offer (same as pam tunnel start).
# Use event.wait() when available so we proceed as soon as ready; fallback to short sleep.
max_wait = 15.0
# Same backend registration delay as when event is present (router/gateway need time to register)
backend_delay = float(os.environ.get('WEBSOCKET_BACKEND_DELAY', '2.0'))
# Router/gateway need a moment to register the conversation after the
# WebSocket handshake. Default 0.30s; on first-offer failure we top up
# with the delta to the legacy 2.0s before retrying (adaptive fallback).
backend_delay = websocket_backend_delay_sec()
if tunnel_session.websocket_ready_event:
logging.debug(f"Waiting for dedicated WebSocket to connect (max {max_wait}s)...")
websocket_ready = tunnel_session.websocket_ready_event.wait(timeout=max_wait)
Expand Down Expand Up @@ -1712,8 +1729,13 @@ def _open_terminal_webrtc_tunnel(params: KeeperParams,
else:
logging.debug(f"No linked pamUser for record {record_uid} - using pamMachine credentials directly")

# Formerly a fixed ``time.sleep(1)`` — now 0.0 by default because the
# preceding backend_delay already covers router registration. Set
# PAM_PRE_OFFER_LEGACY=1 (or PAM_PRE_OFFER_SEC=<float>) to restore.
_pre_offer = pre_offer_delay_sec()
_pam_tc.checkpoint('pre_offer_sleep_start')
time.sleep(1) # Allow time for WebSocket listener to start
if _pre_offer > 0:
time.sleep(_pre_offer)
_pam_tc.checkpoint('pre_offer_sleep_done')

# Send offer via HTTP POST - two paths: streaming vs non-streaming
Expand Down Expand Up @@ -1752,6 +1774,83 @@ def _open_terminal_webrtc_tunnel(params: KeeperParams,
message_id = GatewayAction.conversation_id_to_message_id(conversation_id_original)
logging.debug(f"Generated messageId: {message_id} from conversationId: {conversation_id_original}")

# --- Gateway offer POST with retry + adaptive backend-delay fallback ---
# On a first-attempt failure that looks like a transient backend-not-ready
# condition (timeout, 502/503/504, controller_down, RRC timeout), sleep
# the retry base delay plus the delta between the fast default and the
# legacy ``WEBSOCKET_BACKEND_DELAY`` so the cumulative wait on the retry
# matches the pre-change behavior. Fast path stays fast; unlucky first
# try still gets the full safety window before a second attempt.
try:
_max_offer_attempts = max(1, int(os.environ.get('PAM_GATEWAY_OFFER_MAX_ATTEMPTS', '2')))
except (TypeError, ValueError):
_max_offer_attempts = 2
_offer_retry_extra = offer_retry_extra_delay_sec()
_offer_backend_catchup = max(
0.0,
websocket_backend_delay_legacy_sec() - websocket_backend_delay_sec(),
)
_offer_transient_patterns = (
'timeout', 'rrc_timeout', 'bad_state', 'connection',
'502', '503', '504', 'controller_down',
)

def _send_gateway_offer_with_retry(is_streaming, **extra_kwargs):
_resp = None
for _oa in range(_max_offer_attempts):
if _oa > 0:
if _offer_backend_catchup > 0 or _offer_retry_extra > 0:
_pam_tc.checkpoint('gateway_offer_backend_catchup_delay_start')
time.sleep(_offer_retry_extra + _offer_backend_catchup)
if _offer_backend_catchup > 0 or _offer_retry_extra > 0:
_pam_tc.checkpoint('gateway_offer_backend_catchup_delay_done')
_pam_tc.checkpoint(
'gateway_offer_http_attempt_1' if _oa == 0
else 'gateway_offer_http_attempt_{}'.format(_oa + 1)
)
try:
_resp = router_send_action_to_gateway(
params=params,
destination_gateway_uid_str=gateway_uid,
gateway_action=GatewayActionWebRTCSession(
conversation_id=conversation_id_original,
inputs=inputs,
message_id=message_id,
),
message_type=pam_pb2.CMT_CONNECT,
is_streaming=is_streaming,
gateway_timeout=30000,
**extra_kwargs,
)
except requests.exceptions.RequestException as _re:
if _oa < _max_offer_attempts - 1:
logging.warning(
'Gateway offer HTTP error (%s); retrying (attempt %s/%s)',
_re, _oa + 1, _max_offer_attempts,
)
continue
raise
except Exception as _ge:
_em = str(_ge).lower()
if _oa < _max_offer_attempts - 1 and any(
_p in _em for _p in _offer_transient_patterns
):
logging.warning(
'Gateway offer transient failure (%s); retrying (attempt %s/%s)',
_ge, _oa + 1, _max_offer_attempts,
)
continue
raise
if _resp is None and _oa < _max_offer_attempts - 1:
logging.warning(
'Gateway offer returned no response; retrying (attempt %s/%s)',
_oa + 1, _max_offer_attempts,
)
continue
break
_pam_tc.checkpoint('gateway_offer_http_done')
return _resp

# Two paths: streaming vs non-streaming
if trickle_ice:
# Streaming path: Response will come via WebSocket (use same tokens and session as WebSocket for ALB stickiness)
Expand All @@ -1764,21 +1863,7 @@ def _open_terminal_webrtc_tunnel(params: KeeperParams,
}
if http_session is not None:
offer_kwargs["http_session"] = http_session
_pam_tc.checkpoint('gateway_offer_http_attempt_1')
router_response = router_send_action_to_gateway(
params=params,
destination_gateway_uid_str=gateway_uid,
gateway_action=GatewayActionWebRTCSession(
conversation_id=conversation_id_original,
inputs=inputs,
message_id=message_id
),
message_type=pam_pb2.CMT_CONNECT,
is_streaming=True, # Response will come via WebSocket
gateway_timeout=30000,
**offer_kwargs
)
_pam_tc.checkpoint('gateway_offer_http_done')
router_response = _send_gateway_offer_with_retry(is_streaming=True, **offer_kwargs)

logging.debug(f"{bcolors.OKGREEN}Offer sent to gateway (streaming mode){bcolors.ENDC}")

Expand Down Expand Up @@ -1813,20 +1898,7 @@ def _open_terminal_webrtc_tunnel(params: KeeperParams,
}
else:
# Non-streaming path: Handle response immediately
_pam_tc.checkpoint('gateway_offer_http_attempt_1')
router_response = router_send_action_to_gateway(
params=params,
destination_gateway_uid_str=gateway_uid,
gateway_action=GatewayActionWebRTCSession(
conversation_id=conversation_id_original,
inputs=inputs,
message_id=message_id
),
message_type=pam_pb2.CMT_CONNECT,
is_streaming=False, # Response comes immediately in HTTP response
gateway_timeout=30000
)
_pam_tc.checkpoint('gateway_offer_http_done')
router_response = _send_gateway_offer_with_retry(is_streaming=False)

logging.debug(f"{bcolors.OKGREEN}Offer sent to gateway (non-streaming mode){bcolors.ENDC}")
logging.debug(f"Router response: {router_response}")
Expand Down
Loading