# Templatized notebook for running CB-Geo MPM TAPIS job

## Install DesignSafe API (dapi)

In [None]:
# Dapi installation
!pip uninstall dapi --yes

!pip install dapi --user --quiet

# Install the latest development version of dapi from GitHub
# !pip install git+https://github.com/DesignSafe-CI/dapi.git@dev --user --quiet

# Install editable local version of dapi
# !pip install -e ../

Found existing installation: dapi 1.0.0
Uninstalling dapi-1.0.0:
  Successfully uninstalled dapi-1.0.0
Obtaining file:///Users/krishna/dev/DesignSafe/dapi
  Installing build dependencies ... [?25ldone
[?25h  Checking if build backend supports build_editable ... [?25ldone
[?25h  Getting requirements to build editable ... [?25ldone
[?25h  Preparing editable metadata (pyproject.toml) ... [?25ldone
Building wheels for collected packages: dapi
  Building editable for dapi (pyproject.toml) ... [?25ldone
[?25h  Created wheel for dapi: filename=dapi-1.0.0-py3-none-any.whl size=3826 sha256=25a5c9308663e0078f3b0943613ad978d718bb5f64c36fd23162943a7d68b56f
  Stored in directory: /private/var/folders/w8/xz590jyd7r36zmxcspgzj3z40000gn/T/pip-ephem-wheel-cache-sjzyd4es/wheels/98/df/91/ed70fe2dca11c3c6e5b6e8e6eef18c373a119d095037f892a3
Successfully built dapi
Installing collected packages: dapi
Successfully installed dapi-1.0.0

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release 

In [2]:
import os

# Import only DSClient and exceptions needed at top level
from dapi import (
    DSClient,
    SubmittedJob,
    interpret_job_status,  # Import new function
    AppDiscoveryError,
    FileOperationError,
    JobSubmissionError,
    SystemInfoError,
    JobMonitorError,
    # Optionally import status constants if you want to check against them explicitly
    STATUS_TIMEOUT,
    STATUS_UNKNOWN,
    TAPIS_TERMINAL_STATES,
)
import json
from datetime import datetime
from dataclasses import asdict
import pandas as pd
import tqdm as notebook_tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
try:
    print("Initializing DSClient...")
    ds = DSClient()
    print("DSClient initialized.")
except Exception as e:
    print(f"Initialization failed: {e}")
    raise SystemExit("Stopping notebook due to client initialization failure.")

Initializing DSClient...
Authentication successful.
DatabaseAccessor initialized. Connections will be created on first access.
DSClient initialized.


In [4]:
ds_path: str = "/MyData/mpm-benchmarks/2d/uniaxial_stress/"
input_filename: str = "mpm.json"
max_job_minutes: int = 10
# queue: str = "skx" # Example override - only if needed and valid
# tacc_allocation: str = "BCS20003"
tacc_allocation: str = "ASC25049"
app_id_to_use = "mpm-s3"

In [5]:
try:
    input_uri = ds.files.translate_path_to_uri(ds_path)
    print(f"Input Directory Tapis URI: {input_uri}")
except Exception as e:
    print(f"Error translating path '{ds_path}': {e}")
    raise SystemExit("Stopping notebook due to path translation error.")

Translated '/MyData/mpm-benchmarks/2d/uniaxial_stress/' to 'tapis://designsafe.storage.default/kks32/mpm-benchmarks/2d/uniaxial_stress/' using t.username
Input Directory Tapis URI: tapis://designsafe.storage.default/kks32/mpm-benchmarks/2d/uniaxial_stress/


In [6]:
try:
    print("\nGenerating job request dictionary...")
    job_dict = ds.jobs.generate_request(
        app_id=app_id_to_use,
        input_dir_uri=input_uri,
        script_filename=input_filename,
        max_minutes=max_job_minutes,
        allocation=tacc_allocation,
    )
    print("\n--- Generated Job Request Dictionary ---")
    print(json.dumps(job_dict, indent=2, default=str))
    print("---------------------------------------")
except (AppDiscoveryError, ValueError, JobSubmissionError) as e:
    print(f"Error generating job request: {e}")
    raise SystemExit("Stopping notebook due to job request generation error.")
except Exception as e:
    print(f"An unexpected error occurred during job request generation: {e}")
    raise SystemExit("Stopping notebook due to unexpected generation error.")


Generating job request dictionary...
Generating job request for app 'mpm-s3'...
Using App Details: mpm-s3 v1.0
Placing script 'mpm.json' in appArgs: 'Input Script'
Adding allocation: ASC25049
Job request dictionary generated successfully.

--- Generated Job Request Dictionary ---
{
  "name": "mpm-s3-20250604_081741",
  "appId": "mpm-s3",
  "appVersion": "1.0",
  "description": "Material Point Method (MPM) is a particle based method that represents the material as a collection of material points, and their deformations are determined by Newton\u2019s laws of motion.",
  "execSystemId": "stampede3",
  "archiveSystemId": "stampede3",
  "archiveOnAppError": true,
  "execSystemLogicalQueue": "skx-dev",
  "nodeCount": 1,
  "coresPerNode": 48,
  "maxMinutes": 10,
  "memoryMB": 192000,
  "isMpi": false,
  "tags": [],
  "fileInputs": [
    {
      "name": "Input Directory",
      "sourceUrl": "tapis://designsafe.storage.default/kks32/mpm-benchmarks/2d/uniaxial_stress/",
      "autoMountLocal":

In [7]:
# At this point, the user can inspect and modify job_dict if needed.
# For example:
print("Modifying job request dictionary...")
job_dict["nodeCount"] = 1
job_dict["coresPerNode"] = 1
# job_dict["execSystemLogicalQueue"] = "development"

print(json.dumps(job_dict, indent=2, default=str))

Modifying job request dictionary...
{
  "name": "mpm-s3-20250604_081741",
  "appId": "mpm-s3",
  "appVersion": "1.0",
  "description": "Material Point Method (MPM) is a particle based method that represents the material as a collection of material points, and their deformations are determined by Newton\u2019s laws of motion.",
  "execSystemId": "stampede3",
  "archiveSystemId": "stampede3",
  "archiveOnAppError": true,
  "execSystemLogicalQueue": "skx-dev",
  "nodeCount": 1,
  "coresPerNode": 1,
  "maxMinutes": 10,
  "memoryMB": 192000,
  "isMpi": false,
  "tags": [],
  "fileInputs": [
    {
      "name": "Input Directory",
      "sourceUrl": "tapis://designsafe.storage.default/kks32/mpm-benchmarks/2d/uniaxial_stress/",
      "autoMountLocal": true,
      "targetPath": "inputDirectory"
    }
  ],
  "parameterSet": {
    "appArgs": [
      {
        "name": "Input Script",
        "arg": "mpm.json"
      }
    ],
    "schedulerOptions": [
      {
        "name": "TACC Allocation",
     

In [8]:
if "job_dict" not in locals():
    print("Error: job_dict not found.")
    raise SystemExit("Stopping notebook.")
try:
    print("\nSubmitting the job request dictionary...")
    submitted_job = ds.jobs.submit_request(job_dict)
    print(f"Job Submitted Successfully!")
    print(f"Job UUID: {submitted_job.uuid}")
except JobSubmissionError as e:
    print(f"Job submission failed: {e}")
    print("\n--- Failed Job Request ---")
    print(json.dumps(job_dict, indent=2, default=str))
    print("--------------------------")
    raise SystemExit("Stopping notebook due to job submission error.")
except Exception as e:
    print(f"An unexpected error occurred during job submission: {e}")
    raise SystemExit("Stopping notebook due to unexpected submission error.")


Submitting the job request dictionary...

--- Submitting Tapis Job Request ---
{
  "name": "mpm-s3-20250604_081741",
  "appId": "mpm-s3",
  "appVersion": "1.0",
  "description": "Material Point Method (MPM) is a particle based method that represents the material as a collection of material points, and their deformations are determined by Newton\u2019s laws of motion.",
  "execSystemId": "stampede3",
  "archiveSystemId": "stampede3",
  "archiveOnAppError": true,
  "execSystemLogicalQueue": "skx-dev",
  "nodeCount": 1,
  "coresPerNode": 1,
  "maxMinutes": 10,
  "memoryMB": 192000,
  "isMpi": false,
  "tags": [],
  "fileInputs": [
    {
      "name": "Input Directory",
      "sourceUrl": "tapis://designsafe.storage.default/kks32/mpm-benchmarks/2d/uniaxial_stress/",
      "autoMountLocal": true,
      "targetPath": "inputDirectory"
    }
  ],
  "parameterSet": {
    "appArgs": [
      {
        "name": "Input Script",
        "arg": "mpm.json"
      }
    ],
    "schedulerOptions": [
    

In [9]:
if "submitted_job" not in locals():
    print("Error: submitted_job not found.")
    raise SystemExit("Stopping notebook.")

# Call monitor - exceptions are handled inside now, returns status string
final_status = submitted_job.monitor(interval=15)  # Use 15s interval

print(f"\nJob {submitted_job.uuid} monitoring finished.")


Monitoring Job: 52f48eaf-b7d6-4964-a97b-a4b32a6aaeb3-007


Monitoring job:   0%|                                                   | 0/40 [00:00<?, ? checks/s]

	Status: RUNNING


Monitoring job (Status: ARCHIVING):   8%|█▋                     | 3/40 [00:30<06:42, 10.88s/ checks]

	Status: ARCHIVING


Monitoring job (Status: ARCHIVING): 100%|██████████████████████| 40/40 [01:01<00:00,  1.54s/ checks]

	Status: FINISHED

Job 52f48eaf-b7d6-4964-a97b-a4b32a6aaeb3-007 monitoring finished.





In [10]:
print("\n--- Job Outcome ---")
ds.jobs.interpret_status(final_status, submitted_job.uuid)
print("-------------------")


--- Job Outcome ---
Job 52f48eaf-b7d6-4964-a97b-a4b32a6aaeb3-007 completed successfully.
-------------------


In [11]:
# Check against known good terminal states or the specific success state
if final_status in ["FINISHED", "FAILED"]:  # Or just: if final_status == "FINISHED":
    print(f"\nAttempting to display runtime summary...")
    try:
        submitted_job.print_runtime_summary(verbose=False)
    except Exception as e:
        print(f"Could not display runtime summary: {e}")
else:
    print(f"\nSkipping runtime summary because job ended with status: {final_status}.")


Attempting to display runtime summary...

Runtime Summary
---------------
QUEUED  time: 00:00:01
RUNNING time: 00:00:24
TOTAL   time: 00:02:05
---------------


In [12]:
if "ds" in locals() and "submitted_job" in locals():  # Check if ds and a job exist
    job_uuid_to_check = submitted_job.uuid  # Or any other job UUID string
    try:
        print(
            f"\nFetching status for job {job_uuid_to_check} using ds.jobs.get_status()..."
        )
        current_status = ds.jobs.get_status(job_uuid_to_check)
        print(f"Status of job {job_uuid_to_check}: {current_status}")
    except JobMonitorError as e:
        print(f"Error getting job status: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
else:
    print(
        "DSClient ('ds') or submitted_job not initialized. Cannot demonstrate ds.jobs.get_status()."
    )


Fetching status for job 52f48eaf-b7d6-4964-a97b-a4b32a6aaeb3-007 using ds.jobs.get_status()...
Status of job 52f48eaf-b7d6-4964-a97b-a4b32a6aaeb3-007: FINISHED


In [13]:
# Display Last Job Status Message
if "submitted_job" in locals():
    print(f"\n--- Last Status Message for Job {submitted_job.uuid} ---")
    last_msg = submitted_job.last_message
    if last_msg:
        print(f"Message: {last_msg}")
    else:
        print("No last status message available for this job.")
    print("-------------------------------------------------")
else:
    print("\nSkipping last status message display (job not submitted).")


--- Last Status Message for Job 52f48eaf-b7d6-4964-a97b-a4b32a6aaeb3-007 ---
Message: Setting job status to FINISHED.
-------------------------------------------------


In [14]:
# Display job output if in a terminal state
if "submitted_job" in locals() and final_status in submitted_job.TERMINAL_STATES:
    print(
        f"\n--- Job Output/Error for {submitted_job.uuid} (Status: {final_status}) ---"
    )
    max_output_lines = 50  # Number of lines to display from the end of the files

    # Attempt to get standard output
    try:
        stdout_content = submitted_job.get_output_content(
            "tapisjob.out",
            max_lines=max_output_lines,
            missing_ok=False,  # .out should ideally always exist
        )
        if stdout_content is not None:
            print(f"\n--- Last {max_output_lines} lines of tapisjob.out ---")
            print(stdout_content)
            print("------------------------------------")
        else:
            print("\n[INFO] tapisjob.out was not found or is empty.")
    except FileOperationError as e:
        print(f"\n[ERROR] Could not retrieve tapisjob.out: {e}")
    except Exception as e:
        print(f"\n[ERROR] Unexpected error retrieving tapisjob.out: {e}")

    # If job failed, also try to get standard error (tapisjob.err might not always be separate)
    if final_status in [
        "FAILED",
        "ARCHIVING_FAILED",
    ]:  # Add other failure states if needed
        try:
            stderr_content = submitted_job.get_output_content(
                "tapisjob.err",  # This file might not exist if stderr is redirected to .out
                max_lines=max_output_lines,
                missing_ok=True,  # Okay if .err doesn't exist
            )
            if stderr_content is not None:  # Only print if found and not empty
                print(f"\n--- Last {max_output_lines} lines of tapisjob.err ---")
                print(stderr_content)
                print("------------------------------------")
            else:
                print(
                    "\n[INFO] tapisjob.err was not found (this is common if errors are in tapisjob.out)."
                )
        except FileOperationError as e:
            print(f"\n[ERROR] Could not retrieve tapisjob.err: {e}")
        except Exception as e:
            print(f"\n[ERROR] Unexpected error retrieving tapisjob.err: {e}")
    print("----------------------------------------------------")
else:
    print(
        "\nSkipping job output display (job not submitted or not in a terminal state)."
    )


--- Job Output/Error for 52f48eaf-b7d6-4964-a97b-a4b32a6aaeb3-007 (Status: FINISHED) ---
Attempting to fetch content of 'tapisjob.out' from job archive...
Returning last 50 lines of 'tapisjob.out'.

--- Last 50 lines of tapisjob.out ---
TACC:  Starting parallel tasks... 
[2025-06-04 08:19:09.625] [main] [info] git revision: 18f353fee2ac6735e4f53c9498e7976746b04055
[2025-06-04 08:19:09.648] [MPMExplicit] [info] MPM analysis type MPMExplicit2D
[2025-06-04 08:19:09.650] [MPMExplicit] [info] Rank 0 Read nodes: 1 ms
[2025-06-04 08:19:09.655] [MPMExplicit] [info] Rank 0 Read cells: 1 ms
[2025-06-04 08:19:09.657] [MPMExplicit] [info] Rank 0 Generate particles: 1 ms
[2025-06-04 08:19:09.665] [MPMExplicit] [info] Rank 0 Locate particles: 8 ms
[2025-06-04 08:19:09.665] [MPMExplicit] [info] Rank 0 Read volume, velocity and stresses: 0 ms
[2025-06-04 08:19:09.666] [MPMExplicit] [info] Rank 0 Create particle sets: 0 ms
[2025-06-04 08:19:09.666] [MPMExplicit] [info] Step: 0 of 10.

[2025-06-04 08:1

In [15]:
# if final_status in TAPIS_TERMINAL_STATES and final_status != STATUS_UNKNOWN: # Check if it's a known end state
print(f"\nAttempting to access archive information...")
try:
    archive_uri = submitted_job.archive_uri
    if archive_uri:
        print(f"Job Archive Tapis URI: {archive_uri}")
        print("\nListing archive contents (root):")
        outputs = ds.files.list(archive_uri)
        if outputs:
            for item in outputs:
                print(
                    f"- {item.name} (Type: {item.type}, Size: {item.size} bytes, Modified: {item.lastModified})"
                )
        else:
            print("No files found in the archive root directory.")
    else:
        print("Archive URI not available for this job.")
except FileOperationError as e:
    print(f"Could not list archive files: {e}")
except Exception as e:
    print(f"An unexpected error occurred while accessing archive information: {e}")


Attempting to access archive information...
Job Archive Tapis URI: tapis://stampede3/work2/05873/kks32/stampede3/tapis-jobs-archive/2025-06-04Z/mpm-s3-20250604_081741-52f48eaf-b7d6-4964-a97b-a4b32a6aaeb3-007

Listing archive contents (root):
Listing files in system 'stampede3' at path 'work2/05873/kks32/stampede3/tapis-jobs-archive/2025-06-04Z/mpm-s3-20250604_081741-52f48eaf-b7d6-4964-a97b-a4b32a6aaeb3-007'...
Found 5 items.
- inputDirectory (Type: dir, Size: 4096 bytes, Modified: 2025-06-04T13:19:50Z)
- tapisjob.env (Type: file, Size: 1518 bytes, Modified: 2025-06-04T13:19:49Z)
- tapisjob.out (Type: file, Size: 3973 bytes, Modified: 2025-06-04T13:19:49Z)
- tapisjob.sh (Type: file, Size: 1205 bytes, Modified: 2025-06-04T13:19:50Z)
- tapisjob_app.sh (Type: file, Size: 263 bytes, Modified: 2025-06-04T13:19:49Z)


## Apps Access

In [16]:
# Find all apps (less verbose)
all_apps = ds.apps.find("", verbose=False)
print(f"Found {len(all_apps)} total apps.")

Found 92 total apps.


In [17]:
# Find MPM apps specifically
mpm_apps = ds.apps.find("mpm", verbose=True)


Found 2 matching apps:
- mpm (Version: 1.1.0, Owner: wma_prtl)
- mpm-s3 (Version: 1.0, Owner: wma_prtl)



In [18]:
# Get details for the specific MPM app we want to use
app_id_to_use = "opensees-express"
app_details = ds.apps.get_details(app_id_to_use, verbose=True)

if not app_details:
    raise SystemExit(
        f"Could not find details for app '{app_id_to_use}'. Please check the app ID."
    )
# Print the app details

print(f"App Description: {app_details}")


App Details:
  ID: opensees-express
  Version: latest
  Owner: wma_prtl
  Execution System: wma-exec-01
  Description: OpenSees-EXPRESS provides users with a sequential OpenSees interpreter. It is ideal to run small sequential scripts on DesignSafe resources freeing up your own machine.
App Description: 
containerImage: tapis://cloud.data/corral/tacc/aci/CEP/applications/v3/opensees/latest/OpenSees-EXPRESS/opensees_express.zip
created: 2025-02-20T18:41:03.661272Z
deleted: False
description: OpenSees-EXPRESS provides users with a sequential OpenSees interpreter. It is ideal to run small sequential scripts on DesignSafe resources freeing up your own machine.
enabled: True
id: opensees-express
isPublic: True
jobAttributes: 
archiveOnAppError: True
archiveSystemDir: /tmp/${JobOwner}/tapis-jobs-archive/${JobCreateDate}/${JobName}-${JobUUID}
archiveSystemId: cloud.data
cmdPrefix: None
coresPerNode: 1
description: None
dtnSystemInputDir: !tapis_not_set
dtnSystemOutputDir: !tapis_not_set
dyna

In [19]:
# --- Example: List Queues for Frontera ---
try:
    print("\n--- System Queue Information ---")
    frontera_queues = ds.systems.list_queues("frontera")
    # You can now inspect the 'frontera_queues' list
    # Example: Find if 'development' queue exists
    dev_queue_exists = any(q.name == "development" for q in frontera_queues)
    print(f"Does 'development' queue exist on Frontera? {dev_queue_exists}")

    # Example: List queues for a non-existent system
    ds.systems.list_queues("non-existent-system")  # This would raise SystemInfoError

except SystemInfoError as e:
    print(f"Error getting system info: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")
print("-----------------------------")


--- System Queue Information ---

Fetching queue information for system 'frontera'...
Found 10 batch logical queues for system 'frontera':
  - Name: flex (HPC Queue: flex, Max Jobs: -1, Max User Jobs: N/A, Max Mins: 2880, Max Nodes: 128)
  - Name: development (HPC Queue: development, Max Jobs: -1, Max User Jobs: N/A, Max Mins: 120, Max Nodes: 40)
  - Name: normal (HPC Queue: normal, Max Jobs: -1, Max User Jobs: N/A, Max Mins: 2880, Max Nodes: 512)
  - Name: large (HPC Queue: large, Max Jobs: -1, Max User Jobs: N/A, Max Mins: 2880, Max Nodes: 2048)
  - Name: debug (HPC Queue: debug, Max Jobs: -1, Max User Jobs: N/A, Max Mins: 2880, Max Nodes: 8368)
  - Name: rtx (HPC Queue: rtx, Max Jobs: -1, Max User Jobs: N/A, Max Mins: 2880, Max Nodes: 22)
  - Name: rtx-dev (HPC Queue: rtx-dev, Max Jobs: -1, Max User Jobs: N/A, Max Mins: 120, Max Nodes: 2)
  - Name: nvdimm (HPC Queue: nvdimm, Max Jobs: -1, Max User Jobs: N/A, Max Mins: 120, Max Nodes: 4)
  - Name: small (HPC Queue: small, Max Jobs: 

## Verify TAPIS paths

In [20]:
# --- Translate Path with Verification ---
ds_path: str = "/MyData/mpm-benchmarks/2d/uniaxial_stress/"
ds_path_nonexistent: str = "/MyData/this/path/does/not/exist/"

try:
    # Translate and verify the existing path
    print(f"\nTranslating and verifying path: {ds_path}")
    input_uri = ds.files.translate_path_to_uri(ds_path, verify_exists=True)
    print(f"Input Directory Tapis URI (verified): {input_uri}")

    # Example: Try translating a non-existent path with verification (will raise error)
    print(f"\nTranslating and verifying non-existent path: {ds_path_nonexistent}")
    input_uri_bad = ds.files.translate_path_to_uri(
        ds_path_nonexistent, verify_exists=True
    )
    print(f"This line should not be reached.")

except FileOperationError as e:
    print(f"Error during path translation/verification: {e}")
    # Decide how to handle the error (e.g., stop notebook, use default, etc.)
    # For this example, we'll stop if verification fails.
    raise SystemExit("Stopping notebook due to path verification error.")
except Exception as e:
    print(f"An unexpected error occurred during path translation: {e}")
    raise SystemExit("Stopping notebook due to unexpected path translation error.")


Translating and verifying path: /MyData/mpm-benchmarks/2d/uniaxial_stress/
Translated '/MyData/mpm-benchmarks/2d/uniaxial_stress/' to 'tapis://designsafe.storage.default/kks32/mpm-benchmarks/2d/uniaxial_stress/' using t.username
Verifying existence of translated path: tapis://designsafe.storage.default/kks32/mpm-benchmarks/2d/uniaxial_stress/
Checking system 'designsafe.storage.default' for path 'kks32/mpm-benchmarks/2d/uniaxial_stress/'...
Verification successful: Path exists.
Input Directory Tapis URI (verified): tapis://designsafe.storage.default/kks32/mpm-benchmarks/2d/uniaxial_stress/

Translating and verifying non-existent path: /MyData/this/path/does/not/exist/
Translated '/MyData/this/path/does/not/exist/' to 'tapis://designsafe.storage.default/kks32/this/path/does/not/exist/' using t.username
Verifying existence of translated path: tapis://designsafe.storage.default/kks32/this/path/does/not/exist/
Checking system 'designsafe.storage.default' for path 'kks32/this/path/does/not

SystemExit: Stopping notebook due to path verification error.

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
