Skip to content

Commit

Permalink
Expand depends_on to allow different conditions (service_start, servi…
Browse files Browse the repository at this point in the history
…ce_healthy)

Rework "up" and "start" to wait on conditional state of dependent services
Add integration tests

Signed-off-by: Joffrey F <joffrey@docker.com>
  • Loading branch information
shin- committed Jan 4, 2017
1 parent f6edd61 commit 04394b1
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 18 deletions.
12 changes: 11 additions & 1 deletion compose/config/config.py
Expand Up @@ -649,6 +649,8 @@ def process_service(service_config):
if 'sysctls' in service_dict:
service_dict['sysctls'] = build_string_dict(parse_sysctls(service_dict['sysctls']))

service_dict = process_depends_on(service_dict)

for field in ['dns', 'dns_search', 'tmpfs']:
if field in service_dict:
service_dict[field] = to_list(service_dict[field])
Expand All @@ -658,14 +660,22 @@ def process_service(service_config):
return service_dict


def process_depends_on(service_dict):
if 'depends_on' in service_dict and not isinstance(service_dict['depends_on'], dict):
service_dict['depends_on'] = dict([
(svc, {'condition': 'service_started'}) for svc in service_dict['depends_on']
])
return service_dict


def process_healthcheck(service_dict, service_name):
if 'healthcheck' not in service_dict:
return service_dict

hc = {}
raw = service_dict['healthcheck']

if raw.get('disable'):
if raw.get('disable') or raw.get('disabled'):
if len(raw) > 1:
raise ConfigurationError(
'Service "{}" defines an invalid healthcheck: '
Expand Down
8 changes: 5 additions & 3 deletions compose/config/validation.py
Expand Up @@ -180,11 +180,13 @@ def validate_links(service_config, service_names):


def validate_depends_on(service_config, service_names):
for dependency in service_config.config.get('depends_on', []):
deps = service_config.config.get('depends_on', {})
for dependency in deps.keys():
if dependency not in service_names:
raise ConfigurationError(
"Service '{s.name}' depends on service '{dep}' which is "
"undefined.".format(s=service_config, dep=dependency))
"undefined.".format(s=service_config, dep=dependency)
)


def get_unsupported_config_msg(path, error_key):
Expand All @@ -201,7 +203,7 @@ def anglicize_json_type(json_type):


def is_service_dict_schema(schema_id):
return schema_id in ('config_schema_v1.json', '#/properties/services')
return schema_id in ('config_schema_v1.json', '#/properties/services')


def handle_error_for_schema_with_id(error, path):
Expand Down
21 changes: 21 additions & 0 deletions compose/errors.py
Expand Up @@ -10,3 +10,24 @@ def __init__(self, reason):
class StreamParseError(RuntimeError):
def __init__(self, reason):
self.msg = reason


class HealthCheckException(Exception):
def __init__(self, reason):
self.msg = reason


class HealthCheckFailed(HealthCheckException):
def __init__(self, container_id):
super(HealthCheckFailed, self).__init__(
'Container "{}" is unhealthy.'.format(container_id)
)


class NoHealthCheckConfigured(HealthCheckException):
def __init__(self, service_name):
super(NoHealthCheckConfigured, self).__init__(
'Service "{}" is missing a healthcheck configuration'.format(
service_name
)
)
7 changes: 4 additions & 3 deletions compose/parallel.py
Expand Up @@ -165,13 +165,14 @@ def feed_queue(objects, func, get_deps, results, state):
for obj in pending:
deps = get_deps(obj)

if any(dep in state.failed for dep in deps):
if any(dep[0] in state.failed for dep in deps):
log.debug('{} has upstream errors - not processing'.format(obj))
results.put((obj, None, UpstreamError()))
state.failed.add(obj)
elif all(
dep not in objects or dep in state.finished
for dep in deps
dep not in objects or (
dep in state.finished and (not ready_check or ready_check(dep))
) for dep, ready_check in deps
):
log.debug('Starting producer thread for {}'.format(obj))
t = Thread(target=producer, args=(obj, func, results))
Expand Down
12 changes: 9 additions & 3 deletions compose/project.py
Expand Up @@ -227,7 +227,10 @@ def start_service(service):
services = self.get_services(service_names)

def get_deps(service):
return {self.get_service(dep) for dep in service.get_dependency_names()}
return {
(self.get_service(dep), config)
for dep, config in service.get_dependency_configs().items()
}

parallel.parallel_execute(
services,
Expand All @@ -243,7 +246,7 @@ def stop(self, service_names=None, one_off=OneOffFilter.exclude, **options):

def get_deps(container):
# actually returning inversed dependencies
return {other for other in containers
return {(other, None) for other in containers
if container.service in
self.get_service(other.service).get_dependency_names()}

Expand Down Expand Up @@ -394,7 +397,10 @@ def do(service):
)

def get_deps(service):
return {self.get_service(dep) for dep in service.get_dependency_names()}
return {
(self.get_service(dep), config)
for dep, config in service.get_dependency_configs().items()
}

results, errors = parallel.parallel_execute(
services,
Expand Down
59 changes: 55 additions & 4 deletions compose/service.py
Expand Up @@ -28,6 +28,8 @@
from .const import LABEL_SERVICE
from .const import LABEL_VERSION
from .container import Container
from .errors import HealthCheckFailed
from .errors import NoHealthCheckConfigured
from .errors import OperationFailedError
from .parallel import parallel_execute
from .parallel import parallel_start
Expand Down Expand Up @@ -69,6 +71,9 @@
'volumes_from',
]

CONDITION_STARTED = 'service_started'
CONDITION_HEALTHY = 'service_healthy'


class BuildError(Exception):
def __init__(self, service, reason):
Expand Down Expand Up @@ -533,10 +538,38 @@ def config_dict(self):

def get_dependency_names(self):
net_name = self.network_mode.service_name
return (self.get_linked_service_names() +
self.get_volumes_from_names() +
([net_name] if net_name else []) +
self.options.get('depends_on', []))
return (
self.get_linked_service_names() +
self.get_volumes_from_names() +
([net_name] if net_name else []) +
list(self.options.get('depends_on', {}).keys())
)

def get_dependency_configs(self):
net_name = self.network_mode.service_name
configs = dict(
[(name, None) for name in self.get_linked_service_names()]
)
configs.update(dict(
[(name, None) for name in self.get_volumes_from_names()]
))
configs.update({net_name: None} if net_name else {})
configs.update(self.options.get('depends_on', {}))
for svc, config in self.options.get('depends_on', {}).items():
if config['condition'] == CONDITION_STARTED:
configs[svc] = lambda s: True
elif config['condition'] == CONDITION_HEALTHY:
configs[svc] = lambda s: s.is_healthy()
else:
# The config schema already prevents this, but it might be
# bypassed if Compose is called programmatically.
raise ValueError(
'depends_on condition "{}" is invalid.'.format(
config['condition']
)
)

return configs

def get_linked_service_names(self):
return [service.name for (service, _) in self.links]
Expand Down Expand Up @@ -871,6 +904,24 @@ def push(self, ignore_push_failures=False):
else:
log.error(six.text_type(e))

def is_healthy(self):
""" Check that all containers for this service report healthy.
Returns false if at least one healthcheck is pending.
If an unhealthy container is detected, raise a HealthCheckFailed
exception.
"""
result = True
for ctnr in self.containers():
ctnr.inspect()
status = ctnr.get('State.Health.Status')
if status is None:
raise NoHealthCheckConfigured(self.name)
elif status == 'starting':
result = False
elif status == 'unhealthy':
raise HealthCheckFailed(ctnr.short_id)
return result


def short_id_alias_exists(container, network):
aliases = container.get(
Expand Down
2 changes: 1 addition & 1 deletion tests/acceptance/cli_test.py
Expand Up @@ -928,7 +928,7 @@ def test_up_with_net_v1(self):
assert foo_container.get('HostConfig.NetworkMode') == \
'container:{}'.format(bar_container.id)

@v3_only()
@v2_1_only()
def test_up_with_healthcheck(self):
def wait_on_health_status(container, status):
def condition():
Expand Down
114 changes: 114 additions & 0 deletions tests/integration/project_test.py
Expand Up @@ -19,6 +19,8 @@
from compose.const import LABEL_PROJECT
from compose.const import LABEL_SERVICE
from compose.container import Container
from compose.errors import HealthCheckFailed
from compose.errors import NoHealthCheckConfigured
from compose.project import Project
from compose.project import ProjectError
from compose.service import ConvergenceStrategy
Expand Down Expand Up @@ -1375,3 +1377,115 @@ def test_project_up_orphans(self):
ctnr for ctnr in project._labeled_containers()
if ctnr.labels.get(LABEL_SERVICE) == 'service1'
]) == 0

@v2_1_only()
def test_project_up_healthy_dependency(self):
config_dict = {
'version': '2.1',
'services': {
'svc1': {
'image': 'busybox:latest',
'command': 'top',
'healthcheck': {
'test': 'exit 0',
'retries': 1,
'timeout': '10s',
'interval': '0.1s'
},
},
'svc2': {
'image': 'busybox:latest',
'command': 'top',
'depends_on': {
'svc1': {'condition': 'service_healthy'},
}
}
}
}
config_data = build_config(config_dict)
project = Project.from_config(
name='composetest', config_data=config_data, client=self.client
)
project.up()
containers = project.containers()
assert len(containers) == 2

svc1 = project.get_service('svc1')
svc2 = project.get_service('svc2')
assert 'svc1' in svc2.get_dependency_names()
assert svc1.is_healthy()

@v2_1_only()
def test_project_up_unhealthy_dependency(self):
config_dict = {
'version': '2.1',
'services': {
'svc1': {
'image': 'busybox:latest',
'command': 'top',
'healthcheck': {
'test': 'exit 1',
'retries': 1,
'timeout': '10s',
'interval': '0.1s'
},
},
'svc2': {
'image': 'busybox:latest',
'command': 'top',
'depends_on': {
'svc1': {'condition': 'service_healthy'},
}
}
}
}
config_data = build_config(config_dict)
project = Project.from_config(
name='composetest', config_data=config_data, client=self.client
)
with pytest.raises(HealthCheckFailed):
project.up()
containers = project.containers()
assert len(containers) == 1

svc1 = project.get_service('svc1')
svc2 = project.get_service('svc2')
assert 'svc1' in svc2.get_dependency_names()
with pytest.raises(HealthCheckFailed):
svc1.is_healthy()

@v2_1_only()
def test_project_up_no_healthcheck_dependency(self):
config_dict = {
'version': '2.1',
'services': {
'svc1': {
'image': 'busybox:latest',
'command': 'top',
'healthcheck': {
'disabled': True
},
},
'svc2': {
'image': 'busybox:latest',
'command': 'top',
'depends_on': {
'svc1': {'condition': 'service_healthy'},
}
}
}
}
config_data = build_config(config_dict)
project = Project.from_config(
name='composetest', config_data=config_data, client=self.client
)
with pytest.raises(NoHealthCheckConfigured):
project.up()
containers = project.containers()
assert len(containers) == 1

svc1 = project.get_service('svc1')
svc2 = project.get_service('svc2')
assert 'svc1' in svc2.get_dependency_names()
with pytest.raises(NoHealthCheckConfigured):
svc1.is_healthy()
9 changes: 7 additions & 2 deletions tests/unit/config/config_test.py
Expand Up @@ -920,7 +920,10 @@ def test_load_with_multiple_files_v2(self):
'build': {'context': os.path.abspath('/')},
'image': 'example/web',
'volumes': [VolumeSpec.parse('/home/user/project:/code')],
'depends_on': ['db', 'other'],
'depends_on': {
'db': {'condition': 'container_start'},
'other': {'condition': 'container_start'},
},
},
{
'name': 'db',
Expand Down Expand Up @@ -3055,7 +3058,9 @@ def test_extends_with_depends_on(self):
image: example
""")
services = load_from_filename(str(tmpdir.join('docker-compose.yml')))
assert service_sort(services)[2]['depends_on'] == ['other']
assert service_sort(services)[2]['depends_on'] == {
'other': {'condition': 'container_start'}
}


@pytest.mark.xfail(IS_WINDOWS_PLATFORM, reason='paths use slash')
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/parallel_test.py
Expand Up @@ -25,7 +25,7 @@


def get_deps(obj):
return deps[obj]
return [(dep, None) for dep in deps[obj]]


def test_parallel_execute():
Expand Down

0 comments on commit 04394b1

Please sign in to comment.