Skip to content
This repository has been archived by the owner on Feb 25, 2023. It is now read-only.

gravel: add etcd #403

Merged
merged 22 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1347206
images/microos: add etcd, python aetcd3
jecluis Apr 4, 2021
aa2a0b1
gravel: nodes/mgr: spin-off token generation
jecluis Apr 4, 2021
547a116
gravel: nodes/mgr: spawn etcd on start / bootstrap
jecluis Apr 4, 2021
85a0d34
gravel: add etcd support on multi-node deployments
jecluis Apr 4, 2021
857989f
gravel: nodes/mgr: make ceph.conf managed by cephadm
jecluis Apr 6, 2021
a5c7ca6
gravel: add kvstore controller
jecluis Apr 6, 2021
61fa349
gravel: nodes/mgr: move node init earlier in the file
jecluis Apr 7, 2021
7fe2d14
gravel: nodes/mgr: make start async, add shutdown
jecluis Apr 7, 2021
fd73020
gravel: nodes/mgr: make as async as possible
jecluis Apr 7, 2021
c086a16
gravel: nodes/mgr: make etcd spawn async
jecluis Apr 7, 2021
21eaea5
gravel: nodes/mgr: add kvstore support
jecluis Apr 7, 2021
61ad42b
gravel: nodes/mgr: keep token in kvstore
jecluis Apr 7, 2021
2fefa66
images: config.sh: install etcd from custom location
jecluis Apr 8, 2021
ac8d0af
gravel: nodes/mgr: obtain state on start, watch
jecluis Apr 8, 2021
a0ad5cd
gravel: ctrl/svc: make it a ticker
jecluis Apr 8, 2021
bb22107
gravel: ctrl/svc: gate operations if not ready
jecluis Apr 8, 2021
61021bb
gravel: kv: allow returning None on 'get'
jecluis Apr 9, 2021
5a973ad
gravel: nodes/mgr: expose kvstore to the world
jecluis Apr 9, 2021
c737488
gravel: ctrl/svc: keep state in kvstore
jecluis Apr 9, 2021
ba50e74
gravel: gstate: cleanup tickers on shutdown
jecluis Apr 10, 2021
44d0224
gravel: ctrl/svc: cleanup watcher on shutdown
jecluis Apr 10, 2021
9cc8749
aquarium: log shutdown event
jecluis Apr 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion images/microOS/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ EOF
fi

if [[ "$kiwi_profiles" == *"Ceph"* ]]; then
pip install fastapi uvicorn
pip install fastapi uvicorn \
https://celeuma.wipwd.dev/aqr/aetcd3/aetcd3-0.1.0a5.dev2+g0aa852e.d20210410-py3-none-any.whl
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this referring to the fix for martyanov/aetcd#4, which was since closed? In that case, do we still need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the initial comment, but no. Even though that PR was closed, there's a couple of additional patches on top of the original repo, one of them to fix an aiofiles's dependency version -- original repo requires <0.6, while uvicorn requires >= 0.6, and it's annoying to have to deal with those dependencies when pip installs. Another one is a missing await when shutting down the lib IIRC.

baseInsertService aquarium
fi

Expand Down
2 changes: 2 additions & 0 deletions images/microOS/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,8 @@
<package name="python3-requests"/>
<package name="python3-rados"/>
<package name="python3-websockets"/>
<package name="etcd"/>
<package name="etcdctl"/>
<archive name="aquarium.tar.gz"/>
</packages>

Expand Down
4 changes: 3 additions & 1 deletion src/aquarium.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async def on_startup():
logger.info("Aquarium startup!")

# init node mgr
mgr.init_node_mgr()
await mgr.init_node_mgr()

# create a task simply so we don't hold up the startup
asyncio.create_task(gstate.start())
Expand All @@ -98,7 +98,9 @@ async def on_startup():

@app.on_event("shutdown") # type: ignore
async def on_shutdown():
logger.info("Aquarium shutdown!")
await gstate.shutdown()
await mgr.shutdown()


api.include_router(local.router)
Expand Down
27 changes: 16 additions & 11 deletions src/gravel/api/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@
CephFSNoAuthorizationError
)
from gravel.controllers.orch.models import CephFSAuthorizationModel

from gravel.controllers.services import (
ConstraintsModel,
NotEnoughSpaceError,
NotReadyError,
ServiceError,
ServiceModel,
ServiceRequirementsModel,
ServiceStorageModel,
ServiceTypeEnum,
Services,
UnknownServiceError
UnknownServiceError,
get_services_ctrl
)


Expand Down Expand Up @@ -72,21 +72,21 @@ class CreateResponse(BaseModel):
response_model=ConstraintsModel
)
async def get_constraints() -> ConstraintsModel:
services = Services()
services = get_services_ctrl()
return services.constraints


@router.get("/", response_model=List[ServiceModel])
async def get_services() -> List[ServiceModel]:
services = Services()
services = get_services_ctrl()
return services.ls()


@router.get("/get/{service_name}",
name="Get service by name",
response_model=ServiceModel)
async def get_service(service_name: str) -> ServiceModel:
services = Services()
services = get_services_ctrl()
try:
return services.get(service_name)
except UnknownServiceError as e:
Expand All @@ -108,17 +108,17 @@ async def check_requirements(
detail="requires positive 'size' and number of 'replicas'"
)

services = Services()
services = get_services_ctrl()
feasible, reqs = services.check_requirements(size, replicas)
return RequirementsResponse(feasible=feasible, requirements=reqs)


@router.post("/create", response_model=CreateResponse)
async def create_service(req: CreateRequest) -> CreateResponse:

services = Services()
services = get_services_ctrl()
try:
services.create(req.name, req.type, req.size, req.replicas)
await services.create(req.name, req.type, req.size, req.replicas)
except NotImplementedError as e:
raise HTTPException(status.HTTP_501_NOT_IMPLEMENTED,
detail=str(e))
Expand All @@ -130,6 +130,8 @@ async def create_service(req: CreateRequest) -> CreateResponse:
except Exception as e:
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e))
except NotReadyError:
raise HTTPException(status.HTTP_425_TOO_EARLY)
return CreateResponse(success=True)


Expand All @@ -144,8 +146,11 @@ async def get_statistics() -> Dict[str, ServiceStorageModel]:
allocated space for said service and how much space is being used, along
with the service's space utilization.
"""
services = Services()
return services.get_stats()
services = get_services_ctrl()
try:
return services.get_stats()
except NotReadyError:
raise HTTPException(status.HTTP_425_TOO_EARLY)


@router.get(
Expand Down
5 changes: 5 additions & 0 deletions src/gravel/controllers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ class StatusOptionsModel(BaseModel):
probe_interval: float = Field(1.0, title="Status Probe Interval")


class ServicesOptionsModel(BaseModel):
probe_interval: float = Field(1.0, title="Services Probe Interval")


class OptionsModel(BaseModel):
service_state_path: Path = Field(Path(config_dir).joinpath("storage.json"),
title="Path to Service State file")
inventory: InventoryOptionsModel = Field(InventoryOptionsModel())
storage: StorageOptionsModel = Field(StorageOptionsModel())
devices: DevicesOptionsModel = Field(DevicesOptionsModel())
status: StatusOptionsModel = Field(StatusOptionsModel())
services: ServicesOptionsModel = Field(ServicesOptionsModel())


class ConfigModel(BaseModel):
Expand Down
9 changes: 9 additions & 0 deletions src/gravel/controllers/gstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ async def tick(self) -> None:
self._is_ticking = False
self._last_tick = time.monotonic()

async def shutdown(self) -> None:
pass


class GlobalState:

Expand Down Expand Up @@ -140,12 +143,18 @@ async def tick(self) -> None:
await self._do_ticks()

logger.info("tick shutting down")
await self._shutdown_tickers()

async def _do_ticks(self) -> None:
for desc, ticker in self.tickers.items():
logger.debug(f"tick {desc}")
asyncio.create_task(ticker.tick())

async def _shutdown_tickers(self) -> None:
for desc, ticker in self.tickers.items():
logger.debug(f"shutdown ticker {desc}")
await ticker.shutdown()

def add_ticker(self, desc: str, whom: Ticker) -> None:
if desc not in self.tickers:
self.tickers[desc] = whom
Expand Down
124 changes: 124 additions & 0 deletions src/gravel/controllers/kv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# project aquarium's backend
# Copyright (C) 2021 SUSE, LLC.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

import asyncio
from typing import Callable, Optional
import aetcd3
import aetcd3.locks
import aetcd3.events
import grpclib.exceptions

from logging import Logger
from fastapi.logger import logger as fastapi_logger

logger: Logger = fastapi_logger


class Lock:
_lock: aetcd3.locks.Lock
_is_acquired: bool

def __init__(self, lock: aetcd3.locks.Lock):
self._lock = lock
self._is_acquired = False

async def acquire(self) -> None:
await self._lock.acquire()
self._is_acquired = True

async def release(self) -> None:
if not self._is_acquired:
return
await self._lock.release()
self._is_acquired = False


class KV:

_client: Optional[aetcd3.Etcd3Client]
_is_closing: bool

def __init__(self):
self._client = None
self._is_open = False
self._is_closing = False

async def ensure_connection(self) -> None:
""" Open k/v store connection """
# try getting the status, loop until we make it.
opened = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we never make it? The KV store is unavailable forever?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. This might warrant a timeout or something. But yeah, that's it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably fine for now. It'll also presumably benefit once we have a cache later (at least we'll be able to read cached K/Vs while we're waiting for etcd)

while not self._is_closing:
try:
async with aetcd3.client() as client:
await client.status()
except Exception:
logger.warn("etcd not up yet? sleep.")
await asyncio.sleep(1.0)
continue
opened = True
break
if opened:
self._client = aetcd3.client()
logger.info("opened kvstore connection")

async def close(self) -> None:
""" Close k/v store connection """
self._is_closing = True
if not self._client:
return
await self._client.close()
self._client = None

async def put(self, key: str, value: str) -> None:
""" Put key/value pair """
assert self._client
await self._client.put(key, value)

async def get(self, key: str) -> Optional[str]:
""" Get value for provided key """
assert self._client
value, _ = await self._client.get(key)
if not value:
return None
return value.decode("utf-8")

async def rm(self, key: str) -> None:
""" Remove key from store """
assert self._client
await self._client.delete(key)

async def lock(self, key: str) -> Lock:
""" Lock a given key. Requires compliant consumers. """
assert self._client
return Lock(self._client.lock(key))

async def watch(
self,
key: str,
callback: Callable[[str, str], None]
) -> int:
""" Watch updates on a given key """
assert self._client

async def _cb(what: aetcd3.events.Event) -> None:
if not what or \
type(what) == grpclib.exceptions.StreamTerminatedError:
return
callback(what.key.decode("utf-8"), what.value.decode("utf-8"))

return await self._client.add_watch_callback(key, _cb)

async def cancel_watch(self, watch_id: int) -> None:
""" Cancel a watch """
assert self._client
await self._client.cancel_watch(watch_id)
1 change: 1 addition & 0 deletions src/gravel/controllers/nodes/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class WelcomeMessageModel(BaseModel):
pubkey: str
cephconf: str
keyring: str
etcd_peer: str


class ReadyToAddMessageModel(BaseModel):
Expand Down