diff --git a/images/microOS/config.sh b/images/microOS/config.sh
index ad6afdfb5..5b4e09c1c 100644
--- a/images/microOS/config.sh
+++ b/images/microOS/config.sh
@@ -174,7 +174,8 @@ EOF
fi
if [[ "$kiwi_profiles" == *"Ceph"* ]]; then
- pip install fastapi uvicorn
+ pip install fastapi uvicorn \
+ https://celeuma.wipwd.dev/aqr/aetcd3/aetcd3-0.1.0a5.dev2+g0aa852e.d20210410-py3-none-any.whl
baseInsertService aquarium
fi
diff --git a/images/microOS/config.xml b/images/microOS/config.xml
index a94b92a86..c2835491e 100644
--- a/images/microOS/config.xml
+++ b/images/microOS/config.xml
@@ -608,6 +608,8 @@
+
+
diff --git a/src/aquarium.py b/src/aquarium.py
index 7994e03a8..bdfc97036 100644
--- a/src/aquarium.py
+++ b/src/aquarium.py
@@ -89,7 +89,7 @@ async def on_startup():
logger.info("Aquarium startup!")
# init node mgr
- mgr.init_node_mgr()
+ await mgr.init_node_mgr()
# create a task simply so we don't hold up the startup
asyncio.create_task(gstate.start())
@@ -98,7 +98,9 @@ async def on_startup():
@app.on_event("shutdown") # type: ignore
async def on_shutdown():
+ logger.info("Aquarium shutdown!")
await gstate.shutdown()
+ await mgr.shutdown()
api.include_router(local.router)
diff --git a/src/gravel/api/services.py b/src/gravel/api/services.py
index aa1e75252..ec899c909 100644
--- a/src/gravel/api/services.py
+++ b/src/gravel/api/services.py
@@ -24,17 +24,17 @@
CephFSNoAuthorizationError
)
from gravel.controllers.orch.models import CephFSAuthorizationModel
-
from gravel.controllers.services import (
ConstraintsModel,
NotEnoughSpaceError,
+ NotReadyError,
ServiceError,
ServiceModel,
ServiceRequirementsModel,
ServiceStorageModel,
ServiceTypeEnum,
- Services,
- UnknownServiceError
+ UnknownServiceError,
+ get_services_ctrl
)
@@ -72,13 +72,13 @@ class CreateResponse(BaseModel):
response_model=ConstraintsModel
)
async def get_constraints() -> ConstraintsModel:
- services = Services()
+ services = get_services_ctrl()
return services.constraints
@router.get("/", response_model=List[ServiceModel])
async def get_services() -> List[ServiceModel]:
- services = Services()
+ services = get_services_ctrl()
return services.ls()
@@ -86,7 +86,7 @@ async def get_services() -> List[ServiceModel]:
name="Get service by name",
response_model=ServiceModel)
async def get_service(service_name: str) -> ServiceModel:
- services = Services()
+ services = get_services_ctrl()
try:
return services.get(service_name)
except UnknownServiceError as e:
@@ -108,7 +108,7 @@ async def check_requirements(
detail="requires positive 'size' and number of 'replicas'"
)
- services = Services()
+ services = get_services_ctrl()
feasible, reqs = services.check_requirements(size, replicas)
return RequirementsResponse(feasible=feasible, requirements=reqs)
@@ -116,9 +116,9 @@ async def check_requirements(
@router.post("/create", response_model=CreateResponse)
async def create_service(req: CreateRequest) -> CreateResponse:
- services = Services()
+ services = get_services_ctrl()
try:
- services.create(req.name, req.type, req.size, req.replicas)
+ await services.create(req.name, req.type, req.size, req.replicas)
except NotImplementedError as e:
raise HTTPException(status.HTTP_501_NOT_IMPLEMENTED,
detail=str(e))
@@ -130,6 +130,8 @@ async def create_service(req: CreateRequest) -> CreateResponse:
except Exception as e:
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e))
+ except NotReadyError:
+ raise HTTPException(status.HTTP_425_TOO_EARLY)
return CreateResponse(success=True)
@@ -144,8 +146,11 @@ async def get_statistics() -> Dict[str, ServiceStorageModel]:
allocated space for said service and how much space is being used, along
with the service's space utilization.
"""
- services = Services()
- return services.get_stats()
+ services = get_services_ctrl()
+ try:
+ return services.get_stats()
+ except NotReadyError:
+ raise HTTPException(status.HTTP_425_TOO_EARLY)
@router.get(
diff --git a/src/gravel/controllers/config.py b/src/gravel/controllers/config.py
index 8df0c90e0..b9a20740d 100644
--- a/src/gravel/controllers/config.py
+++ b/src/gravel/controllers/config.py
@@ -44,6 +44,10 @@ class StatusOptionsModel(BaseModel):
probe_interval: float = Field(1.0, title="Status Probe Interval")
+class ServicesOptionsModel(BaseModel):
+ probe_interval: float = Field(1.0, title="Services Probe Interval")
+
+
class OptionsModel(BaseModel):
service_state_path: Path = Field(Path(config_dir).joinpath("storage.json"),
title="Path to Service State file")
@@ -51,6 +55,7 @@ class OptionsModel(BaseModel):
storage: StorageOptionsModel = Field(StorageOptionsModel())
devices: DevicesOptionsModel = Field(DevicesOptionsModel())
status: StatusOptionsModel = Field(StatusOptionsModel())
+ services: ServicesOptionsModel = Field(ServicesOptionsModel())
class ConfigModel(BaseModel):
diff --git a/src/gravel/controllers/gstate.py b/src/gravel/controllers/gstate.py
index eac1fcf54..1f5a68f39 100644
--- a/src/gravel/controllers/gstate.py
+++ b/src/gravel/controllers/gstate.py
@@ -102,6 +102,9 @@ async def tick(self) -> None:
self._is_ticking = False
self._last_tick = time.monotonic()
+ async def shutdown(self) -> None:
+ pass
+
class GlobalState:
@@ -140,12 +143,18 @@ async def tick(self) -> None:
await self._do_ticks()
logger.info("tick shutting down")
+ await self._shutdown_tickers()
async def _do_ticks(self) -> None:
for desc, ticker in self.tickers.items():
logger.debug(f"tick {desc}")
asyncio.create_task(ticker.tick())
+ async def _shutdown_tickers(self) -> None:
+ for desc, ticker in self.tickers.items():
+ logger.debug(f"shutdown ticker {desc}")
+ await ticker.shutdown()
+
def add_ticker(self, desc: str, whom: Ticker) -> None:
if desc not in self.tickers:
self.tickers[desc] = whom
diff --git a/src/gravel/controllers/kv.py b/src/gravel/controllers/kv.py
new file mode 100644
index 000000000..fb35a0a0f
--- /dev/null
+++ b/src/gravel/controllers/kv.py
@@ -0,0 +1,124 @@
+# project aquarium's backend
+# Copyright (C) 2021 SUSE, LLC.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+import asyncio
+from typing import Callable, Optional
+import aetcd3
+import aetcd3.locks
+import aetcd3.events
+import grpclib.exceptions
+
+from logging import Logger
+from fastapi.logger import logger as fastapi_logger
+
+logger: Logger = fastapi_logger
+
+
+class Lock:
+ _lock: aetcd3.locks.Lock
+ _is_acquired: bool
+
+ def __init__(self, lock: aetcd3.locks.Lock):
+ self._lock = lock
+ self._is_acquired = False
+
+ async def acquire(self) -> None:
+ await self._lock.acquire()
+ self._is_acquired = True
+
+ async def release(self) -> None:
+ if not self._is_acquired:
+ return
+ await self._lock.release()
+ self._is_acquired = False
+
+
+class KV:
+
+ _client: Optional[aetcd3.Etcd3Client]
+ _is_closing: bool
+
+ def __init__(self):
+ self._client = None
+ self._is_open = False
+ self._is_closing = False
+
+ async def ensure_connection(self) -> None:
+ """ Open k/v store connection """
+ # try getting the status, loop until we make it.
+ opened = False
+ while not self._is_closing:
+ try:
+ async with aetcd3.client() as client:
+ await client.status()
+ except Exception:
+ logger.warn("etcd not up yet? sleep.")
+ await asyncio.sleep(1.0)
+ continue
+ opened = True
+ break
+ if opened:
+ self._client = aetcd3.client()
+ logger.info("opened kvstore connection")
+
+ async def close(self) -> None:
+ """ Close k/v store connection """
+ self._is_closing = True
+ if not self._client:
+ return
+ await self._client.close()
+ self._client = None
+
+ async def put(self, key: str, value: str) -> None:
+ """ Put key/value pair """
+ assert self._client
+ await self._client.put(key, value)
+
+ async def get(self, key: str) -> Optional[str]:
+ """ Get value for provided key """
+ assert self._client
+ value, _ = await self._client.get(key)
+ if not value:
+ return None
+ return value.decode("utf-8")
+
+ async def rm(self, key: str) -> None:
+ """ Remove key from store """
+ assert self._client
+ await self._client.delete(key)
+
+ async def lock(self, key: str) -> Lock:
+ """ Lock a given key. Requires compliant consumers. """
+ assert self._client
+ return Lock(self._client.lock(key))
+
+ async def watch(
+ self,
+ key: str,
+ callback: Callable[[str, str], None]
+ ) -> int:
+ """ Watch updates on a given key """
+ assert self._client
+
+ async def _cb(what: aetcd3.events.Event) -> None:
+ if not what or \
+ type(what) == grpclib.exceptions.StreamTerminatedError:
+ return
+ callback(what.key.decode("utf-8"), what.value.decode("utf-8"))
+
+ return await self._client.add_watch_callback(key, _cb)
+
+ async def cancel_watch(self, watch_id: int) -> None:
+ """ Cancel a watch """
+ assert self._client
+ await self._client.cancel_watch(watch_id)
diff --git a/src/gravel/controllers/nodes/messages.py b/src/gravel/controllers/nodes/messages.py
index 68e23472c..c845e9a1c 100644
--- a/src/gravel/controllers/nodes/messages.py
+++ b/src/gravel/controllers/nodes/messages.py
@@ -41,6 +41,7 @@ class WelcomeMessageModel(BaseModel):
pubkey: str
cephconf: str
keyring: str
+ etcd_peer: str
class ReadyToAddMessageModel(BaseModel):
diff --git a/src/gravel/controllers/nodes/mgr.py b/src/gravel/controllers/nodes/mgr.py
index 84db7928f..fa228f366 100644
--- a/src/gravel/controllers/nodes/mgr.py
+++ b/src/gravel/controllers/nodes/mgr.py
@@ -12,6 +12,7 @@
# GNU General Public License for more details.
import asyncio
+import multiprocessing
from enum import Enum
from logging import Logger
from uuid import UUID, uuid4
@@ -21,7 +22,9 @@
Optional,
)
from pathlib import Path
+import shlex
+import aetcd3
from pydantic import BaseModel
from fastapi import status
from fastapi.logger import logger as fastapi_logger
@@ -66,6 +69,7 @@
MessageTypeEnum,
)
from gravel.controllers.orch.orchestrator import Orchestrator
+from gravel.controllers.kv import KV
logger: Logger = fastapi_logger
@@ -115,6 +119,35 @@ class JoiningNodeModel(BaseModel):
address: str
+# We need to rely on "spawn" because otherwise the subprocess' signal
+# handler will play nasty tricks with uvicorn's and fastapi's signal
+# handlers.
+# For future reference,
+# - uvicorn issue: https://github.com/encode/uvicorn/issues/548
+# - python bug report: https://bugs.python.org/issue43064
+# - somewhat of a solution, using "spawn" for multiprocessing:
+# https://github.com/tiangolo/fastapi/issues/1487#issuecomment-657290725
+#
+# And because we need to rely on spawn, which will create a new python
+# interpreter to execute what we are specifying, the function we're passing
+# needs to be pickled. And nested functions, apparently, can't be pickled. Thus,
+# we need to have the functions at the top-level scope.
+#
+def _bootstrap_etcd_process(cmd: str):
+
+ async def _run_etcd():
+ etcd_cmd = shlex.split(cmd)
+ process = await asyncio.create_subprocess_exec(*etcd_cmd)
+ await process.wait()
+
+ loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(loop)
+ try:
+ loop.run_until_complete(_run_etcd())
+ except KeyboardInterrupt:
+ pass
+
+
class NodeMgr:
_init_stage: NodeInitStage
@@ -125,6 +158,7 @@ class NodeMgr:
_token: Optional[str]
_joining: Dict[str, JoiningNodeModel]
_bootstrapper: Optional[Bootstrap]
+ _kvstore: Optional[KV]
def __init__(self):
self._init_stage = NodeInitStage.NONE
@@ -133,24 +167,53 @@ def __init__(self):
self._token = None
self._joining = {}
self._bootstrapper = None
+ self._kvstore = None
+
+ multiprocessing.set_start_method("spawn")
self._node_init()
+
+ async def start(self) -> None:
assert self._state
- logger.debug(f"init > {self._state}")
+ logger.debug(f"start > {self._state}")
assert self._state.stage == NodeStageEnum.NONE or \
self._state.stage == NodeStageEnum.BOOTSTRAPPED or \
self._state.stage == NodeStageEnum.READY
if self._state.stage == NodeStageEnum.NONE:
- self._wait_inventory()
+ await self._wait_inventory()
else:
assert self._state.stage == NodeStageEnum.READY or \
self._state.stage == NodeStageEnum.BOOTSTRAPPED
- self._node_start()
+ await self._spawn_etcd(new=False, token=None)
+ await self._node_start()
+
+ async def shutdown(self) -> None:
+ if self._kvstore:
+ await self._kvstore.close()
+
+ def _node_init(self) -> None:
+ statefile: Path = self._get_node_file("node")
+ if not statefile.exists():
- def _node_prestart(self, nodeinfo: NodeInfoModel):
+ state = NodeStateModel(
+ uuid=uuid4(),
+ role=NodeRoleEnum.NONE,
+ stage=NodeStageEnum.NONE,
+ address=None,
+ hostname=None
+ )
+ try:
+ statefile.write_text(state.json())
+ except Exception as e:
+ raise NodeError(str(e))
+ assert statefile.exists()
+
+ self._state = NodeStateModel.parse_file(statefile)
+
+ async def _node_prestart(self, nodeinfo: NodeInfoModel):
""" sets hostname and addresses; allows bootstrap, join. """
assert self._state.stage == NodeStageEnum.NONE
assert self._init_stage == NodeInitStage.NONE
@@ -174,9 +237,9 @@ def _node_prestart(self, nodeinfo: NodeInfoModel):
address = address[:netmask_idx]
self._state.address = address
- self._save_state()
+ await self._save_state()
- def _node_start(self) -> None:
+ async def _node_start(self) -> None:
""" node is ready to accept incoming messages, if leader """
assert self._state
assert self._state.stage == NodeStageEnum.READY or \
@@ -185,7 +248,8 @@ def _node_start(self) -> None:
logger.info("start node")
- self._load()
+ await self._obtain_state()
+ await self._load()
self._init_stage = NodeInitStage.STARTED
if self._state.role != NodeRoleEnum.LEADER:
@@ -200,15 +264,22 @@ def _node_shutdown(self) -> None:
self._init_stage = NodeInitStage.STOPPING
self._incoming_task.cancel()
- def _wait_inventory(self) -> None:
+ async def _wait_inventory(self) -> None:
async def _subscriber(nodeinfo: NodeInfoModel) -> None:
logger.debug(f"subscriber > node info: {nodeinfo}")
assert nodeinfo
- self._node_prestart(nodeinfo)
+ await self._node_prestart(nodeinfo)
get_inventory().subscribe(_subscriber, once=True)
+ def _generate_token(self) -> str:
+ def gen() -> str:
+ return ''.join(random.choice("0123456789abcdef") for _ in range(4))
+
+ tokenstr = '-'.join(gen() for _ in range(4))
+ return tokenstr
+
async def join(self, leader_address: str, token: str) -> bool:
logger.debug(
f"join > with leader {leader_address}, token: {token}"
@@ -248,7 +319,7 @@ async def join(self, leader_address: str, token: str) -> bool:
await conn.send(msg)
self._state.stage = NodeStageEnum.JOINING
- self._save_state()
+ await self._save_state()
reply: MessageModel = await conn.receive()
logger.debug(f"join > recv: {reply}")
@@ -257,7 +328,7 @@ async def join(self, leader_address: str, token: str) -> bool:
logger.error(f"join > error: {errmsg.what}")
await conn.close()
self._state.stage = NodeStageEnum.NONE
- self._save_state()
+ await self._save_state()
return False
assert reply.type == MessageTypeEnum.WELCOME
@@ -265,6 +336,16 @@ async def join(self, leader_address: str, token: str) -> bool:
assert welcome.pubkey
assert welcome.cephconf
assert welcome.keyring
+ assert welcome.etcd_peer
+
+ my_url: str = \
+ f"{self._state.hostname}=http://{self._state.address}:2380"
+ initial_cluster: str = f"{welcome.etcd_peer},{my_url}"
+ await self._spawn_etcd(
+ new=False,
+ token=None,
+ initial_cluster=initial_cluster
+ )
authorized_keys: Path = Path("/root/.ssh/authorized_keys")
if not authorized_keys.parent.exists():
@@ -293,12 +374,11 @@ async def join(self, leader_address: str, token: str) -> bool:
self._state.stage = NodeStageEnum.READY
self._state.role = NodeRoleEnum.FOLLOWER
- self._save_state()
+ await self._save_state()
self._token = token
- self._save_token(should_exist=False)
- self._node_start()
+ await self._node_start()
return True
async def bootstrap(self) -> None:
@@ -337,6 +417,58 @@ async def finish_bootstrap_cb(
logger.error(f"bootstrap error: {e.message}")
raise NodeCantBootstrapError(e.message)
+ async def _spawn_etcd(
+ self,
+ new: bool,
+ token: Optional[str],
+ initial_cluster: Optional[str] = None
+ ) -> None:
+
+ assert self._state.hostname
+ assert self._state.address
+ hostname = self._state.hostname
+ address = self._state.address
+
+ logger.info(f"starting etcd, hostname: {hostname}, addr: {address}")
+
+ client_url: str = f"http://{address}:2379"
+ peer_url: str = f"http://{address}:2380"
+
+ if not initial_cluster:
+ initial_cluster = f"{hostname}={peer_url}"
+
+ args_dict: Dict[str, str] = {
+ "name": hostname,
+ "initial-advertise-peer-urls": peer_url,
+ "listen-peer-urls": peer_url,
+ "listen-client-urls": f"{client_url},http://127.0.0.1:2379",
+ "advertise-client-urls": client_url,
+ "initial-cluster": initial_cluster,
+ "initial-cluster-state": "existing",
+ "data-dir": f"/var/lib/etcd/{hostname}.etcd"
+ }
+
+ if new:
+ assert token
+ args_dict["initial-cluster-token"] = token
+ args_dict["initial-cluster-state"] = "new"
+
+ args = " ".join([f"--{k} {v}" for k, v in args_dict.items()])
+ logger.debug(f"spawn etcd: {args}")
+ etcd_cmd = f"etcd {args}"
+
+ process = multiprocessing.Process(
+ target=_bootstrap_etcd_process,
+ args=(etcd_cmd,)
+ )
+ process.start()
+ logger.info(f"started etcd process pid = {process.pid}")
+ self._kvstore = KV()
+ await self._kvstore.ensure_connection()
+
+ async def _bootstrap_etcd(self, token: str) -> None:
+ await self._spawn_etcd(new=True, token=token)
+
async def _prepare_bootstrap(self) -> None:
assert self._state
if self._state.stage == NodeStageEnum.BOOTSTRAPPING:
@@ -346,13 +478,18 @@ async def _prepare_bootstrap(self) -> None:
elif self._init_stage < NodeInitStage.PRESTART:
raise NodeNotStartedError()
+ self._token = self._generate_token()
+ logger.info(f"generated new token: {self._token}")
+ await self._bootstrap_etcd(self._token)
+ await self._save_token()
+
async def _start_bootstrap(self) -> None:
assert self._state
assert self._state.stage == NodeStageEnum.NONE
assert self._state.hostname
assert self._state.address
self._state.stage = NodeStageEnum.BOOTSTRAPPING
- self._save_state()
+ await self._save_state()
async def _finish_bootstrap_error(self, error: str) -> None:
"""
@@ -364,7 +501,7 @@ async def _finish_bootstrap_error(self, error: str) -> None:
assert self._state.stage == NodeStageEnum.BOOTSTRAPPING
self._state.stage = NodeStageEnum.ERROR
- self._save_state()
+ await self._save_state()
async def _finish_bootstrap(self):
"""
@@ -379,19 +516,12 @@ async def _finish_bootstrap(self):
self._state.stage = NodeStageEnum.BOOTSTRAPPED
self._state.role = NodeRoleEnum.LEADER
- self._save_state()
-
- def gen() -> str:
- return ''.join(random.choice("0123456789abcdef") for _ in range(4))
-
- tokenstr = '-'.join(gen() for _ in range(4))
- self._token = tokenstr
- self._save_token(should_exist=False)
+ await self._save_state()
- logger.debug(f"finished bootstrap: token = {tokenstr}")
+ logger.debug(f"finished bootstrap: token = {self._token}")
- self._load()
- self._node_start()
+ await self._load()
+ await self._node_start()
async def _finish_bootstrap_config(self) -> None:
mon: Mon = Mon()
@@ -411,6 +541,14 @@ async def _finish_bootstrap_config(self) -> None:
if not res:
logger.error("unable to set default ruleset")
+ res = mon.config_set(
+ "mgr",
+ "mgr/cephadm/manage_etc_ceph_ceph_conf",
+ "true"
+ )
+ if not res:
+ logger.error("unable to enable managed ceph.conf by cephadm")
+
@property
def bootstrapper_stage(self) -> BootstrapStage:
if not self._bootstrapper:
@@ -446,12 +584,16 @@ async def finish_deployment(self) -> None:
return
self._state.stage = NodeStageEnum.READY
- self._save_state()
+ await self._save_state()
@property
def inited(self) -> bool:
return self._init_stage >= NodeInitStage.PRESTART
+ @property
+ def started(self) -> bool:
+ return self._init_stage == NodeInitStage.STARTED
+
@property
def stage(self) -> NodeStageEnum:
assert self._state
@@ -493,63 +635,44 @@ def connmgr(self) -> ConnMgr:
def token(self) -> Optional[str]:
return self._token
- def _get_node_file(self, what: str) -> Path:
- confdir: Path = gstate.config.confdir
- assert confdir.exists()
- assert confdir.is_dir()
- return confdir.joinpath(f"{what}.json")
+ @property
+ def store(self) -> KV:
+ assert self._kvstore
+ return self._kvstore
- def _node_init(self) -> None:
- statefile: Path = self._get_node_file("node")
- if not statefile.exists():
- # other control files must not exist either
- tokenfile: Path = self._get_node_file("token")
- assert not tokenfile.exists()
+ async def _obtain_state(self) -> None:
+ assert self._kvstore
- state = NodeStateModel(
- uuid=uuid4(),
- role=NodeRoleEnum.NONE,
- stage=NodeStageEnum.NONE,
- address=None,
- hostname=None
- )
- try:
- statefile.write_text(state.json())
- except Exception as e:
- raise NodeError(str(e))
- assert statefile.exists()
+ def _watcher(key: str, value: str) -> None:
+ if key == "/nodes/token":
+ logger.info(f"got updated token: {value}")
+ self._token = value
- self._state = NodeStateModel.parse_file(statefile)
+ self._token = await self._load_token()
+ await self._kvstore.watch("/nodes/token", _watcher)
- def _load(self) -> None:
- self._token = self._load_token()
-
- def _load_token(self) -> Optional[str]:
- assert self._state
+ def _get_node_file(self, what: str) -> Path:
confdir: Path = gstate.config.confdir
assert confdir.exists()
assert confdir.is_dir()
- tokenfile: Path = confdir.joinpath("token.json")
- if not tokenfile.exists():
- assert self._state.stage < NodeStageEnum.BOOTSTRAPPED
- return None
- token = TokenModel.parse_file(tokenfile)
- return token.token
+ return confdir.joinpath(f"{what}.json")
- def _save_token(self, should_exist: bool = True) -> None:
- tokenfile: Path = self._get_node_file("token")
+ async def _load(self) -> None:
+ self._token = await self._load_token()
- # this check could be a single assert, but it's important to know which
- # path failed.
- if should_exist:
- assert tokenfile.exists()
- else:
- assert not tokenfile.exists()
+ async def _load_token(self) -> Optional[str]:
+ assert self._kvstore
+ tokenstr = await self._kvstore.get("/nodes/token")
+ assert tokenstr
+ return tokenstr
- token: TokenModel = TokenModel(token=self._token)
- tokenfile.write_text(token.json())
+ async def _save_token(self) -> None:
+ assert self._token
+ logger.info(f"saving token: {self._token}")
+ assert self._kvstore
+ await self._kvstore.put("/nodes/token", self._token)
- def _save_state(self, should_exist: bool = True) -> None:
+ async def _save_state(self, should_exist: bool = True) -> None:
statefile: Path = self._get_node_file("node")
# this check could be a single assert, but it's important to know which
@@ -646,10 +769,20 @@ async def _handle_join(
logger.debug(f"handle join > pubkey: {pubkey}")
+ logger.debug("handle join > connect etcd client")
+ etcd: aetcd3.Etcd3Client = aetcd3.client()
+ peer_url: str = f"http://{msg.address}:2380"
+ logger.debug(f"handle join > add '{peer_url}' to etcd")
+ member = await etcd.add_member([peer_url])
+ assert member is not None
+
+ my_url: str = \
+ f"{self._state.hostname}=http://{self._state.address}:2380"
welcome = WelcomeMessageModel(
pubkey=pubkey,
cephconf=cephconf,
- keyring=keyring
+ keyring=keyring,
+ etcd_peer=my_url
)
try:
logger.debug(f"handle join > send welcome: {welcome}")
@@ -712,7 +845,17 @@ def get_node_mgr() -> NodeMgr:
return _nodemgr
-def init_node_mgr() -> None:
+async def init_node_mgr() -> None:
global _nodemgr
assert not _nodemgr
+ logger.info("starting node manager")
_nodemgr = NodeMgr()
+ await _nodemgr.start()
+
+
+async def shutdown() -> None:
+ global _nodemgr
+ if _nodemgr:
+ logger.info("shutting down node manager")
+ await _nodemgr.shutdown()
+ _nodemgr = None
diff --git a/src/gravel/controllers/resources/status.py b/src/gravel/controllers/resources/status.py
index 0df3fc9e7..9cb976d63 100644
--- a/src/gravel/controllers/resources/status.py
+++ b/src/gravel/controllers/resources/status.py
@@ -38,7 +38,8 @@
)
from gravel.controllers.services import (
ServiceTypeEnum,
- Services
+ Services,
+ get_services_ctrl
)
@@ -121,7 +122,7 @@ def client_io_rate(self) -> OverallClientIORateModel:
raise ClientIORateNotAvailableError()
services_rates: List[ServiceIORateModel] = []
- services: Services = Services()
+ services: Services = get_services_ctrl()
for service in services.ls():
svc_name: str = service.name
svc_type: ServiceTypeEnum = service.type
diff --git a/src/gravel/controllers/services.py b/src/gravel/controllers/services.py
index 73bb276e7..fb430e740 100644
--- a/src/gravel/controllers/services.py
+++ b/src/gravel/controllers/services.py
@@ -12,12 +12,15 @@
# GNU General Public License for more details.
from enum import Enum
-from pathlib import Path
-from typing import Dict, List, Tuple
+from typing import Dict, List, Optional, Tuple
from logging import Logger
from fastapi.logger import logger as fastapi_logger
from pydantic import BaseModel
from pydantic.fields import Field
+from gravel.controllers.nodes.mgr import (
+ NodeMgr,
+ get_node_mgr
+)
from gravel.controllers.orch.ceph import Mon
from gravel.controllers.orch.cephfs import CephFS, CephFSError
@@ -31,7 +34,10 @@
NFSExport,
NFSService
)
-from gravel.controllers.gstate import gstate
+from gravel.controllers.gstate import (
+ Ticker,
+ gstate
+)
from gravel.controllers.resources.devices import (
DeviceHostModel,
Devices,
@@ -62,6 +68,10 @@ class NotEnoughSpaceError(ServiceError):
pass
+class NotReadyError(ServiceError):
+ pass
+
+
class ServiceTypeEnum(str, Enum):
CEPHFS = "cephfs"
NFS = "nfs"
@@ -119,19 +129,53 @@ class ServiceStorageModel(BaseModel):
utilization: float = Field(0, title="Utilization")
-class Services:
+class Services(Ticker):
_services: Dict[str, ServiceModel]
+ _ready: bool
+ _state_watcher_id: Optional[int]
def __init__(self):
+ super().__init__(
+ "services",
+ gstate.config.options.services.probe_interval
+ )
self._services = {}
- self._load()
+ self._ready = False
+ self._state_watcher_id = None
+
+ def _is_ready(self) -> bool:
+ nodemgr: NodeMgr = get_node_mgr()
+ return nodemgr.started
+
+ async def _should_tick(self) -> bool:
+ return self._is_ready()
+
+ async def _do_tick(self) -> None:
+ assert self._is_ready()
+ if not self._ready:
+ await self._load()
+ await self._set_watchers()
+ self._ready = True
+ logger.debug(f"tick {len(self._services)} services")
+
+ async def shutdown(self) -> None:
+ logger.info("shutdown services")
+ if self._state_watcher_id:
+ nodemgr = get_node_mgr()
+ await nodemgr.store.cancel_watch(self._state_watcher_id)
+
+ async def create(
+ self,
+ name: str,
+ type: ServiceTypeEnum,
+ size: int,
+ replicas: int
+ ) -> ServiceModel:
+
+ if not self._is_ready():
+ raise NotReadyError()
- def create(self, name: str,
- type: ServiceTypeEnum,
- size: int,
- replicas: int
- ) -> ServiceModel:
if name in self._services:
raise ServiceExistsError(f"service {name} already exists")
@@ -156,10 +200,12 @@ def create(self, name: str,
raise NotImplementedError(f"unknown service type: {svc.type}")
self._services[name] = svc
- self._save()
+ await self._save()
return svc
def remove(self, name: str):
+ if not self._is_ready():
+ raise NotReadyError()
pass
def ls(self) -> List[ServiceModel]:
@@ -265,6 +311,10 @@ def get_stats(self) -> Dict[str, ServiceStorageModel]:
used for each service, and utilization as a function of the used space
and the allocated space.
"""
+
+ if not self._is_ready():
+ raise NotReadyError()
+
storage: Storage = get_storage()
storage_pools: Dict[int, StoragePoolModel] = storage.usage().pools_by_id
@@ -297,6 +347,8 @@ def get_stats(self) -> Dict[str, ServiceStorageModel]:
return services
def _create_cephfs(self, svc: ServiceModel) -> None:
+ assert self._is_ready()
+
cephfs = CephFS()
try:
cephfs.create(svc.name)
@@ -341,6 +393,8 @@ def get_pool(name: str) -> CephOSDPoolEntryModel:
# client.
def _create_nfs(self, svc: ServiceModel) -> None:
+ assert self._is_ready()
+
# create a cephfs
self._create_cephfs(svc)
@@ -375,17 +429,47 @@ def _create_nfs(self, svc: ServiceModel) -> None:
except NFSError as e:
raise ServiceError("unable to create nfs export") from e
- def _save(self) -> None:
- assert gstate.config.options.service_state_path
- path = Path(gstate.config.options.service_state_path)
- path.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
+ async def _save(self) -> None:
+ assert self._is_ready()
+
state = StateModel(state=self._services)
- path.write_text(state.json(indent=2))
+ statestr = state.json()
- def _load(self) -> None:
- assert gstate.config.options.service_state_path
- path = Path(gstate.config.options.service_state_path)
- if not path.exists():
- return
- state: StateModel = StateModel.parse_file(path)
+ nodemgr = get_node_mgr()
+ await nodemgr.store.put("/services/state", statestr)
+
+ def _load_state(self, value: str) -> None:
+ assert value
+ state = StateModel.parse_raw(value)
self._services = state.state
+
+ async def _load(self) -> None:
+ assert self._is_ready()
+
+ nodemgr = get_node_mgr()
+ statestr = await nodemgr.store.get("/services/state")
+ if not statestr:
+ return
+ self._load_state(statestr)
+
+ async def _set_watchers(self) -> None:
+ assert self._is_ready()
+
+ def _cb(key: str, value: str) -> None:
+ if not value:
+ logger.error("someone removed our state!")
+ return
+ self._load_state(value)
+
+ nodemgr = get_node_mgr()
+ self._state_watcher_id = \
+ await nodemgr.store.watch("/services/state", _cb)
+
+
+_services: Services = Services()
+
+
+def get_services_ctrl() -> Services:
+ global _services
+ assert _services
+ return _services
diff --git a/src/gravel/typings/aetcd3/__init__.pyi b/src/gravel/typings/aetcd3/__init__.pyi
new file mode 100644
index 000000000..9766491cd
--- /dev/null
+++ b/src/gravel/typings/aetcd3/__init__.pyi
@@ -0,0 +1,21 @@
+# ignore because we're not using it: from . import etcdrpc
+from .client import (
+ Etcd3Client,
+ Transactions,
+ client
+)
+from .exceptions import Etcd3Exception # type: ignore
+from .leases import Lease # type: ignore
+from .locks import Lock
+from .members import Member # type: ignore
+
+__all__ = (
+ 'Etcd3Client',
+ 'Etcd3Exception',
+ 'Lease',
+ 'Lock',
+ 'Member',
+ 'Transactions',
+ 'client',
+ # 'etcdrpc',
+)
diff --git a/src/gravel/typings/aetcd3/client.pyi b/src/gravel/typings/aetcd3/client.pyi
new file mode 100644
index 000000000..031ab4b93
--- /dev/null
+++ b/src/gravel/typings/aetcd3/client.pyi
@@ -0,0 +1,248 @@
+# project aquarium's backend
+# Copyright (C) 2021 SUSE, LLC.
+
+# pyright: reportMissingTypeStubs=false
+from typing import (
+ Any,
+ AsyncIterator,
+ Awaitable,
+ Callable,
+ Dict,
+ Generator,
+ IO,
+ List,
+ Optional,
+ Tuple,
+ Union
+)
+
+from aetcd3.etcdrpc.rpc_pb2 import (
+ Compare,
+ DeleteRangeResponse,
+ LeaseKeepAliveResponse,
+ LeaseRevokeResponse,
+ LeaseTimeToLiveResponse,
+ PutResponse,
+ RequestOp,
+ ResponseOp
+)
+from aetcd3.events import Event
+from aetcd3.leases import Lease
+from aetcd3.locks import Lock
+from aetcd3.members import Member
+
+
+class Transactions:
+ ...
+
+
+class KVMetadata:
+ key: bytes
+ create_revision: int
+ mod_revision: int
+ version: int
+ lease_id: int
+ ...
+
+
+class Status:
+ ...
+
+
+class Alarm:
+ ...
+
+
+class Etcd3Client:
+
+ def __init__(
+ self,
+ host: str = ...,
+ port: int = ...,
+ ca_cert: Optional[str] = ...,
+ cert_key: Optional[str] = ...,
+ cert_cert: Optional[str] = ...,
+ timeout: Optional[float] = ...,
+ user: Optional[str] = ...,
+ password: Optional[str] = ...,
+ gprc_options: Dict[Any, Any] = ...
+ ) -> None:
+ ...
+
+ async def open(self) -> None: ...
+ async def close(self) -> None: ...
+
+ async def __aenter__(self) -> Any:
+ ...
+
+ async def __aexit__(self, *args: Any) -> Any:
+ ...
+
+ async def get(
+ self,
+ key: str,
+ serializable: bool = ...
+ ) -> Tuple[bytes, KVMetadata]: ...
+
+ async def get_prefix(
+ self,
+ key_prefix: str,
+ sort_order: Optional[str] = ...,
+ sort_target: str = ...,
+ keys_only: bool = ...
+ ) -> Generator[Tuple[bytes, KVMetadata], None, None]:
+ ...
+
+ async def get_range(
+ self,
+ range_start: str,
+ range_end: str,
+ sort_order: Optional[str] = ...,
+ sort_target: str = ...,
+ **kwargs: Any
+ ) -> Generator[Tuple[bytes, KVMetadata], None, None]:
+ ...
+
+ async def get_all(
+ self,
+ sort_order: Optional[str] = ...,
+ sort_target: Optional[str] = ...,
+ keys_only: bool = ...
+ ) -> Generator[Tuple[bytes, KVMetadata], None, None]:
+ ...
+ ...
+
+ async def put(
+ self,
+ key: str,
+ value: Union[bytes, str],
+ lease: Optional[Union[Lease, int]] = ...,
+ prev_kv: bool = ...
+ ) -> PutResponse:
+ ...
+
+ async def replace(
+ self,
+ key: str,
+ initial_value: Union[bytes, str],
+ new_value: Union[bytes, str]
+ ) -> bool:
+ ...
+
+ async def delete(
+ self,
+ key: str,
+ prev_kv: bool = ...,
+ return_response: bool = ...
+ ) -> Union[bool, DeleteRangeResponse]:
+ ...
+
+ async def delete_prefix(self, prefix: str) -> DeleteRangeResponse: ...
+ async def status(self) -> Status: ...
+
+ async def add_watch_callback(
+ self,
+ key: str,
+ callback: Callable[[Event], Awaitable[None]],
+ **kwargs: Any
+ ) -> int:
+ ...
+
+ async def watch(
+ self,
+ key: str,
+ **kwargs: Any
+ ) -> Tuple[AsyncIterator[Event], Awaitable[None]]:
+ ...
+
+ async def watch_prefix(
+ self,
+ key_prefix: str,
+ **kwargs: Any
+ ) -> Tuple[AsyncIterator[Event], Awaitable[None]]:
+ ...
+
+ async def watch_once(
+ self,
+ key: str,
+ timeout: Optional[float] = ...,
+ **kwargs: Any
+ ) -> Any:
+ ...
+
+ async def watch_prefix_once(
+ self,
+ key_prefix: str,
+ timeout: Optional[float] = ...,
+ **kwargs: Any
+ ) -> Any:
+ ...
+
+ async def cancel_watch(
+ self,
+ watch_id: int
+ ) -> None:
+ ...
+
+ async def transaction(
+ self,
+ compare: List[Compare],
+ success: Optional[List[RequestOp]] = ...,
+ failure: Optional[List[RequestOp]] = ...
+ ) -> Tuple[bool, List[ResponseOp]]:
+ ...
+
+ async def lease(self, ttl: int, id: int) -> Lease: ...
+ async def revoke_lease(self, lease_id: int) -> LeaseRevokeResponse: ...
+ async def refresh_lease(self, lease_id: int) -> LeaseKeepAliveResponse: ...
+
+ async def get_lease_info(
+ self,
+ lease_id: int,
+ *,
+ keys: bool = ...
+ ) -> LeaseTimeToLiveResponse:
+ ...
+
+ def lock(self, name: str, ttl: int = ...) -> Lock: ...
+
+ async def add_member(self, urls: List[str]) -> Member: ...
+ async def remove_member(self, member_id: int) -> None: ...
+
+ async def update_member(
+ self,
+ member_id: int,
+ peer_urls: List[str]
+ ) -> None: ...
+
+ async def members(self) -> Generator[Member, None, None]: ...
+ async def compact(self, revision: int, physical: bool = ...) -> None: ...
+ async def defragment(self) -> None: ...
+ async def hash(self) -> int: ...
+ async def create_alarm(self, member_id: int = ...) -> List[Alarm]: ...
+
+ async def list_alarms(
+ self,
+ member_id: int,
+ alarm_type: str = ...
+ ) -> Generator[Alarm, None, None]:
+ ...
+
+ async def disarm_alarm(self, member_id: int = ...) -> List[Alarm]: ...
+ async def snapshot(self, file_obj: IO[bytes]) -> None: ...
+
+ ...
+
+
+def client(
+ host: str = ...,
+ port: int = ...,
+ ca_cert: Optional[str] = ...,
+ cert_key: Optional[str] = ...,
+ timeout: Optional[float] = ...,
+ cert_cert: Optional[str] = ...,
+ user: Optional[str] = ...,
+ password: Optional[str] = ...,
+ **kwargs: Any
+) -> Etcd3Client:
+ ...
diff --git a/src/gravel/typings/aetcd3/events.pyi b/src/gravel/typings/aetcd3/events.pyi
new file mode 100644
index 000000000..ae59389cf
--- /dev/null
+++ b/src/gravel/typings/aetcd3/events.pyi
@@ -0,0 +1,7 @@
+# project aquarium's backend
+# Copyright (C) 2021 SUSE, LLC.
+
+class Event:
+
+ key: bytes
+ value: bytes
diff --git a/src/gravel/typings/aetcd3/locks.pyi b/src/gravel/typings/aetcd3/locks.pyi
new file mode 100644
index 000000000..cb2f2d6ee
--- /dev/null
+++ b/src/gravel/typings/aetcd3/locks.pyi
@@ -0,0 +1,23 @@
+# project aquarium's backend
+# Copyright (C) 2021 SUSE, LLC.
+
+# pyright: reportMissingTypeStubs=false
+from typing import Optional
+import aetcd3
+from aetcd3.etcdrpc.rpc_pb2 import LeaseKeepAliveResponse
+
+
+class Lock:
+
+ def __init__(
+ self,
+ name: str,
+ ttl: int = ...,
+ etcd_client: Optional[aetcd3.Etcd3Client] = ...
+ ) -> None:
+ ...
+
+ async def acquire(self, timeout: int = ...) -> bool: ...
+ async def release(self) -> bool: ...
+ async def refresh(self) -> LeaseKeepAliveResponse: ...
+ async def is_acquired(self) -> bool: ...
diff --git a/src/mypy.ini b/src/mypy.ini
index 369fea84c..a007daab7 100644
--- a/src/mypy.ini
+++ b/src/mypy.ini
@@ -9,3 +9,6 @@ allow_redefinition = True
[mypy-pyfakefs.*]
ignore_missing_imports = True
+
+[mypy-aetcd3.*]
+ignore_missing_imports = True
\ No newline at end of file
diff --git a/src/requirements.txt b/src/requirements.txt
index 1039c3857..0636b4195 100644
--- a/src/requirements.txt
+++ b/src/requirements.txt
@@ -1,4 +1,4 @@
-aiofiles==0.6.0
+aiofiles
click==7.1.2
fastapi==0.63.0
h11==0.12.0
@@ -7,3 +7,4 @@ starlette==0.13.6
uvicorn==0.13.3
pip
websockets==8.1
+https://celeuma.wipwd.dev/aqr/aetcd3/aetcd3-0.1.0a5.dev2+g0aa852e.d20210410-py3-none-any.whl