Skip to content
This repository has been archived by the owner on Feb 25, 2023. It is now read-only.

Commit

Permalink
Merge pull request #403 from jecluis/wip-etcd
Browse files Browse the repository at this point in the history
gravel: add etcd
  • Loading branch information
jecluis committed Apr 13, 2021
2 parents f25268a + 9cc8749 commit 649a96b
Show file tree
Hide file tree
Showing 17 changed files with 794 additions and 114 deletions.
3 changes: 2 additions & 1 deletion images/microOS/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ EOF
fi

if [[ "$kiwi_profiles" == *"Ceph"* ]]; then
pip install fastapi uvicorn
pip install fastapi uvicorn \
https://celeuma.wipwd.dev/aqr/aetcd3/aetcd3-0.1.0a5.dev2+g0aa852e.d20210410-py3-none-any.whl
baseInsertService aquarium
fi

Expand Down
2 changes: 2 additions & 0 deletions images/microOS/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,8 @@
<package name="python3-requests"/>
<package name="python3-rados"/>
<package name="python3-websockets"/>
<package name="etcd"/>
<package name="etcdctl"/>
<archive name="aquarium.tar.gz"/>
</packages>

Expand Down
4 changes: 3 additions & 1 deletion src/aquarium.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async def on_startup():
logger.info("Aquarium startup!")

# init node mgr
mgr.init_node_mgr()
await mgr.init_node_mgr()

# create a task simply so we don't hold up the startup
asyncio.create_task(gstate.start())
Expand All @@ -98,7 +98,9 @@ async def on_startup():

@app.on_event("shutdown") # type: ignore
async def on_shutdown():
logger.info("Aquarium shutdown!")
await gstate.shutdown()
await mgr.shutdown()


api.include_router(local.router)
Expand Down
27 changes: 16 additions & 11 deletions src/gravel/api/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@
CephFSNoAuthorizationError
)
from gravel.controllers.orch.models import CephFSAuthorizationModel

from gravel.controllers.services import (
ConstraintsModel,
NotEnoughSpaceError,
NotReadyError,
ServiceError,
ServiceModel,
ServiceRequirementsModel,
ServiceStorageModel,
ServiceTypeEnum,
Services,
UnknownServiceError
UnknownServiceError,
get_services_ctrl
)


Expand Down Expand Up @@ -72,21 +72,21 @@ class CreateResponse(BaseModel):
response_model=ConstraintsModel
)
async def get_constraints() -> ConstraintsModel:
services = Services()
services = get_services_ctrl()
return services.constraints


@router.get("/", response_model=List[ServiceModel])
async def get_services() -> List[ServiceModel]:
services = Services()
services = get_services_ctrl()
return services.ls()


@router.get("/get/{service_name}",
name="Get service by name",
response_model=ServiceModel)
async def get_service(service_name: str) -> ServiceModel:
services = Services()
services = get_services_ctrl()
try:
return services.get(service_name)
except UnknownServiceError as e:
Expand All @@ -108,17 +108,17 @@ async def check_requirements(
detail="requires positive 'size' and number of 'replicas'"
)

services = Services()
services = get_services_ctrl()
feasible, reqs = services.check_requirements(size, replicas)
return RequirementsResponse(feasible=feasible, requirements=reqs)


@router.post("/create", response_model=CreateResponse)
async def create_service(req: CreateRequest) -> CreateResponse:

services = Services()
services = get_services_ctrl()
try:
services.create(req.name, req.type, req.size, req.replicas)
await services.create(req.name, req.type, req.size, req.replicas)
except NotImplementedError as e:
raise HTTPException(status.HTTP_501_NOT_IMPLEMENTED,
detail=str(e))
Expand All @@ -130,6 +130,8 @@ async def create_service(req: CreateRequest) -> CreateResponse:
except Exception as e:
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e))
except NotReadyError:
raise HTTPException(status.HTTP_425_TOO_EARLY)
return CreateResponse(success=True)


Expand All @@ -144,8 +146,11 @@ async def get_statistics() -> Dict[str, ServiceStorageModel]:
allocated space for said service and how much space is being used, along
with the service's space utilization.
"""
services = Services()
return services.get_stats()
services = get_services_ctrl()
try:
return services.get_stats()
except NotReadyError:
raise HTTPException(status.HTTP_425_TOO_EARLY)


@router.get(
Expand Down
5 changes: 5 additions & 0 deletions src/gravel/controllers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ class StatusOptionsModel(BaseModel):
probe_interval: float = Field(1.0, title="Status Probe Interval")


class ServicesOptionsModel(BaseModel):
probe_interval: float = Field(1.0, title="Services Probe Interval")


class OptionsModel(BaseModel):
service_state_path: Path = Field(Path(config_dir).joinpath("storage.json"),
title="Path to Service State file")
inventory: InventoryOptionsModel = Field(InventoryOptionsModel())
storage: StorageOptionsModel = Field(StorageOptionsModel())
devices: DevicesOptionsModel = Field(DevicesOptionsModel())
status: StatusOptionsModel = Field(StatusOptionsModel())
services: ServicesOptionsModel = Field(ServicesOptionsModel())


class ConfigModel(BaseModel):
Expand Down
9 changes: 9 additions & 0 deletions src/gravel/controllers/gstate.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ async def tick(self) -> None:
self._is_ticking = False
self._last_tick = time.monotonic()

async def shutdown(self) -> None:
pass


class GlobalState:

Expand Down Expand Up @@ -140,12 +143,18 @@ async def tick(self) -> None:
await self._do_ticks()

logger.info("tick shutting down")
await self._shutdown_tickers()

async def _do_ticks(self) -> None:
for desc, ticker in self.tickers.items():
logger.debug(f"tick {desc}")
asyncio.create_task(ticker.tick())

async def _shutdown_tickers(self) -> None:
for desc, ticker in self.tickers.items():
logger.debug(f"shutdown ticker {desc}")
await ticker.shutdown()

def add_ticker(self, desc: str, whom: Ticker) -> None:
if desc not in self.tickers:
self.tickers[desc] = whom
Expand Down
124 changes: 124 additions & 0 deletions src/gravel/controllers/kv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# project aquarium's backend
# Copyright (C) 2021 SUSE, LLC.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.

import asyncio
from typing import Callable, Optional
import aetcd3
import aetcd3.locks
import aetcd3.events
import grpclib.exceptions

from logging import Logger
from fastapi.logger import logger as fastapi_logger

logger: Logger = fastapi_logger


class Lock:
_lock: aetcd3.locks.Lock
_is_acquired: bool

def __init__(self, lock: aetcd3.locks.Lock):
self._lock = lock
self._is_acquired = False

async def acquire(self) -> None:
await self._lock.acquire()
self._is_acquired = True

async def release(self) -> None:
if not self._is_acquired:
return
await self._lock.release()
self._is_acquired = False


class KV:

_client: Optional[aetcd3.Etcd3Client]
_is_closing: bool

def __init__(self):
self._client = None
self._is_open = False
self._is_closing = False

async def ensure_connection(self) -> None:
""" Open k/v store connection """
# try getting the status, loop until we make it.
opened = False
while not self._is_closing:
try:
async with aetcd3.client() as client:
await client.status()
except Exception:
logger.warn("etcd not up yet? sleep.")
await asyncio.sleep(1.0)
continue
opened = True
break
if opened:
self._client = aetcd3.client()
logger.info("opened kvstore connection")

async def close(self) -> None:
""" Close k/v store connection """
self._is_closing = True
if not self._client:
return
await self._client.close()
self._client = None

async def put(self, key: str, value: str) -> None:
""" Put key/value pair """
assert self._client
await self._client.put(key, value)

async def get(self, key: str) -> Optional[str]:
""" Get value for provided key """
assert self._client
value, _ = await self._client.get(key)
if not value:
return None
return value.decode("utf-8")

async def rm(self, key: str) -> None:
""" Remove key from store """
assert self._client
await self._client.delete(key)

async def lock(self, key: str) -> Lock:
""" Lock a given key. Requires compliant consumers. """
assert self._client
return Lock(self._client.lock(key))

async def watch(
self,
key: str,
callback: Callable[[str, str], None]
) -> int:
""" Watch updates on a given key """
assert self._client

async def _cb(what: aetcd3.events.Event) -> None:
if not what or \
type(what) == grpclib.exceptions.StreamTerminatedError:
return
callback(what.key.decode("utf-8"), what.value.decode("utf-8"))

return await self._client.add_watch_callback(key, _cb)

async def cancel_watch(self, watch_id: int) -> None:
""" Cancel a watch """
assert self._client
await self._client.cancel_watch(watch_id)
1 change: 1 addition & 0 deletions src/gravel/controllers/nodes/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class WelcomeMessageModel(BaseModel):
pubkey: str
cephconf: str
keyring: str
etcd_peer: str


class ReadyToAddMessageModel(BaseModel):
Expand Down

0 comments on commit 649a96b

Please sign in to comment.