From 134720647292a14e01d23c885a5c77677f4c193b Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Sun, 4 Apr 2021 03:50:26 +0000 Subject: [PATCH 01/22] images/microos: add etcd, python aetcd3 Signed-off-by: Joao Eduardo Luis --- images/microOS/config.sh | 2 +- images/microOS/config.xml | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/images/microOS/config.sh b/images/microOS/config.sh index ad6afdfb5..45fcb2bba 100644 --- a/images/microOS/config.sh +++ b/images/microOS/config.sh @@ -174,7 +174,7 @@ EOF fi if [[ "$kiwi_profiles" == *"Ceph"* ]]; then - pip install fastapi uvicorn + pip install fastapi uvicorn aetcd3 baseInsertService aquarium fi diff --git a/images/microOS/config.xml b/images/microOS/config.xml index a94b92a86..c2835491e 100644 --- a/images/microOS/config.xml +++ b/images/microOS/config.xml @@ -608,6 +608,8 @@ + + From aa2a0b14a9ede6e29e966cd0a1ec5f176e8edb1b Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Sun, 4 Apr 2021 08:42:30 +0000 Subject: [PATCH 02/22] gravel: nodes/mgr: spin-off token generation Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/nodes/mgr.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index 84db7928f..1d9af0db9 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -209,6 +209,13 @@ async def _subscriber(nodeinfo: NodeInfoModel) -> None: get_inventory().subscribe(_subscriber, once=True) + def _generate_token(self) -> str: + def gen() -> str: + return ''.join(random.choice("0123456789abcdef") for _ in range(4)) + + tokenstr = '-'.join(gen() for _ in range(4)) + return tokenstr + async def join(self, leader_address: str, token: str) -> bool: logger.debug( f"join > with leader {leader_address}, token: {token}" @@ -381,14 +388,10 @@ async def _finish_bootstrap(self): self._state.role = NodeRoleEnum.LEADER self._save_state() - def gen() -> str: - return ''.join(random.choice("0123456789abcdef") for _ in range(4)) - - tokenstr = '-'.join(gen() for _ in range(4)) - self._token = tokenstr + self._token = self._generate_token() self._save_token(should_exist=False) - logger.debug(f"finished bootstrap: token = {tokenstr}") + logger.debug(f"finished bootstrap: token = {self._token}") self._load() self._node_start() From 547a116f5f90ca3931babee76989fc86dc008ee8 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Sun, 4 Apr 2021 08:43:09 +0000 Subject: [PATCH 03/22] gravel: nodes/mgr: spawn etcd on start / bootstrap Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/nodes/mgr.py | 82 +++++++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 3 deletions(-) diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index 1d9af0db9..c32bd2c96 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -12,6 +12,7 @@ # GNU General Public License for more details. import asyncio +import multiprocessing from enum import Enum from logging import Logger from uuid import UUID, uuid4 @@ -21,6 +22,7 @@ Optional, ) from pathlib import Path +import shlex from pydantic import BaseModel from fastapi import status @@ -115,6 +117,35 @@ class JoiningNodeModel(BaseModel): address: str +# We need to rely on "spawn" because otherwise the subprocess' signal +# handler will play nasty tricks with uvicorn's and fastapi's signal +# handlers. +# For future reference, +# - uvicorn issue: https://github.com/encode/uvicorn/issues/548 +# - python bug report: https://bugs.python.org/issue43064 +# - somewhat of a solution, using "spawn" for multiprocessing: +# https://github.com/tiangolo/fastapi/issues/1487#issuecomment-657290725 +# +# And because we need to rely on spawn, which will create a new python +# interpreter to execute what we are specifying, the function we're passing +# needs to be pickled. And nested functions, apparently, can't be pickled. Thus, +# we need to have the functions at the top-level scope. +# +def _bootstrap_etcd_process(cmd: str): + + async def _run_etcd(): + etcd_cmd = shlex.split(cmd) + process = await asyncio.create_subprocess_exec(*etcd_cmd) + await process.wait() + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(_run_etcd()) + except KeyboardInterrupt: + pass + + class NodeMgr: _init_stage: NodeInitStage @@ -134,6 +165,8 @@ def __init__(self): self._joining = {} self._bootstrapper = None + multiprocessing.set_start_method("spawn") + self._node_init() assert self._state @@ -148,6 +181,7 @@ def __init__(self): else: assert self._state.stage == NodeStageEnum.READY or \ self._state.stage == NodeStageEnum.BOOTSTRAPPED + self._spawn_etcd(new=False, token=None) self._node_start() def _node_prestart(self, nodeinfo: NodeInfoModel): @@ -344,6 +378,46 @@ async def finish_bootstrap_cb( logger.error(f"bootstrap error: {e.message}") raise NodeCantBootstrapError(e.message) + def _spawn_etcd(self, new: bool, token: Optional[str]) -> None: + + assert self._state.hostname + assert self._state.address + hostname = self._state.hostname + address = self._state.address + + logger.info(f"starting etcd, hostname: {hostname}, addr: {address}") + + client_url: str = f"http://{address}:2379" + peer_url: str = f"http://{address}:2380" + args_dict: Dict[str, str] = { + "name": hostname, + "initial-advertise-peer-urls": peer_url, + "listen-peer-urls": peer_url, + "listen-client-urls": f"{client_url},http://127.0.0.1:2379", + "advertise-client-urls": client_url, + "initial-cluster": f"{hostname}={peer_url}", + "initial-cluster-state": "existing", + "data-dir": f"/var/lib/etcd/{hostname}.etcd" + } + + if new: + assert token + args_dict["initial-cluster-token"] = token + args_dict["initial-cluster-state"] = "new" + + args = " ".join([f"--{k} {v}" for k, v in args_dict.items()]) + logger.debug(f"spawn etcd: {args}") + etcd_cmd = f"etcd {args}" + + process = multiprocessing.Process( + target=_bootstrap_etcd_process, + args=(etcd_cmd,) + ) + process.start() + + def _bootstrap_etcd(self, token: str) -> None: + self._spawn_etcd(new=True, token=token) + async def _prepare_bootstrap(self) -> None: assert self._state if self._state.stage == NodeStageEnum.BOOTSTRAPPING: @@ -353,6 +427,11 @@ async def _prepare_bootstrap(self) -> None: elif self._init_stage < NodeInitStage.PRESTART: raise NodeNotStartedError() + self._token = self._generate_token() + self._save_token(should_exist=False) + logger.info(f"generated new token: {self._token}") + self._bootstrap_etcd(self._token) + async def _start_bootstrap(self) -> None: assert self._state assert self._state.stage == NodeStageEnum.NONE @@ -388,9 +467,6 @@ async def _finish_bootstrap(self): self._state.role = NodeRoleEnum.LEADER self._save_state() - self._token = self._generate_token() - self._save_token(should_exist=False) - logger.debug(f"finished bootstrap: token = {self._token}") self._load() From 85a0d34e15fec6c48c87541005440c15525d5be0 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Sun, 4 Apr 2021 16:59:33 +0000 Subject: [PATCH 04/22] gravel: add etcd support on multi-node deployments Add new members on join. Relies on python's aetcd3 library for etcd shenanigans. Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/nodes/messages.py | 1 + src/gravel/controllers/nodes/mgr.py | 36 +++- src/gravel/typings/aetcd3/__init__.pyi | 21 ++ src/gravel/typings/aetcd3/client.pyi | 248 +++++++++++++++++++++++ src/gravel/typings/aetcd3/events.pyi | 7 + src/mypy.ini | 3 + src/requirements.txt | 3 +- 7 files changed, 315 insertions(+), 4 deletions(-) create mode 100644 src/gravel/typings/aetcd3/__init__.pyi create mode 100644 src/gravel/typings/aetcd3/client.pyi create mode 100644 src/gravel/typings/aetcd3/events.pyi diff --git a/src/gravel/controllers/nodes/messages.py b/src/gravel/controllers/nodes/messages.py index 68e23472c..c845e9a1c 100644 --- a/src/gravel/controllers/nodes/messages.py +++ b/src/gravel/controllers/nodes/messages.py @@ -41,6 +41,7 @@ class WelcomeMessageModel(BaseModel): pubkey: str cephconf: str keyring: str + etcd_peer: str class ReadyToAddMessageModel(BaseModel): diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index c32bd2c96..0e69a49d8 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -24,6 +24,7 @@ from pathlib import Path import shlex +import aetcd3 from pydantic import BaseModel from fastapi import status from fastapi.logger import logger as fastapi_logger @@ -306,6 +307,16 @@ async def join(self, leader_address: str, token: str) -> bool: assert welcome.pubkey assert welcome.cephconf assert welcome.keyring + assert welcome.etcd_peer + + my_url: str = \ + f"{self._state.hostname}=http://{self._state.address}:2380" + initial_cluster: str = f"{welcome.etcd_peer},{my_url}" + self._spawn_etcd( + new=False, + token=None, + initial_cluster=initial_cluster + ) authorized_keys: Path = Path("/root/.ssh/authorized_keys") if not authorized_keys.parent.exists(): @@ -378,7 +389,12 @@ async def finish_bootstrap_cb( logger.error(f"bootstrap error: {e.message}") raise NodeCantBootstrapError(e.message) - def _spawn_etcd(self, new: bool, token: Optional[str]) -> None: + def _spawn_etcd( + self, + new: bool, + token: Optional[str], + initial_cluster: Optional[str] = None + ) -> None: assert self._state.hostname assert self._state.address @@ -389,13 +405,17 @@ def _spawn_etcd(self, new: bool, token: Optional[str]) -> None: client_url: str = f"http://{address}:2379" peer_url: str = f"http://{address}:2380" + + if not initial_cluster: + initial_cluster = f"{hostname}={peer_url}" + args_dict: Dict[str, str] = { "name": hostname, "initial-advertise-peer-urls": peer_url, "listen-peer-urls": peer_url, "listen-client-urls": f"{client_url},http://127.0.0.1:2379", "advertise-client-urls": client_url, - "initial-cluster": f"{hostname}={peer_url}", + "initial-cluster": initial_cluster, "initial-cluster-state": "existing", "data-dir": f"/var/lib/etcd/{hostname}.etcd" } @@ -725,10 +745,20 @@ async def _handle_join( logger.debug(f"handle join > pubkey: {pubkey}") + logger.debug("handle join > connect etcd client") + etcd: aetcd3.Etcd3Client = aetcd3.client() + peer_url: str = f"http://{msg.address}:2380" + logger.debug(f"handle join > add '{peer_url}' to etcd") + member = await etcd.add_member([peer_url]) + assert member is not None + + my_url: str = \ + f"{self._state.hostname}=http://{self._state.address}:2380" welcome = WelcomeMessageModel( pubkey=pubkey, cephconf=cephconf, - keyring=keyring + keyring=keyring, + etcd_peer=my_url ) try: logger.debug(f"handle join > send welcome: {welcome}") diff --git a/src/gravel/typings/aetcd3/__init__.pyi b/src/gravel/typings/aetcd3/__init__.pyi new file mode 100644 index 000000000..9766491cd --- /dev/null +++ b/src/gravel/typings/aetcd3/__init__.pyi @@ -0,0 +1,21 @@ +# ignore because we're not using it: from . import etcdrpc +from .client import ( + Etcd3Client, + Transactions, + client +) +from .exceptions import Etcd3Exception # type: ignore +from .leases import Lease # type: ignore +from .locks import Lock +from .members import Member # type: ignore + +__all__ = ( + 'Etcd3Client', + 'Etcd3Exception', + 'Lease', + 'Lock', + 'Member', + 'Transactions', + 'client', + # 'etcdrpc', +) diff --git a/src/gravel/typings/aetcd3/client.pyi b/src/gravel/typings/aetcd3/client.pyi new file mode 100644 index 000000000..031ab4b93 --- /dev/null +++ b/src/gravel/typings/aetcd3/client.pyi @@ -0,0 +1,248 @@ +# project aquarium's backend +# Copyright (C) 2021 SUSE, LLC. + +# pyright: reportMissingTypeStubs=false +from typing import ( + Any, + AsyncIterator, + Awaitable, + Callable, + Dict, + Generator, + IO, + List, + Optional, + Tuple, + Union +) + +from aetcd3.etcdrpc.rpc_pb2 import ( + Compare, + DeleteRangeResponse, + LeaseKeepAliveResponse, + LeaseRevokeResponse, + LeaseTimeToLiveResponse, + PutResponse, + RequestOp, + ResponseOp +) +from aetcd3.events import Event +from aetcd3.leases import Lease +from aetcd3.locks import Lock +from aetcd3.members import Member + + +class Transactions: + ... + + +class KVMetadata: + key: bytes + create_revision: int + mod_revision: int + version: int + lease_id: int + ... + + +class Status: + ... + + +class Alarm: + ... + + +class Etcd3Client: + + def __init__( + self, + host: str = ..., + port: int = ..., + ca_cert: Optional[str] = ..., + cert_key: Optional[str] = ..., + cert_cert: Optional[str] = ..., + timeout: Optional[float] = ..., + user: Optional[str] = ..., + password: Optional[str] = ..., + gprc_options: Dict[Any, Any] = ... + ) -> None: + ... + + async def open(self) -> None: ... + async def close(self) -> None: ... + + async def __aenter__(self) -> Any: + ... + + async def __aexit__(self, *args: Any) -> Any: + ... + + async def get( + self, + key: str, + serializable: bool = ... + ) -> Tuple[bytes, KVMetadata]: ... + + async def get_prefix( + self, + key_prefix: str, + sort_order: Optional[str] = ..., + sort_target: str = ..., + keys_only: bool = ... + ) -> Generator[Tuple[bytes, KVMetadata], None, None]: + ... + + async def get_range( + self, + range_start: str, + range_end: str, + sort_order: Optional[str] = ..., + sort_target: str = ..., + **kwargs: Any + ) -> Generator[Tuple[bytes, KVMetadata], None, None]: + ... + + async def get_all( + self, + sort_order: Optional[str] = ..., + sort_target: Optional[str] = ..., + keys_only: bool = ... + ) -> Generator[Tuple[bytes, KVMetadata], None, None]: + ... + ... + + async def put( + self, + key: str, + value: Union[bytes, str], + lease: Optional[Union[Lease, int]] = ..., + prev_kv: bool = ... + ) -> PutResponse: + ... + + async def replace( + self, + key: str, + initial_value: Union[bytes, str], + new_value: Union[bytes, str] + ) -> bool: + ... + + async def delete( + self, + key: str, + prev_kv: bool = ..., + return_response: bool = ... + ) -> Union[bool, DeleteRangeResponse]: + ... + + async def delete_prefix(self, prefix: str) -> DeleteRangeResponse: ... + async def status(self) -> Status: ... + + async def add_watch_callback( + self, + key: str, + callback: Callable[[Event], Awaitable[None]], + **kwargs: Any + ) -> int: + ... + + async def watch( + self, + key: str, + **kwargs: Any + ) -> Tuple[AsyncIterator[Event], Awaitable[None]]: + ... + + async def watch_prefix( + self, + key_prefix: str, + **kwargs: Any + ) -> Tuple[AsyncIterator[Event], Awaitable[None]]: + ... + + async def watch_once( + self, + key: str, + timeout: Optional[float] = ..., + **kwargs: Any + ) -> Any: + ... + + async def watch_prefix_once( + self, + key_prefix: str, + timeout: Optional[float] = ..., + **kwargs: Any + ) -> Any: + ... + + async def cancel_watch( + self, + watch_id: int + ) -> None: + ... + + async def transaction( + self, + compare: List[Compare], + success: Optional[List[RequestOp]] = ..., + failure: Optional[List[RequestOp]] = ... + ) -> Tuple[bool, List[ResponseOp]]: + ... + + async def lease(self, ttl: int, id: int) -> Lease: ... + async def revoke_lease(self, lease_id: int) -> LeaseRevokeResponse: ... + async def refresh_lease(self, lease_id: int) -> LeaseKeepAliveResponse: ... + + async def get_lease_info( + self, + lease_id: int, + *, + keys: bool = ... + ) -> LeaseTimeToLiveResponse: + ... + + def lock(self, name: str, ttl: int = ...) -> Lock: ... + + async def add_member(self, urls: List[str]) -> Member: ... + async def remove_member(self, member_id: int) -> None: ... + + async def update_member( + self, + member_id: int, + peer_urls: List[str] + ) -> None: ... + + async def members(self) -> Generator[Member, None, None]: ... + async def compact(self, revision: int, physical: bool = ...) -> None: ... + async def defragment(self) -> None: ... + async def hash(self) -> int: ... + async def create_alarm(self, member_id: int = ...) -> List[Alarm]: ... + + async def list_alarms( + self, + member_id: int, + alarm_type: str = ... + ) -> Generator[Alarm, None, None]: + ... + + async def disarm_alarm(self, member_id: int = ...) -> List[Alarm]: ... + async def snapshot(self, file_obj: IO[bytes]) -> None: ... + + ... + + +def client( + host: str = ..., + port: int = ..., + ca_cert: Optional[str] = ..., + cert_key: Optional[str] = ..., + timeout: Optional[float] = ..., + cert_cert: Optional[str] = ..., + user: Optional[str] = ..., + password: Optional[str] = ..., + **kwargs: Any +) -> Etcd3Client: + ... diff --git a/src/gravel/typings/aetcd3/events.pyi b/src/gravel/typings/aetcd3/events.pyi new file mode 100644 index 000000000..ae59389cf --- /dev/null +++ b/src/gravel/typings/aetcd3/events.pyi @@ -0,0 +1,7 @@ +# project aquarium's backend +# Copyright (C) 2021 SUSE, LLC. + +class Event: + + key: bytes + value: bytes diff --git a/src/mypy.ini b/src/mypy.ini index 369fea84c..a007daab7 100644 --- a/src/mypy.ini +++ b/src/mypy.ini @@ -9,3 +9,6 @@ allow_redefinition = True [mypy-pyfakefs.*] ignore_missing_imports = True + +[mypy-aetcd3.*] +ignore_missing_imports = True \ No newline at end of file diff --git a/src/requirements.txt b/src/requirements.txt index 1039c3857..45246d2de 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -1,4 +1,4 @@ -aiofiles==0.6.0 +aiofiles click==7.1.2 fastapi==0.63.0 h11==0.12.0 @@ -7,3 +7,4 @@ starlette==0.13.6 uvicorn==0.13.3 pip websockets==8.1 +aetcd3 From 857989f99db692c347b94359f3edf79847d3d8dd Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Tue, 6 Apr 2021 07:54:54 +0000 Subject: [PATCH 05/22] gravel: nodes/mgr: make ceph.conf managed by cephadm So we can have a consistent and up to date ceph.conf across all nodes, let's rely on cephadm to manage it. We don't drop the ceph.conf initially shared with a joining node because we want to ensure that node is able to perform operations on the cluster as soon as join finishes, and we don't want to have to wait for cephadm to write the ceph.conf. Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/nodes/mgr.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index 0e69a49d8..c9c0c55e2 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -510,6 +510,14 @@ async def _finish_bootstrap_config(self) -> None: if not res: logger.error("unable to set default ruleset") + res = mon.config_set( + "mgr", + "mgr/cephadm/manage_etc_ceph_ceph_conf", + "true" + ) + if not res: + logger.error("unable to enable managed ceph.conf by cephadm") + @property def bootstrapper_stage(self) -> BootstrapStage: if not self._bootstrapper: From a5c7ca6b69cf9679ba88da3b6ac03c376d7ed585 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Tue, 6 Apr 2021 18:44:57 +0000 Subject: [PATCH 06/22] gravel: add kvstore controller Also adds needed typings for aetcd3.locks module. Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/kv.py | 122 ++++++++++++++++++++++++++++ src/gravel/typings/aetcd3/locks.pyi | 23 ++++++ 2 files changed, 145 insertions(+) create mode 100644 src/gravel/controllers/kv.py create mode 100644 src/gravel/typings/aetcd3/locks.pyi diff --git a/src/gravel/controllers/kv.py b/src/gravel/controllers/kv.py new file mode 100644 index 000000000..21d1d6be8 --- /dev/null +++ b/src/gravel/controllers/kv.py @@ -0,0 +1,122 @@ +# project aquarium's backend +# Copyright (C) 2021 SUSE, LLC. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +import asyncio +from typing import Callable, Optional +import aetcd3 +import aetcd3.locks +import aetcd3.events +import grpclib.exceptions + +from logging import Logger +from fastapi.logger import logger as fastapi_logger + +logger: Logger = fastapi_logger + + +class Lock: + _lock: aetcd3.locks.Lock + _is_acquired: bool + + def __init__(self, lock: aetcd3.locks.Lock): + self._lock = lock + self._is_acquired = False + + async def acquire(self) -> None: + await self._lock.acquire() + self._is_acquired = True + + async def release(self) -> None: + if not self._is_acquired: + return + await self._lock.release() + self._is_acquired = False + + +class KV: + + _client: Optional[aetcd3.Etcd3Client] + _is_closing: bool + + def __init__(self): + self._client = None + self._is_open = False + self._is_closing = False + + async def ensure_connection(self) -> None: + """ Open k/v store connection """ + # try getting the status, loop until we make it. + opened = False + while not self._is_closing: + try: + async with aetcd3.client() as client: + await client.status() + except Exception: + logger.warn("etcd not up yet? sleep.") + await asyncio.sleep(1.0) + continue + opened = True + break + if opened: + self._client = aetcd3.client() + logger.info("opened kvstore connection") + + async def close(self) -> None: + """ Close k/v store connection """ + self._is_closing = True + if not self._client: + return + await self._client.close() + self._client = None + + async def put(self, key: str, value: str) -> None: + """ Put key/value pair """ + assert self._client + await self._client.put(key, value) + + async def get(self, key: str) -> str: + """ Get value for provided key """ + assert self._client + value, _ = await self._client.get(key) + return value.decode("utf-8") + + async def rm(self, key: str) -> None: + """ Remove key from store """ + assert self._client + await self._client.delete(key) + + async def lock(self, key: str) -> Lock: + """ Lock a given key. Requires compliant consumers. """ + assert self._client + return Lock(self._client.lock(key)) + + async def watch( + self, + key: str, + callback: Callable[[str, str], None] + ) -> int: + """ Watch updates on a given key """ + assert self._client + + async def _cb(what: aetcd3.events.Event) -> None: + if not what or \ + type(what) == grpclib.exceptions.StreamTerminatedError: + return + callback(what.key.decode("utf-8"), what.value.decode("utf-8")) + + return await self._client.add_watch_callback(key, _cb) + + async def cancel_watch(self, watch_id: int) -> None: + """ Cancel a watch """ + assert self._client + await self._client.cancel_watch(watch_id) diff --git a/src/gravel/typings/aetcd3/locks.pyi b/src/gravel/typings/aetcd3/locks.pyi new file mode 100644 index 000000000..cb2f2d6ee --- /dev/null +++ b/src/gravel/typings/aetcd3/locks.pyi @@ -0,0 +1,23 @@ +# project aquarium's backend +# Copyright (C) 2021 SUSE, LLC. + +# pyright: reportMissingTypeStubs=false +from typing import Optional +import aetcd3 +from aetcd3.etcdrpc.rpc_pb2 import LeaseKeepAliveResponse + + +class Lock: + + def __init__( + self, + name: str, + ttl: int = ..., + etcd_client: Optional[aetcd3.Etcd3Client] = ... + ) -> None: + ... + + async def acquire(self, timeout: int = ...) -> bool: ... + async def release(self) -> bool: ... + async def refresh(self) -> LeaseKeepAliveResponse: ... + async def is_acquired(self) -> bool: ... From 61fa34990bf956e635d4d84b8ff8f1cd5b7e9d00 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Wed, 7 Apr 2021 06:27:38 +0000 Subject: [PATCH 07/22] gravel: nodes/mgr: move node init earlier in the file Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/nodes/mgr.py | 44 ++++++++++++++--------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index c9c0c55e2..1e2baa098 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -185,6 +185,28 @@ def __init__(self): self._spawn_etcd(new=False, token=None) self._node_start() + def _node_init(self) -> None: + statefile: Path = self._get_node_file("node") + if not statefile.exists(): + # other control files must not exist either + tokenfile: Path = self._get_node_file("token") + assert not tokenfile.exists() + + state = NodeStateModel( + uuid=uuid4(), + role=NodeRoleEnum.NONE, + stage=NodeStageEnum.NONE, + address=None, + hostname=None + ) + try: + statefile.write_text(state.json()) + except Exception as e: + raise NodeError(str(e)) + assert statefile.exists() + + self._state = NodeStateModel.parse_file(statefile) + def _node_prestart(self, nodeinfo: NodeInfoModel): """ sets hostname and addresses; allows bootstrap, join. """ assert self._state.stage == NodeStageEnum.NONE @@ -606,28 +628,6 @@ def _get_node_file(self, what: str) -> Path: assert confdir.is_dir() return confdir.joinpath(f"{what}.json") - def _node_init(self) -> None: - statefile: Path = self._get_node_file("node") - if not statefile.exists(): - # other control files must not exist either - tokenfile: Path = self._get_node_file("token") - assert not tokenfile.exists() - - state = NodeStateModel( - uuid=uuid4(), - role=NodeRoleEnum.NONE, - stage=NodeStageEnum.NONE, - address=None, - hostname=None - ) - try: - statefile.write_text(state.json()) - except Exception as e: - raise NodeError(str(e)) - assert statefile.exists() - - self._state = NodeStateModel.parse_file(statefile) - def _load(self) -> None: self._token = self._load_token() From 7fe2d1475fd57dac178bbfd0045709ebed005825 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Wed, 7 Apr 2021 10:39:24 +0000 Subject: [PATCH 08/22] gravel: nodes/mgr: make start async, add shutdown Signed-off-by: Joao Eduardo Luis --- src/aquarium.py | 3 ++- src/gravel/controllers/nodes/mgr.py | 19 +++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/aquarium.py b/src/aquarium.py index 7994e03a8..a12bbcf42 100644 --- a/src/aquarium.py +++ b/src/aquarium.py @@ -89,7 +89,7 @@ async def on_startup(): logger.info("Aquarium startup!") # init node mgr - mgr.init_node_mgr() + await mgr.init_node_mgr() # create a task simply so we don't hold up the startup asyncio.create_task(gstate.start()) @@ -99,6 +99,7 @@ async def on_startup(): @app.on_event("shutdown") # type: ignore async def on_shutdown(): await gstate.shutdown() + await mgr.shutdown() api.include_router(local.router) diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index 1e2baa098..2478acc4b 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -169,9 +169,11 @@ def __init__(self): multiprocessing.set_start_method("spawn") self._node_init() + + async def start(self) -> None: assert self._state - logger.debug(f"init > {self._state}") + logger.debug(f"start > {self._state}") assert self._state.stage == NodeStageEnum.NONE or \ self._state.stage == NodeStageEnum.BOOTSTRAPPED or \ @@ -185,6 +187,9 @@ def __init__(self): self._spawn_etcd(new=False, token=None) self._node_start() + async def shutdown(self) -> None: + pass + def _node_init(self) -> None: statefile: Path = self._get_node_file("node") if not statefile.exists(): @@ -829,7 +834,17 @@ def get_node_mgr() -> NodeMgr: return _nodemgr -def init_node_mgr() -> None: +async def init_node_mgr() -> None: global _nodemgr assert not _nodemgr + logger.info("starting node manager") _nodemgr = NodeMgr() + await _nodemgr.start() + + +async def shutdown() -> None: + global _nodemgr + if _nodemgr: + logger.info("shutting down node manager") + await _nodemgr.shutdown() + _nodemgr = None From fd73020643c2478df31ecbd905fa24413c819e1a Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Wed, 7 Apr 2021 10:48:26 +0000 Subject: [PATCH 09/22] gravel: nodes/mgr: make as async as possible Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/nodes/mgr.py | 50 ++++++++++++++--------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index 2478acc4b..f4f97c390 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -180,12 +180,12 @@ async def start(self) -> None: self._state.stage == NodeStageEnum.READY if self._state.stage == NodeStageEnum.NONE: - self._wait_inventory() + await self._wait_inventory() else: assert self._state.stage == NodeStageEnum.READY or \ self._state.stage == NodeStageEnum.BOOTSTRAPPED self._spawn_etcd(new=False, token=None) - self._node_start() + await self._node_start() async def shutdown(self) -> None: pass @@ -212,7 +212,7 @@ def _node_init(self) -> None: self._state = NodeStateModel.parse_file(statefile) - def _node_prestart(self, nodeinfo: NodeInfoModel): + async def _node_prestart(self, nodeinfo: NodeInfoModel): """ sets hostname and addresses; allows bootstrap, join. """ assert self._state.stage == NodeStageEnum.NONE assert self._init_stage == NodeInitStage.NONE @@ -236,9 +236,9 @@ def _node_prestart(self, nodeinfo: NodeInfoModel): address = address[:netmask_idx] self._state.address = address - self._save_state() + await self._save_state() - def _node_start(self) -> None: + async def _node_start(self) -> None: """ node is ready to accept incoming messages, if leader """ assert self._state assert self._state.stage == NodeStageEnum.READY or \ @@ -247,7 +247,7 @@ def _node_start(self) -> None: logger.info("start node") - self._load() + await self._load() self._init_stage = NodeInitStage.STARTED if self._state.role != NodeRoleEnum.LEADER: @@ -262,12 +262,12 @@ def _node_shutdown(self) -> None: self._init_stage = NodeInitStage.STOPPING self._incoming_task.cancel() - def _wait_inventory(self) -> None: + async def _wait_inventory(self) -> None: async def _subscriber(nodeinfo: NodeInfoModel) -> None: logger.debug(f"subscriber > node info: {nodeinfo}") assert nodeinfo - self._node_prestart(nodeinfo) + await self._node_prestart(nodeinfo) get_inventory().subscribe(_subscriber, once=True) @@ -317,7 +317,7 @@ async def join(self, leader_address: str, token: str) -> bool: await conn.send(msg) self._state.stage = NodeStageEnum.JOINING - self._save_state() + await self._save_state() reply: MessageModel = await conn.receive() logger.debug(f"join > recv: {reply}") @@ -326,7 +326,7 @@ async def join(self, leader_address: str, token: str) -> bool: logger.error(f"join > error: {errmsg.what}") await conn.close() self._state.stage = NodeStageEnum.NONE - self._save_state() + await self._save_state() return False assert reply.type == MessageTypeEnum.WELCOME @@ -372,12 +372,12 @@ async def join(self, leader_address: str, token: str) -> bool: self._state.stage = NodeStageEnum.READY self._state.role = NodeRoleEnum.FOLLOWER - self._save_state() + await self._save_state() self._token = token - self._save_token(should_exist=False) + await self._save_token(should_exist=False) - self._node_start() + await self._node_start() return True async def bootstrap(self) -> None: @@ -475,7 +475,7 @@ async def _prepare_bootstrap(self) -> None: raise NodeNotStartedError() self._token = self._generate_token() - self._save_token(should_exist=False) + await self._save_token(should_exist=False) logger.info(f"generated new token: {self._token}") self._bootstrap_etcd(self._token) @@ -485,7 +485,7 @@ async def _start_bootstrap(self) -> None: assert self._state.hostname assert self._state.address self._state.stage = NodeStageEnum.BOOTSTRAPPING - self._save_state() + await self._save_state() async def _finish_bootstrap_error(self, error: str) -> None: """ @@ -497,7 +497,7 @@ async def _finish_bootstrap_error(self, error: str) -> None: assert self._state.stage == NodeStageEnum.BOOTSTRAPPING self._state.stage = NodeStageEnum.ERROR - self._save_state() + await self._save_state() async def _finish_bootstrap(self): """ @@ -512,12 +512,12 @@ async def _finish_bootstrap(self): self._state.stage = NodeStageEnum.BOOTSTRAPPED self._state.role = NodeRoleEnum.LEADER - self._save_state() + await self._save_state() logger.debug(f"finished bootstrap: token = {self._token}") - self._load() - self._node_start() + await self._load() + await self._node_start() async def _finish_bootstrap_config(self) -> None: mon: Mon = Mon() @@ -580,7 +580,7 @@ async def finish_deployment(self) -> None: return self._state.stage = NodeStageEnum.READY - self._save_state() + await self._save_state() @property def inited(self) -> bool: @@ -633,10 +633,10 @@ def _get_node_file(self, what: str) -> Path: assert confdir.is_dir() return confdir.joinpath(f"{what}.json") - def _load(self) -> None: - self._token = self._load_token() + async def _load(self) -> None: + self._token = await self._load_token() - def _load_token(self) -> Optional[str]: + async def _load_token(self) -> Optional[str]: assert self._state confdir: Path = gstate.config.confdir assert confdir.exists() @@ -648,7 +648,7 @@ def _load_token(self) -> Optional[str]: token = TokenModel.parse_file(tokenfile) return token.token - def _save_token(self, should_exist: bool = True) -> None: + async def _save_token(self, should_exist: bool = True) -> None: tokenfile: Path = self._get_node_file("token") # this check could be a single assert, but it's important to know which @@ -661,7 +661,7 @@ def _save_token(self, should_exist: bool = True) -> None: token: TokenModel = TokenModel(token=self._token) tokenfile.write_text(token.json()) - def _save_state(self, should_exist: bool = True) -> None: + async def _save_state(self, should_exist: bool = True) -> None: statefile: Path = self._get_node_file("node") # this check could be a single assert, but it's important to know which From c086a16f64757a239dca674fb235dadb2a61ec01 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Wed, 7 Apr 2021 12:52:05 +0000 Subject: [PATCH 10/22] gravel: nodes/mgr: make etcd spawn async Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/nodes/mgr.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index f4f97c390..6607d9008 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -184,7 +184,7 @@ async def start(self) -> None: else: assert self._state.stage == NodeStageEnum.READY or \ self._state.stage == NodeStageEnum.BOOTSTRAPPED - self._spawn_etcd(new=False, token=None) + await self._spawn_etcd(new=False, token=None) await self._node_start() async def shutdown(self) -> None: @@ -339,7 +339,7 @@ async def join(self, leader_address: str, token: str) -> bool: my_url: str = \ f"{self._state.hostname}=http://{self._state.address}:2380" initial_cluster: str = f"{welcome.etcd_peer},{my_url}" - self._spawn_etcd( + await self._spawn_etcd( new=False, token=None, initial_cluster=initial_cluster @@ -416,7 +416,7 @@ async def finish_bootstrap_cb( logger.error(f"bootstrap error: {e.message}") raise NodeCantBootstrapError(e.message) - def _spawn_etcd( + async def _spawn_etcd( self, new: bool, token: Optional[str], @@ -462,8 +462,8 @@ def _spawn_etcd( ) process.start() - def _bootstrap_etcd(self, token: str) -> None: - self._spawn_etcd(new=True, token=token) + async def _bootstrap_etcd(self, token: str) -> None: + await self._spawn_etcd(new=True, token=token) async def _prepare_bootstrap(self) -> None: assert self._state @@ -477,7 +477,7 @@ async def _prepare_bootstrap(self) -> None: self._token = self._generate_token() await self._save_token(should_exist=False) logger.info(f"generated new token: {self._token}") - self._bootstrap_etcd(self._token) + await self._bootstrap_etcd(self._token) async def _start_bootstrap(self) -> None: assert self._state From 21eaea54b68b70fc6cd76cdd1489c419a270991d Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Wed, 7 Apr 2021 12:53:18 +0000 Subject: [PATCH 11/22] gravel: nodes/mgr: add kvstore support Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/nodes/mgr.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index 6607d9008..1441f5a53 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -69,6 +69,7 @@ MessageTypeEnum, ) from gravel.controllers.orch.orchestrator import Orchestrator +from gravel.controllers.kv import KV logger: Logger = fastapi_logger @@ -157,6 +158,7 @@ class NodeMgr: _token: Optional[str] _joining: Dict[str, JoiningNodeModel] _bootstrapper: Optional[Bootstrap] + _kvstore: Optional[KV] def __init__(self): self._init_stage = NodeInitStage.NONE @@ -165,6 +167,7 @@ def __init__(self): self._token = None self._joining = {} self._bootstrapper = None + self._kvstore = None multiprocessing.set_start_method("spawn") @@ -188,7 +191,8 @@ async def start(self) -> None: await self._node_start() async def shutdown(self) -> None: - pass + if self._kvstore: + await self._kvstore.close() def _node_init(self) -> None: statefile: Path = self._get_node_file("node") @@ -461,6 +465,9 @@ async def _spawn_etcd( args=(etcd_cmd,) ) process.start() + logger.info(f"started etcd process pid = {process.pid}") + self._kvstore = KV() + await self._kvstore.ensure_connection() async def _bootstrap_etcd(self, token: str) -> None: await self._spawn_etcd(new=True, token=token) From 61ad42b6cf8071a49feaa1af2ecd9428d19b1451 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Wed, 7 Apr 2021 12:53:36 +0000 Subject: [PATCH 12/22] gravel: nodes/mgr: keep token in kvstore Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/nodes/mgr.py | 37 ++++++++--------------------- 1 file changed, 10 insertions(+), 27 deletions(-) diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index 1441f5a53..a790f8b88 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -197,9 +197,6 @@ async def shutdown(self) -> None: def _node_init(self) -> None: statefile: Path = self._get_node_file("node") if not statefile.exists(): - # other control files must not exist either - tokenfile: Path = self._get_node_file("token") - assert not tokenfile.exists() state = NodeStateModel( uuid=uuid4(), @@ -379,7 +376,6 @@ async def join(self, leader_address: str, token: str) -> bool: await self._save_state() self._token = token - await self._save_token(should_exist=False) await self._node_start() return True @@ -482,9 +478,9 @@ async def _prepare_bootstrap(self) -> None: raise NodeNotStartedError() self._token = self._generate_token() - await self._save_token(should_exist=False) logger.info(f"generated new token: {self._token}") await self._bootstrap_etcd(self._token) + await self._save_token() async def _start_bootstrap(self) -> None: assert self._state @@ -644,29 +640,16 @@ async def _load(self) -> None: self._token = await self._load_token() async def _load_token(self) -> Optional[str]: - assert self._state - confdir: Path = gstate.config.confdir - assert confdir.exists() - assert confdir.is_dir() - tokenfile: Path = confdir.joinpath("token.json") - if not tokenfile.exists(): - assert self._state.stage < NodeStageEnum.BOOTSTRAPPED - return None - token = TokenModel.parse_file(tokenfile) - return token.token - - async def _save_token(self, should_exist: bool = True) -> None: - tokenfile: Path = self._get_node_file("token") - - # this check could be a single assert, but it's important to know which - # path failed. - if should_exist: - assert tokenfile.exists() - else: - assert not tokenfile.exists() + assert self._kvstore + tokenstr = await self._kvstore.get("/nodes/token") + assert tokenstr + return tokenstr - token: TokenModel = TokenModel(token=self._token) - tokenfile.write_text(token.json()) + async def _save_token(self) -> None: + assert self._token + logger.info(f"saving token: {self._token}") + assert self._kvstore + await self._kvstore.put("/nodes/token", self._token) async def _save_state(self, should_exist: bool = True) -> None: statefile: Path = self._get_node_file("node") From 2fefa66e44ba9d9a2d7ed9299bf2423febb3d8cf Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Thu, 8 Apr 2021 06:36:46 +0000 Subject: [PATCH 13/22] images: config.sh: install etcd from custom location We needed to patch aetcd3 and the fix hasn't been merged upstream yet. So, we created our own package and uploaded it somewhere it can be reached. This is what we are installing now. Signed-off-by: Joao Eduardo Luis --- images/microOS/config.sh | 3 ++- src/requirements.txt | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/images/microOS/config.sh b/images/microOS/config.sh index 45fcb2bba..5b4e09c1c 100644 --- a/images/microOS/config.sh +++ b/images/microOS/config.sh @@ -174,7 +174,8 @@ EOF fi if [[ "$kiwi_profiles" == *"Ceph"* ]]; then - pip install fastapi uvicorn aetcd3 + pip install fastapi uvicorn \ + https://celeuma.wipwd.dev/aqr/aetcd3/aetcd3-0.1.0a5.dev2+g0aa852e.d20210410-py3-none-any.whl baseInsertService aquarium fi diff --git a/src/requirements.txt b/src/requirements.txt index 45246d2de..0636b4195 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -7,4 +7,4 @@ starlette==0.13.6 uvicorn==0.13.3 pip websockets==8.1 -aetcd3 +https://celeuma.wipwd.dev/aqr/aetcd3/aetcd3-0.1.0a5.dev2+g0aa852e.d20210410-py3-none-any.whl From ac8d0af23ac8e73b690d70fa37181cf7b6f058a7 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Thu, 8 Apr 2021 06:43:25 +0000 Subject: [PATCH 14/22] gravel: nodes/mgr: obtain state on start, watch Obtain state and watch changes, update as needed. Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/nodes/mgr.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index a790f8b88..8156c6aa5 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -248,6 +248,7 @@ async def _node_start(self) -> None: logger.info("start node") + await self._obtain_state() await self._load() self._init_stage = NodeInitStage.STARTED @@ -630,6 +631,17 @@ def connmgr(self) -> ConnMgr: def token(self) -> Optional[str]: return self._token + async def _obtain_state(self) -> None: + assert self._kvstore + + def _watcher(key: str, value: str) -> None: + if key == "/nodes/token": + logger.info(f"got updated token: {value}") + self._token = value + + self._token = await self._load_token() + await self._kvstore.watch("/nodes/token", _watcher) + def _get_node_file(self, what: str) -> Path: confdir: Path = gstate.config.confdir assert confdir.exists() From a0ad5cd6a23a42c7e1315a2fd9cb045ada1fa465 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Thu, 8 Apr 2021 07:34:58 +0000 Subject: [PATCH 15/22] gravel: ctrl/svc: make it a ticker We're going to cache things in the near-future, and obtain state from the kvstore, watches and whatnot. We need it to be a full fledged service. Signed-off-by: Joao Eduardo Luis --- src/gravel/api/services.py | 17 +++++---- src/gravel/controllers/config.py | 5 +++ src/gravel/controllers/nodes/mgr.py | 4 +++ src/gravel/controllers/resources/status.py | 5 +-- src/gravel/controllers/services.py | 41 ++++++++++++++++++++-- 5 files changed, 58 insertions(+), 14 deletions(-) diff --git a/src/gravel/api/services.py b/src/gravel/api/services.py index aa1e75252..b339350af 100644 --- a/src/gravel/api/services.py +++ b/src/gravel/api/services.py @@ -24,7 +24,6 @@ CephFSNoAuthorizationError ) from gravel.controllers.orch.models import CephFSAuthorizationModel - from gravel.controllers.services import ( ConstraintsModel, NotEnoughSpaceError, @@ -33,8 +32,8 @@ ServiceRequirementsModel, ServiceStorageModel, ServiceTypeEnum, - Services, - UnknownServiceError + UnknownServiceError, + get_services_ctrl ) @@ -72,13 +71,13 @@ class CreateResponse(BaseModel): response_model=ConstraintsModel ) async def get_constraints() -> ConstraintsModel: - services = Services() + services = get_services_ctrl() return services.constraints @router.get("/", response_model=List[ServiceModel]) async def get_services() -> List[ServiceModel]: - services = Services() + services = get_services_ctrl() return services.ls() @@ -86,7 +85,7 @@ async def get_services() -> List[ServiceModel]: name="Get service by name", response_model=ServiceModel) async def get_service(service_name: str) -> ServiceModel: - services = Services() + services = get_services_ctrl() try: return services.get(service_name) except UnknownServiceError as e: @@ -108,7 +107,7 @@ async def check_requirements( detail="requires positive 'size' and number of 'replicas'" ) - services = Services() + services = get_services_ctrl() feasible, reqs = services.check_requirements(size, replicas) return RequirementsResponse(feasible=feasible, requirements=reqs) @@ -116,7 +115,7 @@ async def check_requirements( @router.post("/create", response_model=CreateResponse) async def create_service(req: CreateRequest) -> CreateResponse: - services = Services() + services = get_services_ctrl() try: services.create(req.name, req.type, req.size, req.replicas) except NotImplementedError as e: @@ -144,7 +143,7 @@ async def get_statistics() -> Dict[str, ServiceStorageModel]: allocated space for said service and how much space is being used, along with the service's space utilization. """ - services = Services() + services = get_services_ctrl() return services.get_stats() diff --git a/src/gravel/controllers/config.py b/src/gravel/controllers/config.py index 8df0c90e0..b9a20740d 100644 --- a/src/gravel/controllers/config.py +++ b/src/gravel/controllers/config.py @@ -44,6 +44,10 @@ class StatusOptionsModel(BaseModel): probe_interval: float = Field(1.0, title="Status Probe Interval") +class ServicesOptionsModel(BaseModel): + probe_interval: float = Field(1.0, title="Services Probe Interval") + + class OptionsModel(BaseModel): service_state_path: Path = Field(Path(config_dir).joinpath("storage.json"), title="Path to Service State file") @@ -51,6 +55,7 @@ class OptionsModel(BaseModel): storage: StorageOptionsModel = Field(StorageOptionsModel()) devices: DevicesOptionsModel = Field(DevicesOptionsModel()) status: StatusOptionsModel = Field(StatusOptionsModel()) + services: ServicesOptionsModel = Field(ServicesOptionsModel()) class ConfigModel(BaseModel): diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index 8156c6aa5..49de352df 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -590,6 +590,10 @@ async def finish_deployment(self) -> None: def inited(self) -> bool: return self._init_stage >= NodeInitStage.PRESTART + @property + def started(self) -> bool: + return self._init_stage == NodeInitStage.STARTED + @property def stage(self) -> NodeStageEnum: assert self._state diff --git a/src/gravel/controllers/resources/status.py b/src/gravel/controllers/resources/status.py index 0df3fc9e7..9cb976d63 100644 --- a/src/gravel/controllers/resources/status.py +++ b/src/gravel/controllers/resources/status.py @@ -38,7 +38,8 @@ ) from gravel.controllers.services import ( ServiceTypeEnum, - Services + Services, + get_services_ctrl ) @@ -121,7 +122,7 @@ def client_io_rate(self) -> OverallClientIORateModel: raise ClientIORateNotAvailableError() services_rates: List[ServiceIORateModel] = [] - services: Services = Services() + services: Services = get_services_ctrl() for service in services.ls(): svc_name: str = service.name svc_type: ServiceTypeEnum = service.type diff --git a/src/gravel/controllers/services.py b/src/gravel/controllers/services.py index 73bb276e7..e5b7e96c0 100644 --- a/src/gravel/controllers/services.py +++ b/src/gravel/controllers/services.py @@ -18,6 +18,10 @@ from fastapi.logger import logger as fastapi_logger from pydantic import BaseModel from pydantic.fields import Field +from gravel.controllers.nodes.mgr import ( + NodeMgr, + get_node_mgr +) from gravel.controllers.orch.ceph import Mon from gravel.controllers.orch.cephfs import CephFS, CephFSError @@ -31,7 +35,10 @@ NFSExport, NFSService ) -from gravel.controllers.gstate import gstate +from gravel.controllers.gstate import ( + Ticker, + gstate +) from gravel.controllers.resources.devices import ( DeviceHostModel, Devices, @@ -119,13 +126,32 @@ class ServiceStorageModel(BaseModel): utilization: float = Field(0, title="Utilization") -class Services: +class Services(Ticker): _services: Dict[str, ServiceModel] + _ready: bool def __init__(self): + super().__init__( + "services", + gstate.config.options.services.probe_interval + ) self._services = {} - self._load() + self._ready = False + + def _is_ready(self) -> bool: + nodemgr: NodeMgr = get_node_mgr() + return nodemgr.started + + async def _should_tick(self) -> bool: + return self._is_ready() + + async def _do_tick(self) -> None: + assert self._is_ready() + if not self._ready: + self._load() + self._ready = True + logger.debug(f"tick {len(self._services)} services") def create(self, name: str, type: ServiceTypeEnum, @@ -389,3 +415,12 @@ def _load(self) -> None: return state: StateModel = StateModel.parse_file(path) self._services = state.state + + +_services: Services = Services() + + +def get_services_ctrl() -> Services: + global _services + assert _services + return _services From bb22107c78ad2366b9a6c77107c3d7f89fe2549f Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Thu, 8 Apr 2021 17:51:45 +0000 Subject: [PATCH 16/22] gravel: ctrl/svc: gate operations if not ready Signed-off-by: Joao Eduardo Luis --- src/gravel/api/services.py | 8 +++++++- src/gravel/controllers/services.py | 22 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/gravel/api/services.py b/src/gravel/api/services.py index b339350af..7419a91e1 100644 --- a/src/gravel/api/services.py +++ b/src/gravel/api/services.py @@ -27,6 +27,7 @@ from gravel.controllers.services import ( ConstraintsModel, NotEnoughSpaceError, + NotReadyError, ServiceError, ServiceModel, ServiceRequirementsModel, @@ -129,6 +130,8 @@ async def create_service(req: CreateRequest) -> CreateResponse: except Exception as e: raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) + except NotReadyError: + raise HTTPException(status.HTTP_425_TOO_EARLY) return CreateResponse(success=True) @@ -144,7 +147,10 @@ async def get_statistics() -> Dict[str, ServiceStorageModel]: with the service's space utilization. """ services = get_services_ctrl() - return services.get_stats() + try: + return services.get_stats() + except NotReadyError: + raise HTTPException(status.HTTP_425_TOO_EARLY) @router.get( diff --git a/src/gravel/controllers/services.py b/src/gravel/controllers/services.py index e5b7e96c0..2a8d618bc 100644 --- a/src/gravel/controllers/services.py +++ b/src/gravel/controllers/services.py @@ -69,6 +69,10 @@ class NotEnoughSpaceError(ServiceError): pass +class NotReadyError(ServiceError): + pass + + class ServiceTypeEnum(str, Enum): CEPHFS = "cephfs" NFS = "nfs" @@ -158,6 +162,10 @@ def create(self, name: str, size: int, replicas: int ) -> ServiceModel: + + if not self._is_ready(): + raise NotReadyError() + if name in self._services: raise ServiceExistsError(f"service {name} already exists") @@ -186,6 +194,8 @@ def create(self, name: str, return svc def remove(self, name: str): + if not self._is_ready(): + raise NotReadyError() pass def ls(self) -> List[ServiceModel]: @@ -291,6 +301,10 @@ def get_stats(self) -> Dict[str, ServiceStorageModel]: used for each service, and utilization as a function of the used space and the allocated space. """ + + if not self._is_ready(): + raise NotReadyError() + storage: Storage = get_storage() storage_pools: Dict[int, StoragePoolModel] = storage.usage().pools_by_id @@ -323,6 +337,8 @@ def get_stats(self) -> Dict[str, ServiceStorageModel]: return services def _create_cephfs(self, svc: ServiceModel) -> None: + assert self._is_ready() + cephfs = CephFS() try: cephfs.create(svc.name) @@ -367,6 +383,8 @@ def get_pool(name: str) -> CephOSDPoolEntryModel: # client. def _create_nfs(self, svc: ServiceModel) -> None: + assert self._is_ready() + # create a cephfs self._create_cephfs(svc) @@ -402,6 +420,8 @@ def _create_nfs(self, svc: ServiceModel) -> None: raise ServiceError("unable to create nfs export") from e def _save(self) -> None: + assert self._is_ready() + assert gstate.config.options.service_state_path path = Path(gstate.config.options.service_state_path) path.parent.mkdir(mode=0o700, parents=True, exist_ok=True) @@ -409,6 +429,8 @@ def _save(self) -> None: path.write_text(state.json(indent=2)) def _load(self) -> None: + assert self._is_ready() + assert gstate.config.options.service_state_path path = Path(gstate.config.options.service_state_path) if not path.exists(): From 61021bb8b359e422bdd708cbb94d752d79e56481 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Fri, 9 Apr 2021 13:07:30 +0000 Subject: [PATCH 17/22] gravel: kv: allow returning None on 'get' Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/kv.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/gravel/controllers/kv.py b/src/gravel/controllers/kv.py index 21d1d6be8..fb35a0a0f 100644 --- a/src/gravel/controllers/kv.py +++ b/src/gravel/controllers/kv.py @@ -84,10 +84,12 @@ async def put(self, key: str, value: str) -> None: assert self._client await self._client.put(key, value) - async def get(self, key: str) -> str: + async def get(self, key: str) -> Optional[str]: """ Get value for provided key """ assert self._client value, _ = await self._client.get(key) + if not value: + return None return value.decode("utf-8") async def rm(self, key: str) -> None: From 5a973ad83fd85578d8773d564d7880536fbc7e75 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Fri, 9 Apr 2021 13:08:09 +0000 Subject: [PATCH 18/22] gravel: nodes/mgr: expose kvstore to the world Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/nodes/mgr.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py index 49de352df..fa228f366 100644 --- a/src/gravel/controllers/nodes/mgr.py +++ b/src/gravel/controllers/nodes/mgr.py @@ -635,6 +635,11 @@ def connmgr(self) -> ConnMgr: def token(self) -> Optional[str]: return self._token + @property + def store(self) -> KV: + assert self._kvstore + return self._kvstore + async def _obtain_state(self) -> None: assert self._kvstore From c73748868b2ecb10cdc379a6092ea8b9c76e324c Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Fri, 9 Apr 2021 13:08:30 +0000 Subject: [PATCH 19/22] gravel: ctrl/svc: keep state in kvstore Signed-off-by: Joao Eduardo Luis --- src/gravel/api/services.py | 2 +- src/gravel/controllers/services.py | 56 ++++++++++++++++++++---------- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/src/gravel/api/services.py b/src/gravel/api/services.py index 7419a91e1..ec899c909 100644 --- a/src/gravel/api/services.py +++ b/src/gravel/api/services.py @@ -118,7 +118,7 @@ async def create_service(req: CreateRequest) -> CreateResponse: services = get_services_ctrl() try: - services.create(req.name, req.type, req.size, req.replicas) + await services.create(req.name, req.type, req.size, req.replicas) except NotImplementedError as e: raise HTTPException(status.HTTP_501_NOT_IMPLEMENTED, detail=str(e)) diff --git a/src/gravel/controllers/services.py b/src/gravel/controllers/services.py index 2a8d618bc..a751c47d2 100644 --- a/src/gravel/controllers/services.py +++ b/src/gravel/controllers/services.py @@ -12,7 +12,6 @@ # GNU General Public License for more details. from enum import Enum -from pathlib import Path from typing import Dict, List, Tuple from logging import Logger from fastapi.logger import logger as fastapi_logger @@ -153,15 +152,18 @@ async def _should_tick(self) -> bool: async def _do_tick(self) -> None: assert self._is_ready() if not self._ready: - self._load() + await self._load() + await self._set_watchers() self._ready = True logger.debug(f"tick {len(self._services)} services") - def create(self, name: str, - type: ServiceTypeEnum, - size: int, - replicas: int - ) -> ServiceModel: + async def create( + self, + name: str, + type: ServiceTypeEnum, + size: int, + replicas: int + ) -> ServiceModel: if not self._is_ready(): raise NotReadyError() @@ -190,7 +192,7 @@ def create(self, name: str, raise NotImplementedError(f"unknown service type: {svc.type}") self._services[name] = svc - self._save() + await self._save() return svc def remove(self, name: str): @@ -419,24 +421,40 @@ def _create_nfs(self, svc: ServiceModel) -> None: except NFSError as e: raise ServiceError("unable to create nfs export") from e - def _save(self) -> None: + async def _save(self) -> None: assert self._is_ready() - assert gstate.config.options.service_state_path - path = Path(gstate.config.options.service_state_path) - path.parent.mkdir(mode=0o700, parents=True, exist_ok=True) state = StateModel(state=self._services) - path.write_text(state.json(indent=2)) + statestr = state.json() - def _load(self) -> None: + nodemgr = get_node_mgr() + await nodemgr.store.put("/services/state", statestr) + + def _load_state(self, value: str) -> None: + assert value + state = StateModel.parse_raw(value) + self._services = state.state + + async def _load(self) -> None: assert self._is_ready() - assert gstate.config.options.service_state_path - path = Path(gstate.config.options.service_state_path) - if not path.exists(): + nodemgr = get_node_mgr() + statestr = await nodemgr.store.get("/services/state") + if not statestr: return - state: StateModel = StateModel.parse_file(path) - self._services = state.state + self._load_state(statestr) + + async def _set_watchers(self) -> None: + assert self._is_ready() + + def _cb(key: str, value: str) -> None: + if not value: + logger.error("someone removed our state!") + return + self._load_state(value) + + nodemgr = get_node_mgr() + await nodemgr.store.watch("/services/state", _cb) _services: Services = Services() From ba50e74b4a1b643814910d78c4fb9138a2180425 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Sat, 10 Apr 2021 21:34:21 +0000 Subject: [PATCH 20/22] gravel: gstate: cleanup tickers on shutdown For those services needing to cleanup state, add a shutdown method to be called when we are shutting down. Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/gstate.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/gravel/controllers/gstate.py b/src/gravel/controllers/gstate.py index eac1fcf54..1f5a68f39 100644 --- a/src/gravel/controllers/gstate.py +++ b/src/gravel/controllers/gstate.py @@ -102,6 +102,9 @@ async def tick(self) -> None: self._is_ticking = False self._last_tick = time.monotonic() + async def shutdown(self) -> None: + pass + class GlobalState: @@ -140,12 +143,18 @@ async def tick(self) -> None: await self._do_ticks() logger.info("tick shutting down") + await self._shutdown_tickers() async def _do_ticks(self) -> None: for desc, ticker in self.tickers.items(): logger.debug(f"tick {desc}") asyncio.create_task(ticker.tick()) + async def _shutdown_tickers(self) -> None: + for desc, ticker in self.tickers.items(): + logger.debug(f"shutdown ticker {desc}") + await ticker.shutdown() + def add_ticker(self, desc: str, whom: Ticker) -> None: if desc not in self.tickers: self.tickers[desc] = whom From 44d0224cdf83e9022b5a65bc741834a8dcc77ff8 Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Sat, 10 Apr 2021 21:35:35 +0000 Subject: [PATCH 21/22] gravel: ctrl/svc: cleanup watcher on shutdown Signed-off-by: Joao Eduardo Luis --- src/gravel/controllers/services.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/gravel/controllers/services.py b/src/gravel/controllers/services.py index a751c47d2..fb430e740 100644 --- a/src/gravel/controllers/services.py +++ b/src/gravel/controllers/services.py @@ -12,7 +12,7 @@ # GNU General Public License for more details. from enum import Enum -from typing import Dict, List, Tuple +from typing import Dict, List, Optional, Tuple from logging import Logger from fastapi.logger import logger as fastapi_logger from pydantic import BaseModel @@ -133,6 +133,7 @@ class Services(Ticker): _services: Dict[str, ServiceModel] _ready: bool + _state_watcher_id: Optional[int] def __init__(self): super().__init__( @@ -141,6 +142,7 @@ def __init__(self): ) self._services = {} self._ready = False + self._state_watcher_id = None def _is_ready(self) -> bool: nodemgr: NodeMgr = get_node_mgr() @@ -157,6 +159,12 @@ async def _do_tick(self) -> None: self._ready = True logger.debug(f"tick {len(self._services)} services") + async def shutdown(self) -> None: + logger.info("shutdown services") + if self._state_watcher_id: + nodemgr = get_node_mgr() + await nodemgr.store.cancel_watch(self._state_watcher_id) + async def create( self, name: str, @@ -454,7 +462,8 @@ def _cb(key: str, value: str) -> None: self._load_state(value) nodemgr = get_node_mgr() - await nodemgr.store.watch("/services/state", _cb) + self._state_watcher_id = \ + await nodemgr.store.watch("/services/state", _cb) _services: Services = Services() From 9cc874943d7c3c7647284a152bb5614246e9d4aa Mon Sep 17 00:00:00 2001 From: Joao Eduardo Luis Date: Sat, 10 Apr 2021 21:35:59 +0000 Subject: [PATCH 22/22] aquarium: log shutdown event Signed-off-by: Joao Eduardo Luis --- src/aquarium.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/aquarium.py b/src/aquarium.py index a12bbcf42..bdfc97036 100644 --- a/src/aquarium.py +++ b/src/aquarium.py @@ -98,6 +98,7 @@ async def on_startup(): @app.on_event("shutdown") # type: ignore async def on_shutdown(): + logger.info("Aquarium shutdown!") await gstate.shutdown() await mgr.shutdown()