Skip to content
Permalink
Browse files

Apache plugin wildcard support for ACMEv2 (#5608)

In `deploy_cert()` and `enhance()`, the user will be presented with a dialog to choose from the VirtualHosts that can be covered by the wildcard domain name. The (multiple) selection result will then be handled in a similar way that we previously handled a single VirtualHost that was returned by the `_find_best_vhost()`.

Additionally the selected VirtualHosts are added to a dictionary that maps selections to a wildcard domain to be reused in the later `enhance()` call and not forcing the user to select the same VirtualHosts again.

* Apache plugin wildcard support

* Present dialog only once per domain, added tests

* Raise exception if no VHosts selected for wildcard domain
  • Loading branch information...
joohoi authored and bmw committed Feb 28, 2018
1 parent a39d2fe commit e9bc4a319b9989a300dd574466a15edd581ee3c4
@@ -5,6 +5,7 @@
import os
import pkg_resources
import re
import six
import socket
import time

@@ -152,6 +153,9 @@ def __init__(self, *args, **kwargs):
self.assoc = dict()
# Outstanding challenges
self._chall_out = set()
# List of vhosts configured per wildcard domain on this run.
# used by deploy_cert() and enhance()
self._wildcard_vhosts = dict()
# Maps enhancements to vhosts we've enabled the enhancement for
self._enhanced_vhosts = defaultdict(set)

@@ -262,6 +266,21 @@ def get_parser(self):
self.aug, self.conf("server-root"), self.conf("vhost-root"),
self.version, configurator=self)

def _wildcard_domain(self, domain):
"""
Checks if domain is a wildcard domain
:param str domain: Domain to check
:returns: If the domain is wildcard domain
:rtype: bool
"""
if isinstance(domain, six.text_type):
wildcard_marker = u"*."
else:
wildcard_marker = b"*."
return domain.startswith(wildcard_marker)

def deploy_cert(self, domain, cert_path, key_path,
chain_path=None, fullchain_path=None):
"""Deploys certificate to specified virtual host.
@@ -280,9 +299,112 @@ def deploy_cert(self, domain, cert_path, key_path,
a lack of directives
"""
# Choose vhost before (possible) enabling of mod_ssl, to keep the
# vhost choice namespace similar with the pre-validation one.
vhost = self.choose_vhost(domain)
vhosts = self.choose_vhosts(domain)
for vhost in vhosts:
self._deploy_cert(vhost, cert_path, key_path, chain_path, fullchain_path)

def choose_vhosts(self, domain, create_if_no_ssl=True):
"""
Finds VirtualHosts that can be used with the provided domain
:param str domain: Domain name to match VirtualHosts to
:param bool create_if_no_ssl: If found VirtualHost doesn't have a HTTPS
counterpart, should one get created
:returns: List of VirtualHosts or None
:rtype: `list` of :class:`~certbot_apache.obj.VirtualHost`
"""

if self._wildcard_domain(domain):
if domain in self._wildcard_vhosts:
# Vhosts for a wildcard domain were already selected
return self._wildcard_vhosts[domain]
# Ask user which VHosts to support.
# Returned objects are guaranteed to be ssl vhosts
return self._choose_vhosts_wildcard(domain, create_if_no_ssl)
else:
return [self.choose_vhost(domain)]

def _vhosts_for_wildcard(self, domain):
"""
Get VHost objects for every VirtualHost that the user wants to handle
with the wildcard certificate.
"""

# Collect all vhosts that match the name
matched = set()
for vhost in self.vhosts:
for name in vhost.get_names():
if self._in_wildcard_scope(name, domain):
matched.add(vhost)

return list(matched)

def _in_wildcard_scope(self, name, domain):
"""
Helper method for _vhosts_for_wildcard() that makes sure that the domain
is in the scope of wildcard domain.
eg. in scope: domain = *.wild.card, name = 1.wild.card
not in scope: domain = *.wild.card, name = 1.2.wild.card
"""
if len(name.split(".")) == len(domain.split(".")):
return fnmatch.fnmatch(name, domain)


def _choose_vhosts_wildcard(self, domain, create_ssl=True):
"""Prompts user to choose vhosts to install a wildcard certificate for"""

# Get all vhosts that are covered by the wildcard domain
vhosts = self._vhosts_for_wildcard(domain)

# Go through the vhosts, making sure that we cover all the names
# present, but preferring the SSL vhosts
filtered_vhosts = dict()
for vhost in vhosts:
for name in vhost.get_names():
if vhost.ssl:
# Always prefer SSL vhosts
filtered_vhosts[name] = vhost
elif name not in filtered_vhosts and create_ssl:
# Add if not in list previously
filtered_vhosts[name] = vhost

# Only unique VHost objects
dialog_input = set([vhost for vhost in filtered_vhosts.values()])

# Ask the user which of names to enable, expect list of names back
dialog_output = display_ops.select_vhost_multiple(list(dialog_input))

if not dialog_output:
logger.error(
"No vhost exists with servername or alias for domain %s. "
"No vhost was selected. Please specify ServerName or ServerAlias "
"in the Apache config.",
domain)
raise errors.PluginError("No vhost selected")

# Make sure we create SSL vhosts for the ones that are HTTP only
# if requested.
return_vhosts = list()
for vhost in dialog_output:
if not vhost.ssl:
return_vhosts.append(self.make_vhost_ssl(vhost))
else:
return_vhosts.append(vhost)

self._wildcard_vhosts[domain] = return_vhosts
return return_vhosts


def _deploy_cert(self, vhost, cert_path, key_path, chain_path, fullchain_path):
"""
Helper function for deploy_cert() that handles the actual deployment
this exists because we might want to do multiple deployments per
domain originally passed for deploy_cert(). This is especially true
with wildcard certificates
"""


# This is done first so that ssl module is enabled and cert_path,
# cert_key... can all be parsed appropriately
@@ -311,7 +433,7 @@ def deploy_cert(self, domain, cert_path, key_path,
raise errors.PluginError(
"Unable to find cert and/or key directives")

logger.info("Deploying Certificate for %s to VirtualHost %s", domain, vhost.filep)
logger.info("Deploying Certificate to VirtualHost %s", vhost.filep)

if self.version < (2, 4, 8) or (chain_path and not fullchain_path):
# install SSLCertificateFile, SSLCertificateKeyFile,
@@ -327,8 +449,8 @@ def deploy_cert(self, domain, cert_path, key_path,
"version of Apache")
else:
if not fullchain_path:
raise errors.PluginError("Please provide the --fullchain-path\
option pointing to your full chain file")
raise errors.PluginError("Please provide the --fullchain-path "
"option pointing to your full chain file")
set_cert_path = fullchain_path
self.aug.set(path["cert_path"][-1], fullchain_path)
self.aug.set(path["cert_key"][-1], key_path)
@@ -391,7 +513,7 @@ def _choose_vhost_from_list(self, target_name, temp=False):
logger.error(
"No vhost exists with servername or alias of %s. "
"No vhost was selected. Please specify ServerName or ServerAlias "
"in the Apache config, or split vhosts into separate files.",
"in the Apache config.",
target_name)
raise errors.PluginError("No vhost selected")
elif temp:
@@ -1376,8 +1498,11 @@ def enhance(self, domain, enhancement, options=None):
except KeyError:
raise errors.PluginError(
"Unsupported enhancement: {0}".format(enhancement))

vhosts = self.choose_vhosts(domain, create_if_no_ssl=False)
try:
func(self.choose_vhost(domain), options)
for vhost in vhosts:
func(vhost, options)
except errors.PluginError:
logger.warning("Failed %s for %s", enhancement, domain)
raise
@@ -13,10 +13,44 @@
logger = logging.getLogger(__name__)


def select_vhost_multiple(vhosts):
"""Select multiple Vhosts to install the certificate for
:param vhosts: Available Apache VirtualHosts
:type vhosts: :class:`list` of type `~obj.Vhost`
:returns: List of VirtualHosts
:rtype: :class:`list`of type `~obj.Vhost`
"""
if not vhosts:
return list()
tags_list = [vhost.display_repr()+"\n" for vhost in vhosts]
# Remove the extra newline from the last entry
if len(tags_list):
tags_list[-1] = tags_list[-1][:-1]
code, names = zope.component.getUtility(interfaces.IDisplay).checklist(
"Which VirtualHosts would you like to install the wildcard certificate for?",
tags=tags_list, force_interactive=True)
if code == display_util.OK:
return_vhosts = _reversemap_vhosts(names, vhosts)
return return_vhosts
return []

def _reversemap_vhosts(names, vhosts):
"""Helper function for select_vhost_multiple for mapping string
representations back to actual vhost objects"""
return_vhosts = list()

for selection in names:
for vhost in vhosts:
if vhost.display_repr().strip() == selection.strip():
return_vhosts.append(vhost)
return return_vhosts

def select_vhost(domain, vhosts):
"""Select an appropriate Apache Vhost.
:param vhosts: Available Apache Virtual Hosts
:param vhosts: Available Apache VirtualHosts
:type vhosts: :class:`list` of type `~obj.Vhost`
:returns: VirtualHost or `None`
@@ -25,13 +59,11 @@ def select_vhost(domain, vhosts):
"""
if not vhosts:
return None
while True:
code, tag = _vhost_menu(domain, vhosts)
if code == display_util.OK:
return vhosts[tag]
else:
return None

code, tag = _vhost_menu(domain, vhosts)
if code == display_util.OK:
return vhosts[tag]
else:
return None

def _vhost_menu(domain, vhosts):
"""Select an appropriate Apache Vhost.
@@ -167,6 +167,19 @@ def __str__(self):
active="Yes" if self.enabled else "No",
modmacro="Yes" if self.modmacro else "No"))

def display_repr(self):
"""Return a representation of VHost to be used in dialog"""
return (
"File: {filename}\n"
"Addresses: {addrs}\n"
"Names: {names}\n"
"HTTPS: {https}\n".format(
filename=self.filep,
addrs=", ".join(str(addr) for addr in self.addrs),
names=", ".join(self.get_names()),
https="Yes" if self.ssl else "No"))


def __eq__(self, other):
if isinstance(other, self.__class__):
return (self.filep == other.filep and self.path == other.path and
@@ -1337,6 +1337,106 @@ def test_enable_mod_unsupported(self):
self.config.enable_mod,
"whatever")

def test_wildcard_domain(self):
# pylint: disable=protected-access
cases = {u"*.example.org": True, b"*.x.example.org": True,
u"a.example.org": False, b"a.x.example.org": False}
for key in cases.keys():
self.assertEqual(self.config._wildcard_domain(key), cases[key])

def test_choose_vhosts_wildcard(self):
# pylint: disable=protected-access
mock_path = "certbot_apache.display_ops.select_vhost_multiple"
with mock.patch(mock_path) as mock_select_vhs:
mock_select_vhs.return_value = [self.vh_truth[3]]
vhs = self.config._choose_vhosts_wildcard("*.certbot.demo",
create_ssl=True)
# Check that the dialog was called with one vh: certbot.demo
self.assertEquals(mock_select_vhs.call_args[0][0][0], self.vh_truth[3])
self.assertEquals(len(mock_select_vhs.call_args_list), 1)

# And the actual returned values
self.assertEquals(len(vhs), 1)
self.assertTrue(vhs[0].name == "certbot.demo")
self.assertTrue(vhs[0].ssl)

self.assertFalse(vhs[0] == self.vh_truth[3])

@mock.patch("certbot_apache.configurator.ApacheConfigurator.make_vhost_ssl")
def test_choose_vhosts_wildcard_no_ssl(self, mock_makessl):
# pylint: disable=protected-access
mock_path = "certbot_apache.display_ops.select_vhost_multiple"
with mock.patch(mock_path) as mock_select_vhs:
mock_select_vhs.return_value = [self.vh_truth[1]]
vhs = self.config._choose_vhosts_wildcard("*.certbot.demo",
create_ssl=False)
self.assertFalse(mock_makessl.called)
self.assertEquals(vhs[0], self.vh_truth[1])

@mock.patch("certbot_apache.configurator.ApacheConfigurator._vhosts_for_wildcard")
@mock.patch("certbot_apache.configurator.ApacheConfigurator.make_vhost_ssl")
def test_choose_vhosts_wildcard_already_ssl(self, mock_makessl, mock_vh_for_w):
# pylint: disable=protected-access
# Already SSL vhost
mock_vh_for_w.return_value = [self.vh_truth[7]]
mock_path = "certbot_apache.display_ops.select_vhost_multiple"
with mock.patch(mock_path) as mock_select_vhs:
mock_select_vhs.return_value = [self.vh_truth[7]]
vhs = self.config._choose_vhosts_wildcard("whatever",
create_ssl=True)
self.assertEquals(mock_select_vhs.call_args[0][0][0], self.vh_truth[7])
self.assertEquals(len(mock_select_vhs.call_args_list), 1)
# Ensure that make_vhost_ssl was not called, vhost.ssl == true
self.assertFalse(mock_makessl.called)

# And the actual returned values
self.assertEquals(len(vhs), 1)
self.assertTrue(vhs[0].ssl)
self.assertEquals(vhs[0], self.vh_truth[7])


def test_deploy_cert_wildcard(self):
# pylint: disable=protected-access
mock_choose_vhosts = mock.MagicMock()
mock_choose_vhosts.return_value = [self.vh_truth[7]]
self.config._choose_vhosts_wildcard = mock_choose_vhosts
mock_d = "certbot_apache.configurator.ApacheConfigurator._deploy_cert"
with mock.patch(mock_d) as mock_dep:
self.config.deploy_cert("*.wildcard.example.org", "/tmp/path",
"/tmp/path", "/tmp/path", "/tmp/path")
self.assertTrue(mock_dep.called)
self.assertEquals(len(mock_dep.call_args_list), 1)
self.assertEqual(self.vh_truth[7], mock_dep.call_args_list[0][0][0])

@mock.patch("certbot_apache.display_ops.select_vhost_multiple")
def test_deploy_cert_wildcard_no_vhosts(self, mock_dialog):
# pylint: disable=protected-access
mock_dialog.return_value = []
self.assertRaises(errors.PluginError,
self.config.deploy_cert,
"*.wild.cat", "/tmp/path", "/tmp/path",
"/tmp/path", "/tmp/path")

@mock.patch("certbot_apache.configurator.ApacheConfigurator._choose_vhosts_wildcard")
def test_enhance_wildcard_after_install(self, mock_choose):
# pylint: disable=protected-access
self.config.parser.modules.add("mod_ssl.c")
self.config.parser.modules.add("headers_module")
self.config._wildcard_vhosts["*.certbot.demo"] = [self.vh_truth[3]]
self.config.enhance("*.certbot.demo", "ensure-http-header",
"Upgrade-Insecure-Requests")
self.assertFalse(mock_choose.called)

@mock.patch("certbot_apache.configurator.ApacheConfigurator._choose_vhosts_wildcard")
def test_enhance_wildcard_no_install(self, mock_choose):
mock_choose.return_value = [self.vh_truth[3]]
self.config.parser.modules.add("mod_ssl.c")
self.config.parser.modules.add("headers_module")
self.config.enhance("*.certbot.demo", "ensure-http-header",
"Upgrade-Insecure-Requests")
self.assertTrue(mock_choose.called)


class AugeasVhostsTest(util.ApacheTest):
"""Test vhosts with illegal names dependent on augeas version."""
# pylint: disable=protected-access

0 comments on commit e9bc4a3

Please sign in to comment.
You can’t perform that action at this time.