From f7ce736062f39b71350b6e3071cd000143075f48 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Wed, 17 May 2023 15:54:22 +0200 Subject: [PATCH 1/4] Feature: process instance messages Problem: users cannot send `INSTANCE` messages because the format is not yet supported by CCNs. Solution: support them, obviously. Modified the program handler to support both programs and instances. Renamed most program-related functions to the more generic `vm` naming convention. DB-wise, the same tables are reused for instances and programs. The `vms` table (previously named `programs`) now contains entries for instances and programs. A new table is dedicated to rootfs volumes as that is an instance-only feature. The advantage of reusing the same tables is that the logic for instances and programs can be unified, for example the balance/cost views only require minor updates. --- .../0017_f9fa39b6bdef_vm_instances.py | 425 ++++++++++++++++++ setup.cfg | 2 +- src/aleph/db/accessors/programs.py | 113 ----- src/aleph/db/accessors/vms.py | 115 +++++ src/aleph/db/models/__init__.py | 2 +- src/aleph/db/models/messages.py | 3 +- src/aleph/db/models/{programs.py => vms.py} | 92 +++- .../handlers/content/{program.py => vm.py} | 249 +++++----- src/aleph/handlers/message_handler.py | 7 +- src/aleph/types/message_status.py | 16 +- src/aleph/types/vms.py | 7 +- tests/db/test_programs_db.py | 59 ++- .../test_process_forgets.py | 4 +- .../test_process_instances.py | 380 ++++++++++++++++ .../test_process_programs.py | 16 +- 15 files changed, 1196 insertions(+), 294 deletions(-) create mode 100644 deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py delete mode 100644 src/aleph/db/accessors/programs.py create mode 100644 src/aleph/db/accessors/vms.py rename src/aleph/db/models/{programs.py => vms.py} (64%) rename src/aleph/handlers/content/{program.py => vm.py} (53%) create mode 100644 tests/message_processing/test_process_instances.py diff --git a/deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py b/deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py new file mode 100644 index 000000000..cac1513e4 --- /dev/null +++ b/deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py @@ -0,0 +1,425 @@ +"""vm instances + +Revision ID: f9fa39b6bdef +Revises: 77e68941d36c +Create Date: 2023-05-17 11:59:42.783630 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "f9fa39b6bdef" +down_revision = "77e68941d36c" +branch_labels = None +depends_on = None + + +def recreate_cost_views(): + op.execute("DROP VIEW costs_view") + op.execute("DROP VIEW program_costs_view") + op.execute("DROP VIEW program_volumes_files_view") + + # Recreate the views using `vm` instead of `program` wherever necessary + op.execute( + """ + create view vm_volumes_files_view(vm_hash, ref, use_latest, type, latest, original, volume_to_use) as + SELECT volume.program_hash AS vm_hash, + volume.ref, + volume.use_latest, + 'code_volume' AS type, + tags.file_hash AS latest, + originals.file_hash AS original, + CASE + WHEN volume.use_latest THEN tags.file_hash + ELSE originals.file_hash + END AS volume_to_use + FROM program_code_volumes volume + LEFT OUTER JOIN file_tags tags ON volume.ref = tags.tag + JOIN file_pins originals ON volume.ref = originals.item_hash + UNION + SELECT volume.program_hash AS vm_hash, + volume.ref, + volume.use_latest, + 'data_volume' AS type, + tags.file_hash AS latest, + originals.file_hash AS original, + CASE + WHEN volume.use_latest THEN tags.file_hash + ELSE originals.file_hash + END AS volume_to_use + FROM program_data_volumes volume + LEFT OUTER JOIN file_tags tags ON volume.ref = tags.tag + JOIN file_pins originals ON volume.ref = originals.item_hash + UNION + SELECT volume.program_hash AS vm_hash, + volume.ref, + volume.use_latest, + 'runtime' AS type, + tags.file_hash AS latest, + originals.file_hash AS original, + CASE + WHEN volume.use_latest THEN tags.file_hash + ELSE originals.file_hash + END AS volume_to_use + FROM program_runtimes volume + LEFT OUTER JOIN file_tags tags ON volume.ref = tags.tag + JOIN file_pins originals ON volume.ref = originals.item_hash + UNION + SELECT volume.vm_hash, + volume.ref, + volume.use_latest, + 'machine_volume' AS type, + tags.file_hash AS latest, + originals.file_hash AS original, + CASE + WHEN volume.use_latest THEN tags.file_hash + ELSE originals.file_hash + END AS volume_to_use + FROM vm_machine_volumes volume + LEFT OUTER JOIN file_tags tags ON volume.ref = tags.tag + JOIN file_pins originals ON volume.ref = originals.item_hash + """ + ) + op.execute( + """ + create view vm_costs_view as + SELECT vm_versions.vm_hash, + vm_versions.owner, + vms.resources_vcpus, + vms.resources_memory, + file_volumes_size.file_volumes_size, + other_volumes_size.other_volumes_size, + used_disk.required_disk_space, + cu.compute_units_required, + bcp.base_compute_unit_price, + m.compute_unit_price_multiplier, + cpm.compute_unit_price, + free_disk.included_disk_space, + additional_disk.additional_disk_space, + adp.disk_price, + tp.total_price + FROM vm_versions + JOIN vms on vm_versions.current_version = vms.item_hash + JOIN (SELECT volume.vm_hash, + sum(files.size) AS file_volumes_size + FROM vm_volumes_files_view volume + LEFT JOIN files ON volume.volume_to_use = files.hash + GROUP BY volume.vm_hash) file_volumes_size + ON vm_versions.current_version = file_volumes_size.vm_hash + JOIN (SELECT instance_hash, size_mib * 1024 * 1024 rootfs_size FROM instance_rootfs) rootfs_size + ON vm_versions.vm_hash = rootfs_size.instance_hash + JOIN (SELECT vm_hash, SUM(size_mib) * 1024 * 1024 other_volumes_size + FROM vm_machine_volumes + GROUP BY vm_hash) other_volumes_size + ON vm_versions.current_version = other_volumes_size.vm_hash, + LATERAL (SELECT file_volumes_size + other_volumes_size AS required_disk_space) used_disk, + LATERAL ( SELECT ceil(GREATEST(ceil(vms.resources_vcpus / 1), + vms.resources_memory / 2000)) AS compute_units_required) cu, + LATERAL ( SELECT CASE + WHEN vms.persistent + THEN 20000000000 * cu.compute_units_required + ELSE 2000000000 * cu.compute_units_required + END AS included_disk_space) free_disk, + LATERAL ( SELECT GREATEST(file_volumes_size.file_volumes_size + + rootfs_size.rootfs_size + + other_volumes_size.other_volumes_size - + free_disk.included_disk_space, + 0) AS additional_disk_space) additional_disk, + LATERAL ( SELECT CASE + WHEN vms.persistent THEN 2000 + ELSE 200 + END AS base_compute_unit_price) bcp, + LATERAL ( SELECT 1 + vms.environment_internet::integer AS compute_unit_price_multiplier) m, + LATERAL ( SELECT cu.compute_units_required * m.compute_unit_price_multiplier::double precision * + bcp.base_compute_unit_price::double precision * + m.compute_unit_price_multiplier AS compute_unit_price) cpm, + LATERAL ( SELECT additional_disk.additional_disk_space * 20::double precision / + 1000000::double precision AS disk_price) adp, + LATERAL ( SELECT cpm.compute_unit_price + adp.disk_price AS total_price) tp + """ + ) + op.execute( + """ + create view costs_view as + SELECT coalesce(vm_prices.owner, storage.owner) address, + total_vm_cost, + total_storage_cost, + total_cost + FROM (SELECT owner, sum(total_price) total_vm_cost FROM vm_costs_view GROUP BY owner) vm_prices + FULL OUTER JOIN (SELECT owner, sum(f.size) storage_size + FROM file_pins + JOIN files f on file_pins.file_hash = f.hash + WHERE owner is not null + GROUP BY owner) storage ON vm_prices.owner = storage.owner, + LATERAL (SELECT 3 * storage_size / 1000000 total_storage_cost) sc, + LATERAL (SELECT coalesce(vm_prices.total_vm_cost, 0) + + coalesce(total_storage_cost, 0) AS total_cost ) tc; + """ + ) + + +def upgrade() -> None: + # Rename all common tables to `vm_*` + op.rename_table("programs", "vms") + op.rename_table("program_machine_volumes", "vm_machine_volumes") + op.rename_table("program_versions", "vm_versions") + + # Rename all common columns to `vm_*` + op.alter_column("vm_machine_volumes", "program_hash", new_column_name="vm_hash") + op.alter_column("vm_versions", "program_hash", new_column_name="vm_hash") + + # Rename indexes + op.execute( + "ALTER INDEX ix_program_machine_volumes_program_hash RENAME TO ix_vm_machine_volumes_vm_hash" + ) + op.execute("ALTER INDEX ix_programs_owner RENAME TO ix_vms_owner") + + # Create the instance rootfs table + op.create_table( + "instance_rootfs", + sa.Column("instance_hash", sa.String(), nullable=False), + sa.Column("parent", sa.String(), nullable=True), + sa.Column("size_mib", sa.Integer(), nullable=False), + sa.Column("persistence", sa.String(), nullable=False), + sa.Column("comment", sa.String(), nullable=True), + sa.ForeignKeyConstraint( + ["instance_hash"], + ["vms.item_hash"], + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("instance_hash"), + ) + + # Make program-only columns nullable + op.alter_column("vms", "http_trigger", existing_type=sa.BOOLEAN(), nullable=True) + op.alter_column("vms", "persistent", existing_type=sa.BOOLEAN(), nullable=True) + + # Recreate the cost views (some column names must change) + recreate_cost_views() + + # Add the parent column for persistent volumes + op.add_column("vm_machine_volumes", sa.Column("parent", sa.String(), nullable=True)) + + # Add the instance columns to the vms (ex programs) table + op.add_column( + "vms", + sa.Column( + "cloud_config", postgresql.JSONB(astext_type=sa.Text()), nullable=True + ), + ) + op.add_column("vms", sa.Column("program_type", sa.String(), nullable=True)) + + # Update error codes + op.execute( + "UPDATE error_codes SET description = 'VM reference not found' WHERE code = 300" + ) + op.execute( + "UPDATE error_codes SET description = 'VM volume reference(s) not found' WHERE code = 301" + ) + op.execute( + "UPDATE error_codes SET description = 'VM update not allowed' WHERE code = 302" + ) + op.execute( + "UPDATE error_codes SET description = 'VM update not targeting the original version of the VM' WHERE code = 303" + ) + + # ### end Alembic commands ### + + +def downgrade_cost_views(): + op.execute("DROP VIEW costs_view") + op.execute("DROP VIEW vm_costs_view") + op.execute("DROP VIEW vm_volumes_files_view") + + # Copied from 0007_0bfde82697c8_balance_views.py + op.execute( + """ + create view program_volumes_files_view(program_hash, ref, use_latest, type, latest, original, volume_to_use) as + SELECT volume.program_hash, + volume.ref, + volume.use_latest, + 'code_volume' AS type, + tags.file_hash AS latest, + originals.file_hash AS original, + CASE + WHEN volume.use_latest THEN tags.file_hash + ELSE originals.file_hash + END AS volume_to_use + FROM program_code_volumes volume + LEFT OUTER JOIN file_tags tags ON volume.ref = tags.tag + JOIN file_pins originals ON volume.ref = originals.item_hash + UNION + SELECT volume.program_hash, + volume.ref, + volume.use_latest, + 'data_volume' AS type, + tags.file_hash AS latest, + originals.file_hash AS original, + CASE + WHEN volume.use_latest THEN tags.file_hash + ELSE originals.file_hash + END AS volume_to_use + FROM program_data_volumes volume + LEFT OUTER JOIN file_tags tags ON volume.ref = tags.tag + JOIN file_pins originals ON volume.ref = originals.item_hash + UNION + SELECT volume.program_hash, + volume.ref, + volume.use_latest, + 'runtime' AS type, + tags.file_hash AS latest, + originals.file_hash AS original, + CASE + WHEN volume.use_latest THEN tags.file_hash + ELSE originals.file_hash + END AS volume_to_use + FROM program_runtimes volume + LEFT OUTER JOIN file_tags tags ON volume.ref = tags.tag + JOIN file_pins originals ON volume.ref = originals.item_hash + UNION + SELECT volume.program_hash, + volume.ref, + volume.use_latest, + 'machine_volume' AS type, + tags.file_hash AS latest, + originals.file_hash AS original, + CASE + WHEN volume.use_latest THEN tags.file_hash + ELSE originals.file_hash + END AS volume_to_use + FROM program_machine_volumes volume + LEFT OUTER JOIN file_tags tags ON volume.ref = tags.tag + JOIN file_pins originals ON volume.ref = originals.item_hash + """ + ) + op.execute( + """ + create view program_costs_view as + SELECT program_versions.program_hash, + program_versions.owner, + programs.resources_vcpus, + programs.resources_memory, + file_volumes_size.file_volumes_size, + other_volumes_size.other_volumes_size, + used_disk.required_disk_space, + cu.compute_units_required, + bcp.base_compute_unit_price, + m.compute_unit_price_multiplier, + cpm.compute_unit_price, + free_disk.included_disk_space, + additional_disk.additional_disk_space, + adp.disk_price, + tp.total_price + FROM program_versions + JOIN programs on program_versions.current_version = programs.item_hash + JOIN (SELECT volume.program_hash, + sum(files.size) AS file_volumes_size + FROM program_volumes_files_view volume + LEFT JOIN files ON volume.volume_to_use = files.hash + GROUP BY volume.program_hash) file_volumes_size + ON program_versions.current_version = file_volumes_size.program_hash + JOIN (SELECT program_hash, SUM(size_mib) * 1024 * 1024 other_volumes_size + FROM program_machine_volumes + GROUP BY program_hash) other_volumes_size + ON program_versions.current_version = other_volumes_size.program_hash, + LATERAL (SELECT file_volumes_size + other_volumes_size AS required_disk_space) used_disk, + LATERAL ( SELECT ceil(GREATEST(ceil(programs.resources_vcpus / 1), + programs.resources_memory / 2000)) AS compute_units_required) cu, + LATERAL ( SELECT CASE + WHEN programs.persistent + THEN 20000000000 * cu.compute_units_required + ELSE 2000000000 * cu.compute_units_required + END AS included_disk_space) free_disk, + LATERAL ( SELECT GREATEST(file_volumes_size.file_volumes_size + other_volumes_size.other_volumes_size - + free_disk.included_disk_space, + 0) AS additional_disk_space) additional_disk, + LATERAL ( SELECT CASE + WHEN programs.persistent THEN 2000 + ELSE 200 + END AS base_compute_unit_price) bcp, + LATERAL ( SELECT 1 + programs.environment_internet::integer AS compute_unit_price_multiplier) m, + LATERAL ( SELECT cu.compute_units_required * m.compute_unit_price_multiplier::double precision * + bcp.base_compute_unit_price::double precision * + m.compute_unit_price_multiplier AS compute_unit_price) cpm, + LATERAL ( SELECT additional_disk.additional_disk_space * 20::double precision / + 1000000::double precision AS disk_price) adp, + LATERAL ( SELECT cpm.compute_unit_price + adp.disk_price AS total_price) tp + """ + ) + op.execute( + """ + create view costs_view as + SELECT coalesce(program_prices.owner, storage.owner) address, + total_program_cost, + total_storage_cost, + total_cost + FROM (SELECT owner, sum(total_price) total_program_cost FROM program_costs_view GROUP BY owner) program_prices + FULL OUTER JOIN (SELECT owner, sum(f.size) storage_size + FROM file_pins + JOIN files f on file_pins.file_hash = f.hash + WHERE owner is not null + GROUP BY owner) storage ON program_prices.owner = storage.owner, + LATERAL (SELECT 3 * storage_size / 1000000 total_storage_cost) sc, + LATERAL (SELECT coalesce(program_prices.total_program_cost, 0) + + coalesce(total_storage_cost, 0) AS total_cost ) tc; + """ + ) + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + + # Rename tables / columns + op.rename_table("vms", "programs") + op.rename_table("vm_machine_volumes", "program_machine_volumes") + op.rename_table("vm_versions", "program_versions") + op.alter_column("program_versions", "vm_hash", new_column_name="program_hash") + op.alter_column( + "program_machine_volumes", "vm_hash", new_column_name="program_hash" + ) + + # Rename indexes + op.execute( + "ALTER INDEX ix_vm_machine_volumes_vm_hash RENAME TO ix_program_machine_volumes_program_hash" + ) + op.execute("ALTER INDEX ix_vms_owner RENAME TO ix_programs_owner") + + op.drop_column("programs", "program_type") + op.drop_column("programs", "cloud_config") + + # Drop the parent column for persistent VMs + op.drop_column("program_machine_volumes", "parent") + + # Make program-only columns non-nullable again + op.alter_column( + "programs", "persistent", existing_type=sa.BOOLEAN(), nullable=False + ) + op.alter_column( + "programs", "http_trigger", existing_type=sa.BOOLEAN(), nullable=False + ) + + # Reset views + downgrade_cost_views() + + # Drop the rootfs table + op.drop_table("instance_rootfs") + + # Revert error codes + op.execute( + "UPDATE error_codes SET description = 'Program reference not found' WHERE code = 300" + ) + op.execute( + "UPDATE error_codes SET description = 'Program volume reference(s) not found' WHERE code = 301" + ) + op.execute( + "UPDATE error_codes SET description = 'Program update not allowed' WHERE code = 302" + ) + op.execute( + "UPDATE error_codes " + "SET description = 'Program update not targeting the original version of the program' WHERE code = 303" + ) + + # ### end Alembic commands ### diff --git a/setup.cfg b/setup.cfg index ac97faaed..661de5cd0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,7 +40,7 @@ install_requires = aiohttp==3.8.4 aioipfs@git+https://github.com/aleph-im/aioipfs.git@d671c79b2871bb4d6c8877ba1e7f3ffbe7d20b71 alembic==1.8.1 - aleph-message==0.3.2 + aleph-message@git+https://github.com/aleph-im/aleph-message@2983925295643634785a6a712d09e8f5c58642a2 aleph-p2p-client@git+https://github.com/aleph-im/p2p-service-client-python@2c04af39c566217f629fd89505ffc3270fba8676 aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@32dd1749a4773da494275709060632cbeba9a51b asyncpg==0.26.0 diff --git a/src/aleph/db/accessors/programs.py b/src/aleph/db/accessors/programs.py deleted file mode 100644 index 0cfb246d7..000000000 --- a/src/aleph/db/accessors/programs.py +++ /dev/null @@ -1,113 +0,0 @@ -import datetime as dt -from typing import Optional, Iterable - -from sqlalchemy import delete, func, select -from sqlalchemy.dialects.postgresql import insert - -from aleph.db.models.programs import ( - CodeVolumeDb, - DataVolumeDb, - ExportVolumeDb, - ProgramDb, - RuntimeDb, - MachineVolumeBaseDb, - ProgramVersionDb, -) -from aleph.types.db_session import DbSession -from aleph.types.vms import ProgramVersion - - -def program_exists(session: DbSession, item_hash: str) -> bool: - return ProgramDb.exists(session=session, where=ProgramDb.item_hash == item_hash) - - -def get_program(session: DbSession, item_hash: str) -> Optional[ProgramDb]: - select_stmt = select(ProgramDb).where(ProgramDb.item_hash == item_hash) - return session.execute(select_stmt).scalar_one_or_none() - - -def is_program_amend_allowed(session: DbSession, program_hash: str) -> Optional[bool]: - select_stmt = ( - select(ProgramDb.allow_amend) - .select_from(ProgramVersionDb) - .join(ProgramDb, ProgramVersionDb.current_version == ProgramDb.item_hash) - .where(ProgramVersionDb.program_hash == program_hash) - ) - return session.execute(select_stmt).scalar_one_or_none() - - -def _delete_program(session: DbSession, where) -> Iterable[str]: - # Deletion of volumes is managed automatically by the DB - # using an "on delete cascade" foreign key. - return session.execute( - delete(ProgramDb).where(where).returning(ProgramDb.item_hash) - ).scalars() - - -def delete_program(session: DbSession, item_hash: str) -> None: - _ = _delete_program(session=session, where=ProgramDb.item_hash == item_hash) - - -def delete_program_updates(session: DbSession, program_hash: str) -> Iterable[str]: - return _delete_program(session=session, where=ProgramDb.replaces == program_hash) - - -def upsert_program_version( - session: DbSession, - program_hash: str, - owner: str, - current_version: ProgramVersion, - last_updated: dt.datetime, -) -> None: - insert_stmt = insert(ProgramVersionDb).values( - program_hash=program_hash, - owner=owner, - current_version=current_version, - last_updated=last_updated, - ) - upsert_stmt = insert_stmt.on_conflict_do_update( - constraint="program_versions_pkey", - set_={"current_version": current_version, "last_updated": last_updated}, - where=ProgramVersionDb.last_updated < last_updated, - ) - session.execute(upsert_stmt) - - -def refresh_program_version(session: DbSession, program_hash: str) -> None: - coalesced_ref = func.coalesce(ProgramDb.replaces, ProgramDb.item_hash) - select_latest_revision_stmt = ( - select( - coalesced_ref.label("replaces"), - func.max(ProgramDb.created).label("created"), - ).group_by(coalesced_ref) - ).subquery() - select_latest_program_version_stmt = ( - select( - coalesced_ref, - ProgramDb.owner, - ProgramDb.item_hash, - ProgramDb.created, - ) - .join( - select_latest_revision_stmt, - (coalesced_ref == select_latest_revision_stmt.c.replaces) - & (ProgramDb.created == select_latest_revision_stmt.c.created), - ) - .where(coalesced_ref == program_hash) - ) - - insert_stmt = insert(ProgramVersionDb).from_select( - ["program_hash", "owner", "current_version", "last_updated"], - select_latest_program_version_stmt, - ) - upsert_stmt = insert_stmt.on_conflict_do_update( - constraint="program_versions_pkey", - set_={ - "current_version": insert_stmt.excluded.current_version, - "last_updated": insert_stmt.excluded.last_updated, - }, - ) - session.execute( - delete(ProgramVersionDb).where(ProgramVersionDb.program_hash == program_hash) - ) - session.execute(upsert_stmt) diff --git a/src/aleph/db/accessors/vms.py b/src/aleph/db/accessors/vms.py new file mode 100644 index 000000000..baf844646 --- /dev/null +++ b/src/aleph/db/accessors/vms.py @@ -0,0 +1,115 @@ +import datetime as dt +from typing import Optional, Iterable + +from sqlalchemy import delete, func, select +from sqlalchemy.dialects.postgresql import insert + +from aleph.db.models.vms import ( + VmBaseDb, + VmVersionDb, + ProgramDb, + VmInstanceDb, +) +from aleph.types.db_session import DbSession +from aleph.types.vms import VmVersion + + +def get_instance(session: DbSession, item_hash: str) -> Optional[VmInstanceDb]: + select_stmt = select(VmInstanceDb).where(VmInstanceDb.item_hash == item_hash) + return session.execute(select_stmt).scalar_one_or_none() + + +def get_program(session: DbSession, item_hash: str) -> Optional[ProgramDb]: + select_stmt = select(ProgramDb).where(ProgramDb.item_hash == item_hash) + return session.execute(select_stmt).scalar_one_or_none() + + +def is_vm_amend_allowed(session: DbSession, vm_hash: str) -> Optional[bool]: + select_stmt = ( + select(VmBaseDb.allow_amend) + .select_from(VmVersionDb) + .join(VmBaseDb, VmVersionDb.current_version == VmBaseDb.item_hash) + .where(VmVersionDb.vm_hash == vm_hash) + ) + return session.execute(select_stmt).scalar_one_or_none() + + +def _delete_vm(session: DbSession, where) -> Iterable[str]: + # Deletion of volumes is managed automatically by the DB + # using an "on delete cascade" foreign key. + return session.execute( + delete(VmBaseDb).where(where).returning(VmBaseDb.item_hash) + ).scalars() + + +def delete_vm(session: DbSession, vm_hash: str) -> None: + _ = _delete_vm(session=session, where=VmBaseDb.item_hash == vm_hash) + + +def delete_vm_updates(session: DbSession, vm_hash: str) -> Iterable[str]: + return _delete_vm(session=session, where=VmBaseDb.replaces == vm_hash) + + +def get_vm_version(session: DbSession, vm_hash: str) -> Optional[VmVersionDb]: + return session.execute( + select(VmVersionDb).where(VmVersionDb.vm_hash == vm_hash) + ).scalar_one_or_none() + + +def upsert_vm_version( + session: DbSession, + vm_hash: str, + owner: str, + current_version: VmVersion, + last_updated: dt.datetime, +) -> None: + insert_stmt = insert(VmVersionDb).values( + vm_hash=vm_hash, + owner=owner, + current_version=current_version, + last_updated=last_updated, + ) + upsert_stmt = insert_stmt.on_conflict_do_update( + constraint="program_versions_pkey", + set_={"current_version": current_version, "last_updated": last_updated}, + where=VmVersionDb.last_updated < last_updated, + ) + session.execute(upsert_stmt) + + +def refresh_vm_version(session: DbSession, vm_hash: str) -> None: + coalesced_ref = func.coalesce(VmBaseDb.replaces, VmBaseDb.item_hash) + select_latest_revision_stmt = ( + select( + coalesced_ref.label("replaces"), + func.max(VmBaseDb.created).label("created"), + ).group_by(coalesced_ref) + ).subquery() + select_latest_program_version_stmt = ( + select( + coalesced_ref, + VmBaseDb.owner, + VmBaseDb.item_hash, + VmBaseDb.created, + ) + .join( + select_latest_revision_stmt, + (coalesced_ref == select_latest_revision_stmt.c.replaces) + & (VmBaseDb.created == select_latest_revision_stmt.c.created), + ) + .where(coalesced_ref == vm_hash) + ) + + insert_stmt = insert(VmVersionDb).from_select( + ["vm_hash", "owner", "current_version", "last_updated"], + select_latest_program_version_stmt, + ) + upsert_stmt = insert_stmt.on_conflict_do_update( + constraint="program_versions_pkey", + set_={ + "current_version": insert_stmt.excluded.current_version, + "last_updated": insert_stmt.excluded.last_updated, + }, + ) + session.execute(delete(VmVersionDb).where(VmVersionDb.vm_hash == vm_hash)) + session.execute(upsert_stmt) diff --git a/src/aleph/db/models/__init__.py b/src/aleph/db/models/__init__.py index 9a927ad28..3ecbd37ae 100644 --- a/src/aleph/db/models/__init__.py +++ b/src/aleph/db/models/__init__.py @@ -9,4 +9,4 @@ from .pending_messages import * from .pending_txs import * from .posts import * -from .programs import * +from .vms import * diff --git a/src/aleph/db/models/messages.py b/src/aleph/db/models/messages.py index 2abb92570..ed3d13f02 100644 --- a/src/aleph/db/models/messages.py +++ b/src/aleph/db/models/messages.py @@ -10,7 +10,7 @@ ForgetContent, PostContent, ProgramContent, - StoreContent, + StoreContent, InstanceContent, ) from pydantic import ValidationError from pydantic.error_wrappers import ErrorWrapper @@ -38,6 +38,7 @@ CONTENT_TYPE_MAP: Dict[MessageType, Type[BaseContent]] = { MessageType.aggregate: AggregateContent, MessageType.forget: ForgetContent, + MessageType.instance: InstanceContent, MessageType.post: PostContent, MessageType.program: ProgramContent, MessageType.store: StoreContent, diff --git a/src/aleph/db/models/programs.py b/src/aleph/db/models/vms.py similarity index 64% rename from src/aleph/db/models/programs.py rename to src/aleph/db/models/vms.py index b850ab9aa..e992a467c 100644 --- a/src/aleph/db/models/programs.py +++ b/src/aleph/db/models/vms.py @@ -1,22 +1,23 @@ import datetime as dt from typing import Any, Optional, Dict, List -from aleph_message.models.program import MachineType, Encoding, VolumePersistence +from aleph_message.models.execution.program import MachineType, Encoding +from aleph_message.models.execution.volume import VolumePersistence from sqlalchemy import Column, String, ForeignKey, Boolean, Integer, TIMESTAMP from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import relationship, declared_attr, Mapped from sqlalchemy_utils import ChoiceType -from aleph.types.vms import CpuArchitecture, ProgramVersion +from aleph.types.vms import CpuArchitecture, VmVersion, VmType from .base import Base -class RootVolumeMixin: +class ProgramVolumeMixin: @declared_attr def program_hash(cls) -> Mapped[str]: return Column( "program_hash", - ForeignKey("programs.item_hash", ondelete="CASCADE"), + ForeignKey("vms.item_hash", ondelete="CASCADE"), primary_key=True, ) @@ -28,21 +29,37 @@ class VolumeWithRefMixin: use_latest: bool = Column(Boolean, nullable=True) -class CodeVolumeDb(Base, RootVolumeMixin, VolumeWithRefMixin): +class RootfsVolumeDb(Base): + __tablename__ = "instance_rootfs" + + instance_hash: str = Column( + ForeignKey("vms.item_hash", ondelete="CASCADE"), primary_key=True + ) + parent: Optional[str] = Column(String, nullable=True) + size_mib: int = Column(Integer, nullable=False) + persistence: VolumePersistence = Column( + ChoiceType(VolumePersistence), nullable=False + ) + comment: Optional[str] = Column(String, nullable=True) + + instance: "VmInstanceDb" = relationship("VmInstanceDb", back_populates="rootfs") + + +class CodeVolumeDb(Base, ProgramVolumeMixin, VolumeWithRefMixin): __tablename__ = "program_code_volumes" entrypoint: str = Column(String, nullable=False) program: "ProgramDb" = relationship("ProgramDb", back_populates="code_volume") -class DataVolumeDb(Base, RootVolumeMixin, VolumeWithRefMixin): +class DataVolumeDb(Base, ProgramVolumeMixin, VolumeWithRefMixin): __tablename__ = "program_data_volumes" mount: str = Column(String, nullable=False) program: "ProgramDb" = relationship("ProgramDb", back_populates="data_volume") -class ExportVolumeDb(Base, RootVolumeMixin): +class ExportVolumeDb(Base, ProgramVolumeMixin): __tablename__ = "program_export_volumes" program: "ProgramDb" = relationship("ProgramDb", back_populates="export_volume") @@ -52,25 +69,25 @@ class RuntimeDb(Base, VolumeWithRefMixin): __tablename__ = "program_runtimes" program_hash: Mapped[str] = Column( - ForeignKey("programs.item_hash", ondelete="CASCADE"), primary_key=True + ForeignKey("vms.item_hash", ondelete="CASCADE"), primary_key=True ) comment: str = Column(String, nullable=False) program: "ProgramDb" = relationship("ProgramDb", back_populates="runtime") class MachineVolumeBaseDb(Base): - __tablename__ = "program_machine_volumes" + __tablename__ = "vm_machine_volumes" id: int = Column(Integer, primary_key=True) type: str = Column(String, nullable=False) - program_hash: str = Column( - ForeignKey("programs.item_hash", ondelete="CASCADE"), nullable=False, index=True + vm_hash: str = Column( + ForeignKey("vms.item_hash", ondelete="CASCADE"), nullable=False, index=True ) comment: Optional[str] = Column(String, nullable=True) mount: Optional[str] = Column(String, nullable=True) size_mib: int = Column(Integer, nullable=True) - program: "ProgramDb" = relationship("ProgramDb", back_populates="volumes") + vm: "VmBaseDb" = relationship("VmBaseDb", back_populates="volumes") __mapper_args__: Dict[str, Any] = { "polymorphic_on": type, @@ -86,6 +103,7 @@ class EphemeralVolumeDb(MachineVolumeBaseDb): class PersistentVolumeDb(MachineVolumeBaseDb): + parent: Optional[str] = Column(String, nullable=True) persistence: VolumePersistence = Column( ChoiceType(VolumePersistence), nullable=True ) @@ -94,20 +112,19 @@ class PersistentVolumeDb(MachineVolumeBaseDb): __mapper_args__ = {"polymorphic_identity": "persistent"} -class ProgramDb(Base): - __tablename__ = "programs" +class VmBaseDb(Base): + __tablename__ = "vms" item_hash: str = Column(String, primary_key=True) owner: str = Column(String, nullable=False, index=True) - type: MachineType = Column(ChoiceType(MachineType), nullable=False) + type: VmType = Column(ChoiceType(VmType), nullable=False) + allow_amend: bool = Column(Boolean, nullable=False) # Note: metadata is a reserved keyword for SQLAlchemy metadata_: Optional[Dict[str, Any]] = Column("metadata", JSONB, nullable=True) variables: Optional[Dict[str, Any]] = Column(JSONB, nullable=True) - http_trigger: bool = Column(Boolean, nullable=False) message_triggers: Optional[List[Dict[str, Any]]] = Column(JSONB, nullable=True) - persistent: bool = Column(Boolean, nullable=False) environment_reproducible: bool = Column(Boolean, nullable=False) environment_internet: bool = Column(Boolean, nullable=False) @@ -128,6 +145,36 @@ class ProgramDb(Base): replaces: Optional[str] = Column(ForeignKey(item_hash), nullable=True) created: dt.datetime = Column(TIMESTAMP(timezone=True), nullable=False) + __mapper_args__: Dict[str, Any] = { + "polymorphic_on": type, + } + + volumes: List[MachineVolumeBaseDb] = relationship( + MachineVolumeBaseDb, back_populates="vm", uselist=True + ) + + +class VmInstanceDb(VmBaseDb): + __mapper_args__ = { + "polymorphic_identity": VmType.INSTANCE.value, + } + + cloud_config: Dict[str, Any] = Column(JSONB, nullable=True) + + rootfs: RootfsVolumeDb = relationship( + "RootfsVolumeDb", back_populates="instance", uselist=False + ) + + +class ProgramDb(VmBaseDb): + __mapper_args__ = { + "polymorphic_identity": VmType.PROGRAM.value, + } + + program_type: MachineType = Column(ChoiceType(MachineType), nullable=True) + http_trigger: bool = Column(Boolean, nullable=True) + persistent: bool = Column(Boolean, nullable=True) + code_volume: CodeVolumeDb = relationship( "CodeVolumeDb", back_populates="program", @@ -146,15 +193,12 @@ class ProgramDb(Base): back_populates="program", uselist=False, ) - volumes: List[MachineVolumeBaseDb] = relationship( - MachineVolumeBaseDb, back_populates="program", uselist=True - ) -class ProgramVersionDb(Base): - __tablename__ = "program_versions" +class VmVersionDb(Base): + __tablename__ = "vm_versions" - program_hash: str = Column(String, primary_key=True) + vm_hash: str = Column(String, primary_key=True) owner: str = Column(String, nullable=False) - current_version: ProgramVersion = Column(String, nullable=False) + current_version: VmVersion = Column(String, nullable=False) last_updated: dt.datetime = Column(TIMESTAMP(timezone=True), nullable=False) diff --git a/src/aleph/handlers/content/program.py b/src/aleph/handlers/content/vm.py similarity index 53% rename from src/aleph/handlers/content/program.py rename to src/aleph/handlers/content/vm.py index 4f5e9d5a9..d2fcb6664 100644 --- a/src/aleph/handlers/content/program.py +++ b/src/aleph/handlers/content/vm.py @@ -1,8 +1,8 @@ import logging -from typing import List, Set +from typing import List, Set, overload -from aleph_message.models import ProgramContent -from aleph_message.models.program import ( +from aleph_message.models import ProgramContent, ExecutableContent, InstanceContent +from aleph_message.models.execution.volume import ( AbstractVolume, ImmutableVolume, EphemeralVolume, @@ -10,25 +10,28 @@ ) from aleph.db.accessors.files import find_file_tags, find_file_pins -from aleph.db.accessors.programs import ( - delete_program, +from aleph.db.accessors.vms import ( + delete_vm, get_program, - upsert_program_version, - delete_program_updates, - refresh_program_version, - is_program_amend_allowed, + upsert_vm_version, + delete_vm_updates, + refresh_vm_version, + is_vm_amend_allowed, ) from aleph.db.models import ( MessageDb, CodeVolumeDb, DataVolumeDb, ExportVolumeDb, - ProgramDb, MachineVolumeBaseDb, ImmutableVolumeDb, EphemeralVolumeDb, PersistentVolumeDb, RuntimeDb, + VmInstanceDb, + ProgramDb, + RootfsVolumeDb, + VmBaseDb, ) from aleph.handlers.content.content_handler import ContentHandler from aleph.toolkit.timestamp import timestamp_to_datetime @@ -42,73 +45,35 @@ ProgramUpdateNotAllowed, ProgramCannotUpdateUpdate, ) -from aleph.types.vms import ProgramVersion - +from aleph.types.vms import VmVersion LOGGER = logging.getLogger(__name__) -def _get_program_content(message: MessageDb) -> ProgramContent: +def _get_vm_content(message: MessageDb) -> ExecutableContent: content = message.parsed_content - if not isinstance(content, ProgramContent): + if not isinstance(content, ExecutableContent): raise InvalidMessageFormat( f"Unexpected content type for program message: {message.item_hash}" ) return content -def map_volume(volume: AbstractVolume) -> MachineVolumeBaseDb: - comment = volume.comment - mount = volume.mount +@overload +def _map_content_to_db_model(item_hash: str, content: InstanceContent) -> VmInstanceDb: + ... - if isinstance(volume, ImmutableVolume): - return ImmutableVolumeDb( - comment=comment, mount=mount, ref=volume.ref, use_latest=volume.use_latest - ) - elif isinstance(volume, EphemeralVolume): - return EphemeralVolumeDb(comment=comment, mount=mount, size_mib=volume.size_mib) - elif isinstance(volume, PersistentVolume): - return PersistentVolumeDb( - comment=comment, - mount=mount, - persistence=volume.persistence, - name=volume.name, - size_mib=volume.size_mib, - ) - else: - raise InternalError(f"Unsupported volume type: {volume.__class__.__name__}") - - -def program_message_to_db(message: MessageDb): - content = _get_program_content(message) - - code_volume = CodeVolumeDb( - encoding=content.code.encoding, - entrypoint=content.code.entrypoint, - ref=content.code.ref, - use_latest=content.code.use_latest, - ) - runtime = RuntimeDb( - ref=content.runtime.ref, - use_latest=content.runtime.use_latest, - comment=content.runtime.comment, - ) +# For some reason, mypy is not happy with the overload resolution here. +# This seems linked to multiple inheritance of Pydantic base models, a deeper investigation +# is required. +@overload +def _map_content_to_db_model(item_hash: str, content: ProgramContent) -> ProgramDb: # type: ignore[misc] + ... - if content.data: - data_volume = DataVolumeDb( - encoding=content.data.encoding, - mount=content.data.mount, - ref=content.data.ref, - use_latest=content.data.use_latest, - ) - else: - data_volume = None - if content.export: - export_volume = ExportVolumeDb(encoding=content.export.encoding) - else: - export_volume = None +def _map_content_to_db_model(item_hash, content): + db_cls = ProgramDb if isinstance(content, ProgramContent) else VmInstanceDb volumes = [map_volume(volume) for volume in content.volumes] @@ -126,21 +91,12 @@ def program_message_to_db(message: MessageDb): node_owner = node.owner node_address_regex = node.address_regex - if content.on.message: - message_triggers = [subscription.dict() for subscription in content.on.message] - else: - message_triggers = None - - program = ProgramDb( + return db_cls( owner=content.address, - item_hash=message.item_hash, - type=content.type, + item_hash=item_hash, allow_amend=content.allow_amend, metadata_=content.metadata, variables=content.variables, - http_trigger=content.on.http, - message_triggers=message_triggers, - persistent=bool(content.on.persistent), environment_reproducible=content.environment.reproducible, environment_internet=content.environment.internet, environment_aleph_api=content.environment.aleph_api, @@ -152,17 +108,89 @@ def program_message_to_db(message: MessageDb): cpu_vendor=cpu_vendor, node_owner=node_owner, node_address_regex=node_address_regex, - code_volume=code_volume, - runtime=runtime, - data_volume=data_volume, - export_volume=export_volume, volumes=volumes, created=timestamp_to_datetime(content.time), ) - return program -def find_missing_volumes(session: DbSession, content: ProgramContent) -> Set[FileTag]: +def map_volume(volume: AbstractVolume) -> MachineVolumeBaseDb: + comment = volume.comment + mount = volume.mount + + if isinstance(volume, ImmutableVolume): + return ImmutableVolumeDb( + comment=comment, mount=mount, ref=volume.ref, use_latest=volume.use_latest + ) + elif isinstance(volume, EphemeralVolume): + return EphemeralVolumeDb(comment=comment, mount=mount, size_mib=volume.size_mib) + elif isinstance(volume, PersistentVolume): + return PersistentVolumeDb( + comment=comment, + mount=mount, + persistence=volume.persistence, + name=volume.name, + size_mib=volume.size_mib, + ) + else: + raise InternalError(f"Unsupported volume type: {volume.__class__.__name__}") + + +def vm_message_to_db(message: MessageDb) -> VmBaseDb: + content = _get_vm_content(message) + vm = _map_content_to_db_model(message.item_hash, content) + + if isinstance(vm, ProgramDb): + vm.program_type = content.type + vm.persistent = bool(content.on.persistent) + vm.http_trigger = content.on.http + + if content.on.message: + vm.message_triggers = [ + subscription.dict() for subscription in content.on.message + ] + + vm.code_volume = CodeVolumeDb( + encoding=content.code.encoding, + entrypoint=content.code.entrypoint, + ref=content.code.ref, + use_latest=content.code.use_latest, + ) + + vm.runtime = RuntimeDb( + ref=content.runtime.ref, + use_latest=content.runtime.use_latest, + comment=content.runtime.comment, + ) + + if content.data: + vm.data_volume = DataVolumeDb( + encoding=content.data.encoding, + mount=content.data.mount, + ref=content.data.ref, + use_latest=content.data.use_latest, + ) + + if content.export: + vm.export_volume = ExportVolumeDb(encoding=content.export.encoding) + + elif isinstance(content, InstanceContent): + vm.rootfs = RootfsVolumeDb( + parent=content.rootfs.parent, + size_mib=content.rootfs.size_mib, + persistence=content.rootfs.persistence, + comment=content.rootfs.comment, + ) + vm.cloud_config = content.cloud_config + + else: + raise TypeError(f"Unexpected VM message content type: {type(content)}") + + return vm + + +def find_missing_volumes( + session: DbSession, content: ExecutableContent +) -> Set[FileTag]: tags_to_check = set() pins_to_check = set() @@ -172,15 +200,25 @@ def add_ref_to_check(_volume): else: pins_to_check.add(_volume.ref) - add_ref_to_check(content.code) - add_ref_to_check(content.runtime) - if content.data: - add_ref_to_check(content.data) + if isinstance(content, ProgramContent): + add_ref_to_check(content.code) + add_ref_to_check(content.runtime) + if content.data: + add_ref_to_check(content.data) + + elif isinstance(content, InstanceContent): + if rootfs_parent := content.rootfs.parent: + tags_to_check.add(FileTag(rootfs_parent)) for volume in content.volumes: if isinstance(volume, ImmutableVolume): add_ref_to_check(volume) + if isinstance(volume, PersistentVolume): + # Assume `use_latest` for persistent volume parents + if parent := volume.parent: + tags_to_check.add(FileTag(parent)) + # For each volume, if use_latest is set check the tags and otherwise check # the file pins. @@ -190,9 +228,17 @@ def add_ref_to_check(_volume): return (pins_to_check - file_pins_db) | (tags_to_check - file_tags_db) -class ProgramMessageHandler(ContentHandler): +class VmMessageHandler(ContentHandler): + """ + Handles both PROGRAM and INSTANCE messages. + + The implementation for both is very similar, making it simpler to implement both + in the same handler. + + """ + async def check_dependencies(self, session: DbSession, message: MessageDb) -> None: - content = _get_program_content(message) + content = _get_vm_content(message) missing_volumes = find_missing_volumes(session=session, content=content) if missing_volumes: @@ -206,9 +252,7 @@ async def check_dependencies(self, session: DbSession, message: MessageDb) -> No if original_program.replaces is not None: raise ProgramCannotUpdateUpdate() - is_amend_allowed = is_program_amend_allowed( - session=session, program_hash=ref - ) + is_amend_allowed = is_vm_amend_allowed(session=session, vm_hash=ref) if is_amend_allowed is None: raise InternalError(f"Could not find current version of program {ref}") @@ -216,36 +260,37 @@ async def check_dependencies(self, session: DbSession, message: MessageDb) -> No raise ProgramUpdateNotAllowed() @staticmethod - async def process_program_message(session: DbSession, message: MessageDb): - program = program_message_to_db(message) - session.add(program) + async def process_vm_message(session: DbSession, message: MessageDb): + vm = vm_message_to_db(message) + session.add(vm) - program_ref = program.replaces or program.item_hash - upsert_program_version( + program_ref = vm.replaces or vm.item_hash + upsert_vm_version( session=session, - program_hash=program.item_hash, - owner=program.owner, - current_version=ProgramVersion(program_ref), - last_updated=program.created, + vm_hash=vm.item_hash, + owner=vm.owner, + current_version=VmVersion(program_ref), + last_updated=vm.created, ) async def process(self, session: DbSession, messages: List[MessageDb]) -> None: for message in messages: - await self.process_program_message(session=session, message=message) + await self.process_vm_message(session=session, message=message) async def forget_message(self, session: DbSession, message: MessageDb) -> Set[str]: - content = _get_program_content(message) + content = _get_vm_content(message) LOGGER.debug("Deleting program %s...", message.item_hash) - delete_program(session=session, item_hash=message.item_hash) + delete_vm(session=session, vm_hash=message.item_hash) if content.replaces: update_hashes = set() - refresh_program_version(session=session, program_hash=message.item_hash) else: update_hashes = set( - delete_program_updates(session=session, program_hash=message.item_hash) + delete_vm_updates(session=session, vm_hash=message.item_hash) ) + refresh_vm_version(session=session, vm_hash=message.item_hash) + return update_hashes diff --git a/src/aleph/handlers/message_handler.py b/src/aleph/handlers/message_handler.py index 35b5a82a9..96983a488 100644 --- a/src/aleph/handlers/message_handler.py +++ b/src/aleph/handlers/message_handler.py @@ -33,7 +33,7 @@ from aleph.handlers.content.content_handler import ContentHandler from aleph.handlers.content.forget import ForgetMessageHandler from aleph.handlers.content.post import PostMessageHandler -from aleph.handlers.content.program import ProgramMessageHandler +from aleph.handlers.content.vm import VmMessageHandler from aleph.handlers.content.store import StoreMessageHandler from aleph.schemas.pending_messages import parse_message from aleph.storage import StorageService @@ -66,13 +66,16 @@ def __init__( self.chain_service = chain_service self.storage_service = storage_service + vm_handler = VmMessageHandler() + self.content_handlers = { MessageType.aggregate: AggregateMessageHandler(), + MessageType.instance: vm_handler, MessageType.post: PostMessageHandler( balances_addresses=config.aleph.balances.addresses.value, balances_post_type=config.aleph.balances.post_type.value, ), - MessageType.program: ProgramMessageHandler(), + MessageType.program: vm_handler, MessageType.store: StoreMessageHandler(storage_service=storage_service), } diff --git a/src/aleph/types/message_status.py b/src/aleph/types/message_status.py index 0413f5c26..f9d5ff669 100644 --- a/src/aleph/types/message_status.py +++ b/src/aleph/types/message_status.py @@ -36,10 +36,10 @@ class ErrorCode(IntEnum): POST_AMEND_AMEND = 102 STORE_REF_NOT_FOUND = 200 STORE_UPDATE_UPDATE = 201 - PROGRAM_REF_NOT_FOUND = 300 - PROGRAM_VOLUME_NOT_FOUND = 301 - PROGRAM_AMEND_NOT_ALLOWED = 302 - PROGRAM_UPDATE_UPDATE = 303 + VM_REF_NOT_FOUND = 300 + VM_VOLUME_NOT_FOUND = 301 + VM_AMEND_NOT_ALLOWED = 302 + VM_UPDATE_UPDATE = 303 FORGET_NO_TARGET = 500 FORGET_TARGET_NOT_FOUND = 501 FORGET_FORGET = 502 @@ -194,7 +194,7 @@ class ProgramRefNotFound(RetryMessageException): The original program specified in the `ref` field could not be found. """ - error_code = ErrorCode.PROGRAM_REF_NOT_FOUND + error_code = ErrorCode.VM_REF_NOT_FOUND class ProgramVolumeNotFound(RetryMessageException): @@ -202,7 +202,7 @@ class ProgramVolumeNotFound(RetryMessageException): One or more volume files could not be found. """ - error_code = ErrorCode.PROGRAM_VOLUME_NOT_FOUND + error_code = ErrorCode.VM_VOLUME_NOT_FOUND class ProgramUpdateNotAllowed(InvalidMessageException): @@ -211,7 +211,7 @@ class ProgramUpdateNotAllowed(InvalidMessageException): is set to False. """ - error_code = ErrorCode.PROGRAM_AMEND_NOT_ALLOWED + error_code = ErrorCode.VM_AMEND_NOT_ALLOWED class ProgramCannotUpdateUpdate(InvalidMessageException): @@ -220,7 +220,7 @@ class ProgramCannotUpdateUpdate(InvalidMessageException): itself. Update trees are not supported. """ - error_code = ErrorCode.PROGRAM_UPDATE_UPDATE + error_code = ErrorCode.VM_UPDATE_UPDATE class ForgetTargetNotFound(RetryMessageException): diff --git a/src/aleph/types/vms.py b/src/aleph/types/vms.py index c83937c87..009e80ddf 100644 --- a/src/aleph/types/vms.py +++ b/src/aleph/types/vms.py @@ -1,7 +1,12 @@ from enum import Enum from typing import NewType -ProgramVersion = NewType("ProgramVersion", str) +VmVersion = NewType("VmVersion", str) + + +class VmType(str, Enum): + INSTANCE = "instance" + PROGRAM = "program" class CpuArchitecture(str, Enum): diff --git a/tests/db/test_programs_db.py b/tests/db/test_programs_db.py index e063c6c85..f48267b7c 100644 --- a/tests/db/test_programs_db.py +++ b/tests/db/test_programs_db.py @@ -5,28 +5,29 @@ import pytest import pytz from aleph_message.models import ItemHash -from aleph_message.models.program import MachineType, Encoding, VolumePersistence +from aleph_message.models.execution.program import MachineType, Encoding +from aleph_message.models.execution.volume import VolumePersistence from sqlalchemy import select -from aleph.db.accessors.programs import ( +from aleph.db.accessors.vms import ( get_program, - is_program_amend_allowed, - refresh_program_version, - delete_program, + is_vm_amend_allowed, + refresh_vm_version, + delete_vm, ) from aleph.db.models import ( - ProgramDb, + VmBaseDb, CodeVolumeDb, RuntimeDb, - ProgramVersionDb, + VmVersionDb, DataVolumeDb, ExportVolumeDb, ImmutableVolumeDb, EphemeralVolumeDb, - PersistentVolumeDb, + PersistentVolumeDb, ProgramDb, ) from aleph.types.db_session import DbSessionFactory -from aleph.types.vms import ProgramVersion +from aleph.types.vms import VmVersion @pytest.fixture @@ -49,7 +50,7 @@ def original_program() -> ProgramDb: program = ProgramDb( item_hash=program_hash, owner="0xabadbabe", - type=MachineType.vm_function, + program_type=MachineType.vm_function, allow_amend=True, metadata_=None, variables=None, @@ -186,19 +187,19 @@ def test_program_accessors( session.add(original_program) session.add(program_update) session.add( - ProgramVersionDb( - program_hash=original_program.item_hash, + VmVersionDb( + vm_hash=original_program.item_hash, owner=original_program.owner, - current_version=ProgramVersion(program_update.item_hash), + current_version=VmVersion(program_update.item_hash), last_updated=program_update.created, ) ) session.add(program_with_many_volumes) session.add( - ProgramVersionDb( - program_hash=program_with_many_volumes.item_hash, + VmVersionDb( + vm_hash=program_with_many_volumes.item_hash, owner=program_with_many_volumes.owner, - current_version=ProgramVersion(program_with_many_volumes.item_hash), + current_version=VmVersion(program_with_many_volumes.item_hash), last_updated=program_with_many_volumes.created, ) ) @@ -225,13 +226,13 @@ def test_program_accessors( expected=program_with_many_volumes_db, actual=program_with_many_volumes ) - is_amend_allowed = is_program_amend_allowed( - session=session, program_hash=original_program.item_hash + is_amend_allowed = is_vm_amend_allowed( + session=session, vm_hash=original_program.item_hash ) assert is_amend_allowed is False - is_amend_allowed = is_program_amend_allowed( - session=session, program_hash=program_with_many_volumes.item_hash + is_amend_allowed = is_vm_amend_allowed( + session=session, vm_hash=program_with_many_volumes.item_hash ) assert is_amend_allowed is True @@ -243,11 +244,9 @@ def test_refresh_program( ): program_hash = original_program.item_hash - def get_program_version(session) -> Optional[ProgramVersionDb]: + def get_program_version(session) -> Optional[VmVersionDb]: return session.execute( - select(ProgramVersionDb).where( - ProgramVersionDb.program_hash == program_hash - ) + select(VmVersionDb).where(VmVersionDb.vm_hash == program_hash) ).scalar_one_or_none() # Insert program version with refresh_program_version @@ -255,7 +254,7 @@ def get_program_version(session) -> Optional[ProgramVersionDb]: session.add(original_program) session.commit() - refresh_program_version(session=session, program_hash=program_hash) + refresh_vm_version(session=session, vm_hash=program_hash) session.commit() program_version_db = get_program_version(session) @@ -268,7 +267,7 @@ def get_program_version(session) -> Optional[ProgramVersionDb]: session.add(program_update) session.commit() - refresh_program_version(session=session, program_hash=program_hash) + refresh_vm_version(session=session, vm_hash=program_hash) session.commit() program_version_db = get_program_version(session) @@ -278,10 +277,10 @@ def get_program_version(session) -> Optional[ProgramVersionDb]: # Delete the update, the original should be back in program_versions with session_factory() as session: - delete_program(session=session, item_hash=program_update.item_hash) + delete_vm(session=session, vm_hash=program_update.item_hash) session.commit() - refresh_program_version(session=session, program_hash=program_hash) + refresh_vm_version(session=session, vm_hash=program_hash) session.commit() program_version_db = get_program_version(session) @@ -291,10 +290,10 @@ def get_program_version(session) -> Optional[ProgramVersionDb]: # Delete the original, no entry should be left in program_versions with session_factory() as session: - delete_program(session=session, item_hash=original_program.item_hash) + delete_vm(session=session, vm_hash=original_program.item_hash) session.commit() - refresh_program_version(session=session, program_hash=program_hash) + refresh_vm_version(session=session, vm_hash=program_hash) session.commit() program_version_db = get_program_version(session) diff --git a/tests/message_processing/test_process_forgets.py b/tests/message_processing/test_process_forgets.py index 6be4b8c4f..29b9e0a40 100644 --- a/tests/message_processing/test_process_forgets.py +++ b/tests/message_processing/test_process_forgets.py @@ -20,7 +20,7 @@ from aleph.handlers.content.aggregate import AggregateMessageHandler from aleph.handlers.content.forget import ForgetMessageHandler from aleph.handlers.content.post import PostMessageHandler -from aleph.handlers.content.program import ProgramMessageHandler +from aleph.handlers.content.vm import VmMessageHandler from aleph.handlers.content.store import StoreMessageHandler from aleph.jobs.process_pending_messages import PendingMessageProcessor from aleph.toolkit.timestamp import timestamp_to_datetime @@ -42,7 +42,7 @@ def forget_handler(mocker) -> ForgetMessageHandler: balances_addresses=["nope"], balances_post_type="no-balances-in-tests", ), - MessageType.program: ProgramMessageHandler(), + MessageType.program: VmMessageHandler(), MessageType.store: StoreMessageHandler(storage_service=mocker.AsyncMock()), } return ForgetMessageHandler(content_handlers=content_handlers) diff --git a/tests/message_processing/test_process_instances.py b/tests/message_processing/test_process_instances.py new file mode 100644 index 000000000..a0ffbb05a --- /dev/null +++ b/tests/message_processing/test_process_instances.py @@ -0,0 +1,380 @@ +import datetime as dt +import itertools +import json +from typing import List, Protocol + +import pytest +import pytz +from aleph_message.models import ( + ItemType, + Chain, + MessageType, + InstanceContent, + ExecutableContent, + ForgetContent, +) +from aleph_message.models.execution.program import ProgramContent +from aleph_message.models.execution.volume import ImmutableVolume +from more_itertools import one + +from aleph.db.accessors.files import ( + insert_message_file_pin, + upsert_file_tag, +) +from aleph.db.accessors.messages import get_message_status, get_rejected_message +from aleph.db.accessors.vms import get_instance, get_vm_version +from aleph.db.models import ( + PendingMessageDb, + MessageStatusDb, + ImmutableVolumeDb, + EphemeralVolumeDb, + PersistentVolumeDb, + StoredFileDb, +) +from aleph.jobs.process_pending_messages import PendingMessageProcessor +from aleph.toolkit.timestamp import timestamp_to_datetime +from aleph.types.db_session import DbSessionFactory, DbSession +from aleph.types.files import FileTag, FileType +from aleph.types.message_status import MessageStatus, ErrorCode + + +@pytest.fixture +def fixture_instance_message(session_factory: DbSessionFactory) -> PendingMessageDb: + content = { + "address": "0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba", + "allow_amend": False, + "variables": { + "VM_CUSTOM_VARIABLE": "SOMETHING", + "VM_CUSTOM_VARIABLE_2": "32", + }, + "environment": { + "reproducible": True, + "internet": False, + "aleph_api": False, + "shared_cache": False, + }, + "resources": {"vcpus": 1, "memory": 128, "seconds": 30}, + "requirements": {"cpu": {"architecture": "x86_64"}}, + "rootfs": { + "parent": "549ec451d9b099cad112d4aaa2c00ac40fb6729a92ff252ff22eef0b5c3cb613", + "persistence": "host", + "name": "test-rootfs", + "size_mib": 20000, + }, + "cloud_config": {"password": "password"}, + "volumes": [ + { + "comment": "Python libraries. Read-only since a 'ref' is specified.", + "mount": "/opt/venv", + "ref": "5f31b0706f59404fad3d0bff97ef89ddf24da4761608ea0646329362c662ba51", + "use_latest": False, + }, + { + "comment": "Ephemeral storage, read-write but will not persist after the VM stops", + "mount": "/var/cache", + "ephemeral": True, + "size_mib": 5, + }, + { + "comment": "Working data persisted on the VM supervisor, not available on other nodes", + "mount": "/var/lib/sqlite", + "name": "sqlite-data", + "persistence": "host", + "size_mib": 10, + }, + { + "comment": "Working data persisted on the Aleph network. " + "New VMs will try to use the latest version of this volume, " + "with no guarantee against conflicts", + "mount": "/var/lib/statistics", + "name": "statistics", + "persistence": "store", + "size_mib": 10, + }, + { + "comment": "Raw drive to use by a process, do not mount it", + "name": "raw-data", + "persistence": "host", + "size_mib": 10, + }, + ], + "time": 1619017773.8950517, + } + + pending_message = PendingMessageDb( + item_hash="734a1287a2b7b5be060312ff5b05ad1bcf838950492e3428f2ac6437a1acad26", + type=MessageType.instance, + chain=Chain.ETH, + sender="0x9319Ad3B7A8E0eE24f2E639c40D8eD124C5520Ba", + signature=None, + item_type=ItemType.inline, + item_content=json.dumps(content), + time=timestamp_to_datetime(1619017773.8950577), + channel=None, + reception_time=timestamp_to_datetime(1619017774), + fetched=True, + check_message=False, + retries=0, + next_attempt=dt.datetime(2023, 1, 1), + ) + with session_factory() as session: + session.add(pending_message) + session.add( + MessageStatusDb( + item_hash=pending_message.item_hash, + status=MessageStatus.PENDING, + reception_time=pending_message.reception_time, + ) + ) + session.commit() + + return pending_message + + +@pytest.fixture +def fixture_forget_instance_message( + fixture_instance_message: PendingMessageDb, +) -> PendingMessageDb: + content = ForgetContent( + address=fixture_instance_message.sender, + time=(fixture_instance_message.time + dt.timedelta(seconds=1)).timestamp(), + hashes=[fixture_instance_message.item_hash], + reason="Bye Felicia", + ) + + pending_message = PendingMessageDb( + item_hash="8a1497002b2fd19b6036f1ef9a652ad47f1700b3f0d380761dbd347be9178702", + type=MessageType.forget, + chain=Chain.ETH, + sender=fixture_instance_message.sender, + signature=None, + item_type=ItemType.inline, + item_content=content.json(), + time=fixture_instance_message.time + dt.timedelta(seconds=1), + channel=None, + reception_time=fixture_instance_message.reception_time + + dt.timedelta(seconds=1), + fetched=True, + check_message=False, + retries=0, + next_attempt=dt.datetime(2023, 1, 1), + ) + return pending_message + + +class Volume(Protocol): + ref: str + use_latest: bool + + +class Rootfs(Volume): + def __init__(self, parent: str): + self.ref = parent + self.use_latest = True + + +def get_volume_refs(content: ExecutableContent) -> List[Volume]: + volumes = [] + + for volume in content.volumes: + if isinstance(volume, ImmutableVolume): + volumes.append(volume) + + if isinstance(content, ProgramContent): + volumes += [content.code, content.runtime] + if content.data: + volumes.append(content.data) + + elif isinstance(content, InstanceContent): + if parent := content.rootfs.parent: + volumes.append(Rootfs(parent)) + + return volumes + + +def insert_volume_refs(session: DbSession, message: PendingMessageDb): + """ + Insert volume references in the DB to make the program processable. + """ + + content = InstanceContent.parse_raw(message.item_content) + volumes = get_volume_refs(content) + + created = pytz.utc.localize(dt.datetime(2023, 1, 1)) + + for volume in volumes: + # Note: we use the reversed ref to generate the file hash for style points, + # but it could be set to any valid hash. + file_hash = volume.ref[::-1] + + session.add(StoredFileDb(hash=file_hash, size=1024 * 1024, type=FileType.FILE)) + session.flush() + insert_message_file_pin( + session=session, + file_hash=volume.ref[::-1], + owner=content.address, + item_hash=volume.ref, + ref=None, + created=created, + ) + if volume.use_latest: + upsert_file_tag( + session=session, + tag=FileTag(volume.ref), + owner=content.address, + file_hash=volume.ref[::-1], + last_updated=created, + ) + + +@pytest.mark.asyncio +async def test_process_instance( + session_factory: DbSessionFactory, + message_processor: PendingMessageProcessor, + fixture_instance_message: PendingMessageDb, +): + with session_factory() as session: + insert_volume_refs(session, fixture_instance_message) + session.commit() + + pipeline = message_processor.make_pipeline() + # Exhaust the iterator + _ = [message async for message in pipeline] + + assert fixture_instance_message.item_content + content_dict = json.loads(fixture_instance_message.item_content) + + with session_factory() as session: + instance = get_instance( + session=session, item_hash=fixture_instance_message.item_hash + ) + assert instance is not None + + assert instance.owner == fixture_instance_message.sender + assert not instance.allow_amend + assert instance.replaces is None + + assert instance.resources_vcpus == content_dict["resources"]["vcpus"] + assert instance.resources_memory == content_dict["resources"]["memory"] + assert instance.resources_seconds == content_dict["resources"]["seconds"] + + assert instance.environment_internet == content_dict["environment"]["internet"] + assert ( + instance.environment_aleph_api == content_dict["environment"]["aleph_api"] + ) + assert ( + instance.environment_reproducible + == content_dict["environment"]["reproducible"] + ) + assert ( + instance.environment_shared_cache + == content_dict["environment"]["shared_cache"] + ) + + assert instance.variables + assert instance.variables == content_dict["variables"] + + rootfs = instance.rootfs + assert rootfs.parent == content_dict["rootfs"]["parent"] + assert rootfs.size_mib == content_dict["rootfs"]["size_mib"] + assert rootfs.persistence == content_dict["rootfs"]["persistence"] + assert rootfs.comment == content_dict["rootfs"].get("comment") + + assert len(instance.volumes) == 5 + + volumes_by_type = { + type: list(volumes_iter) + for type, volumes_iter in itertools.groupby( + sorted(instance.volumes, key=lambda v: str(v.__class__)), + key=lambda v: v.__class__, + ) + } + assert len(volumes_by_type[EphemeralVolumeDb]) == 1 + assert len(volumes_by_type[PersistentVolumeDb]) == 3 + assert len(volumes_by_type[ImmutableVolumeDb]) == 1 + + ephemeral_volume: EphemeralVolumeDb = one(volumes_by_type[EphemeralVolumeDb]) # type: ignore[assignment] + assert ephemeral_volume.mount == "/var/cache" + assert ephemeral_volume.size_mib == 5 + + instance_version = get_vm_version( + session=session, vm_hash=fixture_instance_message.item_hash + ) + assert instance_version + + assert instance_version.current_version == fixture_instance_message.item_hash + assert instance_version.owner == content_dict["address"] + + +@pytest.mark.asyncio +async def test_process_instance_missing_volumes( + session_factory: DbSessionFactory, + message_processor: PendingMessageProcessor, + fixture_instance_message, +): + """ + Check that an instance message with volumes not references in file_tags/file_pins + is rejected. + """ + + vm_hash = fixture_instance_message.item_hash + pipeline = message_processor.make_pipeline() + # Exhaust the iterator + _ = [message async for message in pipeline] + + with session_factory() as session: + instance = get_instance(session=session, item_hash=vm_hash) + assert instance is None + + message_status = get_message_status(session=session, item_hash=vm_hash) + assert message_status is not None + assert message_status.status == MessageStatus.REJECTED + + rejected_message = get_rejected_message(session=session, item_hash=vm_hash) + assert rejected_message is not None + assert rejected_message.error_code == ErrorCode.VM_VOLUME_NOT_FOUND + + content = InstanceContent.parse_raw(fixture_instance_message.item_content) + volume_refs = set(volume.ref for volume in get_volume_refs(content)) + assert isinstance(rejected_message.details, dict) + assert set(rejected_message.details["errors"]) == volume_refs + assert rejected_message.traceback is None + + +@pytest.mark.asyncio +async def test_forget_instance_message( + session_factory: DbSessionFactory, + message_processor: PendingMessageProcessor, + fixture_instance_message: PendingMessageDb, + fixture_forget_instance_message: PendingMessageDb, +): + vm_hash = fixture_instance_message.item_hash + + # Process the instance message + with session_factory() as session: + insert_volume_refs(session, fixture_instance_message) + session.commit() + + pipeline = message_processor.make_pipeline() + # Exhaust the iterator + _ = [message async for message in pipeline] + + # Sanity check + with session_factory() as session: + instance = get_instance(session=session, item_hash=vm_hash) + assert instance is not None + + # Insert the FORGET message and process it + session.add(fixture_forget_instance_message) + session.commit() + + pipeline = message_processor.make_pipeline() + # Exhaust the iterator + _ = [message async for message in pipeline] + + with session_factory() as session: + instance = get_instance(session=session, item_hash=vm_hash) + assert instance is None, "The instance is still present despite being forgotten" + + instance_version = get_vm_version(session=session, vm_hash=vm_hash) + assert instance_version is None diff --git a/tests/message_processing/test_process_programs.py b/tests/message_processing/test_process_programs.py index aa095034d..9d66cdbb5 100644 --- a/tests/message_processing/test_process_programs.py +++ b/tests/message_processing/test_process_programs.py @@ -6,13 +6,11 @@ import pytest import pytz from aleph_message.models import ItemType, Chain, MessageType -from aleph_message.models.program import ( +from aleph_message.models.execution.program import ( MachineType, - VolumePersistence, ProgramContent, - ImmutableVolume, ) -from configmanager import Config +from aleph_message.models.execution.volume import ImmutableVolume, VolumePersistence from more_itertools import one from sqlalchemy import select @@ -21,11 +19,11 @@ upsert_file_tag, ) from aleph.db.accessors.messages import get_message_status, get_rejected_message -from aleph.db.accessors.programs import get_program +from aleph.db.accessors.vms import get_program from aleph.db.models import ( PendingMessageDb, MessageStatusDb, - ProgramDb, + VmBaseDb, ImmutableVolumeDb, EphemeralVolumeDb, PersistentVolumeDb, @@ -175,7 +173,7 @@ async def test_process_program( assert program is not None assert program.owner == fixture_program_message.sender - assert program.type == MachineType.vm_function + assert program.program_type == MachineType.vm_function assert not program.allow_amend assert program.replaces is None assert program.http_trigger @@ -242,7 +240,7 @@ async def test_program_with_subscriptions( content_dict = json.loads(program_message.item_content) with session_factory() as session: - program: ProgramDb = session.execute(select(ProgramDb)).scalar_one() + program: VmBaseDb = session.execute(select(VmBaseDb)).scalar_one() message_triggers = program.message_triggers assert message_triggers assert len(message_triggers) == 1 @@ -279,7 +277,7 @@ async def test_process_program_missing_volumes( rejected_message = get_rejected_message(session=session, item_hash=program_hash) assert rejected_message is not None - assert rejected_message.error_code == ErrorCode.PROGRAM_VOLUME_NOT_FOUND + assert rejected_message.error_code == ErrorCode.VM_VOLUME_NOT_FOUND content = ProgramContent.parse_raw(program_message.item_content) volume_refs = set(volume.ref for volume in get_volumes_with_ref(content)) From e2e432b54c55747ebd1d372425c6054a384f2d7b Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Mon, 22 May 2023 18:52:22 +0200 Subject: [PATCH 2/4] use aleph-message updates + check parent size --- .../0017_f9fa39b6bdef_vm_instances.py | 18 +++- setup.cfg | 2 +- src/aleph/db/models/vms.py | 7 +- src/aleph/handlers/content/vm.py | 96 +++++++++++++++---- src/aleph/types/message_status.py | 45 ++++++++- .../test_process_instances.py | 18 ++-- 6 files changed, 148 insertions(+), 38 deletions(-) diff --git a/deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py b/deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py index cac1513e4..be5021046 100644 --- a/deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py +++ b/deployment/migrations/versions/0017_f9fa39b6bdef_vm_instances.py @@ -181,10 +181,10 @@ def upgrade() -> None: op.create_table( "instance_rootfs", sa.Column("instance_hash", sa.String(), nullable=False), - sa.Column("parent", sa.String(), nullable=True), + sa.Column("parent_ref", sa.String(), nullable=False), + sa.Column("parent_use_latest", sa.Boolean(), nullable=False), sa.Column("size_mib", sa.Integer(), nullable=False), sa.Column("persistence", sa.String(), nullable=False), - sa.Column("comment", sa.String(), nullable=True), sa.ForeignKeyConstraint( ["instance_hash"], ["vms.item_hash"], @@ -200,8 +200,14 @@ def upgrade() -> None: # Recreate the cost views (some column names must change) recreate_cost_views() - # Add the parent column for persistent volumes - op.add_column("vm_machine_volumes", sa.Column("parent", sa.String(), nullable=True)) + # Add the parent columns for persistent volumes + op.add_column( + "vm_machine_volumes", sa.Column("parent_ref", sa.String(), nullable=True) + ) + op.add_column( + "vm_machine_volumes", + sa.Column("parent_use_latest", sa.Boolean(), nullable=True), + ) # Add the instance columns to the vms (ex programs) table op.add_column( @@ -225,6 +231,9 @@ def upgrade() -> None: op.execute( "UPDATE error_codes SET description = 'VM update not targeting the original version of the VM' WHERE code = 303" ) + op.execute( + "INSERT INTO error_codes(code, description) VALUES (304, 'VM volume parent is larger than the child volume')" + ) # ### end Alembic commands ### @@ -421,5 +430,6 @@ def downgrade() -> None: "UPDATE error_codes " "SET description = 'Program update not targeting the original version of the program' WHERE code = 303" ) + op.execute("DELETE FROM error_codes WHERE code = 304") # ### end Alembic commands ### diff --git a/setup.cfg b/setup.cfg index 661de5cd0..77352d5df 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,7 +40,7 @@ install_requires = aiohttp==3.8.4 aioipfs@git+https://github.com/aleph-im/aioipfs.git@d671c79b2871bb4d6c8877ba1e7f3ffbe7d20b71 alembic==1.8.1 - aleph-message@git+https://github.com/aleph-im/aleph-message@2983925295643634785a6a712d09e8f5c58642a2 + aleph-message@git+https://github.com/aleph-im/aleph-message@3ccb20f97e61676f194424b6b6b061c0835459f1 aleph-p2p-client@git+https://github.com/aleph-im/p2p-service-client-python@2c04af39c566217f629fd89505ffc3270fba8676 aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@32dd1749a4773da494275709060632cbeba9a51b asyncpg==0.26.0 diff --git a/src/aleph/db/models/vms.py b/src/aleph/db/models/vms.py index e992a467c..0c054409a 100644 --- a/src/aleph/db/models/vms.py +++ b/src/aleph/db/models/vms.py @@ -35,12 +35,12 @@ class RootfsVolumeDb(Base): instance_hash: str = Column( ForeignKey("vms.item_hash", ondelete="CASCADE"), primary_key=True ) - parent: Optional[str] = Column(String, nullable=True) + parent_ref: str = Column(String, nullable=False) + parent_use_latest: bool = Column(Boolean, nullable=False) size_mib: int = Column(Integer, nullable=False) persistence: VolumePersistence = Column( ChoiceType(VolumePersistence), nullable=False ) - comment: Optional[str] = Column(String, nullable=True) instance: "VmInstanceDb" = relationship("VmInstanceDb", back_populates="rootfs") @@ -103,7 +103,8 @@ class EphemeralVolumeDb(MachineVolumeBaseDb): class PersistentVolumeDb(MachineVolumeBaseDb): - parent: Optional[str] = Column(String, nullable=True) + parent_ref: Optional[str] = Column(String, nullable=True) + parent_use_latest: Optional[bool] = Column(Boolean, nullable=True) persistence: VolumePersistence = Column( ChoiceType(VolumePersistence), nullable=True ) diff --git a/src/aleph/handlers/content/vm.py b/src/aleph/handlers/content/vm.py index d2fcb6664..ef6372cd7 100644 --- a/src/aleph/handlers/content/vm.py +++ b/src/aleph/handlers/content/vm.py @@ -1,5 +1,5 @@ import logging -from typing import List, Set, overload +from typing import List, Set, overload, Protocol, Optional from aleph_message.models import ProgramContent, ExecutableContent, InstanceContent from aleph_message.models.execution.volume import ( @@ -7,9 +7,15 @@ ImmutableVolume, EphemeralVolume, PersistentVolume, + ParentVolume, ) -from aleph.db.accessors.files import find_file_tags, find_file_pins +from aleph.db.accessors.files import ( + find_file_tags, + find_file_pins, + get_file_tag, + get_message_file_pin, +) from aleph.db.accessors.vms import ( delete_vm, get_program, @@ -32,6 +38,7 @@ ProgramDb, RootfsVolumeDb, VmBaseDb, + StoredFileDb, ) from aleph.handlers.content.content_handler import ContentHandler from aleph.toolkit.timestamp import timestamp_to_datetime @@ -40,10 +47,11 @@ from aleph.types.message_status import ( InternalError, InvalidMessageFormat, - ProgramRefNotFound, - ProgramVolumeNotFound, - ProgramUpdateNotAllowed, - ProgramCannotUpdateUpdate, + VmRefNotFound, + VmVolumeNotFound, + VmUpdateNotAllowed, + VmCannotUpdateUpdate, + VmVolumeTooSmall, ) from aleph.types.vms import VmVersion @@ -124,12 +132,19 @@ def map_volume(volume: AbstractVolume) -> MachineVolumeBaseDb: elif isinstance(volume, EphemeralVolume): return EphemeralVolumeDb(comment=comment, mount=mount, size_mib=volume.size_mib) elif isinstance(volume, PersistentVolume): + if parent := volume.parent: + parent_ref, parent_use_latest = parent.ref, parent.use_latest + else: + parent_ref, parent_use_latest = None, None + return PersistentVolumeDb( comment=comment, mount=mount, persistence=volume.persistence, name=volume.name, size_mib=volume.size_mib, + parent_ref=parent_ref, + parent_use_latest=parent_use_latest, ) else: raise InternalError(f"Unsupported volume type: {volume.__class__.__name__}") @@ -174,11 +189,12 @@ def vm_message_to_db(message: MessageDb) -> VmBaseDb: vm.export_volume = ExportVolumeDb(encoding=content.export.encoding) elif isinstance(content, InstanceContent): + parent = content.rootfs.parent vm.rootfs = RootfsVolumeDb( - parent=content.rootfs.parent, + parent_ref=parent.ref, + parent_use_latest=parent.use_latest, size_mib=content.rootfs.size_mib, persistence=content.rootfs.persistence, - comment=content.rootfs.comment, ) vm.cloud_config = content.cloud_config @@ -207,17 +223,15 @@ def add_ref_to_check(_volume): add_ref_to_check(content.data) elif isinstance(content, InstanceContent): - if rootfs_parent := content.rootfs.parent: - tags_to_check.add(FileTag(rootfs_parent)) + add_ref_to_check(content.rootfs.parent) for volume in content.volumes: if isinstance(volume, ImmutableVolume): add_ref_to_check(volume) if isinstance(volume, PersistentVolume): - # Assume `use_latest` for persistent volume parents if parent := volume.parent: - tags_to_check.add(FileTag(parent)) + add_ref_to_check(parent) # For each volume, if use_latest is set check the tags and otherwise check # the file pins. @@ -228,6 +242,53 @@ def add_ref_to_check(_volume): return (pins_to_check - file_pins_db) | (tags_to_check - file_tags_db) +def check_parent_volumes_size_requirements( + session: DbSession, content: ExecutableContent +) -> None: + def _get_parent_volume_file(_parent: ParentVolume) -> StoredFileDb: + if _parent.use_latest: + file_tag = get_file_tag(session=session, tag=_parent.ref) + if file_tag is None: + raise InternalError( + f"Could not find latest version of parent volume {volume.parent.ref}" + ) + + return file_tag.file + + file_pin = get_message_file_pin(session=session, item_hash=_parent.ref) + if file_pin is None: + raise InternalError( + f"Could not find original version of parent volume {volume.parent.ref}" + ) + + return file_pin.file + + class HasParent(Protocol): + parent: ParentVolume + size_mib: int + + volumes_with_parent: List[HasParent] = [ + volume + for volume in content.volumes + if isinstance(volume, PersistentVolume) and volume.parent is not None + ] + + if isinstance(content, InstanceContent): + volumes_with_parent.append(content.rootfs) + + for volume in volumes_with_parent: + volume_metadata = _get_parent_volume_file(volume.parent) + volume_size = volume.size_mib * 1024 * 1024 + if volume_size < volume_metadata.size: + raise VmVolumeTooSmall( + parent_size=volume_metadata.size, + parent_ref=volume.parent.ref, + parent_file=volume_metadata.hash, + volume_name=getattr(volume, "name", "rootfs"), + volume_size=volume_size, + ) + + class VmMessageHandler(ContentHandler): """ Handles both PROGRAM and INSTANCE messages. @@ -242,22 +303,25 @@ async def check_dependencies(self, session: DbSession, message: MessageDb) -> No missing_volumes = find_missing_volumes(session=session, content=content) if missing_volumes: - raise ProgramVolumeNotFound([volume for volume in missing_volumes]) + raise VmVolumeNotFound([volume for volume in missing_volumes]) + + check_parent_volumes_size_requirements(session=session, content=content) + # Check dependencies if the message updates an existing instance/program if (ref := content.replaces) is not None: original_program = get_program(session=session, item_hash=ref) if original_program is None: - raise ProgramRefNotFound(ref) + raise VmRefNotFound(ref) if original_program.replaces is not None: - raise ProgramCannotUpdateUpdate() + raise VmCannotUpdateUpdate() is_amend_allowed = is_vm_amend_allowed(session=session, vm_hash=ref) if is_amend_allowed is None: raise InternalError(f"Could not find current version of program {ref}") if not is_amend_allowed: - raise ProgramUpdateNotAllowed() + raise VmUpdateNotAllowed() @staticmethod async def process_vm_message(session: DbSession, message: MessageDb): diff --git a/src/aleph/types/message_status.py b/src/aleph/types/message_status.py index f9d5ff669..f9eda2a8f 100644 --- a/src/aleph/types/message_status.py +++ b/src/aleph/types/message_status.py @@ -40,6 +40,7 @@ class ErrorCode(IntEnum): VM_VOLUME_NOT_FOUND = 301 VM_AMEND_NOT_ALLOWED = 302 VM_UPDATE_UPDATE = 303 + VM_VOLUME_TOO_SMALL = 304 FORGET_NO_TARGET = 500 FORGET_TARGET_NOT_FOUND = 501 FORGET_FORGET = 502 @@ -189,7 +190,7 @@ class StoreCannotUpdateStoreWithRef(InvalidMessageException): error_code = ErrorCode.STORE_UPDATE_UPDATE -class ProgramRefNotFound(RetryMessageException): +class VmRefNotFound(RetryMessageException): """ The original program specified in the `ref` field could not be found. """ @@ -197,7 +198,7 @@ class ProgramRefNotFound(RetryMessageException): error_code = ErrorCode.VM_REF_NOT_FOUND -class ProgramVolumeNotFound(RetryMessageException): +class VmVolumeNotFound(RetryMessageException): """ One or more volume files could not be found. """ @@ -205,7 +206,7 @@ class ProgramVolumeNotFound(RetryMessageException): error_code = ErrorCode.VM_VOLUME_NOT_FOUND -class ProgramUpdateNotAllowed(InvalidMessageException): +class VmUpdateNotAllowed(InvalidMessageException): """ The message attempts to amend an immutable program, i.e. for which allow_amend is set to False. @@ -214,7 +215,7 @@ class ProgramUpdateNotAllowed(InvalidMessageException): error_code = ErrorCode.VM_AMEND_NOT_ALLOWED -class ProgramCannotUpdateUpdate(InvalidMessageException): +class VmCannotUpdateUpdate(InvalidMessageException): """ The program hash in the `replaces` field has a value for the `replaces` field itself. Update trees are not supported. @@ -223,6 +224,42 @@ class ProgramCannotUpdateUpdate(InvalidMessageException): error_code = ErrorCode.VM_UPDATE_UPDATE +class VmVolumeTooSmall(InvalidMessageException): + """ + A volume with a parent volume has a size inferior to the size of the parent. + Ex: attempting to use a 4GB Ubuntu rootfs to a 2GB volume. + """ + + error_code = ErrorCode.VM_VOLUME_TOO_SMALL + + def __init__( + self, + volume_name: str, + volume_size: int, + parent_ref: str, + parent_file: str, + parent_size: int, + ): + self.volume_name = volume_name + self.volume_size = volume_size + self.parent_ref = parent_ref + self.parent_file = parent_file + self.parent_size = parent_size + + def details(self) -> Optional[Dict[str, Any]]: + return { + "errors": [ + { + "volume_name": self.volume_name, + "parent_ref": self.parent_ref, + "parent_file": self.parent_file, + "parent_size": self.parent_size, + "volume_size": self.volume_size, + } + ] + } + + class ForgetTargetNotFound(RetryMessageException): """ A target specified in the FORGET message could not be found. diff --git a/tests/message_processing/test_process_instances.py b/tests/message_processing/test_process_instances.py index a0ffbb05a..9c68bcb6c 100644 --- a/tests/message_processing/test_process_instances.py +++ b/tests/message_processing/test_process_instances.py @@ -56,7 +56,10 @@ def fixture_instance_message(session_factory: DbSessionFactory) -> PendingMessag "resources": {"vcpus": 1, "memory": 128, "seconds": 30}, "requirements": {"cpu": {"architecture": "x86_64"}}, "rootfs": { - "parent": "549ec451d9b099cad112d4aaa2c00ac40fb6729a92ff252ff22eef0b5c3cb613", + "parent": { + "ref": "549ec451d9b099cad112d4aaa2c00ac40fb6729a92ff252ff22eef0b5c3cb613", + "use_latest": True, + }, "persistence": "host", "name": "test-rootfs", "size_mib": 20000, @@ -167,12 +170,6 @@ class Volume(Protocol): use_latest: bool -class Rootfs(Volume): - def __init__(self, parent: str): - self.ref = parent - self.use_latest = True - - def get_volume_refs(content: ExecutableContent) -> List[Volume]: volumes = [] @@ -187,7 +184,7 @@ def get_volume_refs(content: ExecutableContent) -> List[Volume]: elif isinstance(content, InstanceContent): if parent := content.rootfs.parent: - volumes.append(Rootfs(parent)) + volumes.append(parent) return volumes @@ -275,10 +272,11 @@ async def test_process_instance( assert instance.variables == content_dict["variables"] rootfs = instance.rootfs - assert rootfs.parent == content_dict["rootfs"]["parent"] + assert rootfs.parent_ref == content_dict["rootfs"]["parent"]["ref"] + assert rootfs.parent_use_latest == content_dict["rootfs"]["parent"]["use_latest"] + assert rootfs.parent_use_latest == content_dict["rootfs"]["parent"]["use_latest"] assert rootfs.size_mib == content_dict["rootfs"]["size_mib"] assert rootfs.persistence == content_dict["rootfs"]["persistence"] - assert rootfs.comment == content_dict["rootfs"].get("comment") assert len(instance.volumes) == 5 From cb42fcb2f298509687caf3be4f9358c2c2dff0ff Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Wed, 31 May 2023 14:57:23 +0200 Subject: [PATCH 3/4] update aleph-message dependency --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 77352d5df..69ad96c17 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,7 +40,7 @@ install_requires = aiohttp==3.8.4 aioipfs@git+https://github.com/aleph-im/aioipfs.git@d671c79b2871bb4d6c8877ba1e7f3ffbe7d20b71 alembic==1.8.1 - aleph-message@git+https://github.com/aleph-im/aleph-message@3ccb20f97e61676f194424b6b6b061c0835459f1 + aleph-message==0.4.0a1 aleph-p2p-client@git+https://github.com/aleph-im/p2p-service-client-python@2c04af39c566217f629fd89505ffc3270fba8676 aleph-pytezos@git+https://github.com/aleph-im/aleph-pytezos.git@32dd1749a4773da494275709060632cbeba9a51b asyncpg==0.26.0 From b24f33499fd269e0df221f8ae22eab4e21a83d4f Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Wed, 31 May 2023 15:03:59 +0200 Subject: [PATCH 4/4] black --- src/aleph/db/models/messages.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/aleph/db/models/messages.py b/src/aleph/db/models/messages.py index ed3d13f02..8c36508ca 100644 --- a/src/aleph/db/models/messages.py +++ b/src/aleph/db/models/messages.py @@ -10,7 +10,8 @@ ForgetContent, PostContent, ProgramContent, - StoreContent, InstanceContent, + StoreContent, + InstanceContent, ) from pydantic import ValidationError from pydantic.error_wrappers import ErrorWrapper @@ -59,7 +60,6 @@ def validate_message_content( message_type: MessageType, content_dict: Dict[str, Any], ) -> BaseContent: - content_type = CONTENT_TYPE_MAP[message_type] content = content_type.parse_obj(content_dict) # Validate that the content time can be converted to datetime. This will @@ -133,7 +133,6 @@ def from_pending_message( content_dict: Dict[str, Any], content_size: int, ) -> "MessageDb": - content_dict = cls._coerce_content(pending_message, content_dict) parsed_content = validate_message_content(pending_message.type, content_dict)