Skip to content

Commit

Permalink
Migrate container registry info from etcd to postgresql
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Feb 19, 2024
1 parent 46fa039 commit 497bc32
Show file tree
Hide file tree
Showing 22 changed files with 456 additions and 835 deletions.
@@ -0,0 +1,11 @@
{
"container_registries": [
{
"id": "fe878f09-06cc-4b91-9242-4c71015cce04",
"hostname": "cr.backend.ai",
"url": "https://cr.backend.ai",
"type": "harbor2",
"project": "stable,community,multiarch"
}
]
}
11 changes: 11 additions & 0 deletions fixtures/manager/example-container-registries-backendai.json
@@ -0,0 +1,11 @@
{
"container_registries": [
{
"id": "fe878f09-06cc-4b91-9242-4c71015cce04",
"hostname": "cr.backend.ai",
"url": "https://cr.backend.ai",
"type": "harbor2",
"project": "stable,community"
}
]
}
11 changes: 11 additions & 0 deletions fixtures/manager/example-container-registries-dockerhub.json
@@ -0,0 +1,11 @@
{
"container_registries": [
{
"id": "abc42a05-4471-41fa-8772-10bf6452c7d1",
"hostname": "index.docker.io",
"url": "https://registry-1.docker.io",
"type": "docker",
"username": "lablup"
}
]
}
7 changes: 7 additions & 0 deletions scripts/install-dev.sh
Expand Up @@ -955,6 +955,13 @@ configure_backendai() {
# initialize the DB schema
show_info "Setting up databases..."
./backend.ai mgr schema oneshot

if [[ "$(uname -m)" == "aarch64" ]] then
./backend.ai mgr fixture populate fixtures/manager/example-container-registries-backendai-multiarch.json
else
./backend.ai mgr fixture populate fixtures/manager/example-container-registries-backendai.json
fi

./backend.ai mgr fixture populate fixtures/manager/example-users.json
./backend.ai mgr fixture populate fixtures/manager/example-keypairs.json
./backend.ai mgr fixture populate fixtures/manager/example-set-user-main-access-keys.json
Expand Down
79 changes: 59 additions & 20 deletions src/ai/backend/install/context.py
Expand Up @@ -262,8 +262,19 @@ async def install_halfstack(self) -> None:
cwd=self.install_info.base_path,
)

def get_container_registry_fixture_filename(self) -> str:
if self.os_info.platform in (Platform.LINUX_ARM64, Platform.MACOS_ARM64):
return "example-container-registries-backendai-multiarch.json"
else:
return "example-container-registries-backendai.json"

async def load_fixtures(self) -> None:
await self.run_manager_cli(["mgr", "schema", "oneshot"])
with self.resource_path(
"ai.backend.install.fixtures", self.get_container_registry_fixture_filename()
) as path:
await self.run_manager_cli(["mgr", "fixture", "populate", str(path)])

with self.resource_path("ai.backend.install.fixtures", "example-keypairs.json") as path:
await self.run_manager_cli(["mgr", "fixture", "populate", str(path)])
with self.resource_path(
Expand Down Expand Up @@ -654,32 +665,48 @@ async def alias_image(self, alias: str, target_ref: str, arch: str) -> None:
])

async def populate_images(self) -> None:
halfstack = self.install_info.halfstack_config

data: Any
for image_source in self.dist_info.image_sources:
match image_source:
case ImageSource.BACKENDAI_REGISTRY:
self.log_header(
"Scanning and pulling configured Backend.AI container images..."
)
if self.os_info.platform in (Platform.LINUX_ARM64, Platform.MACOS_ARM64):
project = "stable,community,multiarch"
else:
project = "stable,community"

data = {
"docker": {
"image": {
"auto_pull": "tag", # FIXME: temporary workaround for multiarch
},
"registry": {
"cr.backend.ai": {
"": "https://cr.backend.ai",
"type": "harbor2",
"project": project,
},
"auto_pull": "tag",
},
},
}
await self.etcd_put_json("config", data)

async with aiotools.closing_async(
await asyncpg.connect(
host=halfstack.postgres_addr.face.host,
port=halfstack.postgres_addr.face.port,
user=halfstack.postgres_user,
password=halfstack.postgres_password,
database="backend",
)
) as conn:
if self.os_info.platform in (Platform.LINUX_ARM64, Platform.MACOS_ARM64):
project = "stable,community,multiarch"
else:
project = "stable,community"

await conn.execute(
"INSERT INTO container_registries (id, url, hostname, type, project) VALUES ($1, $2, $3, $4, $5);",
"fe878f09-06cc-4b91-9242-4c71015cce04",
"https://cr.backend.ai",
"cr.backend.ai",
"harbor2",
project,
)

await self.run_manager_cli(["mgr", "image", "rescan", "cr.backend.ai"])
if self.os_info.platform in (Platform.LINUX_ARM64, Platform.MACOS_ARM64):
await self.alias_image(
Expand All @@ -700,18 +727,30 @@ async def populate_images(self) -> None:
data = {
"docker": {
"image": {
"auto_pull": "tag", # FIXME: temporary workaround for multiarch
},
"registry": {
"index.docker.io": {
"": "https://registry-1.docker.io",
"type": "docker",
"username": "lablup",
},
"auto_pull": "tag",
},
},
}
await self.etcd_put_json("config", data)

async with aiotools.closing_async(
await asyncpg.connect(
host=halfstack.postgres_addr.face.host,
port=halfstack.postgres_addr.face.port,
user=halfstack.postgres_user,
password=halfstack.postgres_password,
database="backend",
)
) as conn:
await conn.execute(
"INSERT INTO container_registries (id, url, hostname, type, username) VALUES ($1, $2, $3, $4, $5);",
"abc42a05-4471-41fa-8772-10bf6452c7d1",
"https://registry-1.docker.io",
"index.docker.io",
"docker",
"lablup",
)

for ref in self.dist_info.image_refs:
await self.run_manager_cli(["mgr", "image", "rescan", ref])
await self.run_exec([*self.docker_sudo, "docker", "pull", ref])
Expand Down
@@ -0,0 +1,11 @@
{
"container_registries": [
{
"id": "fe878f09-06cc-4b91-9242-4c71015cce04",
"hostname": "cr.backend.ai",
"url": "https://cr.backend.ai",
"type": "harbor2",
"project": ["stable", "community", "multiarch"]
}
]
}
@@ -0,0 +1,11 @@
{
"container_registries": [
{
"id": "fe878f09-06cc-4b91-9242-4c71015cce04",
"hostname": "cr.backend.ai",
"url": "https://cr.backend.ai",
"type": "harbor2",
"project": ["stable", "community"]
}
]
}
@@ -0,0 +1,11 @@
{
"container_registries": [
{
"id": "abc42a05-4471-41fa-8772-10bf6452c7d1",
"hostname": "index.docker.io",
"url": "https://registry-1.docker.io",
"type": "docker",
"username": "lablup"
}
]
}
5 changes: 2 additions & 3 deletions src/ai/backend/manager/cli/image_impl.py
Expand Up @@ -16,7 +16,7 @@
from ..models.image import ImageAliasRow, ImageRow
from ..models.image import rescan_images as rescan_images_func
from ..models.utils import connect_database
from .context import CLIContext, etcd_ctx, redis_ctx
from .context import CLIContext, redis_ctx

log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined]

Expand Down Expand Up @@ -153,10 +153,9 @@ async def rescan_images(cli_ctx: CLIContext, registry_or_image: str, local: bool
raise click.BadArgumentUsage("Please specify a valid registry or full image name.")
async with (
connect_database(cli_ctx.local_config) as db,
etcd_ctx(cli_ctx) as etcd,
):
try:
await rescan_images_func(etcd, db, registry_or_image, local=local)
await rescan_images_func(db, registry_or_image, local=local)
except Exception:
log.exception("An error occurred.")

Expand Down
124 changes: 1 addition & 123 deletions src/ai/backend/manager/config.py
Expand Up @@ -219,7 +219,7 @@

from ..manager.defs import INTRINSIC_SLOTS
from .api import ManagerStatus
from .api.exceptions import ObjectNotFound, ServerMisconfiguredError
from .api.exceptions import ServerMisconfiguredError
from .models.session import SessionStatus

log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined]
Expand Down Expand Up @@ -345,33 +345,6 @@
},
}

container_registry_iv = t.Dict({
t.Key(""): tx.URL,
t.Key("type", default="docker"): t.String,
t.Key("username", default=None): t.Null | t.String,
t.Key("password", default=None): t.Null | t.String(allow_blank=True),
t.Key("project", default=None): (
t.Null | t.List(t.String) | tx.StringList(empty_str_as_empty_list=True)
),
tx.AliasedKey(["ssl_verify", "ssl-verify"], default=True): t.ToBool,
}).allow_extra("*")


def container_registry_serialize(v: dict[str, Any]) -> dict[str, str]:
raw_data = {
"": str(v[""]),
"type": str(v["type"]),
}
if (username := v.get("username")) is not None:
raw_data["username"] = str(username)
if (password := v.get("password", None)) is not None:
raw_data["password"] = str(password)
if (project := v.get("project", None)) is not None:
raw_data["project"] = ",".join(project)
if (ssl_verify := v.get("ssl_verify", None)) is not None:
raw_data["ssl_verify"] = "1" if ssl_verify else "0"
return raw_data


session_hang_tolerance_iv = t.Dict(
{
Expand Down Expand Up @@ -402,7 +375,6 @@ def container_registry_serialize(v: dict[str, Any]) -> dict[str, str]:
}).allow_extra("*"),
t.Key("redis", default=_config_defaults["redis"]): config.redis_config_iv,
t.Key("docker", default=_config_defaults["docker"]): t.Dict({
t.Key("registry"): t.Mapping(t.String, container_registry_iv),
t.Key("image", default=_config_defaults["docker"]["image"]): t.Dict({
t.Key("auto_pull", default=_config_defaults["docker"]["image"]["auto_pull"]): t.Enum(
"digest", "tag", "none"
Expand Down Expand Up @@ -633,100 +605,6 @@ async def get_raw(self, key: str, allow_null: bool = True) -> Optional[str]:
raise ServerMisconfiguredError("A required etcd config is missing.", key)
return value

async def list_container_registry(self) -> dict[str, dict[str, Any]]:
registries = await self.etcd.get_prefix(self.ETCD_CONTAINER_REGISTRY_KEY)
return {
hostname: container_registry_iv.check(item)
for hostname, item in registries.items()
# type: ignore
}

async def get_container_registry(self, hostname: str) -> dict[str, Any]:
registries = await self.list_container_registry()
try:
item = registries[hostname]
except KeyError:
raise ObjectNotFound(object_name="container registry")
return item

async def add_container_registry(self, hostname: str, config_new: dict[str, Any]) -> None:
updates = self.flatten(
self.ETCD_CONTAINER_REGISTRY_KEY,
{hostname: container_registry_serialize(container_registry_iv.check(config_new))},
)
await self.etcd.put_dict(updates)

async def modify_container_registry(
self, hostname: str, config_updated: dict[str, Any]
) -> None:
# Fetch the raw registries data and make it a mutable dict.
registries = dict(await self.etcd.get_prefix(self.ETCD_CONTAINER_REGISTRY_KEY))
# Exclude the target hostname from the raw data.
try:
original_item = registries[hostname]
del registries[hostname]
except KeyError:
raise ObjectNotFound(object_name="container registry")
# Delete all items with having the prefix of the given hostname.
# This will "accidentally" delete any registry sharing the same prefix.
raw_hostname = urllib.parse.quote(hostname, safe="")
await self.etcd.delete_prefix(f"{self.ETCD_CONTAINER_REGISTRY_KEY}/{raw_hostname}")

# Re-add the "accidentally" deleted items
updates: dict[str, str] = {}
for key, raw_item in registries.items():
if key.startswith(hostname):
updates.update(
self.flatten(
self.ETCD_CONTAINER_REGISTRY_KEY,
{key: raw_item}, # type: ignore
)
)
# Re-add the updated item
if (_ssl_verify := config_updated.pop("ssl-verify", None)) is not None:
# Move "ssl-verify" to "ssl_verify" if exists, for key aliasing compatibility:
# the etcd-stored original item has already the normalized name "ssl_verify",
# while the user input may have either "ssl-verify" or "ssl_verify".
# We should run the IV check after merging the original item and the user input
# to prevent overwriting non-existent fields with the default values in IV.
config_updated["ssl_verify"] = _ssl_verify
updates.update(
self.flatten(
self.ETCD_CONTAINER_REGISTRY_KEY,
{
hostname: container_registry_serialize(
container_registry_iv.check({**original_item, **config_updated}) # type: ignore
)
},
)
)
await self.etcd.put_dict(updates)

async def delete_container_registry(self, hostname: str) -> None:
# Fetch the raw registries data and make it a mutable dict.
registries = dict(await self.etcd.get_prefix(self.ETCD_CONTAINER_REGISTRY_KEY))
# Exclude the target hostname from the raw data.
try:
del registries[hostname]
except KeyError:
raise ObjectNotFound(object_name="container registry")
# Delete all items with having the prefix of the given hostname.
# This will "accidentally" delete any registry sharing the same prefix.
raw_hostname = urllib.parse.quote(hostname, safe="")
await self.etcd.delete_prefix(f"{self.ETCD_CONTAINER_REGISTRY_KEY}/{raw_hostname}")

# Re-add the "accidentally" deleted items.
updates: dict[str, str] = {}
for key, raw_item in registries.items():
if key.startswith(hostname):
updates.update(
self.flatten(
self.ETCD_CONTAINER_REGISTRY_KEY,
{key: raw_item}, # type: ignore
)
)
await self.etcd.put_dict(updates)

async def register_myself(self) -> None:
instance_id = await get_instance_id()
manager_info = {
Expand Down

0 comments on commit 497bc32

Please sign in to comment.