# Backend API Demo

This guide walks through manually testing some of the features of StreamWeave to demonstrate its capabilities: 

- Prefect pipeline orchestration
- rclone data transfers
- File transfer hook system
- File visibility/access rules

The demo includes a "simlab", which consists of three simulated instruments with network shares that serve
as the source data for the harvesting demo. Files are transferred into multiple docker volumes in this example,
which represent potential transfer destination targets in a real deployment.

This page is a static rendering of a Jupyter Notebook, which you can <a href="./backend-demo.ipynb" download>&#x2913; download </a> to run locally.

## Prerequisites

- Docker and Docker Compose installed
- `uv` installed for Python package management
- `git` installed to clone the `streamweave` repository

## Initial setup

Before running the notebook, you'll need to ensure Jupyter is setup:

> **Jupyter setup** (if needed):
> ```bash
> git clone https://github.com/datasophos/streamweave.git
> cd backend
> uv sync
> uv run jupyter lab ../docs/backend-demo.ipynb
> ```

The following cell contains helper commands that will be used throughout the notebook:

In [1]:
import httpx
import json
import os
import subprocess
import threading
import time
import warnings
from pathlib import Path

from cryptography.fernet import Fernet

# Find the repo root regardless of where Jupyter was launched from
def _find_repo_root():
    p = Path.cwd()
    while p != p.parent:
        if (p / "docker-compose.yml").exists():
            return p
        p = p.parent
    raise RuntimeError("Could not find repo root (no docker-compose.yml found)")

REPO_ROOT = _find_repo_root()
SIMLAB_COMPOSE = str(REPO_ROOT / "simlab" / "docker-compose.simlab.yml")

# Generate a Fernet key for this session (used by docker-compose and seed.py)
ENCRYPTION_KEY = Fernet.generate_key().decode()
SECRET_KEY = Fernet.generate_key().decode()

# Environment variables for docker-compose and scripts
STREAMWEAVE_ENV = {
    **os.environ,
    "STREAMWEAVE_ENCRYPTION_KEY": ENCRYPTION_KEY,
    "SECRET_KEY": SECRET_KEY,
    "DATABASE_URL": "postgresql+asyncpg://streamweave:streamweave@localhost:5432/streamweave",
}

BASE_URL = "http://localhost:8000"
PREFECT_API_URL = "http://localhost:4200/api"

client = httpx.Client(base_url=BASE_URL, timeout=30)
prefect = httpx.Client(base_url=PREFECT_API_URL, timeout=30)


def pp(resp, n: int | None = None):
    """
    Pretty-print a JSON response.
    
    Prints the first `n` items, if given as an argument.
    """
    try:
      data = resp.json()
      if n is not None and isinstance(data, list) and len(data) > n:
          data = [*data[:n], "..."]
      print(json.dumps(data, indent=2))
    except Exception:
      print(f"HTTP {resp.status_code}: {resp.text}")


def run(cmd, **kwargs):
    """Run a shell command, streaming stdout normally and stderr in yellow."""
    YELLOW = "\033[33m"
    RESET = "\033[0m"
    
    # Use STREAMWEAVE_ENV by default if no env is specified
    if "env" not in kwargs:
        kwargs["env"] = STREAMWEAVE_ENV
    
    with subprocess.Popen(
      cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, **kwargs
    ) as proc:
      def _stream_stderr():
          for line in proc.stderr:
              print(f"{YELLOW}{line}{RESET}", end="", flush=True)
    
      t = threading.Thread(target=_stream_stderr)
      t.start()
    
      for line in proc.stdout:
          print(line, end="", flush=True)
    
      t.join()
    
    if proc.returncode != 0:
      warnings.warn(f"Command exited with code {proc.returncode}: {cmd}")
    
    return proc


def wait_for_flow_run(flow_run_id: str, timeout: int = 120) -> str:
    """
    Wait for a Prefect flow run to complete.
    
    Args:
        flow_run_id: The UUID of the flow run to wait for
        timeout: Maximum seconds to wait (default 120)
    
    Returns:
        The final state type (COMPLETED, FAILED, CANCELLED, CRASHED, or TIMEOUT)
    """
    terminal_states = ("COMPLETED", "FAILED", "CANCELLED", "CRASHED")
    
    print(f"Waiting for flow run {flow_run_id} to complete...")
    for attempt in range(timeout):
        flow_run = prefect.get(f"/flow_runs/{flow_run_id}").json()
        state = flow_run.get("state", {}).get("type", "UNKNOWN")
        if state in terminal_states:
            print(f"Flow run finished with state: {state}")
            return state
        print(f"  State: {state} (attempt {attempt + 1}/{timeout})")
        time.sleep(1)
    
    print(f"Warning: Flow run did not complete within {timeout} seconds")
    return "TIMEOUT"


## Starting the Stack

### 1a. Start the "simlab" stack

In [2]:
_ = run(f"env COMPOSE_FILE={SIMLAB_COMPOSE} sh -c 'docker compose down -v && docker compose up -d --build'")

[33m Container simlab-xray-diffraction-01-1 Stopping 
[0m[33m Container simlab-microscope-01-1 Stopping 
[0m[33m Container simlab-spectrometer-01-1 Stopping 
[0m[33m Container simlab-spectrometer-01-1 Stopped 
[0m[33m Container simlab-spectrometer-01-1 Removing 
[0m[33m Container simlab-microscope-01-1 Stopped 
[0m[33m Container simlab-microscope-01-1 Removing 
[0m[33m Container simlab-spectrometer-01-1 Removed 
[0m[33m Container simlab-xray-diffraction-01-1 Stopped 
[0m[33m Container simlab-xray-diffraction-01-1 Removing 
[0m[33m Container simlab-microscope-01-1 Removed 
[0m[33m Container simlab-xray-diffraction-01-1 Removed 
[0m[33m Network streamweave-simlab Removing 
[0m[33m Network streamweave-simlab Resource is still in use 
[0m[33m Image simlab-xray-diffraction-01 Building 
[0m[33m Image simlab-microscope-01 Building 
[0m[33m Image simlab-spectrometer-01 Building 
[0m#1 [internal] load local bake definitions
#1 reading from stdin 1.62kB done
#1 

In [3]:
# remove any files that were created in previous iterations of running this notebook:
_ = run(f"docker compose -f {SIMLAB_COMPOSE} exec microscope-01 sh -c 'rm -rf /data/user_a /data/user_b /data/scratch.tmp'")

# print the currently running containers
_ = run(f"docker compose -f {SIMLAB_COMPOSE} ps")

NAME                           IMAGE                        COMMAND                  SERVICE               CREATED        STATUS                                     PORTS
simlab-microscope-01-1         simlab-microscope-01         "/sbin/tini -- /usr/…"   microscope-01         1 second ago   Up Less than a second (health: starting)   0.0.0.0:4451->445/tcp, [::]:4451->445/tcp
simlab-spectrometer-01-1       simlab-spectrometer-01       "/sbin/tini -- /usr/…"   spectrometer-01       1 second ago   Up Less than a second (health: starting)   0.0.0.0:4452->445/tcp, [::]:4452->445/tcp
simlab-xray-diffraction-01-1   simlab-xray-diffraction-01   "/sbin/tini -- /usr/…"   xray-diffraction-01   1 second ago   Up Less than a second (health: starting)   0.0.0.0:4453->445/tcp, [::]:4453->445/tcp


You should see three healthy containers: `microscope-01`, `spectrometer-01`, `xray-diffraction-01`. These containers each contain a [samba](https://www.samba.org/) share that represents real data produced on instrument computers.

### 1b. Start the main stack

The first part of this command will clear any data you have in the local stack. Comment out the `docker compose down` part if you do not wish to do that:

In [4]:
_ = run("docker compose down -v")
_ = run("docker compose up -d --build")

[33m Container streamweave-worker-1 Stopping 
[0m[33m Container streamweave-api-1 Stopping 
[0m[33m Container streamweave-worker-1 Stopped 
[0m[33m Container streamweave-worker-1 Removing 
[0m[33m Container streamweave-worker-1 Removed 
[0m[33m Container streamweave-api-1 Stopped 
[0m[33m Container streamweave-api-1 Removing 
[0m[33m Container streamweave-api-1 Removed 
[0m[33m Container streamweave-postgres-1 Stopping 
[0m[33m Container streamweave-prefect-server-1 Stopping 
[0m[33m Container streamweave-postgres-1 Stopped 
[0m[33m Container streamweave-postgres-1 Removing 
[0m[33m Container streamweave-postgres-1 Removed 
[0m[33m Container streamweave-prefect-server-1 Stopped 
[0m[33m Container streamweave-prefect-server-1 Removing 
[0m[33m Container streamweave-prefect-server-1 Removed 
[0m[33m Container streamweave-redis-1 Stopping 
[0m[33m Container streamweave-prefect-postgres-1 Stopping 
[0m[33m Container streamweave-prefect-postgres-1 Stoppe

This starts:
- **postgres** — app database (port 5432)
- **redis** — Prefect cache (port 6379)
- **prefect-postgres** — Prefect's own database
- **prefect-server** — Prefect UI + API (port 4200)
- **api** — Streamweave FastAPI (port 8000)
- **worker** — Prefect worker with rclone installed

In [5]:
_ = run("docker compose ps")

NAME                             IMAGE                        COMMAND                  SERVICE            CREATED         STATUS                   PORTS
streamweave-api-1                streamweave-api              "sh -c 'alembic upgr…"   api                7 seconds ago   Up Less than a second    0.0.0.0:8000->8000/tcp, [::]:8000->8000/tcp
streamweave-postgres-1           postgres:16-alpine           "docker-entrypoint.s…"   postgres           7 seconds ago   Up 6 seconds (healthy)   0.0.0.0:5432->5432/tcp, [::]:5432->5432/tcp
streamweave-prefect-postgres-1   postgres:16-alpine           "docker-entrypoint.s…"   prefect-postgres   7 seconds ago   Up 6 seconds (healthy)   5432/tcp
streamweave-prefect-server-1     prefecthq/prefect:3-latest   "/usr/bin/tini -g --…"   prefect-server     7 seconds ago   Up Less than a second    0.0.0.0:4200->4200/tcp, [::]:4200->4200/tcp
streamweave-redis-1              redis:7-alpine               "docker-entrypoint.s…"   redis              7 seconds ag

### Check api status

In [6]:
# Wait for the API to be ready (retries up to 30 seconds)
for attempt in range(30):
    try:
        resp = client.get("/health")
        pp(resp)
        break
    except httpx.RequestError:
        print(f"Waiting for API... (attempt {attempt + 1}/30)")
        time.sleep(1)
else:
    raise RuntimeError("API did not become available within 30 seconds")

Waiting for API... (attempt 1/30)
Waiting for API... (attempt 2/30)
Waiting for API... (attempt 3/30)
{
  "status": "ok"
}


Expected: `{"status": "ok"}`

Prefect UI is at http://localhost:4200

## 2. Create Admin User

The `create-admin.py` script is interactive, but also accepts arguments to set the admin user's username and password:

In [7]:
import os

ADMIN_EMAIL = "admin@test.org"
ADMIN_PASSWORD = "adminpassword123"

_ = run(
    f"python {REPO_ROOT / 'scripts' / 'create-admin.py'}"
    f" --email {ADMIN_EMAIL} --password {ADMIN_PASSWORD}",
    env={**os.environ, "DATABASE_URL": "postgresql+asyncpg://streamweave:streamweave@localhost:5432/streamweave"},
    cwd=str(REPO_ROOT / "backend"),
)

Admin user created: admin@test.org (id=599dac7f-0685-4ab2-951d-c354bdcb4d00)


## 3. Get an Auth Token

Replace the email/password below with what you used in the admin creation step.

In [8]:
resp = client.post("/auth/jwt/login", data={"username": ADMIN_EMAIL, "password": ADMIN_PASSWORD})
TOKEN = resp.json()["access_token"]
AUTH = {"Authorization": f"Bearer {TOKEN}"}
print(f"Token acquired (first 20 chars): {TOKEN[:20]}...")

Token acquired (first 20 chars): eyJhbGciOiJIUzI1NiIs...


## 4. Seed the Database

Run the seed script locally to create instruments, storage locations, schedules, and hooks:

In [9]:
_ = run(
    "python seed.py",
    cwd=str(REPO_ROOT / "simlab"),
)

# sleep to allow seeding to finish when running notebook all at once
time.sleep(4)

Simlab seed data created successfully.
  Service account: simlab-service (id=57f526ca-efc3-45c2-92b7-34ff999ff122)
  Instruments: Microscope 01, Spectrometer 01, X-Ray Diffraction 01
  Storage: Archive Storage, Restricted Storage
  Schedules: 3 created
  Hooks: 2 created


## 5. Verify Seeded Data

### 5a. List instruments

In [10]:
resp = client.get("/api/instruments", headers=AUTH)
pp(resp)

[
  {
    "id": "4bca34e9-5309-4d8d-b78c-af9ef64845f8",
    "name": "Microscope 01",
    "description": "Simulated optical microscope",
    "location": "Lab A, Room 101",
    "pid": null,
    "cifs_host": "microscope-01",
    "cifs_share": "microscope",
    "cifs_base_path": "/",
    "service_account_id": "57f526ca-efc3-45c2-92b7-34ff999ff122",
    "transfer_adapter": "rclone",
    "transfer_config": null,
    "enabled": true,
    "created_at": "2026-02-24T17:05:37.343035Z",
    "updated_at": "2026-02-24T17:05:37.343035Z"
  },
  {
    "id": "65186f54-074d-430e-b2ca-c24b4861e990",
    "name": "Spectrometer 01",
    "description": "Simulated UV-Vis spectrometer",
    "location": "Lab B, Room 205",
    "pid": null,
    "cifs_host": "spectrometer-01",
    "cifs_share": "spectrometer",
    "cifs_base_path": "/",
    "service_account_id": "57f526ca-efc3-45c2-92b7-34ff999ff122",
    "transfer_adapter": "rclone",
    "transfer_config": null,
    "enabled": true,
    "created_at": "2026-02-24T1

Expected: 3 instruments (Microscope 01, Spectrometer 01, X-Ray Diffraction 01).

### 5b. List storage locations

In [11]:
resp = client.get("/api/storage-locations", headers=AUTH)
pp(resp)

[
  {
    "id": "7cb0e261-f511-4ff3-96af-0bd3323d0ba7",
    "name": "Archive Storage",
    "type": "posix",
    "connection_config": {},
    "base_path": "/storage/archive",
    "enabled": true,
    "created_at": "2026-02-24T17:05:37.343035Z",
    "updated_at": "2026-02-24T17:05:37.343035Z"
  },
  {
    "id": "51c43e6b-e524-4fd0-aa92-3b40d8fe7cda",
    "name": "Restricted Storage",
    "type": "posix",
    "connection_config": {},
    "base_path": "/storage/restricted",
    "enabled": true,
    "created_at": "2026-02-24T17:05:37.343035Z",
    "updated_at": "2026-02-24T17:05:37.343035Z"
  }
]


Expected: 2 locations (Archive Storage at `/storage/archive`, Restricted Storage at `/storage/restricted`).

### 5c. List schedules

In [12]:
resp = client.get("/api/schedules", headers=AUTH)
schedules = resp.json()
pp(resp)

[
  {
    "id": "29f057d4-4d8d-4de9-b741-390c4b5ac0c2",
    "instrument_id": "4bca34e9-5309-4d8d-b78c-af9ef64845f8",
    "default_storage_location_id": "7cb0e261-f511-4ff3-96af-0bd3323d0ba7",
    "cron_expression": "*/15 * * * *",
    "prefect_deployment_id": null,
    "enabled": true,
    "created_at": "2026-02-24T17:05:37.343035Z",
    "updated_at": "2026-02-24T17:05:37.343035Z"
  },
  {
    "id": "1649bd4b-b44d-4ca0-a699-533dcc75adee",
    "instrument_id": "65186f54-074d-430e-b2ca-c24b4861e990",
    "default_storage_location_id": "7cb0e261-f511-4ff3-96af-0bd3323d0ba7",
    "cron_expression": "*/15 * * * *",
    "prefect_deployment_id": null,
    "enabled": true,
    "created_at": "2026-02-24T17:05:37.343035Z",
    "updated_at": "2026-02-24T17:05:37.343035Z"
  },
  {
    "id": "43b115e4-7fef-42cd-b781-dc70a7381a40",
    "instrument_id": "20bfa6b2-87a4-4d41-9c97-d7f59bd92723",
    "default_storage_location_id": "51c43e6b-e524-4fd0-aa92-3b40d8fe7cda",
    "cron_expression": "*/15 * * *

Expected: 3 schedules with `cron_expression: "*/15 * * * *"`. Note that `prefect_deployment_id` will be **null** — the seed script writes directly to the DB and doesn't create Prefect deployments.

### 5c. Re-create schedules through the API

The seed script (for demonstration purposes) bypasses the API, so schedules have no Prefect deployments. Delete the seeded schedules and re-create them through the API, which triggers deployment creation.

In [13]:
# Get instrument and storage IDs
instruments = client.get("/api/instruments", headers=AUTH).json()
storage_locs = client.get("/api/storage-locations", headers=AUTH).json()
STORAGE_ID = storage_locs[0]["id"]
RESTRICTED_ID = storage_locs[1]["id"]

# Delete existing seeded schedules
for s in schedules:
    client.delete(f"/api/schedules/{s['id']}", headers=AUTH)
print(f"Deleted {len(schedules)} seeded schedules")

# Re-create via API (this creates Prefect deployments)
for i, inst in enumerate(instruments):
    stor = RESTRICTED_ID if i == 2 else STORAGE_ID
    resp = client.post("/api/schedules", headers=AUTH, json={
        "instrument_id": inst["id"],
        "default_storage_location_id": stor,
        "cron_expression": "*/15 * * * *",
        "enabled": True,
    })
    data = resp.json()
    print(json.dumps({"id": data["id"], "prefect_deployment_id": data.get("prefect_deployment_id")}, indent=2))

Deleted 3 seeded schedules
{
  "id": "725fa608-f4da-40f5-980a-2aa977ffe7e3",
  "prefect_deployment_id": "a28a83b2-4d95-45ba-acd3-6b664c9ba1e7"
}
{
  "id": "0293924e-a6de-459c-98c0-6d73303d576b",
  "prefect_deployment_id": "97ee019b-aa52-43b0-84e9-f472b9951c50"
}
{
  "id": "a86e9e55-9518-4b9f-9a24-321f3ea3b428",
  "prefect_deployment_id": "decb9a53-cd72-4922-959c-9a955338e70f"
}


Each schedule should now an `id` and also show a non-null `prefect_deployment_id`.

Save the first schedule ID for later:

In [14]:
SCHEDULE_ID = client.get("/api/schedules", headers=AUTH).json()[0]["id"]
print(f"Schedule ID: {SCHEDULE_ID}")

Schedule ID: 725fa608-f4da-40f5-980a-2aa977ffe7e3


### 5d. List hooks

In [15]:
resp = client.get("/api/hooks", headers=AUTH)
pp(resp)

[
  {
    "id": "b77c8db2-8fdf-4995-9739-cc03289297bd",
    "name": "Microscope File Filter",
    "description": "Exclude temporary files from microscope harvests",
    "trigger": "pre_transfer",
    "implementation": "builtin",
    "builtin_name": "file_filter",
    "script_path": null,
    "webhook_url": null,
    "config": {
      "exclude_patterns": [
        "*.tmp",
        "*.lock",
        "~$*"
      ]
    },
    "instrument_id": "4bca34e9-5309-4d8d-b78c-af9ef64845f8",
    "priority": 0,
    "enabled": true
  },
  {
    "id": "0d6f6b94-b1ed-466d-8e8e-a26cbdfc43dd",
    "name": "Microscope Metadata Enrichment",
    "description": "Extract experiment and run info from file paths",
    "trigger": "post_transfer",
    "implementation": "builtin",
    "builtin_name": "metadata_enrichment",
    "script_path": null,
    "webhook_url": null,
    "config": {
      "rules": [
        {
          "pattern": "/(?P<username>[a-z][a-z0-9_]*)/(?P<experiment>experiment_\\d+)",
          "sour

Expected: 2 hooks on the microscope:

- **Microscope File Filter** (pre_transfer): excludes `*.tmp`, `*.lock`, `~$*`
- **Microscope Metadata Enrichment** (post_transfer): extracts `username` and `experiment` from paths like `{username}/{experiment}/...`

## 6. Test Prefect Integration

### 6a. Check Prefect UI

Open http://localhost:4200 in a browser. After re-creating schedules via the API (step 5c), you should see:

**Deployments** tab: 3 deployments named `harvest-{instrument_name}`:

<img style="margin-left: 4em;" src="_static/20260224_prefect_deployments.png" width="800">

**Work Pools** tab: a pool named **streamweave-worker-pool** with an active worker:

<img style="margin-left: 4em;" src="_static/20260224_prefect_workpools.png" width="800">

The **harvest-instrument** flow will appear under Flows once the first run is triggered

### 6b. Trigger a manual harvest

In [16]:
resp = client.post(f"/api/schedules/{SCHEDULE_ID}/trigger", headers=AUTH)
pp(resp)
FLOW_RUN_ID = resp.json().get("flow_run_id")

{
  "flow_run_id": "ebf11ded-7ff8-4388-be9d-53324abaab5a",
  "schedule_id": "725fa608-f4da-40f5-980a-2aa977ffe7e3"
}


Expected response:
```json
{
  "flow_run_id": "<uuid>",
  "schedule_id": "<uuid>"
}
```

If you get a `400` error about "no Prefect deployment", the schedule wasn't linked to Prefect during seeding. Re-create the schedule through the API (see step 5c-extra).

### 6c. Monitor in Prefect UI

Go to http://localhost:4200/flow-runs and watch the triggered flow run. It will:

1. Run `discover_files_task` — discovers files from the instrument's Samba share (2 in this example)
2. Run `transfer_single_file_task` for each new file — transfers via rclone (2)

#### *Flow run diagram*

This diagram shows the timing and status (green = success; red = error) of the different parts of the flow. In this figure, we can see one `discover_files_task` and six `transfer_single_file_task` runs for each file that needed to be transferred:

<img style="margin-left: 4em;" src="_static/20260224_prefect_flowrun_figure.png" width="800">

#### *Logs*

The full logs for the flow run are shown in the Prefect UI, which is very useful for debugging:

<img style="margin-left: 4em;" src="_static/20260224_prefect_flowrun_logs.png" width="800">


## 7. Verify Harvest Results

### 7a. Check file records

In [17]:
wait_for_flow_run(FLOW_RUN_ID)

resp = client.get("/api/files", headers=AUTH)
print(f"\nFound {len(resp.json())} files")
pp(resp)

Waiting for flow run ebf11ded-7ff8-4388-be9d-53324abaab5a to complete...
  State: SCHEDULED (attempt 1/120)
  State: SCHEDULED (attempt 2/120)
  State: SCHEDULED (attempt 3/120)
  State: SCHEDULED (attempt 4/120)
  State: SCHEDULED (attempt 5/120)
  State: PENDING (attempt 6/120)
  State: PENDING (attempt 7/120)
Flow run finished with state: COMPLETED

Found 2 files
[
  {
    "id": "f6b5344e-2f79-43c9-b8d4-9d43d166f027",
    "persistent_id": "ark:/99999/fk4hwtjkzdjnzaa7bqundhd35qkzm",
    "persistent_id_type": "ark",
    "instrument_id": "4bca34e9-5309-4d8d-b78c-af9ef64845f8",
    "source_path": "microscope/experiment_002.csv",
    "filename": "experiment_002.csv",
    "size_bytes": 174,
    "source_mtime": "2026-02-23T15:59:36.848000Z",
    "xxhash": "4a76430697fcf84b",
    "sha256": null,
    "first_discovered_at": "2026-02-24T17:05:49.906603Z",
    "metadata_": {},
    "owner_id": null
  },
  {
    "id": "6b64c77c-100a-4e99-8a8c-8ec6cce076e1",
    "persistent_id": "ark:/99999/fk455j

For each file you should see:
- `persistent_id` starting with `ark:/99999/fk4...` (unique ARK identifier)
- `instrument_id` matching the harvested instrument
- `source_path` matching the file's path on the instrument
- `filename` — the file name
- `xxhash` — checksum computed after transfer

### 7b. Check transfer records

In [18]:
wait_for_flow_run(FLOW_RUN_ID)
resp = client.get("/api/transfers", headers=AUTH)
print(f"Found {len(resp.json())} transfers")
pp(resp)

Waiting for flow run ebf11ded-7ff8-4388-be9d-53324abaab5a to complete...
Flow run finished with state: COMPLETED
Found 2 transfers
[
  {
    "id": "3ff499b1-7989-4bee-b539-58d6eea94e9d",
    "file_id": "6b64c77c-100a-4e99-8a8c-8ec6cce076e1",
    "storage_location_id": "7cb0e261-f511-4ff3-96af-0bd3323d0ba7",
    "destination_path": "/storage/archive/Microscope 01/microscope/experiment_001.csv",
    "transfer_adapter": "rclone",
    "status": "completed",
    "bytes_transferred": 302,
    "source_checksum": null,
    "dest_checksum": "2660dec29d8c3f7b",
    "checksum_verified": false,
    "started_at": "2026-02-24T17:05:49.842858Z",
    "completed_at": "2026-02-24T17:05:49.896039Z",
    "error_message": null,
    "prefect_flow_run_id": null
  },
  {
    "id": "1164b5a5-40f7-44aa-ba06-6009fc9ef0b2",
    "file_id": "f6b5344e-2f79-43c9-b8d4-9d43d166f027",
    "storage_location_id": "7cb0e261-f511-4ff3-96af-0bd3323d0ba7",
    "destination_path": "/storage/archive/Microscope 01/microscope/exp

Each transfer should have:
- `status`: `"completed"` or `"skipped"`
- `dest_checksum` — xxhash of the transferred file
- `destination_path` — where the file was written under `/storage/`
- `bytes_transferred` — file size
- `started_at` and `completed_at` timestamps

### 7c. Verify files on disk

In [19]:
_ = run("docker compose exec api tree /storage/archive/")

/storage/archive/
└── Microscope 01
    └── microscope
        ├── experiment_001.csv
        └── experiment_002.csv

3 directories, 2 files


### 8a. Testing ignoring files via pre-transfer hook

In [20]:
# create a file for the microscope-01 instrument that matches the exclusion filter in the built-in pre-transfer hook
_ = run(f"docker compose -f {SIMLAB_COMPOSE} exec microscope-01 sh -c 'echo temp data > /data/scratch.tmp'")
_ = run(f"docker compose -f {SIMLAB_COMPOSE} exec microscope-01 ls -lah /data/")

total 16K    
drwxr-xr-x    5 root     root         160 Feb 24 17:05 .
drwxr-xr-x    1 root     root        4.0K Feb 24 17:05 ..
-rw-r--r--    1 root     root         302 Feb 23 15:59 experiment_001.csv
-rw-r--r--    1 root     root         174 Feb 23 15:59 experiment_002.csv
-rw-r--r--    1 root     root          10 Feb 24 17:05 scratch.tmp


### 8b. Trigger another harvest

In [21]:
resp = client.post(f"/api/schedules/{SCHEDULE_ID}/trigger", headers=AUTH)
IGNORE_FLOW_RUN_ID = resp.json().get("flow_run_id")
pp(resp)

{
  "flow_run_id": "531c0deb-9f72-486d-a370-374eecd3dc4e",
  "schedule_id": "725fa608-f4da-40f5-980a-2aa977ffe7e3"
}


### 8c. Verify the .tmp file was skipped

StreamWeave has a demonstration pre-transfer hook that ignores certain file patterns. These can be configured easily on a per-instrument basis. The following example will show that the `scratch.tmp` file is discovered in the file finding flow, but is not transferred due to the pre-transfer hook blocking it.


In [22]:
wait_for_flow_run(IGNORE_FLOW_RUN_ID)

print("Files:\n------")
resp = client.get("/api/files", headers=AUTH)
pp(resp)
# microscope/scratch.tmp will be in the file list printed out at this step

files = resp.json()
scratch = next((f for f in files if f["filename"] == "scratch.tmp"), None)
# firmly assert that the file was found
assert scratch is not None, "FAIL: scratch.tmp file record not found"

transfers = client.get(f"/api/transfers?file_id={scratch['id']}", headers=AUTH).json()
# firmly assert that the file was not transferred
assert all(t["status"] == "skipped" for t in transfers), "FAIL: scratch.tmp should only have skipped transfers"
print("PASS: scratch.tmp was correctly filtered by the pre-transfer hook (transfer skipped)")

Waiting for flow run 531c0deb-9f72-486d-a370-374eecd3dc4e to complete...
  State: SCHEDULED (attempt 1/120)
  State: SCHEDULED (attempt 2/120)
  State: SCHEDULED (attempt 3/120)
  State: SCHEDULED (attempt 4/120)
  State: SCHEDULED (attempt 5/120)
  State: SCHEDULED (attempt 6/120)
  State: SCHEDULED (attempt 7/120)
  State: PENDING (attempt 8/120)
Flow run finished with state: COMPLETED
Files:
------
[
  {
    "id": "0fa9c05f-73e4-4939-bf45-b724e43f07ac",
    "persistent_id": "ark:/99999/fk4254liryuqvggvc4dmaqhtrczpu",
    "persistent_id_type": "ark",
    "instrument_id": "4bca34e9-5309-4d8d-b78c-af9ef64845f8",
    "source_path": "microscope/scratch.tmp",
    "filename": "scratch.tmp",
    "size_bytes": 10,
    "source_mtime": "2026-02-24T17:05:50.533000Z",
    "xxhash": null,
    "sha256": null,
    "first_discovered_at": "2026-02-24T17:05:58.485254Z",
    "metadata_": {},
    "owner_id": null
  },
  {
    "id": "f6b5344e-2f79-43c9-b8d4-9d43d166f027",
    "persistent_id": "ark:/99999

Prefect logs show the file being skipped:

<img style="margin-left: 4em;" src="_static/20260224_prefect_flowrun_logs_skip_tmp.png" width="800">


## 9. Test Post-Transfer Hook (Metadata Enrichment)

The microscope harvester also has a demonstration post-transfer hook that extracts `username` and `experiment` from file paths matching the pattern `{username}/{experiment}/...`. This demonstrates how you can generate simple metadata extraction pipelines to operate as part of the data transfer process.


In [23]:
# Add three example CSV files in different folders for two different users:
_ = run(f"""docker compose -f {SIMLAB_COMPOSE} exec microscope-01 sh -c '
mkdir -p /data/user_a/experiment_001 /data/user_a/experiment_002 /data/user_b/experiment_003 &&
printf "time,value\\n0,1.0\\n1,2.0\\n" > /data/user_a/experiment_001/scan_01.csv &&
printf "time,value\\n0,1.0\\n1,2.0\\n" > /data/user_a/experiment_002/scan_01.csv &&
printf "time,value\\n0,1.0\\n1,2.0\\n" > /data/user_b/experiment_003/scan_01.csv
'""")

In [24]:
# show current file listing on the "microscope-01" simulated instrument
_ = run(f"docker compose -f {SIMLAB_COMPOSE} exec microscope-01 tree /data/")

/data/
├── experiment_001.csv
├── experiment_002.csv
├── scratch.tmp
├── user_a
│   ├── experiment_001
│   │   └── scan_01.csv
│   └── experiment_002
│       └── scan_01.csv
└── user_b
    └── experiment_003
        └── scan_01.csv

5 directories, 6 files


### 9a. Clear and re-harvest to see metadata enrichment

We remove all transferred files and file transfer records to simulate harvesting all files again:

In [25]:
_ = run("docker compose exec worker rm -rf /storage/*")
_ = run('docker compose exec postgres psql -U streamweave -c "DELETE FROM file_transfers; DELETE FROM file_records;"')

resp = client.post(f"/api/schedules/{SCHEDULE_ID}/trigger", headers=AUTH)
METADATA_FLOW_RUN_ID = resp.json().get("flow_run_id")
pp(resp)

DELETE 3
DELETE 3
{
  "flow_run_id": "81550980-c8bc-4adc-92f7-92a9b45d0156",
  "schedule_id": "725fa608-f4da-40f5-980a-2aa977ffe7e3"
}


### 9b. Check enriched metadata

In [26]:
# Wait for the harvest to complete
wait_for_flow_run(METADATA_FLOW_RUN_ID)

resp = client.get("/api/files", headers=AUTH)
for f in resp.json():
    print(json.dumps({"filename": f["filename"], "source_path": f["source_path"], "metadata_": f.get("metadata_")}, indent=2))

Waiting for flow run 81550980-c8bc-4adc-92f7-92a9b45d0156 to complete...
  State: SCHEDULED (attempt 1/120)
  State: SCHEDULED (attempt 2/120)
  State: SCHEDULED (attempt 3/120)
  State: SCHEDULED (attempt 4/120)
  State: SCHEDULED (attempt 5/120)
  State: SCHEDULED (attempt 6/120)
  State: SCHEDULED (attempt 7/120)
  State: SCHEDULED (attempt 8/120)
  State: SCHEDULED (attempt 9/120)
  State: SCHEDULED (attempt 10/120)
  State: RUNNING (attempt 11/120)
Flow run finished with state: COMPLETED
{
  "filename": "scan_01.csv",
  "source_path": "microscope/user_a/experiment_001/scan_01.csv",
  "metadata_": {
    "username": "user_a",
    "experiment": "experiment_001"
  }
}
{
  "filename": "scan_01.csv",
  "source_path": "microscope/user_a/experiment_002/scan_01.csv",
  "metadata_": {
    "username": "user_a",
    "experiment": "experiment_002"
  }
}
{
  "filename": "scan_01.csv",
  "source_path": "microscope/user_b/experiment_003/scan_01.csv",
  "metadata_": {
    "username": "user_b",
   

Files under user directories should have enriched metadata like:
```json
{
  "filename": "scan_01.csv",
  "source_path": "user_a/experiment_001/scan_01.csv",
  "metadata_": {
    "username": "user_a",
    "experiment": "experiment_001"
  }
}
```

Files at the root level (like `experiment_001.csv`) won't match the pattern and will have empty metadata — that's expected.

## 10. User-Scoped Access Control Demo

Files are private by default. Access is granted explicitly to users, groups, or projects via the `FileAccessGrant` system.


### 10a. Create a regular (non-admin) user

In [27]:
resp = client.post("/auth/register", json={
    "email": "researcher@test.org",
    "password": "testpassword123",
})
pp(resp)

{
  "id": "e20b41d4-b5b4-4b54-bc10-6d8fd3c72d2c",
  "email": "researcher@test.org",
  "is_active": true,
  "is_superuser": false,
  "is_verified": false,
  "role": "user"
}


### 10b. Get regular user token and user ID

In [28]:
resp = client.post("/auth/jwt/login", data={"username": "researcher@test.org", "password": "testpassword123"})
USER_TOKEN = resp.json()["access_token"]
USER_AUTH = {"Authorization": f"Bearer {USER_TOKEN}"}

resp = client.get("/users/me", headers=USER_AUTH)
USER_ID = resp.json()["id"]
print(f"User ID: {USER_ID}")

User ID: e20b41d4-b5b4-4b54-bc10-6d8fd3c72d2c


### 10c. Verify user sees no files (no access granted)

In [29]:
resp = client.get("/api/files", headers=USER_AUTH)
print("Files:", resp.json())
# Expected: []

resp = client.get("/api/transfers", headers=USER_AUTH)
print("Transfers:", resp.json())
# Expected: []

Files: []
Transfers: []


### 10d. Grant direct user access to a file

In [30]:
# Pick a file to grant access to
FILE_ID = client.get("/api/files", headers=AUTH).json()[0]["id"]
print(f"File ID: {FILE_ID}")

# Grant the user access (admin-only endpoint)
resp = client.post(f"/api/files/{FILE_ID}/access", headers=AUTH, json={
    "grantee_type": "user",
    "grantee_id": USER_ID,
})
pp(resp)

File ID: fc38212d-4f71-405c-be81-26aa8f173beb
{
  "id": "84dbfebb-8032-4e27-8ba9-fb518fc4437f",
  "file_id": "fc38212d-4f71-405c-be81-26aa8f173beb",
  "grantee_type": "user",
  "grantee_id": "e20b41d4-b5b4-4b54-bc10-6d8fd3c72d2c",
  "granted_at": "2026-02-24T17:06:11.583824Z"
}


Expected response:
```json
{
  "id": "<grant-uuid>",
  "file_id": "<file-uuid>",
  "grantee_type": "user",
  "grantee_id": "<user-uuid>",
  "granted_at": "2026-02-23T..."
}
```

### 10e. Verify user now sees the granted file

In [31]:
resp = client.get("/api/files", headers=USER_AUTH)
print(f"Files visible to user: {len(resp.json())}")
# Expected: exactly 1 file

resp = client.get(f"/api/files/{FILE_ID}", headers=USER_AUTH)
pp(resp)
# Expected: 200 with full file details

Files visible to user: 1
{
  "id": "fc38212d-4f71-405c-be81-26aa8f173beb",
  "persistent_id": "ark:/99999/fk4biqspimyync43nspm5elixje3m",
  "persistent_id_type": "ark",
  "instrument_id": "4bca34e9-5309-4d8d-b78c-af9ef64845f8",
  "source_path": "microscope/user_a/experiment_001/scan_01.csv",
  "filename": "scan_01.csv",
  "size_bytes": 23,
  "source_mtime": "2026-02-24T17:05:59.485000Z",
  "xxhash": "7c0d455ba2126c3d",
  "sha256": null,
  "first_discovered_at": "2026-02-24T17:06:10.729750Z",
  "metadata_": {
    "username": "user_a",
    "experiment": "experiment_001"
  },
  "owner_id": null
}


### 10f. Verify 404 for files without access

In [32]:
OTHER_FILE = client.get("/api/files", headers=AUTH).json()[1]["id"]

resp = client.get(f"/api/files/{OTHER_FILE}", headers=USER_AUTH)
pp(resp)
# Expected: {"detail": "File not found"} (404, not 403 — avoids leaking existence)

{
  "detail": "File not found"
}


### 10g. List and revoke a grant

In [33]:
# List grants for the file (admin only)
resp = client.get(f"/api/files/{FILE_ID}/access", headers=AUTH)
pp(resp)

# Revoke the grant
GRANT_ID = resp.json()[0]["id"]
resp = client.delete(f"/api/files/{FILE_ID}/access/{GRANT_ID}", headers=AUTH)
print(f"Delete status: {resp.status_code}")
# Expected: 204

# Verify user can no longer see the file
resp = client.get(f"/api/files/{FILE_ID}", headers=USER_AUTH)
pp(resp)
# Expected: {"detail": "File not found"}

[
  {
    "id": "84dbfebb-8032-4e27-8ba9-fb518fc4437f",
    "file_id": "fc38212d-4f71-405c-be81-26aa8f173beb",
    "grantee_type": "user",
    "grantee_id": "e20b41d4-b5b4-4b54-bc10-6d8fd3c72d2c",
    "granted_at": "2026-02-24T17:06:11.583824Z"
  }
]
Delete status: 204
{
  "detail": "File not found"
}


## 10B. Group-Based Access Demo


### 10B-a. Create a group and add the user

In [34]:
# Create group
resp = client.post("/api/groups", headers=AUTH, json={
    "name": "Lab A Researchers",
    "description": "All researchers in Lab A",
})
GROUP_ID = resp.json()["id"]
print(f"Group ID: {GROUP_ID}")

# Add the regular user to the group
resp = client.post(f"/api/groups/{GROUP_ID}/members", headers=AUTH, json={"user_id": USER_ID})
pp(resp)

Group ID: 646512c1-d81b-434a-b798-aeb402cd5dca
{
  "group_id": "646512c1-d81b-434a-b798-aeb402cd5dca",
  "user_id": "e20b41d4-b5b4-4b54-bc10-6d8fd3c72d2c"
}


### 10B-b. Grant the group access to a file

In [35]:
resp = client.post(f"/api/files/{FILE_ID}/access", headers=AUTH, json={
    "grantee_type": "group",
    "grantee_id": GROUP_ID,
})
pp(resp)

{
  "id": "ca7145e7-ae7b-4518-8657-58461cfd898f",
  "file_id": "fc38212d-4f71-405c-be81-26aa8f173beb",
  "grantee_type": "group",
  "grantee_id": "646512c1-d81b-434a-b798-aeb402cd5dca",
  "granted_at": "2026-02-24T17:06:11.640838Z"
}


### 10B-c. Verify user sees the file via group membership

In [36]:
resp = client.get(f"/api/files/{FILE_ID}", headers=USER_AUTH)
pp(resp)
# Expected: 200 — user can see the file because they're in the granted group

{
  "id": "fc38212d-4f71-405c-be81-26aa8f173beb",
  "persistent_id": "ark:/99999/fk4biqspimyync43nspm5elixje3m",
  "persistent_id_type": "ark",
  "instrument_id": "4bca34e9-5309-4d8d-b78c-af9ef64845f8",
  "source_path": "microscope/user_a/experiment_001/scan_01.csv",
  "filename": "scan_01.csv",
  "size_bytes": 23,
  "source_mtime": "2026-02-24T17:05:59.485000Z",
  "xxhash": "7c0d455ba2126c3d",
  "sha256": null,
  "first_discovered_at": "2026-02-24T17:06:10.729750Z",
  "metadata_": {
    "username": "user_a",
    "experiment": "experiment_001"
  },
  "owner_id": null
}


### 10B-d. Groups CRUD (Create, Read, Update, Delete)

In [37]:
# List groups
print("=== All groups ===")
pp(client.get("/api/groups", headers=AUTH))

# Get group details
print("\n=== Group details ===")
pp(client.get(f"/api/groups/{GROUP_ID}", headers=AUTH))

# List group members
print("\n=== Group members ===")
pp(client.get(f"/api/groups/{GROUP_ID}/members", headers=AUTH))

# Update group
print("\n=== Update group ===")
pp(client.patch(f"/api/groups/{GROUP_ID}", headers=AUTH, json={"description": "Updated description"}))

=== All groups ===
[
  {
    "id": "646512c1-d81b-434a-b798-aeb402cd5dca",
    "name": "Lab A Researchers",
    "description": "All researchers in Lab A",
    "created_at": "2026-02-24T17:06:11.627889Z",
    "updated_at": "2026-02-24T17:06:11.627889Z"
  }
]

=== Group details ===
{
  "id": "646512c1-d81b-434a-b798-aeb402cd5dca",
  "name": "Lab A Researchers",
  "description": "All researchers in Lab A",
  "created_at": "2026-02-24T17:06:11.627889Z",
  "updated_at": "2026-02-24T17:06:11.627889Z"
}

=== Group members ===
[
  {
    "group_id": "646512c1-d81b-434a-b798-aeb402cd5dca",
    "user_id": "e20b41d4-b5b4-4b54-bc10-6d8fd3c72d2c"
  }
]

=== Update group ===
{
  "id": "646512c1-d81b-434a-b798-aeb402cd5dca",
  "name": "Lab A Researchers",
  "description": "Updated description",
  "created_at": "2026-02-24T17:06:11.627889Z",
  "updated_at": "2026-02-24T17:06:11.661656Z"
}


In [38]:
# Remove member
resp = client.delete(f"/api/groups/{GROUP_ID}/members/{USER_ID}", headers=AUTH)
print(f"Remove member status: {resp.status_code}")
# Expected: 204

# Verify user lost access (group membership removed)
resp = client.get(f"/api/files/{FILE_ID}", headers=USER_AUTH)
pp(resp)
# Expected: {"detail": "File not found"}

Remove member status: 204
{
  "detail": "File not found"
}


## 10C. Project-Based File Access Demo

Projects can contain both individual users and entire groups. When a file is granted to a project, all members (direct users + users in member groups) can see it.


### 10C-a. Create a project with user and group members

In [39]:
# Re-add user to the group (removed in previous step)
client.post(f"/api/groups/{GROUP_ID}/members", headers=AUTH, json={"user_id": USER_ID})

# Create project
resp = client.post("/api/projects", headers=AUTH, json={
    "name": "Microscopy Study 2026",
    "description": "Main research project",
})
PROJECT_ID = resp.json()["id"]
print(f"Project ID: {PROJECT_ID}")

# Add the group as a project member
resp = client.post(f"/api/projects/{PROJECT_ID}/members", headers=AUTH, json={
    "member_type": "group",
    "member_id": GROUP_ID,
})
pp(resp)

Project ID: 7d75d750-a051-4405-88de-ddfaaff5e98e
{
  "id": "b13ade48-a23e-4547-a0a3-57949ce214cd",
  "project_id": "7d75d750-a051-4405-88de-ddfaaff5e98e",
  "member_type": "group",
  "member_id": "646512c1-d81b-434a-b798-aeb402cd5dca"
}


### 10C-b. Grant the project access to a file

In [40]:
# Clean up previous grants on the file
grants = client.get(f"/api/files/{FILE_ID}/access", headers=AUTH).json()
for g in grants:
    client.delete(f"/api/files/{FILE_ID}/access/{g['id']}", headers=AUTH)
print(f"Cleaned up {len(grants)} existing grants")

# Grant project access
resp = client.post(f"/api/files/{FILE_ID}/access", headers=AUTH, json={
    "grantee_type": "project",
    "grantee_id": PROJECT_ID,
})
pp(resp)

Cleaned up 1 existing grants
{
  "id": "7d714f34-dbeb-494e-913b-1aa576fe1a86",
  "file_id": "fc38212d-4f71-405c-be81-26aa8f173beb",
  "grantee_type": "project",
  "grantee_id": "7d75d750-a051-4405-88de-ddfaaff5e98e",
  "granted_at": "2026-02-24T17:06:11.719943Z"
}


### 10C-c. Verify user sees the file via project → group → user chain

In [41]:
resp = client.get(f"/api/files/{FILE_ID}", headers=USER_AUTH)
pp(resp)
# Expected: 200 — user can see the file because:
#   user ∈ group → group ∈ project → project has file grant

{
  "id": "fc38212d-4f71-405c-be81-26aa8f173beb",
  "persistent_id": "ark:/99999/fk4biqspimyync43nspm5elixje3m",
  "persistent_id_type": "ark",
  "instrument_id": "4bca34e9-5309-4d8d-b78c-af9ef64845f8",
  "source_path": "microscope/user_a/experiment_001/scan_01.csv",
  "filename": "scan_01.csv",
  "size_bytes": 23,
  "source_mtime": "2026-02-24T17:05:59.485000Z",
  "xxhash": "7c0d455ba2126c3d",
  "sha256": null,
  "first_discovered_at": "2026-02-24T17:06:10.729750Z",
  "metadata_": {
    "username": "user_a",
    "experiment": "experiment_001"
  },
  "owner_id": null
}


### 10C-d. Test direct user membership in projects

In [42]:
# Create a second user
resp = client.post("/auth/register", json={"email": "postdoc@test.org", "password": "testpassword123"})
pp(resp)

resp = client.post("/auth/jwt/login", data={"username": "postdoc@test.org", "password": "testpassword123"})
POSTDOC_TOKEN = resp.json()["access_token"]
POSTDOC_AUTH = {"Authorization": f"Bearer {POSTDOC_TOKEN}"}
POSTDOC_ID = client.get("/users/me", headers=POSTDOC_AUTH).json()["id"]
print(f"Postdoc ID: {POSTDOC_ID}")

# Add postdoc directly to the project (not via group)
resp = client.post(f"/api/projects/{PROJECT_ID}/members", headers=AUTH, json={
    "member_type": "user",
    "member_id": POSTDOC_ID,
})
pp(resp)

# Postdoc can also see the file
resp = client.get(f"/api/files/{FILE_ID}", headers=POSTDOC_AUTH)
print(f"\nPostdoc file access status: {resp.status_code}")
# Expected: 200

{
  "id": "48b4470f-f365-4eb4-9a13-526122f9fd62",
  "email": "postdoc@test.org",
  "is_active": true,
  "is_superuser": false,
  "is_verified": false,
  "role": "user"
}
Postdoc ID: 48b4470f-f365-4eb4-9a13-526122f9fd62
{
  "id": "4c14f918-a3e4-437c-a402-35784ca749ee",
  "project_id": "7d75d750-a051-4405-88de-ddfaaff5e98e",
  "member_type": "user",
  "member_id": "48b4470f-f365-4eb4-9a13-526122f9fd62"
}

Postdoc file access status: 200


### 10C-e. Projects CRUD

In [43]:
# List projects
print("=== All projects ===")
pp(client.get("/api/projects", headers=AUTH))

# List project members
print("\n=== Project members ===")
pp(client.get(f"/api/projects/{PROJECT_ID}/members", headers=AUTH))
# Expected: 2 members (1 group + 1 direct user)

=== All projects ===
[
  {
    "id": "7d75d750-a051-4405-88de-ddfaaff5e98e",
    "name": "Microscopy Study 2026",
    "description": "Main research project",
    "created_at": "2026-02-24T17:06:11.689293Z",
    "updated_at": "2026-02-24T17:06:11.689293Z"
  }
]

=== Project members ===
[
  {
    "id": "b13ade48-a23e-4547-a0a3-57949ce214cd",
    "project_id": "7d75d750-a051-4405-88de-ddfaaff5e98e",
    "member_type": "group",
    "member_id": "646512c1-d81b-434a-b798-aeb402cd5dca"
  },
  {
    "id": "4c14f918-a3e4-437c-a402-35784ca749ee",
    "project_id": "7d75d750-a051-4405-88de-ddfaaff5e98e",
    "member_type": "user",
    "member_id": "48b4470f-f365-4eb4-9a13-526122f9fd62"
  }
]


In [44]:
# Remove postdoc from project
resp = client.delete(f"/api/projects/{PROJECT_ID}/members/{POSTDOC_ID}", headers=AUTH)
print(f"Remove member status: {resp.status_code}")
# Expected: 204

# Postdoc loses access
resp = client.get(f"/api/files/{FILE_ID}", headers=POSTDOC_AUTH)
pp(resp)
# Expected: {"detail": "File not found"}

Remove member status: 204
{
  "detail": "File not found"
}


### 10C-f. Non-admin users cannot manage groups/projects/grants

In [45]:
# All of these should return 403
for endpoint in ["/api/groups", "/api/projects", f"/api/files/{FILE_ID}/access"]:
    resp = client.get(endpoint, headers=USER_AUTH)
    print(f"GET {endpoint}: {resp.status_code} — {resp.json()}")
# Expected: {"detail": "Admin access required"}

GET /api/groups: 403 — {'detail': 'Admin access required'}
GET /api/projects: 403 — {'detail': 'Admin access required'}
GET /api/files/fc38212d-4f71-405c-be81-26aa8f173beb/access: 403 — {'detail': 'Admin access required'}



## 11. File & Transfer API Filtering Demo



### 11a. Filter files by instrument

In [46]:
INSTRUMENT_ID = instruments[0]["id"]

resp = client.get(f"/api/files?instrument_id={INSTRUMENT_ID}", headers=AUTH)
print(f"Files for instrument {INSTRUMENT_ID}: {len(resp.json())}")

Files for instrument 4bca34e9-5309-4d8d-b78c-af9ef64845f8: 6


### 11b. Filter transfers by file

In [47]:
FILE_ID = client.get("/api/files", headers=AUTH).json()[0]["id"]

resp = client.get(f"/api/transfers?file_id={FILE_ID}", headers=AUTH)
pp(resp)

[
  {
    "id": "4bf5abaf-b2ed-412b-87f6-3c7326d80ba9",
    "file_id": "fc38212d-4f71-405c-be81-26aa8f173beb",
    "storage_location_id": "7cb0e261-f511-4ff3-96af-0bd3323d0ba7",
    "destination_path": "/storage/archive/Microscope 01/microscope/user_a/experiment_001/scan_01.csv",
    "transfer_adapter": "rclone",
    "status": "completed",
    "bytes_transferred": 23,
    "source_checksum": null,
    "dest_checksum": "7c0d455ba2126c3d",
    "checksum_verified": false,
    "started_at": "2026-02-24T17:06:10.730384Z",
    "completed_at": "2026-02-24T17:06:10.790206Z",
    "error_message": null,
    "prefect_flow_run_id": null
  }
]


### 11c. Get single file by ID

In [48]:
resp = client.get(f"/api/files/{FILE_ID}", headers=AUTH)
pp(resp)

{
  "id": "fc38212d-4f71-405c-be81-26aa8f173beb",
  "persistent_id": "ark:/99999/fk4biqspimyync43nspm5elixje3m",
  "persistent_id_type": "ark",
  "instrument_id": "4bca34e9-5309-4d8d-b78c-af9ef64845f8",
  "source_path": "microscope/user_a/experiment_001/scan_01.csv",
  "filename": "scan_01.csv",
  "size_bytes": 23,
  "source_mtime": "2026-02-24T17:05:59.485000Z",
  "xxhash": "7c0d455ba2126c3d",
  "sha256": null,
  "first_discovered_at": "2026-02-24T17:06:10.729750Z",
  "metadata_": {
    "username": "user_a",
    "experiment": "experiment_001"
  },
  "owner_id": null
}


Verify all fields are present: `persistent_id`, `persistent_id_type`, `source_path`, `filename`, `xxhash`, `first_discovered_at`, `metadata_`.

### 11d. Get single transfer by ID

In [49]:
TRANSFER_ID = client.get("/api/transfers", headers=AUTH).json()[0]["id"]

resp = client.get(f"/api/transfers/{TRANSFER_ID}", headers=AUTH)
pp(resp)

{
  "id": "0866d35b-f4f2-492d-a562-2a816fd27da8",
  "file_id": "37bc26ec-ed36-4148-ba35-e27b4b3103ac",
  "storage_location_id": "7cb0e261-f511-4ff3-96af-0bd3323d0ba7",
  "destination_path": "/storage/archive/Microscope 01/microscope/experiment_001.csv",
  "transfer_adapter": "rclone",
  "status": "completed",
  "bytes_transferred": 302,
  "source_checksum": null,
  "dest_checksum": "2660dec29d8c3f7b",
  "checksum_verified": false,
  "started_at": "2026-02-24T17:06:10.399059Z",
  "completed_at": "2026-02-24T17:06:10.458032Z",
  "error_message": null,
  "prefect_flow_run_id": null
}


## 12. Test Schedule CRUD with Prefect Sync

### 12a. Create a new schedule

In [50]:
resp = client.post("/api/schedules", headers=AUTH, json={
    "instrument_id": INSTRUMENT_ID,
    "default_storage_location_id": STORAGE_ID,
    "cron_expression": "0 */6 * * *",
    "enabled": True,
})
pp(resp)

{
  "id": "f1598bd0-af1d-403d-8713-7431e03e5ec1",
  "instrument_id": "4bca34e9-5309-4d8d-b78c-af9ef64845f8",
  "default_storage_location_id": "7cb0e261-f511-4ff3-96af-0bd3323d0ba7",
  "cron_expression": "0 */6 * * *",
  "prefect_deployment_id": "a28a83b2-4d95-45ba-acd3-6b664c9ba1e7",
  "enabled": true,
  "created_at": "2026-02-24T17:06:11.923544Z",
  "updated_at": "2026-02-24T17:06:11.925086Z"
}


Check that `prefect_deployment_id` is populated (Prefect deployment was created).

### 12b. Update the schedule

In [51]:
client.get("/api/schedules", headers=AUTH).json()

[{'id': '725fa608-f4da-40f5-980a-2aa977ffe7e3',
  'instrument_id': '4bca34e9-5309-4d8d-b78c-af9ef64845f8',
  'default_storage_location_id': '7cb0e261-f511-4ff3-96af-0bd3323d0ba7',
  'cron_expression': '*/15 * * * *',
  'prefect_deployment_id': 'a28a83b2-4d95-45ba-acd3-6b664c9ba1e7',
  'enabled': True,
  'created_at': '2026-02-24T17:05:41.521453Z',
  'updated_at': '2026-02-24T17:05:41.523843Z'},
 {'id': '0293924e-a6de-459c-98c0-6d73303d576b',
  'instrument_id': '65186f54-074d-430e-b2ca-c24b4861e990',
  'default_storage_location_id': '7cb0e261-f511-4ff3-96af-0bd3323d0ba7',
  'cron_expression': '*/15 * * * *',
  'prefect_deployment_id': '97ee019b-aa52-43b0-84e9-f472b9951c50',
  'enabled': True,
  'created_at': '2026-02-24T17:05:42.798288Z',
  'updated_at': '2026-02-24T17:05:42.799405Z'},
 {'id': 'a86e9e55-9518-4b9f-9a24-321f3ea3b428',
  'instrument_id': '20bfa6b2-87a4-4d41-9c97-d7f59bd92723',
  'default_storage_location_id': '51c43e6b-e524-4fd0-aa92-3b40d8fe7cda',
  'cron_expression': '*/

In [52]:
NEW_SCHEDULE_ID = client.get("/api/schedules", headers=AUTH).json()[0]["id"]

resp = client.patch(f"/api/schedules/{NEW_SCHEDULE_ID}", headers=AUTH, json={
    "cron_expression": "0 */12 * * *",
})
pp(resp)

{
  "id": "725fa608-f4da-40f5-980a-2aa977ffe7e3",
  "instrument_id": "4bca34e9-5309-4d8d-b78c-af9ef64845f8",
  "default_storage_location_id": "7cb0e261-f511-4ff3-96af-0bd3323d0ba7",
  "cron_expression": "0 */12 * * *",
  "prefect_deployment_id": "a28a83b2-4d95-45ba-acd3-6b664c9ba1e7",
  "enabled": true,
  "created_at": "2026-02-24T17:05:41.521453Z",
  "updated_at": "2026-02-24T17:06:12.010835Z"
}


Verify in Prefect UI that the deployment schedule updated.

## 13. Test Idempotent Discovery

Trigger the same harvest twice — the second run should find zero new files.

In [53]:
SCHEDULE_ID

'725fa608-f4da-40f5-980a-2aa977ffe7e3'

In [54]:
# First trigger
resp = client.post(f"/api/schedules/{SCHEDULE_ID}/trigger", headers=AUTH)
FLOW_RUN_ID = resp.json().get("flow_run_id")
print("Trigger 1:", resp.json())
wait_for_flow_run(FLOW_RUN_ID)

before = len(client.get("/api/files", headers=AUTH).json())
print(f"Files before: {before}\n")

# Second trigger
resp = client.post(f"/api/schedules/{SCHEDULE_ID}/trigger", headers=AUTH)
print("Trigger 2:", resp.json())
FLOW_RUN_ID = resp.json().get("flow_run_id")
wait_for_flow_run(FLOW_RUN_ID)

after = len(client.get("/api/files", headers=AUTH).json())
print(f"Files after: {after}\n")

if before == after:
    print("PASS: No duplicate files")
else:
    print("FAIL: Duplicate files created")

Trigger 1: {'flow_run_id': '12a6ac99-7067-4e23-a245-692db7d52a42', 'schedule_id': '725fa608-f4da-40f5-980a-2aa977ffe7e3'}
Waiting for flow run 12a6ac99-7067-4e23-a245-692db7d52a42 to complete...
  State: SCHEDULED (attempt 1/120)
  State: SCHEDULED (attempt 2/120)
  State: SCHEDULED (attempt 3/120)
  State: SCHEDULED (attempt 4/120)
  State: SCHEDULED (attempt 5/120)
  State: PENDING (attempt 6/120)
  State: RUNNING (attempt 7/120)
Flow run finished with state: COMPLETED
Files before: 6

Trigger 2: {'flow_run_id': '836fecca-bbb1-4c9a-8280-bdf253751018', 'schedule_id': '725fa608-f4da-40f5-980a-2aa977ffe7e3'}
Waiting for flow run 836fecca-bbb1-4c9a-8280-bdf253751018 to complete...
  State: SCHEDULED (attempt 1/120)
  State: SCHEDULED (attempt 2/120)
  State: SCHEDULED (attempt 3/120)
  State: SCHEDULED (attempt 4/120)
  State: SCHEDULED (attempt 5/120)
  State: SCHEDULED (attempt 6/120)
  State: SCHEDULED (attempt 7/120)
  State: SCHEDULED (attempt 8/120)
  State: SCHEDULED (attempt 9/12

## 14. Verify ARK Identifiers

Files are assigned unique identifiers ([ARK](https://arks.org/about/), by default) when they are discovered. Future versions of StreamWave will support [handles](https://handle.net/) and [DOIs](https://doi.org) as well for globally persistent identification.

In [55]:
files = client.get("/api/files", headers=AUTH).json()
arks = [f["persistent_id"] for f in files]

print(f"Total files: {len(arks)}")
print(f"Unique ARKs: {len(set(arks))}")
print(f"All start with ark:/99999/fk4: {all(a.startswith('ark:/99999/fk4') for a in arks)}")
print("\nSample ARKs:")
for ark in arks[:5]:
    print(f"  {ark}")

Total files: 6
Unique ARKs: 6
All start with ark:/99999/fk4: True

Sample ARKs:
  ark:/99999/fk4biqspimyync43nspm5elixje3m
  ark:/99999/fk4dkgmm4ypvzbipe4q2zrkqafbaq
  ark:/99999/fk4tspym44rwjgrpjekiuihzr3bqi
  ark:/99999/fk422lxsysd3nbppjq6jj5bap7hhm
  ark:/99999/fk4uk2tvgse5bghpbzl557rogbriu


## 15. Cleanup

Run these in a terminal to tear down everything:

```bash
docker compose down -v
cd simlab && docker compose -f docker-compose.simlab.yml down -v && cd ..
```

In [56]:
# remove additional demo files created in notebook
_ = run(f"docker compose -f {SIMLAB_COMPOSE} exec microscope-01 sh -c 'rm -rf /data/user_a /data/user_b /data/scratch.tmp'")

# teardown "simlab" stack
_ = run(f"docker compose -f {SIMLAB_COMPOSE} down -v")

# teardown main streamwave stack
_ = run("docker compose down -v")

client.close()

[33m Container simlab-spectrometer-01-1 Stopping 
[0m[33m Container simlab-xray-diffraction-01-1 Stopping 
[0m[33m Container simlab-microscope-01-1 Stopping 
[0m[33m Container simlab-microscope-01-1 Stopped 
[0m[33m Container simlab-microscope-01-1 Removing 
[0m[33m Container simlab-spectrometer-01-1 Stopped 
[0m[33m Container simlab-spectrometer-01-1 Removing 
[0m[33m Container simlab-xray-diffraction-01-1 Stopped 
[0m[33m Container simlab-xray-diffraction-01-1 Removing 
[0m[33m Container simlab-microscope-01-1 Removed 
[0m[33m Container simlab-spectrometer-01-1 Removed 
[0m[33m Container simlab-xray-diffraction-01-1 Removed 
[0m[33m Network streamweave-simlab Removing 
[0m[33m Network streamweave-simlab Resource is still in use 
[0m[33m Container streamweave-api-1 Stopping 
[0m[33m Container streamweave-worker-1 Stopping 
[0m[33m Container streamweave-worker-1 Stopped 
[0m[33m Container streamweave-worker-1 Removing 
[0m[33m Container streamweave-

Confirm no containers are still running:

In [57]:
_ = run(f"docker compose -f {SIMLAB_COMPOSE} ps")
_ = run("docker compose ps")

NAME      IMAGE     COMMAND   SERVICE   CREATED   STATUS    PORTS
NAME      IMAGE     COMMAND   SERVICE   CREATED   STATUS    PORTS


## 16. Conclusion

This guide demonstrated the core capabilities of the **StreamWeave** backend, a research data management platform designed to automate the discovery, transfer, and governance of instrument-generated data.

### Features Covered

- **Automated File Discovery & Transfer** — Schedules that automatically harvest files from instrument sources, with checksum verification and idempotent processing
- **Persistent Identifiers (ARK)** — Every discovered file receives a unique, standards-compliant ARK identifier for long-term reference
- **Workflow Orchestration** — Prefect-powered flow execution with real-time monitoring, manual triggers, and scheduled runs
- **Extensible Hooks System** — Pre-transfer hooks for filtering files and post-transfer hooks for metadata enrichment
- **Fine-Grained Access Control** — User, group, and project-based permissions with hierarchical inheritance
- **Full API Coverage** — RESTful endpoints for instruments, storage locations, schedules, files, transfers, and access management

### Use Cases

StreamWeave is ideal for:
- Research core facilities managing data from multiple scientific instruments
- Laboratories requiring automated data archival with provenance tracking
- Organizations needing compliant data governance with audit trails



### Interested in deploying StreamWeave for your organization?

For deployment assistance, custom integrations, or enterprise support, contact us at:

**[https://datasophos.co/#contact](https://datasophos.co/#contact)**