From 943266293f83eb583b51e877965c0a5138d07a75 Mon Sep 17 00:00:00 2001 From: Micah Roberts Date: Tue, 7 Oct 2025 14:22:39 -0600 Subject: [PATCH] ice restart support --- .../tunnel/port_forward/tunnel_helpers.py | 173 ++++++++++++++++-- 1 file changed, 157 insertions(+), 16 deletions(-) diff --git a/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py b/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py index 35825314b..66b1f5570 100644 --- a/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py +++ b/keepercommander/commands/tunnel/port_forward/tunnel_helpers.py @@ -896,7 +896,7 @@ def route_message_to_rust(response_item, tube_registry): tube_registry.set_remote_description(tube_id, answer_sdp, is_answer=True) print( f"{bcolors.OKBLUE}Connection state: {bcolors.ENDC}SDP answer received, connecting...") - + # Send any buffered local ICE candidates now that we have the answer session = get_tunnel_session(tube_id) if session and session.buffered_ice_candidates: @@ -910,6 +910,105 @@ def route_message_to_rust(response_item, tube_registry): session.buffered_ice_candidates.clear() else: logging.warning(f"No signal handler found for tube {tube_id} to send buffered candidates") + elif "offer" in data_json or (data_json.get("type") == "offer"): + # Gateway is sending us an ICE restart offer + offer_sdp = data_json.get('sdp') or data_json.get('offer') + + if offer_sdp: + logging.info(f"Received ICE restart offer from Gateway for conversation: {conversation_id}") + + tube_id = tube_registry.tube_id_from_connection_id(conversation_id) + if not tube_id: + logging.error(f"No tube ID for conversation: {conversation_id}") + return + + # Get session to check if trickle ICE is enabled + session = get_tunnel_session(tube_id) + if not session: + logging.error(f"No tunnel session found for tube {tube_id}") + return + + # ICE restart requires trickle ICE mode - check via signal_handler + if hasattr(session, 'signal_handler') and session.signal_handler: + if not session.signal_handler.trickle_ice: + logging.warning(f"ICE restart offer ignored - trickle ICE not enabled for tube {tube_id}") + return + else: + logging.warning(f"Cannot verify trickle ICE status for tube {tube_id} - no signal handler") + return + + print(f"{bcolors.OKBLUE}Connection state: {bcolors.ENDC}ICE restart offer received from Gateway...") + + try: + # Apply the offer from Gateway + tube_registry.set_remote_description(tube_id, offer_sdp, is_answer=False) + logging.debug(f"Applied ICE restart offer for tube {tube_id}") + + # Generate answer + answer_sdp = tube_registry.create_answer(tube_id) + + if answer_sdp: + logging.info(f"Generated ICE restart answer for tube {tube_id}") + + # Get session to access symmetric key and other info + session = get_tunnel_session(tube_id) + if not session: + logging.error(f"No tunnel session found for tube {tube_id}") + return + + # Prepare answer payload + answer_payload = { + "type": "answer", + "sdp": answer_sdp, + "ice_restart": True + } + + # Encrypt the answer + string_data = json.dumps(answer_payload) + bytes_data = string_to_bytes(string_data) + encrypted_data = tunnel_encrypt(session.symmetric_key, bytes_data) + + # Get signal handler from session + if hasattr(session, 'signal_handler') and session.signal_handler: + signal_handler = session.signal_handler + + # Send answer back to Gateway via HTTP POST + logging.info(f"Sending ICE restart answer to Gateway for tube {tube_id}") + print(f"{bcolors.OKBLUE}Connection state: {bcolors.ENDC}sending ICE restart answer...") + + router_response = router_send_action_to_gateway( + params=signal_handler.params, + destination_gateway_uid_str=session.gateway_uid, + gateway_action=GatewayActionWebRTCSession( + conversation_id=session.conversation_id, + inputs={ + "recordUid": signal_handler.record_uid, + 'kind': 'ice_restart_answer', + 'base64Nonce': signal_handler.base64_nonce, + 'conversationType': 'tunnel', + "data": encrypted_data, + "trickleICE": True, + } + ), + message_type=pam_pb2.CMT_CONNECT, + is_streaming=True, + gateway_timeout=GATEWAY_TIMEOUT, + destination_gateway_cookies=session.gateway_cookies + ) + + logging.info(f"ICE restart answer sent for tube {tube_id}") + print(f"{bcolors.OKGREEN}ICE restart answer sent successfully{bcolors.ENDC}") + else: + logging.error(f"No signal handler found for tube {tube_id} to send answer") + else: + logging.error(f"Failed to generate ICE restart answer for tube {tube_id}") + print(f"{bcolors.FAIL}Failed to generate ICE restart answer{bcolors.ENDC}") + + except Exception as e: + logging.error(f"Error handling ICE restart offer for tube {tube_id}: {e}") + print(f"{bcolors.FAIL}Error processing ICE restart offer: {e}{bcolors.ENDC}") + else: + logging.warning(f"Received offer message without SDP data for conversation: {conversation_id}") elif "candidates" in data_json: tube_id = tube_registry.tube_id_from_connection_id(conversation_id) if not tube_id: @@ -1057,47 +1156,64 @@ def signal_from_rust(self, response: dict): # Handle local connection state changes if signal_kind == 'connection_state_changed': - logging.info(f"Tube {tube_id} connection state changed to: {data}") - - # Update connection state display - if data.lower() == "connected": + new_state = data.lower() + logging.info(f"Connection state changed for tube {tube_id}: {new_state}") + + # Detailed logging for specific states + if new_state == 'disconnected': + logging.warning(f"Connection disconnected for tube {tube_id} - ICE restart may be attempted by Rust") + print(f"{bcolors.FAIL}Connection state: {bcolors.ENDC}disconnected ✗") + + elif new_state == 'failed': + logging.error(f"Connection failed for tube {tube_id} - ICE restart may be attempted by Rust") + print(f"{bcolors.FAIL}Connection state: {bcolors.ENDC}failed ✗") + + elif new_state == 'connected': + logging.info(f"Connection established/restored for tube {tube_id}") print(f"{bcolors.OKGREEN}Connection state: {bcolors.ENDC}connected") + if not self.connection_success_shown: self.connection_success_shown = True - + # Now show the endpoint table - both socket and WebRTC are ready if self.host and self.port and self.tube_id: endpoint_info = f"Endpoint: {bcolors.OKGREEN}{self.tube_id}{bcolors.ENDC} Listening on: {bcolors.OKGREEN}{self.host}:{self.port}{bcolors.ENDC}" mode_info = "Mode: trickle ICE" - + # Create formatted table max_width = max(len(endpoint_info), len(mode_info)) + 4 border = "+" + "-" * (max_width - 2) + "+" - + print(border) print(f"| {endpoint_info.ljust(max_width - 4)} |") print(f"| {mode_info.ljust(max_width - 4)} |") print(border) - + # Show tunnel management commands print(f"View all open tunnels : {bcolors.OKGREEN}pam tunnel list{bcolors.ENDC}") print(f"Stop a tunnel : {bcolors.OKGREEN}pam tunnel stop {self.tube_id}{bcolors.ENDC}") - + print(f"{bcolors.OKGREEN}Tunnel is ready for traffic{bcolors.ENDC}") - + # Flush any buffered ICE candidates now that we're connected if session and session.buffered_ice_candidates: logging.debug(f"Flushing {len(session.buffered_ice_candidates)} buffered ICE candidates") for candidate in session.buffered_ice_candidates: self._send_ice_candidate_immediately(candidate, tube_id) session.buffered_ice_candidates.clear() - - elif data.lower() == "connecting": + + elif new_state == "connecting": + logging.debug(f"Connection in progress for tube {tube_id}") print(f"{bcolors.OKBLUE}Connection state: {bcolors.ENDC}connecting...") - elif data.lower() in ["failed", "closed", "disconnected"]: - print(f"{bcolors.FAIL}Connection state: {bcolors.ENDC}{data.lower()} ✗") + + elif new_state == "closed": + logging.info(f"Connection closed for tube {tube_id}") + print(f"{bcolors.FAIL}Connection state: {bcolors.ENDC}closed ✗") + else: - print(f"{bcolors.OKBLUE}Connection state: {bcolors.ENDC}{data.lower()}") + logging.debug(f"Connection state for tube {tube_id}: {new_state}") + print(f"{bcolors.OKBLUE}Connection state: {bcolors.ENDC}{new_state}") + return # Local event, no gateway response needed elif signal_kind == 'channel_closed': @@ -1178,6 +1294,11 @@ def signal_from_rust(self, response: dict): elif signal_kind == 'ice_restart_request': logging.info(f"Received ICE restart request for tube {tube_id}") + # ICE restart requires trickle ICE mode + if not self.trickle_ice: + logging.warning(f"ICE restart request ignored - trickle ICE not enabled for tube {tube_id}") + return + try: # Execute ICE restart through your tube registry restart_sdp = self.tube_registry.restart_ice(tube_id) @@ -1193,6 +1314,26 @@ def signal_from_rust(self, response: dict): return # Local event, no gateway response needed + elif signal_kind == 'ice_restart_offer': + # Rust initiated ICE restart and generated offer (e.g., network change detected) + # We need to send this offer to Gateway and get an answer + logging.info(f"Received ice_restart_offer from Rust for tube {tube_id}") + + # ICE restart requires trickle ICE mode + if not self.trickle_ice: + logging.warning(f"ICE restart offer ignored - trickle ICE not enabled for tube {tube_id}") + return + + offer_sdp = data # Already base64 encoded from Rust + + if not offer_sdp: + logging.error(f"Empty ICE restart offer received for tube {tube_id}") + return + + # Send the offer to Gateway + self._send_restart_offer(offer_sdp, tube_id) + return + # Unknown signal type else: logging.debug(f"Unknown signal type: {signal_kind}")