Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
435 changes: 435 additions & 0 deletions deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ install_requires =
aiohttp==3.8.4
aioipfs@git+https://github.com/aleph-im/aioipfs.git@d671c79b2871bb4d6c8877ba1e7f3ffbe7d20b71
alembic==1.8.1
aleph-message==0.3.2
aleph-message==0.4.0a1
aleph-p2p-client@git+https://github.com/aleph-im/p2p-service-client-python@2c04af39c566217f629fd89505ffc3270fba8676
aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@32dd1749a4773da494275709060632cbeba9a51b
asyncpg==0.26.0
Expand Down
113 changes: 0 additions & 113 deletions src/aleph/db/accessors/programs.py

This file was deleted.

115 changes: 115 additions & 0 deletions src/aleph/db/accessors/vms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import datetime as dt
from typing import Optional, Iterable

from sqlalchemy import delete, func, select
from sqlalchemy.dialects.postgresql import insert

from aleph.db.models.vms import (
VmBaseDb,
VmVersionDb,
ProgramDb,
VmInstanceDb,
)
from aleph.types.db_session import DbSession
from aleph.types.vms import VmVersion


def get_instance(session: DbSession, item_hash: str) -> Optional[VmInstanceDb]:
select_stmt = select(VmInstanceDb).where(VmInstanceDb.item_hash == item_hash)
return session.execute(select_stmt).scalar_one_or_none()


def get_program(session: DbSession, item_hash: str) -> Optional[ProgramDb]:
select_stmt = select(ProgramDb).where(ProgramDb.item_hash == item_hash)
return session.execute(select_stmt).scalar_one_or_none()


def is_vm_amend_allowed(session: DbSession, vm_hash: str) -> Optional[bool]:
select_stmt = (
select(VmBaseDb.allow_amend)
.select_from(VmVersionDb)
.join(VmBaseDb, VmVersionDb.current_version == VmBaseDb.item_hash)
.where(VmVersionDb.vm_hash == vm_hash)
)
return session.execute(select_stmt).scalar_one_or_none()


def _delete_vm(session: DbSession, where) -> Iterable[str]:
# Deletion of volumes is managed automatically by the DB
# using an "on delete cascade" foreign key.
return session.execute(
delete(VmBaseDb).where(where).returning(VmBaseDb.item_hash)
).scalars()


def delete_vm(session: DbSession, vm_hash: str) -> None:
_ = _delete_vm(session=session, where=VmBaseDb.item_hash == vm_hash)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this duplication ?



def delete_vm_updates(session: DbSession, vm_hash: str) -> Iterable[str]:
return _delete_vm(session=session, where=VmBaseDb.replaces == vm_hash)


def get_vm_version(session: DbSession, vm_hash: str) -> Optional[VmVersionDb]:
return session.execute(
select(VmVersionDb).where(VmVersionDb.vm_hash == vm_hash)
).scalar_one_or_none()


def upsert_vm_version(
session: DbSession,
vm_hash: str,
owner: str,
current_version: VmVersion,
last_updated: dt.datetime,
) -> None:
insert_stmt = insert(VmVersionDb).values(
vm_hash=vm_hash,
owner=owner,
current_version=current_version,
last_updated=last_updated,
)
upsert_stmt = insert_stmt.on_conflict_do_update(
constraint="program_versions_pkey",
set_={"current_version": current_version, "last_updated": last_updated},
where=VmVersionDb.last_updated < last_updated,
)
session.execute(upsert_stmt)


def refresh_vm_version(session: DbSession, vm_hash: str) -> None:
coalesced_ref = func.coalesce(VmBaseDb.replaces, VmBaseDb.item_hash)
select_latest_revision_stmt = (
select(
coalesced_ref.label("replaces"),
func.max(VmBaseDb.created).label("created"),
).group_by(coalesced_ref)
).subquery()
select_latest_program_version_stmt = (
select(
coalesced_ref,
VmBaseDb.owner,
VmBaseDb.item_hash,
VmBaseDb.created,
)
.join(
select_latest_revision_stmt,
(coalesced_ref == select_latest_revision_stmt.c.replaces)
& (VmBaseDb.created == select_latest_revision_stmt.c.created),
)
.where(coalesced_ref == vm_hash)
)

insert_stmt = insert(VmVersionDb).from_select(
["vm_hash", "owner", "current_version", "last_updated"],
select_latest_program_version_stmt,
)
upsert_stmt = insert_stmt.on_conflict_do_update(
constraint="program_versions_pkey",
set_={
"current_version": insert_stmt.excluded.current_version,
"last_updated": insert_stmt.excluded.last_updated,
},
)
session.execute(delete(VmVersionDb).where(VmVersionDb.vm_hash == vm_hash))
session.execute(upsert_stmt)
2 changes: 1 addition & 1 deletion src/aleph/db/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from .pending_messages import *
from .pending_txs import *
from .posts import *
from .programs import *
from .vms import *
4 changes: 2 additions & 2 deletions src/aleph/db/models/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
PostContent,
ProgramContent,
StoreContent,
InstanceContent,
)
from pydantic import ValidationError
from pydantic.error_wrappers import ErrorWrapper
Expand Down Expand Up @@ -38,6 +39,7 @@
CONTENT_TYPE_MAP: Dict[MessageType, Type[BaseContent]] = {
MessageType.aggregate: AggregateContent,
MessageType.forget: ForgetContent,
MessageType.instance: InstanceContent,
MessageType.post: PostContent,
MessageType.program: ProgramContent,
MessageType.store: StoreContent,
Expand All @@ -58,7 +60,6 @@ def validate_message_content(
message_type: MessageType,
content_dict: Dict[str, Any],
) -> BaseContent:

content_type = CONTENT_TYPE_MAP[message_type]
content = content_type.parse_obj(content_dict)
# Validate that the content time can be converted to datetime. This will
Expand Down Expand Up @@ -132,7 +133,6 @@ def from_pending_message(
content_dict: Dict[str, Any],
content_size: int,
) -> "MessageDb":

content_dict = cls._coerce_content(pending_message, content_dict)
parsed_content = validate_message_content(pending_message.type, content_dict)

Expand Down
Loading