Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions keepercommander/commands/pam_launch/terminal_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ def extract_terminal_settings(
elif protocol == ConnectionProtocol.KUBERNETES.value:
settings['protocol_specific'] = _extract_kubernetes_settings(connection)
elif protocol in DATABASE:
settings['protocol_specific'] = _extract_database_settings(connection, protocol)
settings['protocol_specific'] = _extract_database_settings(connection)

# allowSupplyHost is at top level of pamSettings value, not inside connection
settings['allowSupplyHost'] = pam_settings_value.get('allowSupplyHost', False)
Expand Down Expand Up @@ -573,53 +573,48 @@ def extract_terminal_settings(


def _extract_ssh_settings(connection: Dict[str, Any]) -> Dict[str, Any]:
"""Extract SSH-specific settings"""
"""Extract SSH-specific settings from pamSettings.connection (record JSON)."""
sftp = connection.get('sftp') or {}
return {
'publicHostKey': connection.get('publicHostKey', ''),
'executeCommand': connection.get('executeCommand', ''),
'sftpEnabled': connection.get('sftpEnabled', False),
'publicHostKey': connection.get('hostKey', ''),
'executeCommand': connection.get('command', ''),
'sftpEnabled': bool(sftp.get('enableSftp', False)),
'sftpRootDirectory': sftp.get('sftpRootDirectory', ''),
}


def _extract_telnet_settings(connection: Dict[str, Any]) -> Dict[str, Any]:
"""Extract Telnet-specific settings"""
"""Extract Telnet-specific settings from pamSettings.connection (record JSON)."""
return {
'usernameRegex': connection.get('usernameRegex', ''),
'passwordRegex': connection.get('passwordRegex', ''),
'loginSuccessRegex': connection.get('loginSuccessRegex', ''),
'loginFailureRegex': connection.get('loginFailureRegex', ''),
}


def _extract_kubernetes_settings(connection: Dict[str, Any]) -> Dict[str, Any]:
"""Extract Kubernetes-specific settings"""
"""Extract Kubernetes-specific settings from pamSettings.connection (record JSON)."""
return {
'namespace': connection.get('namespace', 'default'),
'pod': connection.get('pod', ''),
'container': connection.get('container', ''),
'ignoreServerCertificate': connection.get('ignoreServerCertificate', False),
'caCertificate': connection.get('caCertificate', ''),
'clientCertificate': connection.get('clientCertificate', ''),
'useSSL': connection.get('useSSL', False),
'ignoreServerCertificate': connection.get('ignoreCert', False),
'caCertificate': connection.get('caCert', ''),
'clientCertificate': connection.get('clientCert', ''),
'clientKey': connection.get('clientKey', ''),
}


def _extract_database_settings(connection: Dict[str, Any], protocol: str) -> Dict[str, Any]:
"""Extract database-specific settings"""
settings = {
'defaultDatabase': connection.get('defaultDatabase', ''),
def _extract_database_settings(connection: Dict[str, Any]) -> Dict[str, Any]:
"""Extract database-specific settings from pamSettings.connection (record JSON)."""
return {
'defaultDatabase': connection.get('database', ''),
'disableCsvExport': connection.get('disableCsvExport', False),
'disableCsvImport': connection.get('disableCsvImport', False),
}

# Add protocol-specific database settings
if protocol == ConnectionProtocol.MYSQL.value:
settings['useSSL'] = connection.get('useSSL', False)
elif protocol == ConnectionProtocol.POSTGRESQL.value:
settings['useSSL'] = connection.get('useSSL', False)
elif protocol == ConnectionProtocol.SQLSERVER.value:
settings['useSSL'] = connection.get('useSSL', True) # SQL Server typically uses SSL by default

return settings


def create_connection_context(params: KeeperParams,
record_uid: str,
Expand Down Expand Up @@ -1117,13 +1112,19 @@ def _build_guacamole_connection_settings(
# Enable SFTP if configured
if protocol_specific.get('sftpEnabled'):
guacd_params['enable-sftp'] = 'true'
if protocol_specific.get('sftpRootDirectory'):
guacd_params['sftp-root-directory'] = protocol_specific['sftpRootDirectory']

elif protocol == ConnectionProtocol.TELNET.value:
# Telnet-specific params
if protocol_specific.get('usernameRegex'):
guacd_params['username-regex'] = protocol_specific['usernameRegex']
if protocol_specific.get('passwordRegex'):
guacd_params['password-regex'] = protocol_specific['passwordRegex']
if protocol_specific.get('loginSuccessRegex'):
guacd_params['login-success-regex'] = protocol_specific['loginSuccessRegex']
if protocol_specific.get('loginFailureRegex'):
guacd_params['login-failure-regex'] = protocol_specific['loginFailureRegex']

elif protocol == ConnectionProtocol.KUBERNETES.value:
# Kubernetes-specific params
Expand All @@ -1141,11 +1142,17 @@ def _build_guacamole_connection_settings(
guacd_params['client-key'] = protocol_specific['clientKey']
if protocol_specific.get('ignoreServerCertificate'):
guacd_params['ignore-cert'] = 'true'
if protocol_specific.get('useSSL'):
guacd_params['use-ssl'] = 'true'

elif protocol in DATABASE:
# Database-specific params
if protocol_specific.get('defaultDatabase'):
guacd_params['database'] = protocol_specific['defaultDatabase']
if protocol_specific.get('disableCsvExport'):
guacd_params['disable-csv-export'] = 'true'
if protocol_specific.get('disableCsvImport'):
guacd_params['disable-csv-import'] = 'true'

# CLI mode: named pipe for terminal STDOUT (guacr terminal handlers; not graphical RDP/VNC)
guacd_params['enable-pipe'] = 'true'
Expand Down
104 changes: 95 additions & 9 deletions keepercommander/commands/tunnel_and_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -1934,6 +1934,7 @@ def _val(v):
print(f' {self._green("connection.protocol"):<36}{_val(cn.get("protocol"))}')
print(f' {self._green("connection.httpCredentialsUid"):<36}{_val(cn.get("httpCredentialsUid") or None)}')
print(f' {self._green("connection.recordingIncludeKeys"):<36}{_val(cn.get("recordingIncludeKeys"))}')
print(f' {self._green("connection.ignoreInitialSslCert"):<36}{_val(cn.get("ignoreInitialSslCert"))}')
else:
pam_settings_field = record_obj.get_typed_field('pamSettings') if record_obj else None
ps = {}
Expand All @@ -1948,6 +1949,8 @@ def _val(v):
print(f' {self._green("connection.protocol"):<36}{_val(cn.get("protocol"))}')
print(f' {self._green("connection.allowKeeperDBProxy"):<36}{_val(cn.get("allowKeeperDBProxy"))}')
print(f' {self._green("connection.recordingIncludeKeys"):<36}{_val(cn.get("recordingIncludeKeys"))}')
print(f' {self._green("connection.security"):<36}{_val(cn.get("security"))}')
print(f' {self._green("connection.ignoreCert"):<36}{_val(cn.get("ignoreCert"))}')
print(f' {self._green("allowSupplyHost"):<36}{_val(ps.get("allowSupplyHost"))}')
if ps.get('configUid'):
print(f' {self._green("configUid"):<36}{_val(ps.get("configUid"))}')
Expand Down Expand Up @@ -2668,6 +2671,13 @@ class PAMConnectionEditCommand(Command):
help='Maximum Scrollback Size (terminal history). Integer to set, '
'empty string to remove. Supported for pamDatabase (mysql/postgresql/sql-server) '
'and pamMachine/pamDirectory (ssh/telnet/kubernetes).')
parser.add_argument('--ignore-server-cert', '-isc', required=False, dest='ignore_server_cert', choices=choices,
help='Ignore server certificate errors (on/off/default). Supported for rdp and '
'kubernetes protocols.')
parser.add_argument('--security-mode', '-sm', required=False, dest='security_mode',
choices=['any', 'nla', 'tls', 'vmconnect', 'rdp', 'default'],
help='RDP Security Mode (any/nla/tls/vmconnect/rdp, or default to unset). '
'Supported for rdp protocol only.')
parser.add_argument('--rotate-on-termination', required=False, dest='rotate_on_termination',
choices=['on', 'off'],
help='Rotate launch credentials when the PAM session ends (DAG resource meta)')
Expand Down Expand Up @@ -2716,6 +2726,19 @@ def execute(self, params, **kwargs):
f"pamRemoteBrowser, pamNetworkConfiguration pamAwsConfiguration, and "
f"pamAzureConfiguration records{bcolors.ENDC}")

# Effective protocol (existing, or the one being set by this same call via
# --connections=on --protocol) — shared by --scrollback, --ignore-server-cert,
# and --security-mode gating below.
def _get_effective_protocol():
existing_ps = record.get_typed_field('pamSettings')
existing_protocol = ''
if existing_ps and existing_ps.value and isinstance(existing_ps.value[0], dict):
existing_protocol = existing_ps.value[0].get('connection', {}).get('protocol') or ''
new_protocol_arg = kwargs.get('protocol', None)
if kwargs.get('connections') == 'on' and new_protocol_arg is not None:
return new_protocol_arg # may be '' to clear
return existing_protocol

# --scrollback: validate record type + effective protocol before any mutation
scrollback_arg = kwargs.get('scrollback', None)
scrollback_clear = False
Expand All @@ -2732,15 +2755,7 @@ def execute(self, params, **kwargs):
f'{bcolors.FAIL}--scrollback is only supported for pamDatabase, pamMachine, and pamDirectory '
f'records. Record "{record_uid}" is of type "{record_type}".{bcolors.ENDC}')

existing_ps = record.get_typed_field('pamSettings')
existing_protocol = ''
if existing_ps and existing_ps.value and isinstance(existing_ps.value[0], dict):
existing_protocol = existing_ps.value[0].get('connection', {}).get('protocol') or ''
new_protocol_arg = kwargs.get('protocol', None)
if kwargs.get('connections') == 'on' and new_protocol_arg is not None:
effective_protocol = new_protocol_arg # may be '' to clear
else:
effective_protocol = existing_protocol
effective_protocol = _get_effective_protocol()
if effective_protocol not in allowed_protocols:
raise CommandError('pam connection edit',
f'{bcolors.FAIL}--scrollback is not supported for protocol "{effective_protocol or "(unset)"}" '
Expand All @@ -2760,6 +2775,37 @@ def execute(self, params, **kwargs):
f'{bcolors.FAIL}--scrollback must be a non-negative integer or empty string. '
f'Got: "{scrollback_arg}".{bcolors.ENDC}')

pam_settings_record_types = ('pamMachine', 'pamDatabase', 'pamDirectory')

# --ignore-server-cert: validate record type + effective protocol before any mutation
ignore_server_cert_arg = kwargs.get('ignore_server_cert', None)
if ignore_server_cert_arg is not None:
if record_type not in pam_settings_record_types:
raise CommandError('pam connection edit',
f'{bcolors.FAIL}--ignore-server-cert is only supported for pamMachine, pamDatabase, and '
f'pamDirectory records. Record "{record_uid}" is of type "{record_type}". For pamRemoteBrowser '
f'records, use `pam rbi edit --ignore-server-cert` instead.{bcolors.ENDC}')
effective_protocol = _get_effective_protocol()
if effective_protocol not in ('rdp', 'kubernetes'):
raise CommandError('pam connection edit',
f'{bcolors.FAIL}--ignore-server-cert is not supported for protocol '
f'"{effective_protocol or "(unset)"}" on {record_type} records. '
f'Allowed protocols: kubernetes, rdp.{bcolors.ENDC}')

# --security-mode: validate record type + effective protocol before any mutation
security_mode_arg = kwargs.get('security_mode', None)
if security_mode_arg is not None:
if record_type not in pam_settings_record_types:
raise CommandError('pam connection edit',
f'{bcolors.FAIL}--security-mode is only supported for pamMachine, pamDatabase, and '
f'pamDirectory records. Record "{record_uid}" is of type "{record_type}".{bcolors.ENDC}')
effective_protocol = _get_effective_protocol()
if effective_protocol != 'rdp':
raise CommandError('pam connection edit',
f'{bcolors.FAIL}--security-mode is not supported for protocol '
f'"{effective_protocol or "(unset)"}" on {record_type} records. '
f'Allowed protocols: rdp.{bcolors.ENDC}')

encrypted_session_token, encrypted_transmission_key, transmission_key = get_keeper_tokens(params)
if record_type in "pamNetworkConfiguration pamAwsConfiguration pamAzureConfiguration".split():
tdag = TunnelDAG(params, encrypted_session_token, encrypted_transmission_key, record_uid, is_config=True,
Expand Down Expand Up @@ -2870,6 +2916,46 @@ def execute(self, params, **kwargs):
else:
logging.debug(f'scrollback is already {scrollback_value} on record={record_uid}')

# --ignore-server-cert: apply (validated above; record_type + effective protocol already checked)
if ignore_server_cert_arg is not None:
psv = pam_settings.value[0] if pam_settings and pam_settings.value else {}
vcon = psv.get('connection', {}) if isinstance(psv, dict) else {}
current_ic = vcon.get('ignoreCert') if isinstance(vcon, dict) else None
if ignore_server_cert_arg == 'default':
if current_ic is not None:
pam_settings.value[0]["connection"].pop('ignoreCert', None)
dirty = True
else:
logging.debug(f'ignoreCert is already unset on record={record_uid}')
else:
target_ic = value_to_boolean(ignore_server_cert_arg)
if current_ic != target_ic:
pam_settings.value[0]["connection"]["ignoreCert"] = target_ic
dirty = True
else:
logging.debug(f'ignoreCert is already {target_ic} on record={record_uid}')

# --security-mode: apply (validated above; record_type + effective protocol already checked)
if security_mode_arg is not None:
from .pam_import.base import RDPSecurity
psv = pam_settings.value[0] if pam_settings and pam_settings.value else {}
vcon = psv.get('connection', {}) if isinstance(psv, dict) else {}
current_sec = vcon.get('security') if isinstance(vcon, dict) else None
if security_mode_arg == 'default':
if current_sec is not None:
pam_settings.value[0]["connection"].pop('security', None)
dirty = True
else:
logging.debug(f'security is already unset on record={record_uid}')
else:
mapped_security = RDPSecurity.map(security_mode_arg)
target_sec = mapped_security.value if mapped_security else security_mode_arg
if current_sec != target_sec:
pam_settings.value[0]["connection"]["security"] = target_sec
dirty = True
else:
logging.debug(f'security is already {target_sec} on record={record_uid}')

if dirty:
record_management.update_record(params, record)
api.sync_down(params)
Expand Down
25 changes: 22 additions & 3 deletions keepercommander/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import threading
import warnings
from datetime import datetime
from typing import Dict, NamedTuple, Optional, Set
from typing import Dict, NamedTuple, Optional, Set, Union
from urllib.parse import urlparse, urlunparse

from urllib3.exceptions import InsecureRequestWarning
Expand Down Expand Up @@ -133,8 +133,27 @@ def set_proxy(self, proxy_server):
self.proxies = None

@property
def certificate_check(self):
"""Return the ``requests`` ``verify`` value (False or a CA bundle path)."""
def certificate_check(self) -> Union[bool, str]:
"""SSL verification value for ``requests``' ``verify=`` parameter.

Despite the name and the plain-bool ``_certificate_check`` backing field,
this getter never returns a bare ``True``/``bool``. It resolves
``_certificate_check`` (and the ``KEEPER_SSL_CERT_FILE`` env var, via
``utils.resolve_ssl_verify``/``utils.get_ssl_cert_file``) into one of:

- ``False`` - certificate verification disabled.
- ``str`` - path to the CA bundle to verify against (the default
system/certifi bundle, or a user override).

In practice the return type is ``False | str`` (never ``True``), even
though the hint is the more permissive ``Union[bool, str]``. Callers
must not do ``is True`` checks against this value - it will always be
false. Use ``if certificate_check:`` to test enabled/disabled, or
``isinstance(certificate_check, str)`` to detect a resolved bundle
path. The old plain-bool value is still available as the private
``_certificate_check`` attribute for code that only cares about the
user's on/off intent, not the resolved ``requests`` verify value.
"""
if self._resolved_verify is None:
from . import utils
self._resolved_verify = utils.resolve_ssl_verify(
Expand Down
Loading
Loading