Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 157 additions & 16 deletions keepercommander/commands/tunnel/port_forward/tunnel_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ def route_message_to_rust(response_item, tube_registry):
tube_registry.set_remote_description(tube_id, answer_sdp, is_answer=True)
print(
f"{bcolors.OKBLUE}Connection state: {bcolors.ENDC}SDP answer received, connecting...")

# Send any buffered local ICE candidates now that we have the answer
session = get_tunnel_session(tube_id)
if session and session.buffered_ice_candidates:
Expand All @@ -910,6 +910,105 @@ def route_message_to_rust(response_item, tube_registry):
session.buffered_ice_candidates.clear()
else:
logging.warning(f"No signal handler found for tube {tube_id} to send buffered candidates")
elif "offer" in data_json or (data_json.get("type") == "offer"):
# Gateway is sending us an ICE restart offer
offer_sdp = data_json.get('sdp') or data_json.get('offer')

if offer_sdp:
logging.info(f"Received ICE restart offer from Gateway for conversation: {conversation_id}")

tube_id = tube_registry.tube_id_from_connection_id(conversation_id)
if not tube_id:
logging.error(f"No tube ID for conversation: {conversation_id}")
return

# Get session to check if trickle ICE is enabled
session = get_tunnel_session(tube_id)
if not session:
logging.error(f"No tunnel session found for tube {tube_id}")
return

# ICE restart requires trickle ICE mode - check via signal_handler
if hasattr(session, 'signal_handler') and session.signal_handler:
if not session.signal_handler.trickle_ice:
logging.warning(f"ICE restart offer ignored - trickle ICE not enabled for tube {tube_id}")
return
else:
logging.warning(f"Cannot verify trickle ICE status for tube {tube_id} - no signal handler")
return

print(f"{bcolors.OKBLUE}Connection state: {bcolors.ENDC}ICE restart offer received from Gateway...")

try:
# Apply the offer from Gateway
tube_registry.set_remote_description(tube_id, offer_sdp, is_answer=False)
logging.debug(f"Applied ICE restart offer for tube {tube_id}")

# Generate answer
answer_sdp = tube_registry.create_answer(tube_id)

if answer_sdp:
logging.info(f"Generated ICE restart answer for tube {tube_id}")

# Get session to access symmetric key and other info
session = get_tunnel_session(tube_id)
if not session:
logging.error(f"No tunnel session found for tube {tube_id}")
return

# Prepare answer payload
answer_payload = {
"type": "answer",
"sdp": answer_sdp,
"ice_restart": True
}

# Encrypt the answer
string_data = json.dumps(answer_payload)
bytes_data = string_to_bytes(string_data)
encrypted_data = tunnel_encrypt(session.symmetric_key, bytes_data)

# Get signal handler from session
if hasattr(session, 'signal_handler') and session.signal_handler:
signal_handler = session.signal_handler

# Send answer back to Gateway via HTTP POST
logging.info(f"Sending ICE restart answer to Gateway for tube {tube_id}")
print(f"{bcolors.OKBLUE}Connection state: {bcolors.ENDC}sending ICE restart answer...")

router_response = router_send_action_to_gateway(
params=signal_handler.params,
destination_gateway_uid_str=session.gateway_uid,
gateway_action=GatewayActionWebRTCSession(
conversation_id=session.conversation_id,
inputs={
"recordUid": signal_handler.record_uid,
'kind': 'ice_restart_answer',
'base64Nonce': signal_handler.base64_nonce,
'conversationType': 'tunnel',
"data": encrypted_data,
"trickleICE": True,
}
),
message_type=pam_pb2.CMT_CONNECT,
is_streaming=True,
gateway_timeout=GATEWAY_TIMEOUT,
destination_gateway_cookies=session.gateway_cookies
)

logging.info(f"ICE restart answer sent for tube {tube_id}")
print(f"{bcolors.OKGREEN}ICE restart answer sent successfully{bcolors.ENDC}")
else:
logging.error(f"No signal handler found for tube {tube_id} to send answer")
else:
logging.error(f"Failed to generate ICE restart answer for tube {tube_id}")
print(f"{bcolors.FAIL}Failed to generate ICE restart answer{bcolors.ENDC}")

except Exception as e:
logging.error(f"Error handling ICE restart offer for tube {tube_id}: {e}")
print(f"{bcolors.FAIL}Error processing ICE restart offer: {e}{bcolors.ENDC}")
else:
logging.warning(f"Received offer message without SDP data for conversation: {conversation_id}")
elif "candidates" in data_json:
tube_id = tube_registry.tube_id_from_connection_id(conversation_id)
if not tube_id:
Expand Down Expand Up @@ -1057,47 +1156,64 @@ def signal_from_rust(self, response: dict):

# Handle local connection state changes
if signal_kind == 'connection_state_changed':
logging.info(f"Tube {tube_id} connection state changed to: {data}")

# Update connection state display
if data.lower() == "connected":
new_state = data.lower()
logging.info(f"Connection state changed for tube {tube_id}: {new_state}")

# Detailed logging for specific states
if new_state == 'disconnected':
logging.warning(f"Connection disconnected for tube {tube_id} - ICE restart may be attempted by Rust")
print(f"{bcolors.FAIL}Connection state: {bcolors.ENDC}disconnected ✗")

elif new_state == 'failed':
logging.error(f"Connection failed for tube {tube_id} - ICE restart may be attempted by Rust")
print(f"{bcolors.FAIL}Connection state: {bcolors.ENDC}failed ✗")

elif new_state == 'connected':
logging.info(f"Connection established/restored for tube {tube_id}")
print(f"{bcolors.OKGREEN}Connection state: {bcolors.ENDC}connected")

if not self.connection_success_shown:
self.connection_success_shown = True

# Now show the endpoint table - both socket and WebRTC are ready
if self.host and self.port and self.tube_id:
endpoint_info = f"Endpoint: {bcolors.OKGREEN}{self.tube_id}{bcolors.ENDC} Listening on: {bcolors.OKGREEN}{self.host}:{self.port}{bcolors.ENDC}"
mode_info = "Mode: trickle ICE"

# Create formatted table
max_width = max(len(endpoint_info), len(mode_info)) + 4
border = "+" + "-" * (max_width - 2) + "+"

print(border)
print(f"| {endpoint_info.ljust(max_width - 4)} |")
print(f"| {mode_info.ljust(max_width - 4)} |")
print(border)

# Show tunnel management commands
print(f"View all open tunnels : {bcolors.OKGREEN}pam tunnel list{bcolors.ENDC}")
print(f"Stop a tunnel : {bcolors.OKGREEN}pam tunnel stop {self.tube_id}{bcolors.ENDC}")

print(f"{bcolors.OKGREEN}Tunnel is ready for traffic{bcolors.ENDC}")

# Flush any buffered ICE candidates now that we're connected
if session and session.buffered_ice_candidates:
logging.debug(f"Flushing {len(session.buffered_ice_candidates)} buffered ICE candidates")
for candidate in session.buffered_ice_candidates:
self._send_ice_candidate_immediately(candidate, tube_id)
session.buffered_ice_candidates.clear()

elif data.lower() == "connecting":

elif new_state == "connecting":
logging.debug(f"Connection in progress for tube {tube_id}")
print(f"{bcolors.OKBLUE}Connection state: {bcolors.ENDC}connecting...")
elif data.lower() in ["failed", "closed", "disconnected"]:
print(f"{bcolors.FAIL}Connection state: {bcolors.ENDC}{data.lower()} ✗")

elif new_state == "closed":
logging.info(f"Connection closed for tube {tube_id}")
print(f"{bcolors.FAIL}Connection state: {bcolors.ENDC}closed ✗")

else:
print(f"{bcolors.OKBLUE}Connection state: {bcolors.ENDC}{data.lower()}")
logging.debug(f"Connection state for tube {tube_id}: {new_state}")
print(f"{bcolors.OKBLUE}Connection state: {bcolors.ENDC}{new_state}")

return # Local event, no gateway response needed

elif signal_kind == 'channel_closed':
Expand Down Expand Up @@ -1178,6 +1294,11 @@ def signal_from_rust(self, response: dict):
elif signal_kind == 'ice_restart_request':
logging.info(f"Received ICE restart request for tube {tube_id}")

# ICE restart requires trickle ICE mode
if not self.trickle_ice:
logging.warning(f"ICE restart request ignored - trickle ICE not enabled for tube {tube_id}")
return

try:
# Execute ICE restart through your tube registry
restart_sdp = self.tube_registry.restart_ice(tube_id)
Expand All @@ -1193,6 +1314,26 @@ def signal_from_rust(self, response: dict):

return # Local event, no gateway response needed

elif signal_kind == 'ice_restart_offer':
# Rust initiated ICE restart and generated offer (e.g., network change detected)
# We need to send this offer to Gateway and get an answer
logging.info(f"Received ice_restart_offer from Rust for tube {tube_id}")

# ICE restart requires trickle ICE mode
if not self.trickle_ice:
logging.warning(f"ICE restart offer ignored - trickle ICE not enabled for tube {tube_id}")
return

offer_sdp = data # Already base64 encoded from Rust

if not offer_sdp:
logging.error(f"Empty ICE restart offer received for tube {tube_id}")
return

# Send the offer to Gateway
self._send_restart_offer(offer_sdp, tube_id)
return

# Unknown signal type
else:
logging.debug(f"Unknown signal type: {signal_kind}")
Expand Down