- deploy via ssh
- recieve job from local via ssh
- obtain datainstances via globus
- execute job
- upload produced datainstances via globus
- return globus paths (indexes/keys?)

In [1]:
from pathlib import Path
from metasmith.agents.presets import Agent, AGENT_SETUP_COMPLETE
from metasmith.models.libraries import *
from metasmith.models.remote import *

In [2]:
globus = GlobusSource.Parse("https://app.globus.org/file-manager?destination_id=64a5c402-05c4-4607-bbad-46a9c2aebd98&destination_path=%2Fhome%2Ftxyliu%2Fscratch%2F")
sockeye_globus_endpoint = globus.endpoint
sockeye_globus_endpoint

'64a5c402-05c4-4607-bbad-46a9c2aebd98'

In [3]:
# # todo: globus endpoint
# agent = Agent(
#     setup_commands=[
#         "ssh sockeye",
#         "module load gcc/9.4.0 apptainer/1.3.1",
#         f'[ ! -z "$SSH_CONNECTION" ] && echo "{AGENT_SETUP_COMPLETE}"',
#     ],
#     cleanup_commands=[
#         "exit",
#     ],
#     home=SshSource(host="sockeye", path="~/scratch/metasmith").AsSource(),
# )
# #     globus_endpoint=sockeye_globus_endpoint


agent = Agent(
    setup_commands=[
        "ssh cosmos",
        f'[ ! -z "$SSH_CONNECTION" ] && echo "{AGENT_SETUP_COMPLETE}"',
    ],
    cleanup_commands=[
        "exit",
    ],
    home=SshSource(host="cosmos", path="~/workspace/metasmith_home").AsSource(),
)

# agent.Deploy()

In [4]:
from local.constants import WORKSPACE_ROOT
from local.utils import LinkifyPath
CACHE = WORKSPACE_ROOT/"main/local_mock/cache/xgdb_tests"

types = DataTypeLibrary.Load(WORKSPACE_ROOT/"main/local_mock/prototypes/metagenomics.dev3.yml")
for name, t in types:
    print(t)

<{data:DNA sequence,format:FASTA}:4M4PqXwA>
<{data:software container,format:OCI,provides:diamond}:iGL288Xm>
<{data:software container,format:OCI,provides:pprodigal}:90LdbjQO>
<{data:Protein features,format:CSV}:kImyYZjD>
<{data:Amino acid sequence,format:FASTA}:oF3YSVYQ>
<{data:database reference,format:.dmnd}:VpPqsgy1>


In [5]:
xgdb_path = CACHE/"test.xgdb"
refdb_path = CACHE/"ref.xgdb"
xgdb = DataInstanceLibrary(xgdb_path)
refdb = DataInstanceLibrary(refdb_path)
xgdb.AddTypeLibrary("metagenomics", types)
added = xgdb.Add(
    [
        (Path(WORKSPACE_ROOT/"scratch/test_ws/data/local/example.fna"), "contigs.fna", "metagenomics::contigs"),
    ],
)
print(added)
refdb.AddTypeLibrary("metagenomics", types)
added = refdb.Add(
    [
        (WORKSPACE_ROOT/"scratch/test_ws/data/local/uniprot_sprot.dmnd", "reference.uniprot_sprot.dmnd", "metagenomics::protein_reference_diamond"),
        (WORKSPACE_ROOT/"scratch/test_ws/data/local/diamond.oci.uri", "container.diamond.oci.uri", "metagenomics::oci_image_diamond"),
        (WORKSPACE_ROOT/"scratch/test_ws/data/local/pprodigal.oci.uri", "container.pprodigal.oci.uri", "metagenomics::oci_image_pprodigal"),
    ],
)
print(added)

xgdb.Save()
refdb.Save()
LinkifyPath((refdb_path/refdb._path_to_meta)/(refdb._index_name+refdb._metadata_ext))
# LinkifyPath(refdb_path/refdb._path_to_types)
xgdb_local = DataInstanceLibrary.Load(xgdb_path)

[PosixPath('contigs.fna')]
[PosixPath('reference.uniprot_sprot.dmnd'), PosixPath('container.diamond.oci.uri'), PosixPath('container.pprodigal.oci.uri')]
./../../main/local_mock/cache/xgdb_tests/ref.xgdb/_metadata/index.yml


In [6]:
trlib_path = "./transforms/simple_1"
# trlib = TransformInstanceLibrary(trlib_path)
# trlib.AddTypeLibrary("metagenomics", types)
# trlib.AddStub("diamond", exist_ok=True)
# trlib.AddStub("pprodigal", exist_ok=True)
# trlib.Save(update_types=True)
trlib = TransformInstanceLibrary.Load(trlib_path)
for path, name, tr in trlib.IterateTransforms():
    print(name, tr.model)

transforms::transform {{"data":"software container"}-{"format":"OCI"}-{"provides":"diamond"}},{{"data":"Amino acid sequence"}-{"format":"FASTA"}},{{"data":"database reference"}-{"format":".dmnd"}}->{{"data":"Protein features"}-{"format":"CSV"}}
transforms::transform {{"data":"software container"}-{"format":"OCI"}-{"provides":"pprodigal"}},{{"data":"DNA sequence"}-{"format":"FASTA"}}->{{"data":"Amino acid sequence"}-{"format":"FASTA"}}


In [7]:
gs = GlobusSource.Parse("https://app.globus.org/file-manager?origin_id=2602486c-1e0f-47a0-be15-eec1b0ff0f96&origin_path=%2FMetasmith%2Fglobus_test3%2Fxgdb%2F")
REMOTE_GLOBUS_ENDPOINT = gs.endpoint
REMOTE_GLOBUS_ENDPOINT

'2602486c-1e0f-47a0-be15-eec1b0ff0f96'

In [8]:
xgdb_remote_source = GlobusSource(
    endpoint=REMOTE_GLOBUS_ENDPOINT,
    path = "/Metasmith/dev07",
).AsSource()

# res = xgdb_local.SaveAs(dest=xgdb_remote_source, label="dev07_xgdb")
# for e in res.errors:
#     print(e)
# res.completed

In [9]:
refdb_remote_source = GlobusSource(
    endpoint=REMOTE_GLOBUS_ENDPOINT,
    path = "/Metasmith/dev07.ref",
).AsSource()

# res = refdb.SaveAs(dest=refdb_remote_source, label="dev07_refdb")
# for e in res.errors:
#     print(e)
# res.completed

In [10]:
from metasmith.models.workflow import WorkflowPlan

plan = WorkflowPlan.Generate(
    given=[xgdb, refdb],
    transforms=[trlib],
    targets=[
        types["orf_annotations"].WithLineage([
            types["contigs"],
            # xgdb["example.fna"].type,
        ]),
    ],
)

for step in plan.steps:
    step_path = Path(step.transform.name)
    model = step.transform.model
    print(f"{step_path.stem}")
    for x in model.requires:
        print(f"    {x}")
    print("    v")
    for x in model.produces:
        print(f"    {x}")
    # print([f"{x.source}" for x in step.uses], [f"{x.source}" for x in step.produces], sep="->")
    # LinkifyPath(step.transform._source.address)

metagenomics::orfs_faa
metagenomics::orf_annotations
pprodigal
    (D:{"data":"software container"}-{"format":"OCI"}-{"provides":"pprodigal"})
    (D:{"data":"DNA sequence"}-{"format":"FASTA"})
    v
    (D:{"data":"Amino acid sequence"}-{"format":"FASTA"})
diamond
    (D:{"data":"software container"}-{"format":"OCI"}-{"provides":"diamond"})
    (D:{"data":"Amino acid sequence"}-{"format":"FASTA"})
    (D:{"data":"database reference"}-{"format":".dmnd"})
    v
    (D:{"data":"Protein features"}-{"format":"CSV"})


In [11]:
from metasmith.coms.ipc import LiveShell
from metasmith.logging import Log
from tempfile import TemporaryDirectory

from metasmith.models.remote import SshSource
from metasmith.models.workflow import WorkflowTask

In [12]:
with open(WORKSPACE_ROOT/"secrets/slurm_account") as f:
    slurm_account = f.read().strip()

local_task_path = WORKSPACE_ROOT/"main/local_mock/cache/xgdb_tests/task"

# send task & plan
task = WorkflowTask(
    plan=plan,
    agent=agent,
    data_libraries=[xgdb, refdb],
    transform_libraries=[trlib],
    config=dict(
        nextflow=dict(
            preset="default",
            # slurm_account=slurm_account,
        ),
    ),
)
# task.SaveAs(Source.FromLocal(local_task_path))

In [13]:
local_task = WorkflowTask.Load(local_task_path)

for step in local_task.plan.steps:
    step_path = Path(step.transform.name)
    model = step.transform.model
    print(f"{step_path.stem}")
    for x in step.uses:
        print(f"    {x.path} as {x.dtype_name}")
    print("    v")
    for x in step.produces:
        print(f"    {x.path} as {x.dtype_name}")


pprodigal
    container.pprodigal.oci.uri as metagenomics::oci_image_pprodigal
    contigs.fna as metagenomics::contigs
    v
    orfs.faa as metagenomics::orfs_faa
diamond
    container.diamond.oci.uri as metagenomics::oci_image_diamond
    orfs.faa as metagenomics::orfs_faa
    reference.uniprot_sprot.dmnd as metagenomics::protein_reference_diamond
    v
    annotations.csv as metagenomics::orf_annotations


In [14]:
print(plan._key)

nZ6eA


In [15]:
remote_cache = agent.home.GetPath()/f"tmp/{plan._key}"
# res = task.SaveAs(agent.home.WithPath(remote_cache))
# res.completed

In [16]:
local_temp = Path("./cache/mock_temp/")
with LiveShell() as sh_local, LiveShell() as sh_remote:
    local_temp = Path(local_temp)
    sh_local.RegisterOnOut(lambda x: Log.Info(f"{x}"))
    sh_local.RegisterOnErr(lambda x: Log.Error(f"{x}"))
    sh_remote.RegisterOnOut(lambda x: print(f"R| {x}"))
    sh_remote.RegisterOnErr(lambda x: print(f"R| {x}"))
    # remote_cache = Path(f"/tmp/metasmith.{plan._key}")

    # connect and prepare remote
    agent.RunSetup(sh_remote)
    Log.Info(f"making destination cache at [{remote_cache}]")
    # sh_remote.Exec(f"[ -e {remote_cache} ] && rm -rf {remote_cache}")
    sh_remote.Exec(f"mkdir -p {remote_cache} && cd {agent.home.GetPath()}")
    Log.Info(f"starting relay service")
    sh_remote.Exec(f"./relay/msm_relay start")
    
    Log.Info(f"calling stage")
    sh_remote.Exec(f'./msm api stage_workflow -a task_dir={remote_cache} -a force=True') # this is ok because /tmp is mounted
    Log.Info(f"calling execute")
    sh_remote.Exec(f'./msm api execute_workflow -a key={plan._key}')

    # sh_remote.Exec(f"rm -rf {remote_cache}")
    # sh_remote.Exec(f"{agent.home}/relay/msm_relay stop")
    agent.RunCleanup(sh_remote)

R| Pseudo-terminal will not be allocated because stdin is not a terminal.


R| Welcome to Ubuntu 22.04.3 LTS (GNU/Linux 5.15.167.4-microsoft-standard-WSL2 x86_64)
R|  * Documentation:  https://help.ubuntu.com
R|  * Management:     https://landscape.canonical.com
R|  * Support:        https://ubuntu.com/advantage
R|  * Strictly confined Kubernetes makes edge and IoT secure. Learn how MicroK8s
R|    just raised the bar for easy, resilient and secure K8s cluster deployment.
R|    https://ubuntu.com/engage/secure-kubernetes-at-the-edge
R| setup_complete.1FQfCxG4
2025-03-05_17-33-13  | making destination cache at [~/workspace/metasmith_home/tmp/nZ6eA]
2025-03-05_17-33-14  | starting relay service
R| 2025-03-05_17-33-15 E| relay server already running in [relay/connections]
2025-03-05_17-33-14  | calling stage
R| 2025-03-05_17-33-15  | api call to [stage_workflow] with [{'task_dir': '/home/tony/workspace/metasmith_home/tmp/nZ6eA', 'force': 'True'}]
R| 2025-03-05_17-33-15  | staging workflow [nZ6eA] with [4] given data instances
R| 2025-03-05_17-33-15  | already stag