Switch branches/tags
0.10.x 0.11.x 0.12.x 0.13.x 0.14.x 0.15.x 0.16.x 0.17.x 0.18.x 0.19.x 0.20.x 0.20.0-changelog 0.21.x 0.22.x 0.23.x 0.24.x 0.25.x 0.26.x 0.27.x 0.28.x 0.29.x 1342 1824 2175-broken-wheel-dependencies 2558-test-fix ENOMEM a-chall-dir accept-language acme-v2-integration acme-v2 acmedns_wip actually-file-update-apache add-code-of-conduct-1 add_case_testcase aggressively-dont-suggest-nginx-default ahaw021-windows all_exceptions allow-py37-testing also-mod-https-nginx always-save-server amazonlinux apache-portability apache-test apache_acmev2 apache_override apache_restart apache22-hack are-builds-working attempting-to-parse auto-order auto-path2 autodeploy autogenerate autohsts-handle-renewal-failures beta-program birdsarah/template_out_phases bleeding-edge-docs bmw-apache-http-01-2 bmw-apache-http-01 bmw-nginx-help bmw-nginx-safeint bmw_multiple_vhosts branch-candidate-0.29.1 break-lockstep bye-validator candidate-0.9.0 candidate-0.11.1-2 candidate-0.11.1 cert_manager certbot-dir certonly challenges-docs changelog-0.27.1 changelog_cleanup cloudflare-packaging contributing-common correct_selection dashaxiong-json-certificate-output dbm_test ddns_auth debian default_prefix detect-acme-version detect_defaults dev-warnings devdocs diagnose-pip-errors disable-rename dnstypo doc-logo doc-package-names docker-tests dockerfile-test dockerfile-test2 docs documentation_cleanup domain-not-unique-in-manual dry_run_ratelimits dynamic-install-requires eicksl-verify-permissions enhance_deprecation enhance_verb erik_py3_comments export_le_python_bad farm-cleanup fhs finalize_shim fix-acmev2 fix-centos6-test fix-everything fix-issue-link fix-rebootstrap-test fix-rebootstrap-test2 fix-rebootstrap-tmp fix-section-test fix_deploy_multi fix_install fix_ipv6only_detection fix_nginx_warning fixed-0.1.1 flexible-challenge-uri forget-the-py freebsd fullchain googledns_acmev2 heavy-tests help-instances http-auth-alt http-auth http01-nginx-follow-up http01-nginx httpd_lens idisplay-logging ignore-menu ignore-unknown-challenges insecureplatformwarning insert_rewrite_at_top install_in_deploy ipv6-standalone ipv6onlydup issue4331 issue_4519 issue_4520 issue_4792 issue_4866 issue_4885 issue_4953 issue_5030 issue_5066 issue_5449 jsha-patch-1 jsha/nginx-poll-reload key-path killpy26 le-dev leave-sys-out-of-this2 legacy_protocol less-verbose let-pip-peep letsencrypt-auto-release-testing-0.1.21 letsencrypt-auto-release-testing-v0.1.22 letsencrypt-auto-release-testing letsencrypt-travis letstest2 lineage-option lint_shhhh log-before-log mac-install make_ssl_makes_new_block manual-cloudflare manual-hooks master min-integration-coverage moar-parallelism mock-110 mockatexit mod-check-test modify_all more-manual-pip-dep-resolution more-testfarm move-main mypy-clean mypy-in-travis mypy-setup namespace-setattr naming-fix new-test-auto-path new_enhancements new_server_block_not_found_for_redirect nginx-acmev2 nginx-compat nginx-compatibility-test nginx-in-install.rst nginx-ipv6 nginx-redirect nginx-reversion-reversion nginx-safeint nginx-space-preservation2 nginx_restructure nginx_selection nginxparser no-1234 no-boulder-logs no-cover-apache no-domains-in-cli-ini no-festivals-required no-phone no-sites-available no-spdy no-wheezing no_duplicate_include no_new_server_blocks none_string notes_revision obj_full_writeout ocsp_apache old-mod-check oom order-matters osiris-ecdsa package-guide pconrad-docs pip-versions pip8-test playing-with-travis plugin-docs plugin_storage portalocker postfix pref-chall2 printf proof-of-possession py3-everything py3_metaclass pydev-paranoia pyopenssl++ pypy python37-tests q quietude-integration quinot/topic/dns-follow-cnames randomsleep recognize-dns reconstitutesque recovery_contact refactor-exception-handler regression_tests relax-setuptools-dep release-test remove-some-travis-cruft remove_location renew_updates return_actual_page revert-3268-dialog-autosize revert-3828-gh-2716 revert-6522 rhel_options route53_acmev2 route53 sendmail separate-repinned-integration server_alias server_block_selection signop-plumb_source_address_setting span-plan specify-min-six-version sphinx-rename subsequent-manual-challenge tell_pkg_mgrs_about_nginx_include test-0.1.22 test-0.21.1 test-37 test-acmev2 test-allow-py37-testing test-are-builds-working test-auto-path test-bmw-nginx-compatibility-test test-break-lockstep test-breakage test-bytes-fullchain-bites test-certbot-upgrade-acme-dep test-domain-not-unique-in-manual test-edge test-everything-0.22.x-2 test-everything-0.22.x test-everything-0.25.1 test-everything-0.29.1 test-everything-4 test-everything-37-quiktest test-everything-again test-everything-again2 test-everything-before-install test-everything-fast-n-quiet test-everything-fast-n-quiet2 test-everything-fix-oldest-tests test-everything-now test-everything-prerelease test-everything-prerelease2 test-everything-separate-integration-coverage test-everything-test test-everything-types test-everything-w-integration test-everything-warnings-2 test-everything-warnings-3 test-everything-warnings test-everything test-exit test-fasteners test-fasteners2 test-fasteners3 test-faster-2 test-fix-hooks-test test-fix-osx-tests test-full-py37-test-everything test-hook-dirs test-http01-nginx test-http01-nginx2 test-letsencrypt-travis test-loud-oldest-tests test-macos-failure test-macos-failurse test-macos test-mypy-certbot-loudly test-mypy-certbot test-no-cover-apache test-no-nose test-nohosts test-oldest test-osx test-osx2 test-pin-back-pkging-tools test-pin-more test-py37-test-everything test-pytest-cover test-python37-test-everything test-python37-test test-python37-tests test-quick-acmev2 test-receive-revert test-remove-cruft test-revert-fix-macos-pytest test-revert-pipstrap-changes test-rm-eol-2.6 test-rollback test-separate-everything test-separate-install test-separate-integration test-separate-integration2 test-separate-integration3 test-separate-repinned-integration test-something test-tests test-update-oldest-tests test-use-real-oldest-certbot-version-with-nginx test-v2-integration-v2 test-v2-integration test-v2-quick test-with-boulder-ip testfail_fix tls-sni-warning-example tos-privacy unbreak-travis unsquashed-postfix update-eold-tests update-server-docs update_error_link upgrade-c-stuff url-checker use-cn-from-csr use-namespace use_key_dir_in_pop v2-orders validator-redirects var-preservation-for-1123 venvdoc with-boulder-ip2 zimbra-installer zjs-digitalocean-packaging zjs-google-cloud-dns-packaging zjs-route53-packaging
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
497 lines (384 sloc) 16.9 KB
"""Plugin common functions."""
import logging
import os
import re
import shutil
import tempfile
import OpenSSL
import pkg_resources
import zope.interface
from josepy import util as jose_util
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges # pylint: disable=unused-import
from certbot import constants
from certbot import crypto_util
from certbot import errors
from certbot import interfaces
from certbot import reverter
from certbot import util
from import PluginStorage
logger = logging.getLogger(__name__)
def option_namespace(name):
"""ArgumentParser options namespace (prefix of all options)."""
return name + "-"
def dest_namespace(name):
"""ArgumentParser dest namespace (prefix of all destinations)."""
return name.replace("-", "_") + "_"
private_ips_regex = re.compile(
hostname_regex = re.compile(
r"^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*[a-z]+$", re.IGNORECASE)
class Plugin(object):
"""Generic plugin."""
# provider is not inherited, subclasses must define it on their own
# @zope.interface.provider(interfaces.IPluginFactory)
def __init__(self, config, name):
self.config = config = name
def add_parser_arguments(cls, add):
"""Add plugin arguments to the CLI argument parser.
NOTE: If some of your flags interact with others, you can
use cli.report_config_interaction to register this to ensure
values are correctly saved/overridable during renewal.
:param callable add: Function that proxies calls to
`argparse.ArgumentParser.add_argument` prepending options
with unique plugin name prefix.
def inject_parser_options(cls, parser, name):
"""Inject parser options.
See `~.IPlugin.inject_parser_options` for docs.
# dummy function, doesn't check if dest.startswith(self.dest_namespace)
def add(arg_name_no_prefix, *args, **kwargs):
# pylint: disable=missing-docstring
return parser.add_argument(
"--{0}{1}".format(option_namespace(name), arg_name_no_prefix),
*args, **kwargs)
return cls.add_parser_arguments(add)
def option_namespace(self):
"""ArgumentParser options namespace (prefix of all options)."""
return option_namespace(
def option_name(self, name):
"""Option name (include plugin namespace)."""
return self.option_namespace + name
def dest_namespace(self):
"""ArgumentParser dest namespace (prefix of all destinations)."""
return dest_namespace(
def dest(self, var):
"""Find a destination for given variable ``var``."""
# this should do exactly the same what ArgumentParser(arg),
# does to "arg" to compute "dest"
return self.dest_namespace + var.replace("-", "_")
def conf(self, var):
"""Find a configuration value for variable ``var``."""
return getattr(self.config, self.dest(var))
class Installer(Plugin):
"""An installer base class with reverter and ssl_dhparam methods defined.
Installer plugins do not have to inherit from this class.
def __init__(self, *args, **kwargs):
super(Installer, self).__init__(*args, **kwargs) = PluginStorage(self.config,
self.reverter = reverter.Reverter(self.config)
def add_to_checkpoint(self, save_files, save_notes, temporary=False):
"""Add files to a checkpoint.
:param set save_files: set of filepaths to save
:param str save_notes: notes about changes during the save
:param bool temporary: True if the files should be added to a
temporary checkpoint rather than a permanent one. This is
usually used for changes that will soon be reverted.
:raises .errors.PluginError: when unable to add to checkpoint
if temporary:
checkpoint_func = self.reverter.add_to_temp_checkpoint
checkpoint_func = self.reverter.add_to_checkpoint
checkpoint_func(save_files, save_notes)
except errors.ReverterError as err:
raise errors.PluginError(str(err))
def finalize_checkpoint(self, title):
"""Timestamp and save changes made through the reverter.
:param str title: Title describing checkpoint
:raises .errors.PluginError: when an error occurs
except errors.ReverterError as err:
raise errors.PluginError(str(err))
def recovery_routine(self):
"""Revert all previously modified files.
Reverts all modified files that have not been saved as a checkpoint
:raises .errors.PluginError: If unable to recover the configuration
except errors.ReverterError as err:
raise errors.PluginError(str(err))
def revert_temporary_config(self):
"""Rollback temporary checkpoint.
:raises .errors.PluginError: when unable to revert config
except errors.ReverterError as err:
raise errors.PluginError(str(err))
def rollback_checkpoints(self, rollback=1):
"""Rollback saved checkpoints.
:param int rollback: Number of checkpoints to revert
:raises .errors.PluginError: If there is a problem with the input or
the function is unable to correctly revert the configuration
except errors.ReverterError as err:
raise errors.PluginError(str(err))
def view_config_changes(self):
"""Show all of the configuration changes that have taken place.
:raises .errors.PluginError: If there is a problem while processing
the checkpoints directories.
except errors.ReverterError as err:
raise errors.PluginError(str(err))
def ssl_dhparams(self):
"""Full absolute path to ssl_dhparams file."""
return os.path.join(self.config.config_dir, constants.SSL_DHPARAMS_DEST)
def updated_ssl_dhparams_digest(self):
"""Full absolute path to digest of updated ssl_dhparams file."""
return os.path.join(self.config.config_dir, constants.UPDATED_SSL_DHPARAMS_DIGEST)
def install_ssl_dhparams(self):
"""Copy Certbot's ssl_dhparams file into the system's config dir if required."""
return install_version_controlled_file(
class Addr(object):
r"""Represents an virtual host address.
:param str addr: addr part of vhost address
:param str port: port number or \*, or ""
def __init__(self, tup, ipv6=False):
self.tup = tup
self.ipv6 = ipv6
def fromstring(cls, str_addr):
"""Initialize Addr from string."""
if str_addr.startswith('['):
# ipv6 addresses starts with [
endIndex = str_addr.rfind(']')
host = str_addr[:endIndex + 1]
port = ''
if len(str_addr) > endIndex + 2 and str_addr[endIndex + 1] == ':':
port = str_addr[endIndex + 2:]
return cls((host, port), ipv6=True)
tup = str_addr.partition(':')
return cls((tup[0], tup[2]))
def __str__(self):
if self.tup[1]:
return "%s:%s" % self.tup
return self.tup[0]
def normalized_tuple(self):
"""Normalized representation of addr/port tuple
if self.ipv6:
return (self.get_ipv6_exploded(), self.tup[1])
return self.tup
def __eq__(self, other):
if isinstance(other, self.__class__):
# compare normalized to take different
# styles of representation into account
return self.normalized_tuple() == other.normalized_tuple()
return False
def __hash__(self):
return hash(self.tup)
def get_addr(self):
"""Return addr part of Addr object."""
return self.tup[0]
def get_port(self):
"""Return port."""
return self.tup[1]
def get_addr_obj(self, port):
"""Return new address object with same addr and new port."""
return self.__class__((self.tup[0], port), self.ipv6)
def _normalize_ipv6(self, addr):
"""Return IPv6 address in normalized form, helper function"""
addr = addr.lstrip("[")
addr = addr.rstrip("]")
return self._explode_ipv6(addr)
def get_ipv6_exploded(self):
"""Return IPv6 in normalized form"""
if self.ipv6:
return ":".join(self._normalize_ipv6(self.tup[0]))
return ""
def _explode_ipv6(self, addr):
"""Explode IPv6 address for comparison"""
result = ['0', '0', '0', '0', '0', '0', '0', '0']
addr_list = addr.split(":")
if len(addr_list) > len(result):
# too long, truncate
addr_list = addr_list[0:len(result)]
append_to_end = False
for i in range(0, len(addr_list)):
block = addr_list[i]
if len(block) == 0:
# encountered ::, so rest of the blocks should be
# appended to the end
append_to_end = True
elif len(block) > 1:
# remove leading zeros
block = block.lstrip("0")
if not append_to_end:
result[i] = str(block)
# count the location from the end using negative indices
result[i-len(addr_list)] = str(block)
return result
class ChallengePerformer(object):
"""Abstract base for challenge performers.
:ivar configurator: Authenticator and installer plugin
:ivar achalls: Annotated challenges
:vartype achalls: `list` of `.KeyAuthorizationAnnotatedChallenge`
:ivar indices: Holds the indices of challenges from a larger array
so the user of the class doesn't have to.
:vartype indices: `list` of `int`
def __init__(self, configurator):
self.configurator = configurator
self.achalls = [] # type: List[achallenges.KeyAuthorizationAnnotatedChallenge]
self.indices = [] # type: List[int]
def add_chall(self, achall, idx=None):
"""Store challenge to be performed when perform() is called.
:param .KeyAuthorizationAnnotatedChallenge achall: Annotated
:param int idx: index to challenge in a larger array
if idx is not None:
def perform(self):
"""Perform all added challenges.
:returns: challenge respones
:rtype: `list` of `acme.challenges.KeyAuthorizationChallengeResponse`
raise NotImplementedError()
class TLSSNI01(ChallengePerformer):
# pylint: disable=abstract-method
"""Abstract base for TLS-SNI-01 challenge performers"""
def __init__(self, configurator):
super(TLSSNI01, self).__init__(configurator)
self.challenge_conf = os.path.join(
configurator.config.config_dir, "le_tls_sni_01_cert_challenge.conf")
# self.completed = 0
def get_cert_path(self, achall):
"""Returns standardized name for challenge certificate.
:param .KeyAuthorizationAnnotatedChallenge achall: Annotated
tls-sni-01 challenge.
:returns: certificate file name
:rtype: str
return os.path.join(self.configurator.config.work_dir,
achall.chall.encode("token") + ".crt")
def get_key_path(self, achall):
"""Get standardized path to challenge key."""
return os.path.join(self.configurator.config.work_dir,
achall.chall.encode("token") + '.pem')
def get_z_domain(self, achall):
"""Returns z_domain (SNI) name for the challenge."""
return achall.response(achall.account_key).z_domain.decode("utf-8")
def _setup_challenge_cert(self, achall, cert_key=None):
"""Generate and write out challenge certificate."""
cert_path = self.get_cert_path(achall)
key_path = self.get_key_path(achall)
# Register the path before you write out the file
self.configurator.reverter.register_file_creation(True, key_path)
self.configurator.reverter.register_file_creation(True, cert_path)
response, (cert, key) = achall.response_and_validation(
cert_pem = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert)
key_pem = OpenSSL.crypto.dump_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key)
# Write out challenge cert and key
with open(cert_path, "wb") as cert_chall_fd:
with util.safe_open(key_path, 'wb', chmod=0o400) as key_file:
return response
def install_version_controlled_file(dest_path, digest_path, src_path, all_hashes):
"""Copy a file into an active location (likely the system's config dir) if required.
:param str dest_path: destination path for version controlled file
:param str digest_path: path to save a digest of the file in
:param str src_path: path to version controlled file found in distribution
:param list all_hashes: hashes of every released version of the file
current_hash = crypto_util.sha256sum(src_path)
def _write_current_hash():
with open(digest_path, "w") as f:
def _install_current_file():
shutil.copyfile(src_path, dest_path)
# Check to make sure options-ssl.conf is installed
if not os.path.isfile(dest_path):
# there's already a file there. if it's up to date, do nothing. if it's not but
# it matches a known file hash, we can update it.
# otherwise, print a warning once per new version.
active_file_digest = crypto_util.sha256sum(dest_path)
if active_file_digest == current_hash: # already up to date
elif active_file_digest in all_hashes: # safe to update
else: # has been manually modified, not safe to update
# did they modify the current version or an old version?
if os.path.isfile(digest_path):
with open(digest_path, "r") as f:
saved_digest =
# they modified it after we either installed or told them about this version, so return
if saved_digest == current_hash:
# there's a new version but we couldn't update the file, or they deleted the digest.
# save the current digest so we only print this once, and print a warning
logger.warning("%s has been manually modified; updated file "
"saved to %s. We recommend updating %s for security purposes.",
dest_path, src_path, dest_path)
# test utils used by certbot_apache/certbot_nginx (hence
# "pragma: no cover") TODO: this might quickly lead to dead code (also
# c.f. #383)
def dir_setup(test_dir, pkg): # pragma: no cover
"""Setup the directories necessary for the configurator."""
def expanded_tempdir(prefix):
"""Return the real path of a temp directory with the specified prefix
Some plugins rely on real paths of symlinks for working correctly. For
example, certbot-apache uses real paths of configuration files to tell
a virtual host from another. On systems where TMP itself is a symbolic
link, (ex: OS X) such plugins will be confused. This function prevents
such a case.
return os.path.realpath(tempfile.mkdtemp(prefix))
temp_dir = expanded_tempdir("temp")
config_dir = expanded_tempdir("config")
work_dir = expanded_tempdir("work")
os.chmod(temp_dir, constants.CONFIG_DIRS_MODE)
os.chmod(config_dir, constants.CONFIG_DIRS_MODE)
os.chmod(work_dir, constants.CONFIG_DIRS_MODE)
test_configs = pkg_resources.resource_filename(
pkg, os.path.join("testdata", test_dir))
test_configs, os.path.join(temp_dir, test_dir), symlinks=True)
return temp_dir, config_dir, work_dir