Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the with_daemon test fixture and suppress unclosed zmq socket warnings #5631

Merged
merged 4 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/system_tests/test_profile_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_create_use_destroy_profile2(self):
self.assertTrue(self.profile_manager.config_dir_ok, msg=output)
self.assertTrue(self.profile_manager.repo_ok, msg=output)
from aiida.manage.configuration.settings import AIIDA_CONFIG_FOLDER
self.assertEqual(AIIDA_CONFIG_FOLDER, self.profile_manager.config_dir, msg=output)
self.assertEqual(str(AIIDA_CONFIG_FOLDER), self.profile_manager.config_dir, msg=output)

from aiida.orm import load_node
from aiida.plugins import DataFactory
Expand Down
27 changes: 6 additions & 21 deletions aiida/cmdline/commands/cmd_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,7 @@

from aiida.cmdline.commands.cmd_verdi import verdi
from aiida.cmdline.utils import decorators, echo
from aiida.cmdline.utils.common import get_env_with_venv_bin
from aiida.cmdline.utils.daemon import (
_START_CIRCUS_COMMAND,
delete_stale_pid_file,
get_daemon_status,
print_client_response_status,
)
from aiida.cmdline.utils.daemon import delete_stale_pid_file, get_daemon_status, print_client_response_status
from aiida.manage import get_manager


Expand Down Expand Up @@ -59,25 +53,17 @@ def start(foreground, number):

Returns exit code 0 if the daemon is OK, non-zero if there was an error.
"""
from aiida.engine.daemon.client import get_daemon_client
from aiida.engine.daemon.client import DaemonException, get_daemon_client

client = get_daemon_client()

echo.echo(f'Starting the daemon with {number} workers... ', nl=False)

if foreground:
command = ['verdi', '-p', client.profile.name, 'daemon', _START_CIRCUS_COMMAND, '--foreground', str(number)]
else:
command = ['verdi', '-p', client.profile.name, 'daemon', _START_CIRCUS_COMMAND, str(number)]

try:
currenv = get_env_with_venv_bin()
subprocess.check_output(command, env=currenv, stderr=subprocess.STDOUT) # pylint: disable=unexpected-keyword-arg
except subprocess.CalledProcessError as exception:
echo.echo(f'Starting the daemon with {number} workers... ', nl=False)
client.start_daemon(number_workers=number, foreground=foreground)
except DaemonException as exception:
echo.echo('FAILED', fg=echo.COLORS['error'], bold=True)
echo.echo_critical(str(exception))

# We add a small timeout to give the pid-file a chance to be created
with spinner():
time.sleep(1)
response = client.get_status()
Expand Down Expand Up @@ -159,8 +145,7 @@ def logshow():

client = get_daemon_client()

currenv = get_env_with_venv_bin()
with subprocess.Popen(['tail', '-f', client.daemon_log_file], env=currenv) as process:
with subprocess.Popen(['tail', '-f', client.daemon_log_file], env=client.get_env()) as process:
process.wait()


Expand Down
6 changes: 6 additions & 0 deletions aiida/cmdline/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ def is_verbose():

def get_env_with_venv_bin():
"""Create a clone of the current running environment with the AIIDA_PATH variable set directory of the config."""
from aiida.common.warnings import warn_deprecation
from aiida.manage.configuration import get_config

warn_deprecation(
'`get_env_with_venv_bin` function is deprecated use `aiida.engine.daemon.client.DaemonClient.get_env` instead.',
version=3
)

config = get_config()

currenv = os.environ.copy()
Expand Down
113 changes: 91 additions & 22 deletions aiida/engine/daemon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@
"""Client to interact with the daemon."""
from __future__ import annotations

import contextlib
import enum
import os
import shutil
import socket
import subprocess
import sys
import tempfile
from typing import TYPE_CHECKING, Any, Dict
import time
import typing as t
from typing import TYPE_CHECKING

from aiida.cmdline.utils.common import get_env_with_venv_bin
from aiida.common.exceptions import AiidaException, ConfigurationError
from aiida.common.lang import type_check
from aiida.manage.configuration import get_config, get_config_option
Expand All @@ -33,7 +36,7 @@
VIRTUALENV = os.environ.get('VIRTUAL_ENV', None)

# see https://github.com/python/typing/issues/182
JsonDictType = Dict[str, Any]
JsonDictType = t.Dict[str, t.Any]

__all__ = ('DaemonClient',)

Expand Down Expand Up @@ -105,10 +108,18 @@ def _verdi_bin(self) -> str:

return VERDI_BIN

@property
def cmd_start_daemon(self) -> list[str]:
"""Return the command to start the daemon."""
return [self._verdi_bin, '-p', self.profile.name, 'daemon', 'start']
def cmd_start_daemon(self, number_workers: int = 1, foreground: bool = False) -> list[str]:
"""Return the command to start the daemon.

:param number_workers: Number of daemon workers to start.
:param foreground: Whether to launch the subprocess in the background or not.
"""
command = [self._verdi_bin, '-p', self.profile.name, 'daemon', 'start-circus', str(number_workers)]

if foreground:
command.append('--foreground')

return command

@property
def cmd_start_daemon_worker(self) -> list[str]:
Expand Down Expand Up @@ -140,7 +151,7 @@ def circus_socket_file(self) -> str:
return self.profile.filepaths['circus']['socket']['file']

@property
def circus_socket_endpoints(self) -> Dict[str, str]:
def circus_socket_endpoints(self) -> dict[str, str]:
return self.profile.filepaths['circus']['socket']

@property
Expand Down Expand Up @@ -173,6 +184,25 @@ def get_circus_port(self) -> int:

return port

@staticmethod
def get_env() -> dict[str, str]:
"""Return the environment for this current process.

This method is used to pass variables from the environment of the current process to a subprocess that is
spawned when the daemon or a daemon worker is started.

It replicates the ``PATH``, ``PYTHONPATH` and the ``AIIDA_PATH`` environment variables. The ``PYTHONPATH``
variable ensures that all Python modules that can be imported by the parent process, are also importable by
the subprocess. The ``AIIDA_PATH`` variable ensures that the subprocess will use the same AiiDA configuration
directory as used by the current process.
ltalirz marked this conversation as resolved.
Show resolved Hide resolved
"""
env = os.environ.copy()
env['PATH'] = ':'.join([os.path.dirname(sys.executable), env['PATH']])
env['PYTHONPATH'] = ':'.join(sys.path)
env['AIIDA_PATH'] = get_config().dirpath
env['PYTHONUNBUFFERED'] = 'True'
return env

def get_circus_socket_directory(self) -> str:
"""Retrieve the absolute path of the directory where the circus sockets are stored.

Expand Down Expand Up @@ -337,8 +367,8 @@ def get_tcp_endpoint(self, port=None):

return endpoint

@property
def client(self) -> 'CircusClient':
@contextlib.contextmanager
def get_client(self) -> 'CircusClient':
"""Return an instance of the CircusClient.

The endpoint is defined by the controller endpoint, which used the port that was written to the port file upon
Expand All @@ -347,7 +377,12 @@ def client(self) -> 'CircusClient':
:return: CircusClient instance
"""
from circus.client import CircusClient
return CircusClient(endpoint=self.get_controller_endpoint(), timeout=self._DAEMON_TIMEOUT)

try:
client = CircusClient(endpoint=self.get_controller_endpoint(), timeout=self._DAEMON_TIMEOUT)
yield client
finally:
client.stop()

def call_client(self, command: JsonDictType) -> JsonDictType:
"""Call the client with a specific command.
Expand All @@ -365,7 +400,8 @@ def call_client(self, command: JsonDictType) -> JsonDictType:
return {'status': self.DAEMON_ERROR_NOT_RUNNING}

try:
result = self.client.call(command)
with self.get_client() as client:
result = client.call(command)
except CallError as exception:
if str(exception) == 'Timed out.':
return {'status': self.DAEMON_ERROR_TIMEOUT}
Expand Down Expand Up @@ -423,11 +459,13 @@ def decrease_workers(self, number: int) -> JsonDictType:
command = {'command': 'decr', 'properties': {'name': self.daemon_name, 'nb': number}}
return self.call_client(command)

def stop_daemon(self, wait: bool) -> JsonDictType:
def stop_daemon(self, wait: bool = True, timeout: int = 5) -> JsonDictType:
sphuber marked this conversation as resolved.
Show resolved Hide resolved
"""Stop the daemon.

:param wait: Boolean to indicate whether to wait for the result of the command.
:param timeout: Wait this number of seconds for the ``is_daemon_running`` to return ``False`` before raising.
:return: The client call response.
:raises DaemonException: If ``is_daemon_running`` returns ``True`` after the ``timeout`` has passed.
"""
command = {'command': 'quit', 'properties': {'waiting': wait}}

Expand All @@ -436,6 +474,15 @@ def stop_daemon(self, wait: bool) -> JsonDictType:
if self._ENDPOINT_PROTOCOL == ControllerProtocol.IPC:
self.delete_circus_socket_directory()

if not wait:
return result

self._await_condition(
lambda: not self.is_daemon_running,
DaemonException(f'The daemon failed to stop within {timeout} seconds.'),
timeout=timeout,
)

return result

def restart_daemon(self, wait: bool) -> JsonDictType:
Expand All @@ -447,26 +494,48 @@ def restart_daemon(self, wait: bool) -> JsonDictType:
command = {'command': 'restart', 'properties': {'name': self.daemon_name, 'waiting': wait}}
return self.call_client(command)

def start_daemon(self) -> None:
def start_daemon(self, number_workers: int = 1, foreground: bool = False, timeout: int = 5) -> None:
"""Start the daemon in a sub process running in the background.

:param number_workers: Number of daemon workers to start.
:param foreground: Whether to launch the subprocess in the background or not.
:param timeout: Wait this number of seconds for the ``is_daemon_running`` to return ``True`` before raising.
:raises DaemonException: If the daemon fails to start.
:raises DaemonException: If the daemon starts but then is unresponsive or in an unexpected state.
:raises DaemonException: If ``is_daemon_running`` returns ``False`` after the ``timeout`` has passed.
"""
env = get_env_with_venv_bin()
env = self.get_env()
command = self.cmd_start_daemon(number_workers, foreground)

try:
subprocess.check_output(self.cmd_start_daemon, env=env, stderr=subprocess.STDOUT) # pylint: disable=unexpected-keyword-arg
subprocess.check_output(command, env=env, stderr=subprocess.STDOUT) # pylint: disable=unexpected-keyword-arg
except subprocess.CalledProcessError as exception:
raise DaemonException('The daemon failed to start.') from exception

response = self.get_status()
self._await_condition(
lambda: self.is_daemon_running,
DaemonException(f'The daemon failed to start within {timeout} seconds.'),
timeout=timeout,
)

@staticmethod
def _await_condition(condition: t.Callable, exception: Exception, timeout: int = 5, interval: float = 0.1):
"""Await a condition to evaluate to ``True`` or raise the exception if the timeout is reached.

:param condition: A callable that is waited for to return ``True``.
:param exception: Raise this exception if ``condition`` does not return ``True`` after ``timeout`` seconds.
:param timeout: Wait this number of seconds for ``condition`` to return ``True`` before raising.
:param interval: The time in seconds to wait between invocations of ``condition``.
:raises: The exception provided by ``exception`` if timeout is reached.
"""
start_time = time.time()

if 'status' not in response:
raise DaemonException('The daemon is not responsive.')
while not condition():

if response['status'] not in ['active', 'ok']:
raise DaemonException(f'The daemon has an unexpected status: {response["status"]}')
time.sleep(interval)

if time.time() - start_time > timeout:
raise exception

def _start_daemon(self, number_workers: int = 1, foreground: bool = False) -> None:
"""Start the daemon.
Expand Down Expand Up @@ -516,7 +585,7 @@ def _start_daemon(self, number_workers: int = 1, foreground: bool = False) -> No
'class': 'FileStream',
'filename': self.daemon_log_file,
},
'env': get_env_with_venv_bin(),
'env': self.get_env(),
}]
} # yapf: disable

Expand Down
31 changes: 11 additions & 20 deletions aiida/manage/configuration/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,12 @@
from aiida.common import exceptions

from .options import parse_option
from .settings import DAEMON_DIR, DAEMON_LOG_DIR

if TYPE_CHECKING:
from aiida.orm.implementation import StorageBackend

__all__ = ('Profile',)

CIRCUS_PID_FILE_TEMPLATE = os.path.join(DAEMON_DIR, 'circus-{}.pid')
ltalirz marked this conversation as resolved.
Show resolved Hide resolved
DAEMON_PID_FILE_TEMPLATE = os.path.join(DAEMON_DIR, 'aiida-{}.pid')
CIRCUS_LOG_FILE_TEMPLATE = os.path.join(DAEMON_LOG_DIR, 'circus-{}.log')
DAEMON_LOG_FILE_TEMPLATE = os.path.join(DAEMON_LOG_DIR, 'aiida-{}.log')
CIRCUS_PORT_FILE_TEMPLATE = os.path.join(DAEMON_DIR, 'circus-{}.port')
CIRCUS_SOCKET_FILE_TEMPATE = os.path.join(DAEMON_DIR, 'circus-{}.sockets')
CIRCUS_CONTROLLER_SOCKET_TEMPLATE = 'circus.c.sock'
CIRCUS_PUBSUB_SOCKET_TEMPLATE = 'circus.p.sock'
CIRCUS_STATS_SOCKET_TEMPLATE = 'circus.s.sock'


class Profile: # pylint: disable=too-many-public-methods
"""Class that models a profile as it is stored in the configuration file of an AiiDA instance."""
Expand Down Expand Up @@ -262,20 +251,22 @@ def filepaths(self):

:return: a dictionary of filepaths
"""
from .settings import DAEMON_DIR, DAEMON_LOG_DIR

return {
'circus': {
'log': CIRCUS_LOG_FILE_TEMPLATE.format(self.name),
'pid': CIRCUS_PID_FILE_TEMPLATE.format(self.name),
'port': CIRCUS_PORT_FILE_TEMPLATE.format(self.name),
'log': str(DAEMON_LOG_DIR / f'circus-{self.name}.log'),
'pid': str(DAEMON_DIR / f'circus-{self.name}.pid'),
'port': str(DAEMON_DIR / f'circus-{self.name}.port'),
'socket': {
'file': CIRCUS_SOCKET_FILE_TEMPATE.format(self.name),
'controller': CIRCUS_CONTROLLER_SOCKET_TEMPLATE,
'pubsub': CIRCUS_PUBSUB_SOCKET_TEMPLATE,
'stats': CIRCUS_STATS_SOCKET_TEMPLATE,
'file': str(DAEMON_DIR / f'circus-{self.name}.sockets'),
'controller': 'circus.c.sock',
'pubsub': 'circus.p.sock',
'stats': 'circus.s.sock',
}
},
'daemon': {
'log': DAEMON_LOG_FILE_TEMPLATE.format(self.name),
'pid': DAEMON_PID_FILE_TEMPLATE.format(self.name),
'log': str(DAEMON_LOG_DIR / f'aiida-{self.name}.log'),
'pid': str(DAEMON_DIR / f'aiida-{self.name}.pid'),
}
}
Loading