Skip to content

Commit

Permalink
Refactor and add --new-cert-name flag
Browse files Browse the repository at this point in the history
  • Loading branch information
ohemorange committed Nov 29, 2016
1 parent e395f7a commit da6a477
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 199 deletions.
92 changes: 92 additions & 0 deletions certbot/cert_manager.py
@@ -1,14 +1,17 @@
"""Tools for managing certificates."""
import datetime
import logging
import os
import pytz
import traceback
import zope.component

from certbot import configuration
from certbot import errors
from certbot import interfaces
from certbot import renewal
from certbot import storage
from certbot import util

logger = logging.getLogger(__name__)

Expand All @@ -30,6 +33,29 @@ def update_live_symlinks(config):
configuration.RenewerConfiguration(renewer_config),
update_symlinks=True)

def rename_lineage(config):
"""Rename the specified lineage to the new name.
:param config: Configuration.
:type config: :class:`certbot.interfaces.IConfig`
"""
if not config.certname:
raise errors.ConfigurationError("Specify a certificate name with "
"with flag --cert-name.")
if not config.new_certname:
raise errors.ConfigurationError("Specify a new name for certificate {0} "
"with flag --new-cert-name."
.format(config.certname))
lineage = lineage_for_certname(config, config.certname)
if not lineage:
raise errors.ConfigurationError("No existing certificate with name "
"{0} found.".format(config.certname))
storage.rename_renewal_config(config.certname, config.new_certname, config)
disp = zope.component.getUtility(interfaces.IDisplay)
disp.notification("Successfully renamed {0} to {1}."
.format(config.certname, config.new_certname), pause=False)

def _report_lines(msgs):
"""Format a results report for a category of single-line renewal outcomes"""
return " " + "\n ".join(str(msg) for msg in msgs)
Expand Down Expand Up @@ -104,3 +130,69 @@ def certificates(config):

# Describe all the certs
_describe_certs(parsed_certs, parse_failures)

def _search_lineages(config, func, initial_rv):
"""Iterate func over unbroken lineages, allowing custom return conditions.
"""
cli_config = configuration.RenewerConfiguration(config)
configs_dir = cli_config.renewal_configs_dir
# Verify the directory is there
util.make_or_verify_dir(configs_dir, mode=0o755, uid=os.geteuid())

rv = initial_rv
for renewal_file in renewal.renewal_conf_files(cli_config):
try:
candidate_lineage = storage.RenewableCert(renewal_file, cli_config)
except (errors.CertStorageError, IOError):
logger.debug("Renewal conf file %s is broken. Skipping.", renewal_file)
logger.debug("Traceback was:\n%s", traceback.format_exc())
continue
rv = func(candidate_lineage, rv)
return rv

def lineage_for_certname(config, certname):
"""Find a lineage object with name certname.
"""
def func(candidate_lineage, rv):
"""Return cert if it has name certname, else return rv
"""
matching_lineage_name_cert = rv
if candidate_lineage.lineagename == certname:
matching_lineage_name_cert = candidate_lineage
return matching_lineage_name_cert
return _search_lineages(config, func, None)

def domains_for_certname(config, certname):
"""Find the domains in the cert with name certname.
"""
def func(candidate_lineage, rv):
"""Return domains if certname matches, else return rv
"""
matching_domains = rv
if candidate_lineage.lineagename == certname:
matching_domains = candidate_lineage.names()
return matching_domains
return _search_lineages(config, func, None)

def find_duplicative_certs(config, domains):
"""Find existing certs that duplicate the request."""
def func(candidate_lineage, rv):
"""Return cert as identical_names_cert if it matches,
or subset_names_cert if it matches as subset
"""
# TODO: Handle these differently depending on whether they are
# expired or still valid?
identical_names_cert, subset_names_cert = rv
candidate_names = set(candidate_lineage.names())
if candidate_names == set(domains):
identical_names_cert = candidate_lineage
elif candidate_names.issubset(set(domains)):
# This logic finds and returns the largest subset-names cert
# in the case where there are several available.
if subset_names_cert is None:
subset_names_cert = candidate_lineage
elif len(candidate_names) > len(subset_names_cert.names()):
subset_names_cert = candidate_lineage
return (identical_names_cert, subset_names_cert)

return _search_lineages(config, func, (None, None))
8 changes: 7 additions & 1 deletion certbot/cli.py
Expand Up @@ -68,6 +68,7 @@
rollback Rollback server configuration changes made during install
config_changes Show changes made to server config during installation
update_symlinks Update cert symlinks based on renewal config file
rename Update a certificate's name
plugins Display information about installed plugins
certificates Display information about certs configured with Certbot
Expand Down Expand Up @@ -326,7 +327,7 @@ def __init__(self, args, plugins, detect_defaults=False):
"register": main.register, "renew": main.renew,
"revoke": main.revoke, "rollback": main.rollback,
"everything": main.run, "update_symlinks": main.update_symlinks,
"certificates": main.certificates}
"certificates": main.certificates, "rename": main.rename}

# List of topics for which additional help can be provided
HELP_TOPICS = ["all", "security", "paths", "automation", "testing"] + list(self.VERBS)
Expand Down Expand Up @@ -693,6 +694,11 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False): # pylint: dis
"per Certbot run. Show certificate names by running certificates "
"command. If there is no existing certificate with this name and "
"domains are requested, create a new certificate with this name.")
helpful.add(
"rename",
"--new-cert-name", dest="new_certname",
metavar="NEW_CERTNAME", default=None,
help="New name for the certificate. Must be a valid filename.")
helpful.add(
[None, "testing", "renew", "certonly"],
"--dry-run", action="store_true", dest="dry_run",
Expand Down
83 changes: 13 additions & 70 deletions certbot/main.py
Expand Up @@ -29,7 +29,6 @@
from certbot import util
from certbot import reporter
from certbot import renewal
from certbot import storage

from certbot.display import util as display_util, ops as display_ops
from certbot.plugins import disco as plugins_disco
Expand Down Expand Up @@ -218,7 +217,7 @@ def _find_lineage_for_domains(config, domains):
if config.duplicate:
return "newcert", None
# TODO: Also address superset case
ident_names_cert, subset_names_cert = _find_duplicative_certs(config, domains)
ident_names_cert, subset_names_cert = cert_manager.find_duplicative_certs(config, domains)
# XXX ^ schoen is not sure whether that correctly reads the systemwide
# configuration file.
if ident_names_cert is None and subset_names_cert is None:
Expand All @@ -242,10 +241,10 @@ def _find_lineage_for_domains_and_certname(config, domains, certname):
if not certname:
return _find_lineage_for_domains(config, domains)
else:
lineage = _lineage_for_certname(config, certname)
lineage = cert_manager.lineage_for_certname(config, certname)
if lineage:
if domains:
if set(_domains_for_certname(config, certname)) != set(domains):
if set(cert_manager.domains_for_certname(config, certname)) != set(domains):
_ask_user_to_confirm_new_names(config, domains, certname,
lineage.names()) # raises if no
return "renew", lineage
Expand Down Expand Up @@ -274,73 +273,9 @@ def _ask_user_to_confirm_new_names(config, new_domains, certname, old_domains):
if not obj.yesno(msg, "Update cert", "Cancel"):
raise errors.ConfigurationError("Specified mismatched cert name and domains.")

def _search_lineages(config, func, initial_rv):
"""Iterate func over unbroken lineages, allowing custom return conditions.
"""
cli_config = configuration.RenewerConfiguration(config)
configs_dir = cli_config.renewal_configs_dir
# Verify the directory is there
util.make_or_verify_dir(configs_dir, mode=0o755, uid=os.geteuid())

rv = initial_rv
for renewal_file in renewal.renewal_conf_files(cli_config):
try:
candidate_lineage = storage.RenewableCert(renewal_file, cli_config)
except (errors.CertStorageError, IOError):
logger.debug("Renewal conf file %s is broken. Skipping.", renewal_file)
logger.debug("Traceback was:\n%s", traceback.format_exc())
continue
rv = func(candidate_lineage, rv)
return rv

def _lineage_for_certname(config, certname):
"""Find a lineage object with name certname.
"""
def func(candidate_lineage, rv):
"""Return cert if it has name certname, else return rv
"""
matching_lineage_name_cert = rv
if candidate_lineage.lineagename == certname:
matching_lineage_name_cert = candidate_lineage
return matching_lineage_name_cert
return _search_lineages(config, func, None)

def _domains_for_certname(config, certname):
"""Find the domains in the cert with name certname.
"""
def func(candidate_lineage, rv):
"""Return domains if certname matches, else return rv
"""
matching_domains = rv
if candidate_lineage.lineagename == certname:
matching_domains = candidate_lineage.names()
return matching_domains
return _search_lineages(config, func, None)

def _find_duplicative_certs(config, domains):
"""Find existing certs that duplicate the request."""
def func(candidate_lineage, rv):
"""Return cert as identical_names_cert if it matches,
or subset_names_cert if it matches as subset
"""
# TODO: Handle these differently depending on whether they are
# expired or still valid?
identical_names_cert, subset_names_cert = rv
candidate_names = set(candidate_lineage.names())
if candidate_names == set(domains):
identical_names_cert = candidate_lineage
elif candidate_names.issubset(set(domains)):
# This logic finds and returns the largest subset-names cert
# in the case where there are several available.
if subset_names_cert is None:
subset_names_cert = candidate_lineage
elif len(candidate_names) > len(subset_names_cert.names()):
subset_names_cert = candidate_lineage
return (identical_names_cert, subset_names_cert)

return _search_lineages(config, func, (None, None))

def _find_domains_or_certname(config, installer):
"""Retrieve domains and certname from config or user input.
"""
domains = None
if config.domains:
domains = config.domains
Expand Down Expand Up @@ -563,6 +498,14 @@ def update_symlinks(config, unused_plugins):
"""
cert_manager.update_live_symlinks(config)

def rename(config, unused_plugins):
"""Rename a certificate
Use the information in the config file to rename an existing
lineage.
"""
cert_manager.rename_lineage(config)

def certificates(config, unused_plugins):
"""Display information about certs configured with Certbot
"""
Expand Down
19 changes: 19 additions & 0 deletions certbot/storage.py
Expand Up @@ -96,6 +96,25 @@ def write_renewal_config(o_filename, n_filename, archive_dir, target, relevant_d
config.write(outfile=f)
return config

def rename_renewal_config(prev_name, new_name, cli_config):
"""Rename's cli_config.certname's config to cli_config.new_certname.
:param .RenewerConfiguration cli_config: parsed command line
arguments
"""
prev_filename = os.path.join(
cli_config.renewal_configs_dir, prev_name) + ".conf"
new_filename = os.path.join(
cli_config.renewal_configs_dir, new_name) + ".conf"
if os.path.isfile(new_filename):
raise errors.ConfigurationError("The new certificate name "
"is already in use.")
try:
os.rename(prev_filename, new_filename)
except OSError:
raise errors.ConfigurationError("Please specify a valid filename "
"for the new certificate name.")


def update_configuration(lineagename, archive_dir, target, cli_config):
"""Modifies lineagename's config to contain the specified values.
Expand Down

0 comments on commit da6a477

Please sign in to comment.