Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion keepercommander/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ def clean_description(desc):
]
domain_subcommands = [
('domain list (dl)', 'List all reserved domains for the enterprise'),
('domain reserve (dr)', 'Reserve, delete, or generate token for a domain'),
('domain reserve (dr)', 'Reserve, delete or generate token for a domain'),
('domain alias list (dal)', 'List domain aliases for the enterprise'),
('domain alias create (dac)', 'Create a domain alias for the enterprise'),
('domain alias delete (dad)', 'Delete a domain alias for the enterprise'),
]

for category in get_category_order():
Expand Down
106 changes: 106 additions & 0 deletions keepercommander/commands/domain_management/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# _ __
# | |/ /___ ___ _ __ ___ _ _ ®
# | ' </ -_) -_) '_ \/ -_) '_|
# |_|\_\___\___| .__/\___|_|
# |_|
#
# Keeper Commander
# Copyright 2026 Keeper Security Inc.
# Contact: ops@keepersecurity.com
#

"""domain_management package – CLI surface for enterprise domain operations.

Package layout (follows Single Responsibility Principle)
--------------------------------------------------------
parsers.py – All argparse parser definitions (no business logic)
helper.py – DomainManagementHelper (validation, error handling, output formatting)
domain_commands.py – ListDomainsCommand, ReserveDomainCommand (domain CRUD)
alias_commands.py – DomainAliasCommand, Get/Create/Delete alias commands
__init__.py – DomainCommand router, register_commands / register_command_info
"""

import re

from ..enterprise_common import EnterpriseCommand
from .helper import DomainManagementHelper
from .parsers import domain_parser
from .domain_commands import ListDomainsCommand, ReserveDomainCommand
from .alias_commands import DomainAliasCommand


class DomainCommand(EnterpriseCommand):
"""Top-level router that delegates to list / reserve / alias sub-commands."""

def __init__(self):
super().__init__()
self.list_cmd = ListDomainsCommand()
self.reserve_cmd = ReserveDomainCommand()
self.alias_cmd = DomainAliasCommand()

def get_parser(self):
return domain_parser

def execute_args(self, params, args, **kwargs):
import shlex
from ..base import ParseError, expand_cmd_args, normalize_output_param

try:
d = {}
d.update(kwargs)
self.extra_parameters = ''
parser = self._get_parser_safe()
envvars = params.environment_variables
args = '' if args is None else args

if parser:
args = expand_cmd_args(args, envvars)
args = normalize_output_param(args)
opts = parser.parse_args(shlex.split(args))
d.update(opts.__dict__)

return self.execute(params, **d)

except ParseError as e:
error_str = str(e)
if 'invalid choice' in error_str:
match = re.search(r"invalid choice: '([^']+)'", error_str)
if match:
invalid_cmd = match.group(1)
output_format = kwargs.get('format', 'text')
DomainManagementHelper.handle_invalid_subcommand(invalid_cmd, output_format)
return None
import logging
logging.error(error_str)
return None

def execute(self, params, **kwargs):
subcommand = kwargs.get('subcommand')

if not subcommand:
self.get_parser().print_help()
return

if subcommand == 'list':
return self.list_cmd.execute(params, **kwargs)
elif subcommand == 'reserve':
return self.reserve_cmd.execute(params, **kwargs)
elif subcommand == 'alias':
return self.alias_cmd.execute(params, **kwargs)
else:
output_format = kwargs.get('format', 'text')
DomainManagementHelper.handle_invalid_subcommand(subcommand, output_format)
return None


def register_commands(commands):
commands['domain'] = DomainCommand()


def register_command_info(aliases, command_info):
aliases['dl'] = ('domain', 'list')
aliases['dr'] = ('domain', 'reserve')
aliases['dal'] = ('domain', 'alias', 'list')
aliases['dac'] = ('domain', 'alias', 'create')
aliases['dad'] = ('domain', 'alias', 'delete')
command_info['domain'] = domain_parser.description
205 changes: 205 additions & 0 deletions keepercommander/commands/domain_management/alias_commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# _ __
# | |/ /___ ___ _ __ ___ _ _ ®
# | ' </ -_) -_) '_ \/ -_) '_|
# |_|\_\___\___| .__/\___|_|
# |_|
#
# Keeper Commander
# Copyright 2026 Keeper Security Inc.
# Contact: ops@keepersecurity.com
#

"""Domain alias commands – list, create, and delete domain aliases."""

import logging

from ... import api
from ...error import KeeperApiError
from ...proto import enterprise_pb2
from ..enterprise_common import EnterpriseCommand
from .constants import API_ENDPOINTS
from .helper import DomainManagementHelper
from .parsers import (
domain_alias_parser,
domain_alias_list_parser,
domain_alias_create_parser,
domain_alias_delete_parser,
)


class DomainAliasCommand(EnterpriseCommand):
"""Routes ``domain alias <list|create|delete>`` to the appropriate command."""

def __init__(self):
super().__init__()
self.list_cmd = GetDomainAliasCommand()
self.create_cmd = CreateDomainAliasCommand()
self.delete_cmd = DeleteDomainAliasCommand()

def get_parser(self):
return domain_alias_parser

def execute(self, params, **kwargs):
alias_subcommand = kwargs.get('alias_subcommand')

if not alias_subcommand:
self.get_parser().print_help()
return

if alias_subcommand == 'list':
return self.list_cmd.execute(params, **kwargs)
elif alias_subcommand == 'create':
return self.create_cmd.execute(params, **kwargs)
elif alias_subcommand == 'delete':
return self.delete_cmd.execute(params, **kwargs)
else:
output_format = kwargs.get('format', 'text')
DomainManagementHelper.handle_invalid_subcommand(
f'alias {alias_subcommand}', output_format,
)
return None


class GetDomainAliasCommand(EnterpriseCommand):
"""List all domain aliases for the enterprise."""

def get_parser(self):
return domain_alias_list_parser

def execute(self, params, **kwargs):
output_format = kwargs.get('format') or 'text'
try:
rs = api.communicate_rest(
params, None, API_ENDPOINTS['get_domain_alias'],
rs_type=enterprise_pb2.DomainAliasResponse,
)

if not rs.domainAlias:
logging.info('No domain aliases found for this enterprise.')
return

return DomainManagementHelper.render_alias_response(rs, output_format, kwargs)

except KeeperApiError as e:
DomainManagementHelper.handle_alias_api_error(e, output_format, 'retrieving')


class CreateDomainAliasCommand(EnterpriseCommand):
"""Create one or more domain aliases for a domain owned by the enterprise."""

def get_parser(self):
return domain_alias_create_parser

def execute(self, params, **kwargs):
domain = kwargs.get('domain', '')
aliases = kwargs.get('alias', [])
output_format = kwargs.get('format', 'text')

if not domain:
DomainManagementHelper.output_error('Domain name is required.', output_format)
return
if not aliases:
DomainManagementHelper.output_error('At least one alias is required.', output_format)
return

is_valid, domain, error_msg = DomainManagementHelper.validate_domain(domain)
if not is_valid:
DomainManagementHelper.output_error(error_msg, output_format)
return

normalized_aliases = DomainManagementHelper.validate_aliases(domain, aliases, output_format)
if normalized_aliases is None:
return

try:
rq = DomainManagementHelper.build_alias_request(domain, normalized_aliases)
rs = api.communicate_rest(
params, rq, API_ENDPOINTS['create_domain_alias'],
rs_type=enterprise_pb2.DomainAliasResponse,
)
return DomainManagementHelper.render_alias_response(
rs, output_format, kwargs,
status_messages=DomainManagementHelper.CREATE_ALIAS_STATUS_MESSAGES,
action='create',
)

except KeeperApiError as e:
DomainManagementHelper.handle_alias_api_error(e, output_format, 'creating')


class DeleteDomainAliasCommand(EnterpriseCommand):
"""Delete one or more domain aliases for a domain owned by the enterprise."""

def get_parser(self):
return domain_alias_delete_parser

def execute(self, params, **kwargs):
domain = kwargs.get('domain', '')
aliases = kwargs.get('alias', [])
output_format = kwargs.get('format', 'text')
force = kwargs.get('force', False)

if not domain:
DomainManagementHelper.output_error('Domain name is required.', output_format)
return
if not aliases:
DomainManagementHelper.output_error('At least one alias is required.', output_format)
return

is_valid, domain, error_msg = DomainManagementHelper.validate_domain(domain)
if not is_valid:
DomainManagementHelper.output_error(error_msg, output_format)
return

normalized_aliases = DomainManagementHelper.validate_aliases(domain, aliases, output_format)
if normalized_aliases is None:
return

existing_aliases = self._get_existing_aliases(params)
not_found = [a for a in normalized_aliases if (domain, a) not in existing_aliases]
if not_found:
for alias in not_found:
DomainManagementHelper.output_error(
f"Domain alias '{alias}' for domain '{domain}' does not exist.", output_format,
)
return

if not force:
alias_list_str = ', '.join(normalized_aliases)
try:
confirm = input(
f'Are you sure you want to delete alias(es) [{alias_list_str}] for domain "{domain}"? (y/N): '
)
except (KeyboardInterrupt, EOFError):
logging.info('Delete cancelled.')
return
if confirm.strip().lower() not in ('y', 'yes'):
logging.info('Delete cancelled.')
return

try:
rq = DomainManagementHelper.build_alias_request(domain, normalized_aliases)
rs = api.communicate_rest(
params, rq, API_ENDPOINTS['delete_domain_alias'],
rs_type=enterprise_pb2.DomainAliasResponse,
)
return DomainManagementHelper.render_alias_response(
rs, output_format, kwargs,
status_messages=DomainManagementHelper.DELETE_ALIAS_STATUS_MESSAGES,
action='delete',
)

except KeeperApiError as e:
DomainManagementHelper.handle_alias_api_error(e, output_format, 'deleting')

@staticmethod
def _get_existing_aliases(params):
"""Fetch current domain aliases and return as a set of (domain, alias) tuples."""
try:
rs = api.communicate_rest(
params, None, API_ENDPOINTS['get_domain_alias'],
rs_type=enterprise_pb2.DomainAliasResponse,
)
return {(da.domain, da.alias) for da in rs.domainAlias} if rs.domainAlias else set()
except KeeperApiError:
return set()
59 changes: 59 additions & 0 deletions keepercommander/commands/domain_management/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# _ __
# | |/ /___ ___ _ __ ___ _ _ ®
# | ' </ -_) -_) '_ \/ -_) '_|
# |_|\_\___\___| .__/\___|_|
# |_|
#
# Keeper Commander
# Copyright 2026 Keeper Security Inc.
# Contact: ops@keepersecurity.com
#

API_ENDPOINTS = {
'list_domains': 'enterprise/list_domains',
'reserve_domain': 'enterprise/reserve_domain',
'get_domain_alias': 'enterprise/get_domain_alias',
'create_domain_alias': 'enterprise/create_domain_alias',
'delete_domain_alias': 'enterprise/delete_domain_alias',
}



MAX_DOMAIN_LENGTH = 253
MAX_LABEL_LENGTH = 63
MIN_TLD_LENGTH = 2
DOMAIN_PATTERN = r'^(?!-)([a-z0-9-]{1,63})(?<!-)(\.(?!-)([a-z0-9-]{1,63})(?<!-))*$'



NOTICE_MSG = 'Notice: This feature is not in production yet. It will be available soon.'

ALIAS_ACCESS_DENIED_MSG = (
'Access denied: You must be an Admin with "Manage Users" permission to manage domain aliases.'
)

ERROR_MESSAGES = {
'bad_request': 'Domain not specified or invalid',
'access_denied': 'Access denied: You must be a Root Admin to manage domains',
'forbidden': 'Access denied: You must be a Root Admin to manage domains',
'domain_already_taken': 'Domain "{domain}" is already reserved by a different enterprise',
'verification_failed': (
'DNS verification failed for domain "{domain}". Please ensure the TXT record '
'is correctly added and DNS has propagated (may take up to 48 hours).'
),
'invalid_domain': 'Invalid domain format: "{domain}"',
'rate_limit': 'Too many requests. Please wait a moment and try again.',
'too_many_requests': 'Too many requests. Please wait a moment and try again.',
}

CREATE_ALIAS_STATUS_MESSAGES = {
0: 'Success',
1: 'Duplicate; already exists',
2: 'Not allowed; domain or alias not owned by the enterprise',
}

DELETE_ALIAS_STATUS_MESSAGES = {
0: 'Success',
1: 'Not allowed or does not exist',
2: 'does not exist',
}
Loading
Loading