Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: could not start Instances from command line #597

Merged
merged 4 commits into from
Apr 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions src/aleph/vm/orchestrator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from sqlalchemy.ext.asyncio import create_async_engine

from aleph.vm.conf import ALLOW_DEVELOPER_SSH_KEYS, make_db_url, settings
from aleph.vm.models import VmExecution

Check warning on line 20 in src/aleph/vm/orchestrator/cli.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/cli.py#L20

Added line #L20 was not covered by tests
from aleph.vm.pool import VmPool
from aleph.vm.version import get_version_from_apt, get_version_from_git

Expand Down Expand Up @@ -187,7 +188,8 @@

bench: list[float] = []

pool = VmPool()
loop = asyncio.get_event_loop()
pool = VmPool(loop)

Check warning on line 192 in src/aleph/vm/orchestrator/cli.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/cli.py#L191-L192

Added lines #L191 - L192 were not covered by tests
pool.setup()

# Does not make sense in benchmarks
Expand Down Expand Up @@ -236,25 +238,24 @@
print("Event result", result)


async def start_instance(item_hash: ItemHash) -> None:
async def start_instance(item_hash: ItemHash, pubsub: Optional[PubSub], pool) -> VmExecution:

Check warning on line 241 in src/aleph/vm/orchestrator/cli.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/cli.py#L241

Added line #L241 was not covered by tests
"""Run an instance from an InstanceMessage."""
pool = VmPool()
return await start_persistent_vm(item_hash, pubsub, pool)

Check warning on line 243 in src/aleph/vm/orchestrator/cli.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/cli.py#L243

Added line #L243 was not covered by tests


async def run_instances(instances: list[ItemHash]) -> None:

Check warning on line 246 in src/aleph/vm/orchestrator/cli.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/cli.py#L246

Added line #L246 was not covered by tests
"""Run instances from a list of message identifiers."""
logger.info(f"Instances to run: {instances}")
loop = asyncio.get_event_loop()
pool = VmPool(loop)

Check warning on line 250 in src/aleph/vm/orchestrator/cli.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/cli.py#L248-L250

Added lines #L248 - L250 were not covered by tests
# The main program uses a singleton pubsub instance in order to watch for updates.
# We create another instance here since that singleton is not initialized yet.
# Watching for updates on this instance will therefore not work.
pubsub: Optional[PubSub] = None

await start_persistent_vm(item_hash, pubsub, pool)


async def run_instances(instances: list[ItemHash]) -> None:
"""Run instances from a list of message identifiers."""
logger.info(f"Instances to run: {instances}")
await asyncio.gather(*[start_instance(instance_id, pubsub, pool) for instance_id in instances])

await asyncio.gather(*[start_instance(item_hash=instance_id) for instance_id in instances])
await asyncio.Event().wait() # wait forever
# TODO : should we really wait forever?


@contextlib.contextmanager
Expand Down