# Expose and Sumi API Demonstration

This notebook demonstrates how to interact with the Kodosumi Expose and Sumi APIs.

**Prerequisites:**
- Kodosumi server running at `http://localhost:3370`
- Valid admin credentials

In [493]:
import httpx
import time
from pprint import pprint

In [494]:
# Configuration
BASE_URL = "http://localhost:3370"

USERNAME = "admin"
PASSWORD = "admin"

## Login

First, we need to authenticate with the Kodosumi server to obtain a session.

In [495]:
# Create HTTP client with session support
client = httpx.Client(base_url=BASE_URL, follow_redirects=True, timeout=30.0)

# Perform login
login_resp = client.post(
    "/login",
    data={"name": USERNAME, "password": PASSWORD},
    headers={"Content-Type": "application/x-www-form-urlencoded"}
)

print(f"Login status: {login_resp.status_code}")
assert login_resp.status_code == 200, f"Login failed: {login_resp.text}"
print("Login successful!")

Login status: 200
Login successful!


## Ensure Expose is Empty

> **"Be careful with that axe, Eugene"**
>
> The following cells will **DELETE ALL** expose items from your database.
> This is a destructive operation. Only proceed if you understand what you're doing.

In [496]:
# List current exposes
resp = client.get("/expose/")
print(f"Status: {resp.status_code}")

exposes = resp.json()
print(f"Found {len(exposes)} expose(s):")
for expose in exposes:
    print(f"  - {expose['name']} (state: {expose.get('state', 'N/A')})")

Status: 200
Found 4 expose(s):
  - form-example (state: RUNNING)
  - jkx (state: RUNNING)
  - prime (state: RUNNING)
  - sumi-test (state: RUNNING)


In [497]:
# Delete all exposes (if any exist)
if exposes:
    print(f"Deleting {len(exposes)} expose(s)...")
    for expose in exposes:
        name = expose['name']
        del_resp = client.delete(f"/expose/{name}")
        if del_resp.status_code == 204:
            print(f"  Deleted: {name}")
        else:
            print(f"  Failed to delete {name}: {del_resp.status_code} - {del_resp.text}")
    print("Done.")
else:
    print("No exposes to delete.")

Deleting 4 expose(s)...
  Deleted: form-example
  Deleted: jkx
  Deleted: prime
  Deleted: sumi-test
Done.


In [498]:
# Verify expose list is empty
resp = client.get("/expose/")
exposes = resp.json()

assert len(exposes) == 0, f"Expected 0 exposes, got {len(exposes)}"
print("Verified: Expose list is empty.")

Verified: Expose list is empty.


### shutdown

In [499]:
# Shutdown Ray Serve (streaming response)
with client.stream("DELETE", "/boot/") as resp:
    print(f"Shutdown status: {resp.status_code}")
    for line in resp.iter_lines():
        if line:
            print(line)

Shutdown status: 200
▶ [DEPLOY] Shutting down Ray Serve
  • [serve] Running serve shutdown -y
  serve shutdown → success
▶ [UPDATE] Updating expose states
  • [Clearing flow registry] PUT /flow/register
  Flow registry refreshed → 10 flows remaining
✔ Shutdown complete


## Create Exposes

Now we'll create 4 expose items via `POST /expose/`.

In [500]:
# 1) Create expose: jkx
resp = client.post("/expose/", json={
    "name": "jkx",
    "bootstrap": "import_path: tests.test_unwrap:fast_app2"
})
print(f"jkx: {resp.status_code}")
pprint(resp.json())

jkx: 201
{'bootstrap': 'import_path: tests.test_unwrap:fast_app2',
 'created': '2025-12-10T08:02:38.992394',
 'display': None,
 'enabled': True,
 'etag': '1765350158.992394',
 'flow_stats': '0/0',
 'heartbeat': 1765350158.967853,
 'meta': None,
 'name': 'jkx',
 'needs_reboot': False,
 'network': None,
 'stale': False,
 'state': 'DEAD',
 'updated': '2025-12-10T08:02:38.992394'}


In [501]:
# 2) Create expose: form-example
resp = client.post("/expose/", json={
    "name": "form-example",
    "bootstrap": "import_path: tests.form_elements_demo:fast_app"
})
print(f"form-example: {resp.status_code}")
pprint(resp.json())

form-example: 201
{'bootstrap': 'import_path: tests.form_elements_demo:fast_app',
 'created': '2025-12-10T08:02:39.053048',
 'display': None,
 'enabled': True,
 'etag': '1765350159.0530481',
 'flow_stats': '0/0',
 'heartbeat': 1765350159.029301,
 'meta': None,
 'name': 'form-example',
 'needs_reboot': False,
 'network': None,
 'stale': False,
 'state': 'DEAD',
 'updated': '2025-12-10T08:02:39.053048'}


In [502]:
# 3) Create expose: prime (with runtime_env and deployments)
prime_bootstrap = """import_path: kodosumi_examples.prime.app:fast_app
runtime_env:
  py_modules:
    - https://github.com//masumi-network/kodosumi-examples/archive/c1fc02b73e8fc3a1298c26b1597627098ecc7510.zip
deployments:
  - name: PrimeDistribution
    num_replicas: 1
    ray_actor_options:
      num_cpus: 0.1
"""

resp = client.post("/expose/", json={
    "name": "prime",
    "bootstrap": prime_bootstrap
})
print(f"prime: {resp.status_code}")
pprint(resp.json())

prime: 201
{'bootstrap': 'import_path: kodosumi_examples.prime.app:fast_app\n'
              'runtime_env:\n'
              '  py_modules:\n'
              '    - '
              'https://github.com//masumi-network/kodosumi-examples/archive/c1fc02b73e8fc3a1298c26b1597627098ecc7510.zip\n'
              'deployments:\n'
              '  - name: PrimeDistribution\n'
              '    num_replicas: 1\n'
              '    ray_actor_options:\n'
              '      num_cpus: 0.1\n',
 'created': '2025-12-10T08:02:39.087386',
 'display': None,
 'enabled': True,
 'etag': '1765350159.0873861',
 'flow_stats': '0/0',
 'heartbeat': 1765350159.065103,
 'meta': None,
 'name': 'prime',
 'needs_reboot': False,
 'network': None,
 'stale': False,
 'state': 'DEAD',
 'updated': '2025-12-10T08:02:39.087386'}


In [503]:
# 4) Create expose: sumi-test
resp = client.post("/expose/", json={
    "name": "sumi-test",
    "bootstrap": "import_path: tests.test_masumi:fast_app"
})
print(f"sumi-test: {resp.status_code}")
pprint(resp.json())

sumi-test: 201
{'bootstrap': 'import_path: tests.test_masumi:fast_app',
 'created': '2025-12-10T08:02:39.125288',
 'display': None,
 'enabled': True,
 'etag': '1765350159.125288',
 'flow_stats': '0/0',
 'heartbeat': 1765350159.098794,
 'meta': None,
 'name': 'sumi-test',
 'needs_reboot': False,
 'network': None,
 'stale': False,
 'state': 'DEAD',
 'updated': '2025-12-10T08:02:39.125288'}


In [504]:
# Verify: List all exposes
resp = client.get("/expose/")
exposes = resp.json()

print(f"Created {len(exposes)} expose(s):")
for expose in exposes:
    print(f"  - {expose['name']} (state: {expose.get('state', 'N/A')})")

Created 4 expose(s):
  - form-example (state: DEAD)
  - jkx (state: DEAD)
  - prime (state: DEAD)
  - sumi-test (state: DEAD)


## Update Exposes

Update `form-example` and `jkx` with display names and descriptions.

In [505]:
# Update form-example with display name
resp = client.post("/expose/", json={
    "name": "form-example",
    "display": "Form Elements Demo",
    "bootstrap": "import_path: tests.form_elements_demo:fast_app",
    "meta": [{
        "url": "/form-example/",
        "data": """display: Form Elements Demo
description: Showcases all available form input components
tags:
  - demo
  - forms
"""
    }]
})
print(f"form-example update: {resp.status_code}")
pprint(resp.json())

form-example update: 201
{'bootstrap': 'import_path: tests.form_elements_demo:fast_app',
 'created': '2025-12-10T08:02:39.053048',
 'display': 'Form Elements Demo',
 'enabled': True,
 'etag': '1765350161.1186728',
 'flow_stats': '0/0',
 'heartbeat': 1765350161.096579,
 'meta': [{'data': 'display: Form Elements Demo\n'
                   'description: Showcases all available form input '
                   'components\n'
                   'tags:\n'
                   '  - demo\n'
                   '  - forms\n',
           'enabled': True,
           'heartbeat': None,
           'state': None,
           'url': '/form-example/'}],
 'name': 'form-example',
 'needs_reboot': False,
 'network': None,
 'stale': False,
 'state': 'DEAD',
 'updated': '2025-12-10T08:02:41.118673'}


In [506]:
# Update jkx with display name
resp = client.post("/expose/", json={
    "name": "jkx",
    "display": "jKx Demo",
    "bootstrap": "import_path: tests.test_unwrap:fast_app2",
    "meta": [{
        "url": "/jkx/",
        "data": """display: JKX Unwrap Test
description: Tests response unwrapping functionality
tags:
  - test
  - unwrap
"""
    }]
})
print(f"jkx update: {resp.status_code}")
pprint(resp.json())

jkx update: 201
{'bootstrap': 'import_path: tests.test_unwrap:fast_app2',
 'created': '2025-12-10T08:02:38.992394',
 'display': 'jKx Demo',
 'enabled': True,
 'etag': '1765350161.9159',
 'flow_stats': '0/0',
 'heartbeat': 1765350161.8874722,
 'meta': [{'data': 'display: JKX Unwrap Test\n'
                   'description: Tests response unwrapping functionality\n'
                   'tags:\n'
                   '  - test\n'
                   '  - unwrap\n',
           'enabled': True,
           'heartbeat': None,
           'state': None,
           'url': '/jkx/'}],
 'name': 'jkx',
 'needs_reboot': False,
 'network': None,
 'stale': False,
 'state': 'DEAD',
 'updated': '2025-12-10T08:02:41.915900'}


## Sumi API

Verify that `/sumi` returns an empty list before boot, then boot and verify flows are available.

In [507]:
# Verify /sumi returns empty list (before boot)
resp = client.get("/sumi")
data = resp.json()

print(f"Status: {resp.status_code}")
print(f"Items: {len(data.get('items', []))}")
assert len(data.get('items', [])) == 0, f"Expected 0 items, got {len(data.get('items', []))}"
print("Verified: /sumi returns empty list (no boot yet)")

Status: 200
Items: 0
Verified: /sumi returns empty list (no boot yet)


In [508]:
# Boot Ray Serve (streaming response)
with client.stream("POST", "/boot/") as resp:
    print(f"Boot status: {resp.status_code}")
    for line in resp.iter_lines():
        if line:
            print(line)

Boot status: 201
ℹ Boot started at 2025-12-10 08:02:44
▶ [DEPLOY] Starting Ray Serve deployment
  • [serve_config.yaml] Loading global configuration → OK
  • [form-example] Prepared deployment config → route=/form-example
  • [jkx] Prepared deployment config → route=/jkx
  • [prime] Prepared deployment config → route=/prime
  • [sumi-test] Prepared deployment config → route=/sumi-test
  • [/var/folders/n9/f3p_gzsj7vjfgfw70yqksywcn5rfs9/T/serve_deploy_8r25z65i.yaml] Created merged deployment config
  • [serve] Running serve deploy (4 applications)
  serve deploy command → success
✓ Deployment initiated (4 applications)
▶ [HEALTH] Waiting for deployments to complete (timeout: 1800s)
  • [form-example] Status: DEPLOYING
  • [jkx] Status: DEPLOYING
  • [prime] Status: DEPLOYING
  • [sumi-test] Status: DEPLOYING
  [form-example] Reached final state → RUNNING
  [jkx] Reached final state → RUNNING
  [prime] Reached final state → RUNNING
  [sumi-test] Reached final state → RUNNING
✓ All deploy

In [509]:
# Verify /sumi returns flows after boot
resp = client.get("/sumi")
data = resp.json()

print(f"Status: {resp.status_code}")
items = data.get('items', [])
print(f"Found {len(items)} flow(s):")
for item in items:
    print(f"  - {item.get('display', 'N/A')} ({item.get('api_url', 'N/A')})")

assert len(items) > 0, "Expected flows after boot"
print(f"\nVerified: /sumi returns {len(items)} flows after boot")

Status: 200
Found 10 flow(s):
  - Complete Registration Form (http://localhost:3370/sumi/form-example/complete-form)
  - Date/Time Input Types Demo (http://localhost:3370/sumi/form-example/datetime-inputs)
  - Numeric Input Types Demo (http://localhost:3370/sumi/form-example/numeric-inputs)
  - Selection Input Types Demo (http://localhost:3370/sumi/form-example/selection-inputs)
  - Special Input Types Demo (http://localhost:3370/sumi/form-example/special-inputs)
  - Text Input Types Demo (http://localhost:3370/sumi/form-example/text-inputs)
  - jKx Knowledge Graph (http://localhost:3370/sumi/jkx/serve)
  - Prime Gap Distribution (http://localhost:3370/sumi/prime)
  - Prime Gap Distribution (async) (http://localhost:3370/sumi/prime/async)
  - Masumi Two-Stage Service (http://localhost:3370/sumi/sumi-test)

Verified: /sumi returns 10 flows after boot


## Text Inputs Demo

Demonstrate starting a job on the `text-inputs` flow and checking its status.

In [510]:
# Get input schema for text-inputs flow
resp = client.get("/sumi/form-example/text-inputs/input_schema")

print(f"Status: {resp.status_code}")
print("Input Schema:")
pprint(resp.json())

Status: 200
Input Schema:
{'input_data': [{'data': {'placeholder': 'Enter your username (3-20 chars)'},
                 'id': 'username',
                 'name': 'Username',
                 'type': 'text',
                 'validations': {'max': '20'}},
                {'data': {'placeholder': 'Min 8 characters'},
                 'id': 'password',
                 'name': 'Password',
                 'type': 'password',
                 'validations': {'max': '100', 'min': '8'}},
                {'data': {'placeholder': 'user@example.com'},
                 'id': 'email',
                 'name': 'Email Address',
                 'type': 'email',
                 'validations': {'format': 'email'}},
                {'data': {'placeholder': 'https://example.com'},
                 'id': 'website',
                 'name': 'Website URL',
                 'type': 'url',
                 'validations': {'format': 'url', 'optional': True}},
                {'data': {'placeholder': '+1 (

### Validation failed (400)

In [511]:
# Start a job on text-inputs
resp = client.post("/sumi/form-example/text-inputs/start_job", json={})
print(f"Status: {resp.status_code}")
result = resp.json()
pprint(result)

Status: 400
{'detail': 'Validation failed for POST '
           "/sumi/form-example/text-inputs/start_job: [{'message': 'Field "
           "required', 'key': 'identifier_from_purchaser'}]",
 'error': 'ValidationException',
 'path': '/sumi/form-example/text-inputs/start_job',
 'status_code': 400}


### backend validation failed (201)

In [512]:
# Start a job on text-inputs
resp = client.post("/sumi/form-example/text-inputs/start_job", json={
    "identifier_from_purchaser": "demo-123"})

print(f"Status: {resp.status_code}")
result = resp.json()
pprint(result)

Status: 201
{'error': "username: ['Username is required']; password: ['Password is "
          "required']; email: ['Email is required']; _global_: []",
 'status': 'failed'}


In [513]:
# Start a job on text-inputs
resp = client.post("/sumi/form-example/text-inputs/start_job", json={
    "identifier_from_purchaser": "demo-123",
    "input_data": {
        "username": "DemoUser",
    }
})

print(f"Status: {resp.status_code}")
result = resp.json()
pprint(result)

Status: 201
{'error': "password: ['Password is required']; email: ['Email is required']; "
          '_global_: []',
 'status': 'failed'}


In [514]:
# Start a job on text-inputs
resp = client.post("/sumi/form-example/text-inputs/start_job", json={
    "identifier_from_purchaser": "demo-123",
    "input_data": {
        "username": "DemoUser",
        "password": "secretpass123"
    }
})

print(f"Status: {resp.status_code}")
result = resp.json()
pprint(result)

Status: 201
{'error': "email: ['Email is required']; _global_: []", 'status': 'failed'}


### Successful POST

In [515]:
# Start a job on text-inputs
resp = client.post("/sumi/form-example/text-inputs/start_job", json={
    "identifier_from_purchaser": "demo-123",
    "input_data": {
        "email": "user@example.com",
        "username": "DemoUser",
        "password": "secretpass123"
    }
})

print(f"Status: {resp.status_code}")
result = resp.json()
pprint(result)

# Save job_id for status check
job_id = result.get("job_id")
print(f"\nJob ID: {job_id}")

Status: 201
{'error': None,
 'identifier_from_purchaser': 'demo-123',
 'input_schema': None,
 'job_id': '69391b2a6dd6594791c3d54e',
 'result': None,
 'runtime': None,
 'started_at': 74188.086,
 'status': 'running',
 'updated_at': 74188.263}

Job ID: 69391b2a6dd6594791c3d54e


In [516]:
# Check job status
resp = client.get(f"/sumi/status/{job_id}")

print(f"Status: {resp.status_code}")
pprint(resp.json())

Status: 404
{'detail': "Job '69391b2a6dd6594791c3d54e' not found",
 'error': 'NotFoundException',
 'path': '/sumi/status/69391b2a6dd6594791c3d54e',
 'status_code': 404}


## Prime Demo

Demonstrate the `prime` flow which calculates prime gap distributions.

- **No input**: Should succeed with defaults (start=0, end=10000000, tasks=200)
- **Invalid input**: Non-integer values should fail
- **Streaming API**: Direct access via proxy (not part of Sumi protocol)

In [517]:
# Get input schema for prime flow
resp = client.get("/sumi/prime/input_schema")

print(f"Status: {resp.status_code}")
print("Input Schema:")
pprint(resp.json())

Status: 200
Input Schema:
{'input_data': [{'data': {'default': '0'},
                 'id': 'start',
                 'name': 'Start',
                 'type': 'text',
                 'validations': {'optional': True}},
                {'data': {'default': '10000000'},
                 'id': 'end',
                 'name': 'End',
                 'type': 'text',
                 'validations': {'optional': True}},
                {'data': {'default': '200'},
                 'id': 'tasks',
                 'name': 'Tasks',
                 'type': 'text',
                 'validations': {'optional': True}}],
 'input_groups': None}


### No input (error)

In [518]:
# Start job with no input_data (uses defaults)
resp = client.post("/sumi/prime/start_job", json={
    "identifier_from_purchaser": "prime-no-input"
})

print(f"Status: {resp.status_code}")
result = resp.json()
pprint(result)

prime_job_id = result.get("job_id")
print(f"\nJob ID: {prime_job_id}")

Status: 201
{'error': None,
 'identifier_from_purchaser': 'prime-no-input',
 'input_schema': None,
 'job_id': '69391b2f6dd659479940641a',
 'result': None,
 'runtime': None,
 'started_at': 74192.476,
 'status': 'running',
 'updated_at': 74192.608}

Job ID: 69391b2f6dd659479940641a


In [519]:
# Poll until job completes
max_attempts = 60
for i in range(max_attempts):
    resp = client.get(f"/sumi/status/{prime_job_id}")
    status_data = resp.json()
    job_status = status_data.get("status")

    print(f"[{i+1}/{max_attempts}] Status: {job_status}")

    if job_status in ("completed", "failed"):
        print("\nFinal result:")
        pprint(status_data)
        break

    time.sleep(1)
else:
    print("Job did not complete within timeout")

[1/60] Status: None
[2/60] Status: running
[3/60] Status: running
[4/60] Status: running
[5/60] Status: failed

Final result:
{'error': 'Traceback (most recent call last):\n'
          '  File "/Users/raum/Project/kodosumi/kodosumi/runner/main.py", line '
          '165, in run\n'
          '    await self.start()\n'
          '  File '
          '"/Users/raum/Project/kodosumi/.venv/lib/python3.12/site-packages/ray/util/tracing/tracing_helper.py", '
          'line 493, in _resume_span\n'
          '    return await method(self, *_args, **_kwargs)\n'
          '           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n'
          '  File "/Users/raum/Project/kodosumi/kodosumi/runner/main.py", line '
          '309, in start\n'
          '    result = await asyncio.get_event_loop().run_in_executor(\n'
          '             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n'
          '  File '
          '"/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/concurrent/futures/thre

### Invalid input (non-integer values - should fail)

In [520]:
# Start job with invalid input (non-integer values)
resp = client.post("/sumi/prime/start_job", json={
    "identifier_from_purchaser": "prime-invalid",
    "input_data": {
        "start": "not-a-number",
        "end": "also-invalid",
        "tasks": "abc"
    }
})

print(f"Status: {resp.status_code}")
result = resp.json()
pprint(result)

invalid_job_id = result.get("job_id")

Status: 201
{'error': None,
 'identifier_from_purchaser': 'prime-invalid',
 'input_schema': None,
 'job_id': '69391b366dd659479940641b',
 'result': None,
 'runtime': None,
 'started_at': 74200.151,
 'status': 'running',
 'updated_at': 74200.181}


In [521]:
# Check status of invalid job (should show error)
for i in range(10):
    resp = client.get(f"/sumi/status/{invalid_job_id}")
    status_data = resp.json()
    job_status = status_data.get("status")

    print(f"[{i+1}/10] Status: {job_status}")

    if job_status in ("completed", "failed"):
        print("\nFinal result:")
        pprint(status_data)
        break

    time.sleep(1)

[1/10] Status: None
[2/10] Status: None
[3/10] Status: running
[4/10] Status: running
[5/10] Status: failed

Final result:
{'error': 'Traceback (most recent call last):\n'
          '  File "/Users/raum/Project/kodosumi/kodosumi/runner/main.py", line '
          '165, in run\n'
          '    await self.start()\n'
          '  File '
          '"/Users/raum/Project/kodosumi/.venv/lib/python3.12/site-packages/ray/util/tracing/tracing_helper.py", '
          'line 493, in _resume_span\n'
          '    return await method(self, *_args, **_kwargs)\n'
          '           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n'
          '  File "/Users/raum/Project/kodosumi/kodosumi/runner/main.py", line '
          '309, in start\n'
          '    result = await asyncio.get_event_loop().run_in_executor(\n'
          '             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n'
          '  File '
          '"/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/concurrent/futures/thread.

### Successful launch (valid integers)

In [522]:
# Start job with valid input (smaller range for faster execution)
resp = client.post("/sumi/prime/start_job", json={
    "identifier_from_purchaser": "prime-valid",
    "input_data": {
        "start": "0",
        "end": "100000",
        "tasks": "10"
    }
})

print(f"Status: {resp.status_code}")
result = resp.json()
pprint(result)

valid_job_id = result.get("job_id")
print(f"\nJob ID: {valid_job_id}")

Status: 201
{'error': None,
 'identifier_from_purchaser': 'prime-valid',
 'input_schema': None,
 'job_id': '69391b3e6dd659479940641c',
 'result': None,
 'runtime': None,
 'started_at': 74207.647,
 'status': 'running',
 'updated_at': 74207.685}

Job ID: 69391b3e6dd659479940641c


In [523]:
# Poll until job completes
for i in range(60):
    resp = client.get(f"/sumi/status/{valid_job_id}")
    status_data = resp.json()
    job_status = status_data.get("status")

    print(f"[{i+1}/60] Status: {job_status}")

    if job_status in ("completed", "failed"):
        print("\nFinal result:")
        pprint(status_data)
        break

    time.sleep(1)
else:
    print("Job did not complete within timeout")

[1/60] Status: None
[2/60] Status: None
[3/60] Status: running
[4/60] Status: running
[5/60] Status: running
[6/60] Status: running
[7/60] Status: running
[8/60] Status: running
[9/60] Status: running
[10/60] Status: running
[11/60] Status: completed

Final result:
{'error': None,
 'identifier_from_purchaser': 'prime-valid',
 'input_schema': None,
 'job_id': '69391b3e6dd659479940641c',
 'result': {'dict': {'1': 1,
                     '10': 916,
                     '12': 964,
                     '14': 484,
                     '16': 339,
                     '18': 514,
                     '2': 1224,
                     '20': 238,
                     '22': 223,
                     '24': 206,
                     '26': 88,
                     '28': 98,
                     '30': 146,
                     '32': 32,
                     '34': 33,
                     '36': 54,
                     '38': 19,
                     '4': 1215,
                     '40': 28,
             

### Streaming API

Stream execution events in real-time using the outputs endpoint.
This uses the `valid_job_id` from the successful launch above.

In [524]:
# Start job with valid input (smaller range for faster execution)
resp = client.post("/sumi/prime/start_job", json={
    "identifier_from_purchaser": "prime-valid",
    "input_data": {
        "start": "0",
        "end": "100000",
        "tasks": "10"
    }
})

print(f"Status: {resp.status_code}")
result = resp.json()
pprint(result)

valid_job_id = result.get("job_id")
print(f"\nJob ID: {valid_job_id}")

Status: 201
{'error': None,
 'identifier_from_purchaser': 'prime-valid',
 'input_schema': None,
 'job_id': '69391b4b6dd659479940641d',
 'result': None,
 'runtime': None,
 'started_at': 74221.19,
 'status': 'running',
 'updated_at': 74221.222}

Job ID: 69391b4b6dd659479940641d


In [525]:
# Stream execution events for the valid job
# Note: This streams from a completed job, showing all recorded events
with client.stream("GET", f"/outputs/stream/{valid_job_id}?extended=true") as resp:
    print(f"Stream status: {resp.status_code}")
    print("Streaming events:")
    for line in resp.iter_lines():
        if line:
            print(line)

Stream status: 200
Streaming events:
id: 1
event: status
data: 1765350222.652195:starting
id: 2
event: inputs
data: 1765350224.8006258:{"dict":{"start":"0","end":"100000","tasks":"10"}}
id: 3
event: meta
data: 1765350225.067287:{"dict":{"fid":"69391b4b6dd659479940641d","username":"c7eb70a5-0c26-407e-b785-f1ef5e7c4486","app_url":"http://localhost:8005/prime","panel_url":"http://localhost:3370","entry_point":"kodosumi_examples.prime.app:run","kodosumi":"1.1.0","tags":["Test"],"summary":"Prime Gap Distribution","description":"This agent creates a prime gap distribution for a given range of numbers.","deprecated":null,"author":"m.rau@house-of-communication.com","organization":"House of Communication","version":"0.1.0","extra":{"identifier_from_purchaser":"prime-valid","input_hash":"5bc06e91746525903d5f5d0d9d3e0cc873df900aa033f096b68a34e08e916269","sumi_endpoint":"prime"}}}
id: 4
event: status
data: 1765350225.193517:running
id: 0
event: alive
data: 1765350228.212321:service alive
id: 5
eve

## Payment Integration Demo (text inputs)

Update the `text-inputs` flow with an `agentIdentifier` to enable Masumi payment processing.

In [526]:
# Update form-example with agentIdentifier for text-inputs
resp = client.post("/expose/", json={
    "name": "form-example",
    "display": "Form Elements Demo",
    "network": "Preprod",
    "bootstrap": "import_path: tests.form_elements_demo:fast_app",
    "meta": [{
        "url": "/form-example/text-inputs",
        "data": """display: Text Input Types Demo
description: Showcases text input components with payment
agentIdentifier: 7e8bdaf2b2b919a3a4b94002cafb50086c0c845fe535d07a77ab7f77ee8e013e3f92bbd2aa41065577ac087a0d55ff96430087324c023b021267ca37
tags:
  - demo
  - forms
  - payment
"""
    }]
})
print(f"form-example update: {resp.status_code}")
pprint(resp.json())

form-example update: 201
{'bootstrap': 'import_path: tests.form_elements_demo:fast_app',
 'created': '2025-12-10T08:02:39.053048',
 'display': 'Form Elements Demo',
 'enabled': True,
 'etag': '1765350241.1295419',
 'flow_stats': '0/0',
 'heartbeat': 1765350241.102141,
 'meta': [{'data': 'display: Text Input Types Demo\n'
                   'description: Showcases text input components with payment\n'
                   'agentIdentifier: '
                   '7e8bdaf2b2b919a3a4b94002cafb50086c0c845fe535d07a77ab7f77ee8e013e3f92bbd2aa41065577ac087a0d55ff96430087324c023b021267ca37\n'
                   'tags:\n'
                   '  - demo\n'
                   '  - forms\n'
                   '  - payment\n',
           'enabled': True,
           'heartbeat': None,
           'state': None,
           'url': '/form-example/text-inputs'}],
 'name': 'form-example',
 'needs_reboot': False,
 'network': 'Preprod',
 'stale': False,
 'state': 'RUNNING',
 'updated': '2025-12-10T08:04:01.129542'

In [527]:
# Refresh form-example to pick up changes
with client.stream("POST", "/boot/refresh/form-example") as resp:
    print(f"Refresh status: {resp.status_code}")
    for line in resp.iter_lines():
        if line:
            print(line)

Refresh status: 200
ℹ Refreshing expose 'form-example' at 2025-12-10 08:04:01
▶ [UPDATE] Step 1/4: Disabling expose 'form-example'
  [form-example] Expose disabled → enabled=False
▶ [DEPLOY] Step 2/4: Running boot process (remove flows)
ℹ [Boot 1/2] Boot started at 2025-12-10 08:04:01
▶ [DEPLOY] [Boot 1/2] Starting Ray Serve deployment
  • [serve_config.yaml] [Boot 1/2] Loading global configuration → OK
  • [jkx] [Boot 1/2] Prepared deployment config → route=/jkx
  • [prime] [Boot 1/2] Prepared deployment config → route=/prime
  • [sumi-test] [Boot 1/2] Prepared deployment config → route=/sumi-test
  • [/var/folders/n9/f3p_gzsj7vjfgfw70yqksywcn5rfs9/T/serve_deploy_wxxwn9wn.yaml] [Boot 1/2] Created merged deployment config
  • [serve] [Boot 1/2] Running serve deploy (3 applications)
  [Boot 1/2] serve deploy command → success
✓ [Boot 1/2] Deployment initiated (3 applications)
▶ [HEALTH] [Boot 1/2] Waiting for deployments to complete (timeout: 1800s)
  [jkx] [Boot 1/2] Reached final stat

### check availability

In [528]:
# Start a job on text-inputs (now with payment processing)
resp = client.get("/sumi/form-example/text-inputs/availability")
print(f"Status: {resp.status_code}")
result = resp.json()
pprint(result)

Status: 200
{'message': 'Text Input Types Demo is ready to accept jobs',
 'status': 'available',
 'type': 'masumi-agent'}


In [529]:
import secrets

identifier = secrets.token_hex(13)

In [530]:
# Start a job on text-inputs (now with payment processing)
resp = client.post("/sumi/form-example/text-inputs/start_job", json={
    "identifier_from_purchaser": identifier,
    "input_data": {
        "email": "user@example.com",
        "username": "PaymentUser",
        "password": "secretpass123"
    }
})

print(f"Status: {resp.status_code}")
result = resp.json()
pprint(result)

payment_job_id = result.get("job_id")
print(f"\nJob ID: {payment_job_id}")

Status: 201
{'error': None,
 'identifier_from_purchaser': '9a74866033f0e778c8ee734e12',
 'input_schema': None,
 'job_id': '69391b6b6dd659490c56663e',
 'result': None,
 'runtime': None,
 'started_at': 74253.021,
 'status': 'running',
 'updated_at': 74253.123}

Job ID: 69391b6b6dd659490c56663e


In [531]:
# Poll until job requires payment or completes
awaiting_payment_count = 0
for i in range(60):
    resp = client.get(f"/sumi/status/{payment_job_id}")
    status_data = resp.json()
    job_status = status_data.get("status")

    print(f"[{i+1}/60] Status: {job_status}")

    if job_status == "awaiting_payment":
        awaiting_payment_count += 1
        if awaiting_payment_count >= 3:
            print("\nJob awaiting payment - need to submit payment")
            pprint(status_data)
            break

    if job_status in ("completed", "failed"):
        print("\nFinal result:")
        pprint(status_data)
        break

    time.sleep(1)
else:
    print("Job did not complete within timeout")

[1/60] Status: running
[2/60] Status: awaiting_payment
[3/60] Status: awaiting_payment
[4/60] Status: awaiting_payment

Job awaiting payment - need to submit payment
{'error': None,
 'identifier_from_purchaser': '9a74866033f0e778c8ee734e12',
 'input_schema': None,
 'job_id': '69391b6b6dd659490c56663e',
 'result': None,
 'runtime': 1.5557138919830322,
 'started_at': 1765350253.658858,
 'status': 'awaiting_payment',
 'updated_at': 1765350255.214572}


### Simulate Payment (Masumi Purchase API)

Extract payment information from the `kind='payment'` event and simulate a purchase.

In [532]:
# Extract payment information from the event stream
import json

payment_info = None
with client.stream("GET", f"/outputs/stream/{payment_job_id}") as resp:
    for line in resp.iter_lines():
        if line and line.startswith("event: payment"):
            # Next line is the data
            continue
        if line and line.startswith("data:") and payment_info is None:
            # Check if this is payment data
            data_part = line[5:].strip()  # Remove "data:" prefix
            if ":" in data_part:
                ts, json_str = data_part.split(":", 1)
                try:
                    parsed = json.loads(json_str)
                    if "dict" in parsed and "blockchainIdentifier" in parsed["dict"]:
                        payment_info = parsed["dict"]
                        print("Found payment event:")
                        pprint(payment_info)
                        break
                except:
                    pass

if not payment_info:
    print("No payment event found - job may not have payment configured")

Found payment event:
{'agentIdentifier': '7e8bdaf2b2b919a3a4b94002cafb50086c0c845fe535d07a77ab7f77ee8e013e3f92bbd2aa41065577ac087a0d55ff96430087324c023b021267ca37',
 'blockchainIdentifier': '2707604c01c0cc08c306605300315e04300b220c6ce0ec18a0c009a98bacb8a80ac98c500463ad63ca6df7a01b2211864604227a50c130cc1abf26a5d3c091380c60e8a1626c133221381535aba20f3c382265a496945aa5aba1021d1310f09e27e291a2344499c983a160c320f1713ba1e0433b22715bc300f2614318838a61e1833108c180f083e9408001d1a8826098f2a1a3222138439a7ba761e4945ad099aaa5e48322580b23c040c16a21822248e3c1319257a34a75866292414290430fea8580e34fc3c262612216c3089ad32442d30012f080f0c327278387a7a67543490ba274c0df61c61e279816858442dd80b32618148200826de8601829d687237119286a5c29192f03e1e1f0c8753008cf89e218494c0625cf690344e06098102ad10f05838d08f01b0f02088521a130e8278f1798f7498b2ec701a09e0ba145186278530b0543c340dc987cc60f11c8a1c1a060fa4c8c4b01b7408dc2f91c03c90e45a250063a1105202881824c1c1c5d5b8209c551e40e561c2192c4f0104d6110184806002433c28c3c875e00830ad0c1d09

In [533]:
# Extract variables for purchase request
if payment_info:
    aid = payment_info.get("agentIdentifier")
    bid = payment_info.get("blockchainIdentifier")
    ih = payment_info.get("inputHash")
    pbt = payment_info.get("payByTime")
    srt = payment_info.get("submitResultTime")
    ult = payment_info.get("unlockTime")
    edut = payment_info.get("externalDisputeUnlockTime")
    
    print("Payment variables extracted:")
    print(f"  agentIdentifier: {aid}")
    print(f"  blockchainIdentifier: {bid}")
    print(f"  inputHash: {ih}")
    print(f"  payByTime: {pbt}")
    print(f"  submitResultTime: {srt}")
    print(f"  unlockTime: {ult}")
    print(f"  externalDisputeUnlockTime: {edut}")

Payment variables extracted:
  agentIdentifier: 7e8bdaf2b2b919a3a4b94002cafb50086c0c845fe535d07a77ab7f77ee8e013e3f92bbd2aa41065577ac087a0d55ff96430087324c023b021267ca37
  blockchainIdentifier: 2707604c01c0cc08c306605300315e04300b220c6ce0ec18a0c009a98bacb8a80ac98c500463ad63ca6df7a01b2211864604227a50c130cc1abf26a5d3c091380c60e8a1626c133221381535aba20f3c382265a496945aa5aba1021d1310f09e27e291a2344499c983a160c320f1713ba1e0433b22715bc300f2614318838a61e1833108c180f083e9408001d1a8826098f2a1a3222138439a7ba761e4945ad099aaa5e48322580b23c040c16a21822248e3c1319257a34a75866292414290430fea8580e34fc3c262612216c3089ad32442d30012f080f0c327278387a7a67543490ba274c0df61c61e279816858442dd80b32618148200826de8601829d687237119286a5c29192f03e1e1f0c8753008cf89e218494c0625cf690344e06098102ad10f05838d08f01b0f02088521a130e8278f1798f7498b2ec701a09e0ba145186278530b0543c340dc987cc60f11c8a1c1a060fa4c8c4b01b7408dc2f91c03c90e45a250063a1105202881824c1c1c5d5b8209c551e40e561c2192c4f0104d6110184806002433c28c3c875e00830ad0c1d

In [534]:
# Simulate payment via Masumi Purchase API
purchase = httpx.post(
    "https://payment.masumi.network/api/v1/purchase/",
    headers={
        "accept": "application/json",
        "token": "iofsnaiojdoiewqajdriknjonasfoinasd",
        "Content-Type": "application/json"
    },
    json={
        "identifierFromPurchaser": identifier,
        "network": "Preprod",
        "sellerVkey": "2a581594ee277e1625a4e999b1b2d788f54217c35bd7f50ea9aecd64",
        "blockchainIdentifier": bid,
        "payByTime": pbt,
        "submitResultTime": srt,
        "unlockTime": ult,
        "externalDisputeUnlockTime": edut,
        "agentIdentifier": aid,
        "inputHash": ih
    },
    timeout=30.0
)
print(f"Purchase status: {purchase.status_code}")
pprint(purchase.json())

Purchase status: 200
{'data': {'CurrentTransaction': None,
          'NextAction': {'errorNote': None,
                         'errorType': None,
                         'requestedAction': 'FundsLockingRequested'},
          'PaidFunds': [{'amount': '10000000', 'unit': ''}],
          'PaymentSource': {'id': 'cmfx3ni6j000nvbfuhrnx0ehb',
                            'network': 'Preprod',
                            'policyId': '7e8bdaf2b2b919a3a4b94002cafb50086c0c845fe535d07a77ab7f77',
                            'smartContractAddress': 'addr_test1wz7j4kmg2cs7yf92uat3ed4a3u97kr7axxr4avaz0lhwdsqukgwfm'},
          'SellerWallet': {'id': 'cmh3cymtx00gets0eta1htlxu',
                           'walletVkey': '2a581594ee277e1625a4e999b1b2d788f54217c35bd7f50ea9aecd64'},
          'SmartContractWallet': None,
          'WithdrawnForBuyer': [],
          'WithdrawnForSeller': [],
          'blockchainIdentifier': '2707604c01c0cc08c306605300315e04300b220c6ce0ec18a0c009a98bacb8a80ac98c500463ad63

In [535]:
# Poll for job status after payment
for i in range(360):
    resp = client.get(f"/sumi/status/{payment_job_id}")
    status_data = resp.json()
    job_status = status_data.get("status")

    print(f"[{i+1}/360] Status: {job_status}")

    if job_status in ("completed", "failed"):
        print("\nFinal result:")
        pprint(status_data)
        break

    time.sleep(1)
else:
    print("\nCurrent status:")
    pprint(status_data)

[1/360] Status: awaiting_payment
[2/360] Status: awaiting_payment
[3/360] Status: awaiting_payment
[4/360] Status: awaiting_payment
[5/360] Status: awaiting_payment
[6/360] Status: awaiting_payment
[7/360] Status: awaiting_payment
[8/360] Status: awaiting_payment
[9/360] Status: awaiting_payment
[10/360] Status: awaiting_payment
[11/360] Status: awaiting_payment
[12/360] Status: awaiting_payment
[13/360] Status: awaiting_payment
[14/360] Status: awaiting_payment
[15/360] Status: awaiting_payment
[16/360] Status: awaiting_payment
[17/360] Status: awaiting_payment
[18/360] Status: awaiting_payment
[19/360] Status: awaiting_payment
[20/360] Status: awaiting_payment
[21/360] Status: awaiting_payment
[22/360] Status: awaiting_payment
[23/360] Status: awaiting_payment
[24/360] Status: awaiting_payment
[25/360] Status: awaiting_payment
[26/360] Status: awaiting_payment
[27/360] Status: awaiting_payment
[28/360] Status: awaiting_payment
[29/360] Status: awaiting_payment
[30/360] Status: awaiti

## HITL/Lock Payment Integration Demo (Two-Step Masumi Test Flow)

This demo showcases the two-step Masumi test flow with:
1. Initial form submission (Name + Date validation)
2. Payment initialization and wait for `FundsLocked`
3. Lock/HITL step (greeting form) - `awaiting_input` status
4. Provide input to release lock
5. Job completion and payment finalization

The flow uses `tests.test_masumi:fast_app` which implements:
- Date validation (must be in future)
- Lock callback for human greeting input
- Lease callback to process the greeting
- Markdown result generation

In [536]:
# Update sumi-test with agentIdentifier for payment integration
resp = client.post("/expose/", json={
    "name": "sumi-test",
    "display": "Masumi Two-Stage Service",
    "network": "Preprod",
    "bootstrap": "import_path: tests.test_masumi:fast_app",
    "meta": [{
        "url": "/sumi-test/",
        "data": """display: Two-Stage HITL Demo
description: Two-stage service with payment, date validation, and HITL lock
agentIdentifier: 7e8bdaf2b2b919a3a4b94002cafb50086c0c845fe535d07a77ab7f77ee8e013e3f92bbd2aa41065577ac087a0d55ff96430087324c023b021267ca37
tags:
  - masumi
  - hitl
  - payment
  - two-stage
"""
    }]
})
print(f"sumi-test update: {resp.status_code}")
pprint(resp.json())

sumi-test update: 201
{'bootstrap': 'import_path: tests.test_masumi:fast_app',
 'created': '2025-12-10T08:02:39.125288',
 'display': 'Masumi Two-Stage Service',
 'enabled': True,
 'etag': '1765350349.531877',
 'flow_stats': '0/0',
 'heartbeat': 1765350349.501764,
 'meta': [{'data': 'display: Two-Stage HITL Demo\n'
                   'description: Two-stage service with payment, date '
                   'validation, and HITL lock\n'
                   'agentIdentifier: '
                   '7e8bdaf2b2b919a3a4b94002cafb50086c0c845fe535d07a77ab7f77ee8e013e3f92bbd2aa41065577ac087a0d55ff96430087324c023b021267ca37\n'
                   'tags:\n'
                   '  - masumi\n'
                   '  - hitl\n'
                   '  - payment\n'
                   '  - two-stage\n',
           'enabled': True,
           'heartbeat': None,
           'state': None,
           'url': '/sumi-test/'}],
 'name': 'sumi-test',
 'needs_reboot': False,
 'network': 'Preprod',
 'stale': False,
 'state

In [537]:
# Refresh sumi-test to pick up changes
with client.stream("POST", "/boot/refresh/sumi-test") as resp:
    print(f"Refresh status: {resp.status_code}")
    for line in resp.iter_lines():
        if line:
            print(line)

Refresh status: 200
ℹ Refreshing expose 'sumi-test' at 2025-12-10 08:05:50
▶ [UPDATE] Step 1/4: Disabling expose 'sumi-test'
  [sumi-test] Expose disabled → enabled=False
▶ [DEPLOY] Step 2/4: Running boot process (remove flows)
ℹ [Boot 1/2] Boot started at 2025-12-10 08:05:50
▶ [DEPLOY] [Boot 1/2] Starting Ray Serve deployment
  • [serve_config.yaml] [Boot 1/2] Loading global configuration → OK
  • [form-example] [Boot 1/2] Prepared deployment config → route=/form-example
  • [jkx] [Boot 1/2] Prepared deployment config → route=/jkx
  • [prime] [Boot 1/2] Prepared deployment config → route=/prime
  • [/var/folders/n9/f3p_gzsj7vjfgfw70yqksywcn5rfs9/T/serve_deploy_4p0764i3.yaml] [Boot 1/2] Created merged deployment config
  • [serve] [Boot 1/2] Running serve deploy (3 applications)
  [Boot 1/2] serve deploy command → success
✓ [Boot 1/2] Deployment initiated (3 applications)
▶ [HEALTH] [Boot 1/2] Waiting for deployments to complete (timeout: 1800s)
  [form-example] [Boot 1/2] Reached fina

### Check Availability and Input Schema

In [538]:
# Check availability for sumi-test
resp = client.get("/sumi/sumi-test/availability")
print(f"Status: {resp.status_code}")
pprint(resp.json())

Status: 200
{'message': 'Two-Stage HITL Demo is ready to accept jobs',
 'status': 'available',
 'type': 'masumi-agent'}


In [539]:
# Get input schema for sumi-test (two-stage flow)
resp = client.get("/sumi/sumi-test/input_schema")
print(f"Status: {resp.status_code}")
schema = resp.json()
pprint(schema)

# Display required fields
if schema.get("input_data"):
    print("\nInput Fields:")
    for field in schema["input_data"]:
        validations = field.get("validations") or {}
        required = "required" if not validations.get("optional") else "optional"
        print(f"  • {field.get('name', field.get('id'))} ({field.get('type')}) - {required}")

Status: 200
{'input_data': [{'data': {'placeholder': 'Enter your name'},
                 'id': 'name',
                 'name': 'Name',
                 'type': 'text',
                 'validations': {'optional': True}},
                {'data': None,
                 'id': 'date',
                 'name': 'Date',
                 'type': 'date',
                 'validations': {'min': '2025-12-11'}}],
 'input_groups': None}

Input Fields:
  • Name (text) - optional
  • Date (date) - required


### Start Job with Payment (Two-Stage Flow)

Submit a job with valid future date. The flow will:
1. Initialize payment with Masumi
2. Wait for `FundsLocked` (status: `awaiting_payment`)
3. After payment, create a lock for human input (status: `awaiting_input`)

In [540]:
import secrets
from datetime import date, timedelta

# Generate unique identifier for this job
hitl_identifier = secrets.token_hex(13)
print(f"Identifier: {hitl_identifier}")

# Calculate a future date (7 days from now)
future_date = (date.today() + timedelta(days=7)).isoformat()
print(f"Future date: {future_date}")

Identifier: afdc7c8873929521e868dceb78
Future date: 2025-12-17


In [541]:
# Start a job on sumi-test (with payment and HITL lock)
resp = client.post("/sumi/sumi-test/start_job", json={
    "identifier_from_purchaser": hitl_identifier,
    "input_data": {
        "name": "HITL Test User",
        "date": future_date
    }
})

print(f"Status: {resp.status_code}")
result = resp.json()
pprint(result)

hitl_job_id = result.get("job_id")
print(f"\nJob ID: {hitl_job_id}")

Status: 201
{'error': None,
 'identifier_from_purchaser': 'afdc7c8873929521e868dceb78',
 'input_schema': None,
 'job_id': '69391bdc6dd6594b06ce8ff2',
 'result': None,
 'runtime': None,
 'started_at': 74365.902,
 'status': 'running',
 'updated_at': 74366.028}

Job ID: 69391bdc6dd6594b06ce8ff2


In [543]:
# Poll until job requires payment
awaiting_payment_count = 0
for i in range(60):
    resp = client.get(f"/sumi/status/{hitl_job_id}")
    status_data = resp.json()
    job_status = status_data.get("status")

    print(f"[{i+1}/60] Status: {job_status}")

    if job_status == "awaiting_payment":
        awaiting_payment_count += 1
        if awaiting_payment_count >= 3:
            print("\nJob awaiting payment - need to submit payment")
            pprint(status_data)
            break

    if job_status in ("awaiting_input", "completed", "failed"):
        print(f"\nJob reached {job_status}:")
        pprint(status_data)
        break

    time.sleep(1)
else:
    print("Timeout waiting for payment status")

[1/60] Status: awaiting_payment
[2/60] Status: awaiting_payment
[3/60] Status: awaiting_payment

Job awaiting payment - need to submit payment
{'error': None,
 'identifier_from_purchaser': 'afdc7c8873929521e868dceb78',
 'input_schema': None,
 'job_id': '69391bdc6dd6594b06ce8ff2',
 'result': None,
 'runtime': 2.0807948112487793,
 'started_at': 1765350366.5765991,
 'status': 'awaiting_payment',
 'updated_at': 1765350368.657394}


### Simulate Payment (Masumi Purchase API)

Extract payment info and simulate the purchase to lock funds.

In [544]:
# Extract payment information from the event stream
import json as json_module

hitl_payment_info = None
with client.stream("GET", f"/outputs/stream/{hitl_job_id}") as resp:
    for line in resp.iter_lines():
        if line and line.startswith("data:") and hitl_payment_info is None:
            data_part = line[5:].strip()
            if ":" in data_part:
                ts, json_str = data_part.split(":", 1)
                try:
                    parsed = json_module.loads(json_str)
                    if "dict" in parsed and "blockchainIdentifier" in parsed["dict"]:
                        hitl_payment_info = parsed["dict"]
                        print("Found payment event:")
                        pprint(hitl_payment_info)
                        break
                except:
                    pass

if not hitl_payment_info:
    print("No payment event found - job may not have payment configured")

Found payment event:
{'agentIdentifier': '7e8bdaf2b2b919a3a4b94002cafb50086c0c845fe535d07a77ab7f77ee8e013e3f92bbd2aa41065577ac087a0d55ff96430087324c023b021267ca37',
 'blockchainIdentifier': '3187630180980d85810c066016029bc0ac24404c0461b4ca2924f2a8b0c221a4c008c00723686e08f74e0ef7d1a20139823449de2300cc391a83c215233c389243caa07d01f027c6478072705012202471b460e18727ea83048c39dbc30f0e6230a816a70f426a04a0aa92992ebd38340616083c15a32c773462209104b98804a43215a4049e143d0c1c3688001d120e2830824480a400ad3d02b423156a1ca3194b060596ba5148382da3243822233d3eaa24076529bd0e32233c00b82f64720e2424b484c2045d350a72312a1c04a0c5a132c60090bc2c34273211080c1604a66f5d51992f5f00cde4817938900c2e950f7011e0a6901c081983464241e8170c129b06c7416950c01c11110d03c518d6da46b8da0d002392f0c83e259803b0c43190206d8512e3360008681256aa07041643c138548b1a3f9d74248cc6381c075c02c462f4992b319b146c84b9c499dc1d23105545f24c79d02e7c1868289300f050adbe01ac25ea1830cd650f3ea457a150a4464c8ec40784438066cca716004d04a369f831c33d10c86740e0b43ced7c15ce9

In [545]:
# Extract variables for purchase request
if hitl_payment_info:
    hitl_aid = hitl_payment_info.get("agentIdentifier")
    hitl_bid = hitl_payment_info.get("blockchainIdentifier")
    hitl_ih = hitl_payment_info.get("inputHash")
    hitl_pbt = hitl_payment_info.get("payByTime")
    hitl_srt = hitl_payment_info.get("submitResultTime")
    hitl_ult = hitl_payment_info.get("unlockTime")
    hitl_edut = hitl_payment_info.get("externalDisputeUnlockTime")
    
    print("Payment variables extracted:")
    print(f"  agentIdentifier: {hitl_aid}")
    print(f"  blockchainIdentifier: {hitl_bid}")
    print(f"  inputHash: {hitl_ih}")
    print(f"  payByTime: {hitl_pbt}")
    print(f"  submitResultTime: {hitl_srt}")

Payment variables extracted:
  agentIdentifier: 7e8bdaf2b2b919a3a4b94002cafb50086c0c845fe535d07a77ab7f77ee8e013e3f92bbd2aa41065577ac087a0d55ff96430087324c023b021267ca37
  blockchainIdentifier: 3187630180980d85810c066016029bc0ac24404c0461b4ca2924f2a8b0c221a4c008c00723686e08f74e0ef7d1a20139823449de2300cc391a83c215233c389243caa07d01f027c6478072705012202471b460e18727ea83048c39dbc30f0e6230a816a70f426a04a0aa92992ebd38340616083c15a32c773462209104b98804a43215a4049e143d0c1c3688001d120e2830824480a400ad3d02b423156a1ca3194b060596ba5148382da3243822233d3eaa24076529bd0e32233c00b82f64720e2424b484c2045d350a72312a1c04a0c5a132c60090bc2c34273211080c1604a66f5d51992f5f00cde4817938900c2e950f7011e0a6901c081983464241e8170c129b06c7416950c01c11110d03c518d6da46b8da0d002392f0c83e259803b0c43190206d8512e3360008681256aa07041643c138548b1a3f9d74248cc6381c075c02c462f4992b319b146c84b9c499dc1d23105545f24c79d02e7c1868289300f050adbe01ac25ea1830cd650f3ea457a150a4464c8ec40784438066cca716004d04a369f831c33d10c86740e0b43ced7c15c

In [546]:
# Simulate payment via Masumi Purchase API
if hitl_payment_info:
    hitl_purchase = httpx.post(
        "https://payment.masumi.network/api/v1/purchase/",
        headers={
            "accept": "application/json",
            "token": "iofsnaiojdoiewqajdriknjonasfoinasd",
            "Content-Type": "application/json"
        },
        json={
            "identifierFromPurchaser": hitl_identifier,
            "network": "Preprod",
            "sellerVkey": "2a581594ee277e1625a4e999b1b2d788f54217c35bd7f50ea9aecd64",
            "blockchainIdentifier": hitl_bid,
            "payByTime": hitl_pbt,
            "submitResultTime": hitl_srt,
            "unlockTime": hitl_ult,
            "externalDisputeUnlockTime": hitl_edut,
            "agentIdentifier": hitl_aid,
            "inputHash": hitl_ih
        },
        timeout=30.0
    )
    print(f"Purchase status: {hitl_purchase.status_code}")
    pprint(hitl_purchase.json())
else:
    print("No payment info available - skipping purchase")

Purchase status: 200
{'data': {'CurrentTransaction': None,
          'NextAction': {'errorNote': None,
                         'errorType': None,
                         'requestedAction': 'FundsLockingRequested'},
          'PaidFunds': [{'amount': '10000000', 'unit': ''}],
          'PaymentSource': {'id': 'cmfx3ni6j000nvbfuhrnx0ehb',
                            'network': 'Preprod',
                            'policyId': '7e8bdaf2b2b919a3a4b94002cafb50086c0c845fe535d07a77ab7f77',
                            'smartContractAddress': 'addr_test1wz7j4kmg2cs7yf92uat3ed4a3u97kr7axxr4avaz0lhwdsqukgwfm'},
          'SellerWallet': {'id': 'cmh3cymtx00gets0eta1htlxu',
                           'walletVkey': '2a581594ee277e1625a4e999b1b2d788f54217c35bd7f50ea9aecd64'},
          'SmartContractWallet': None,
          'WithdrawnForBuyer': [],
          'WithdrawnForSeller': [],
          'blockchainIdentifier': '3187630180980d85810c066016029bc0ac24404c0461b4ca2924f2a8b0c221a4c008c00723686e08

### Wait for HITL Lock (awaiting_input)

After payment is confirmed, the job will create a lock requiring human input.
The status will change to `awaiting_input` with `input_schema` containing the lock form.

In [547]:
# Poll until job reaches awaiting_input status
lock_info = None
for i in range(360):
    resp = client.get(f"/sumi/status/{hitl_job_id}")
    status_data = resp.json()
    job_status = status_data.get("status")

    print(f"[{i+1}/180] Status: {job_status}")

    if job_status == "awaiting_input":
        print("\nJob awaiting human input (HITL lock):")
        pprint(status_data)
        
        # Extract lock info from input_schema
        input_schemas = status_data.get("input_schema", [])
        if input_schemas:
            lock_info = input_schemas[0]
            print(f"\nLock ID: {lock_info.get('lock_id')}")
        break

    if job_status in ("completed", "failed"):
        print(f"\nJob {job_status}:")
        pprint(status_data)
        break

    time.sleep(1)
else:
    print("Timeout waiting for awaiting_input status")

[1/180] Status: awaiting_payment
[2/180] Status: awaiting_payment
[3/180] Status: awaiting_payment
[4/180] Status: awaiting_payment
[5/180] Status: awaiting_payment
[6/180] Status: awaiting_payment
[7/180] Status: awaiting_payment
[8/180] Status: awaiting_payment
[9/180] Status: awaiting_payment
[10/180] Status: awaiting_payment
[11/180] Status: awaiting_payment
[12/180] Status: awaiting_payment
[13/180] Status: awaiting_payment
[14/180] Status: awaiting_payment
[15/180] Status: awaiting_payment
[16/180] Status: awaiting_payment
[17/180] Status: awaiting_payment
[18/180] Status: awaiting_payment
[19/180] Status: awaiting_payment
[20/180] Status: awaiting_payment
[21/180] Status: awaiting_payment
[22/180] Status: awaiting_payment
[23/180] Status: awaiting_payment
[24/180] Status: awaiting_payment
[25/180] Status: awaiting_payment
[26/180] Status: awaiting_payment
[27/180] Status: awaiting_payment
[28/180] Status: awaiting_payment
[29/180] Status: awaiting_payment
[30/180] Status: awaiti

In [548]:
resp = client.get(f"/sumi/status/{hitl_job_id}")
status_data = resp.json()
pprint(resp.json())

{'error': None,
 'identifier_from_purchaser': 'afdc7c8873929521e868dceb78',
 'input_schema': [{'expires_at': 1765361265.372782,
                   'input_data': [{'data': {'placeholder': 'Say hello...'},
                                   'id': 'greeting',
                                   'name': 'Your Greeting',
                                   'type': 'text',
                                   'validations': None}],
                   'input_groups': None,
                   'lock_id': '129cc7b8-d4ba-4918-a6f0-7465a17701c7'}],
 'job_id': '69391bdc6dd6594b06ce8ff2',
 'result': None,
 'runtime': 98.79622769355774,
 'started_at': 1765350366.5765991,
 'status': 'awaiting_input',
 'updated_at': 1765350465.3728268}


### Get Lock Schema

Use `/sumi/lock/{fid}/{lid}` to get the full input schema for the lock.

In [549]:
# Get lock schema via Sumi API
if lock_info:
    lock_id = lock_info.get("lock_id")
    
    resp = client.get(f"/sumi/lock/{hitl_job_id}/{lock_id}")
    print(f"Lock schema status: {resp.status_code}")
    lock_schema = resp.json()
    pprint(lock_schema)
    
    # Display the lock form fields
    if lock_schema.get("input_schema", {}).get("input_data"):
        print("\nLock Form Fields:")
        for field in lock_schema["input_schema"]["input_data"]:
            print(f"  • {field.get('name', field.get('id'))} ({field.get('type')})")
else:
    print("No lock info available")

Lock schema status: 200
{'expires_at': 1765361265.372782,
 'input_schema': {'input_data': [{'data': {'placeholder': 'Say hello...'},
                                  'id': 'greeting',
                                  'name': 'Your Greeting',
                                  'type': 'text',
                                  'validations': None}],
                  'input_groups': None},
 'job_id': '69391bdc6dd6594b06ce8ff2',
 'prompt': 'greeting-lock',
 'status': 'pending',
 'status_id': '129cc7b8-d4ba-4918-a6f0-7465a17701c7'}

Lock Form Fields:
  • Your Greeting (text)


### Provide Input (Release Lock)

Submit the greeting input to release the lock and continue job execution.

In [550]:
# Provide input to release the lock
if lock_info:
    lock_id = lock_info.get("lock_id")
    
    resp = client.post(
        f"/sumi/lock/{hitl_job_id}/{lock_id}",
        json={
            "input_data": {
                "greeting": "Hello from HITL Demo!"
            }
        }
    )
    print(f"Provide input status: {resp.status_code}")
    provide_result = resp.json()
    pprint(provide_result)
else:
    print("No lock info available")

Provide input status: 201
{'input_hash': 'fd78b3075fef311586e38fee323cbc91449113da3870b8e980c58e9f622eef32',
 'status': 'success'}


### Wait for Job Completion

After the lock is released, the job will:
1. Process the greeting input
2. Generate the markdown result
3. Submit result hash to Masumi (payment finalization)
4. Complete with status `completed`

In [551]:
# Poll for job completion
for i in range(120):
    resp = client.get(f"/sumi/status/{hitl_job_id}")
    status_data = resp.json()
    job_status = status_data.get("status")

    print(f"[{i+1}/120] Status: {job_status}")

    if job_status == "completed":
        print("\n✓ Job completed successfully!")
        print("\nResult:")
        pprint(status_data.get("result"))
        break

    if job_status == "failed":
        print("\n✗ Job failed!")
        print("\nError:")
        print(status_data.get("error"))
        break

    time.sleep(1)
else:
    print("\nCurrent status:")
    pprint(status_data)

[1/120] Status: completed

✓ Job completed successfully!

Result:
{'Markdown': {'body': 'You say Hello from HITL Demo!, I say **goodbye**.'}}


### Verify Payment Finalization

Check the event stream for the `final_result` payment event.

In [552]:
# Check for payment finalization in event stream
final_payment = None
with client.stream("GET", f"/outputs/stream/{hitl_job_id}") as resp:
    for line in resp.iter_lines():
        if line and line.startswith("data:"):
            data_part = line[5:].strip()
            if ":" in data_part:
                ts, json_str = data_part.split(":", 1)
                try:
                    parsed = json_module.loads(json_str)
                    if "dict" in parsed:
                        d = parsed["dict"]
                        if d.get("step") == "final_result":
                            final_payment = d
                            print("Payment finalization event found:")
                            pprint(final_payment)
                except:
                    pass

if not final_payment:
    print("No payment finalization event found")

Payment finalization event found:
{'resultHash': 'd3ca4ae36032e3090854468c314ad1869479be795e7164e0cb3f115b612f768d',
 'step': 'final_result'}


### HITL/Lock Payment Flow Summary

| Step | Status | Description |
|------|--------|-------------|
| 1 | `running` | Job starts, validates inputs |
| 2 | `awaiting_payment` | Payment initialized, waiting for `FundsLocked` |
| 3 | `running` | Payment confirmed, job continues |
| 4 | `awaiting_input` | Lock created, waiting for human input |
| 5 | `running` | Lock released, processing input |
| 6 | `completed` | Result generated, payment finalized |

**Key Endpoints Used:**
- `POST /sumi/{expose}/start_job` - Start job
- `GET /sumi/status/{job_id}` - Poll status
- `GET /sumi/lock/{fid}/{lid}` - Get lock schema
- `POST /sumi/lock/{fid}/{lid}` - Provide input to release lock
- `GET /outputs/stream/{job_id}` - Stream events (payment info)