# Test: Data Owner (Client1)

Automated test notebook for the Data Owner workflow.
Run with: `jupyter execute sc_test_do.ipynb`

In [None]:
import sys
from pathlib import Path

# Configuration
TIMEOUT = 120.0

In [None]:
# Clean shared directory for fresh test
!rm -rf shared/public shared/client1 shared/client2

In [None]:
import scanpy as sc
import anndata as ad
from beaver import Twin
from beaver.runtime import TrustedLoader
import beaver

bv = beaver.connect("shared", user="client1")
print("[DO] Connected as client1")

In [None]:
# Register AnnData serialization
@TrustedLoader.register(ad.AnnData)
def anndata_serialize_file(obj, path):
    obj.write_h5ad(path)

@TrustedLoader.register(ad.AnnData)
def anndata_deserialize_file(path):
    return ad.read_h5ad(path)

print("[DO] Registered AnnData loader")

In [None]:
# Wait for DS to signal ready before loading data
print("[DO] Waiting for DS ready signal...")
env, obj = bv.wait_for_message(
    filter_name="ds_ready",
    timeout=TIMEOUT,
    poll_interval=1.0,
)
assert env is not None, "Timeout waiting for DS ready signal"
print(f"[DO] Received ready signal from {env.sender}")

In [None]:
# Load single-cell data
data_dir = Path("single_cell/data")
private_path = data_dir / "sc_RNAseq_adata_downsampled_to5percent.private.h5ad"
mock_path = data_dir / "sc_RNAseq_adata_downsampled_to5percent.mock.h5ad"
sim_path = data_dir / "adata_simulated.h5ad"

# Create mock from simulated if needed
if not mock_path.exists() and sim_path.exists():
    print("[DO] Creating mock data from simulated...")
    adata_sim = sc.read(sim_path)
    adata_sim.obs.rename(columns={"pct_counts_in_top_50_genes": "pct_counts_mt"}, inplace=True)
    adata_sim.obs.rename(columns={"group": "cell_type"}, inplace=True)
    adata_sim.write_h5ad(mock_path)

adata_private = sc.read(private_path)
adata_mock = sc.read(mock_path)

print(f"[DO] Loaded private: {adata_private.n_obs} cells")
print(f"[DO] Loaded mock: {adata_mock.n_obs} cells")

assert adata_private.n_obs > 0, "Private data is empty"
assert adata_mock.n_obs > 0, "Mock data is empty"

In [None]:
# Create and publish Twin
patient_sc = Twin(
    private=adata_private,
    public=adata_mock,
    owner="client1",
    name="patient_sc",
)

bv.remote_vars["patient_sc"] = patient_sc
print("[DO] Published patient_sc Twin")

# Signal to DS that Twin is ready
bv.send({"status": "twin_ready"}, name="do_twin_ready", user="client2")
print("[DO] Sent twin_ready signal to DS")

# Track processed requests
processed_requests = set()

In [None]:
# Request 1: Violin plot
print("[DO] Waiting for request 1/4 (violin)...")
bv.wait_for_message(timeout=TIMEOUT, poll_interval=1.0)

# Find the first unprocessed computation request in inbox
inbox = bv.inbox()
request_envs = [e for e in inbox if e.name and e.name.startswith("request_") and e.name not in processed_requests]
assert len(request_envs) > 0, "No new request found in inbox"
request_env = request_envs[0]
print(f"[DO] Received: {request_env.name}")

# Load it - this injects the request into globals
request_env.load()
processed_requests.add(request_env.name)

# The loaded variable name is the envelope name
request_obj = globals()[request_env.name]
print(f"[DO] Running: {request_env.name}")

result = request_obj.run_both()
assert result is not None, "run_both() returned None"

result.approve()
print("[DO] ✓ Request 1 complete")

In [None]:
# Request 2: Embedding
print("[DO] Waiting for request 2/4 (embedding)...")
bv.wait_for_message(timeout=TIMEOUT, poll_interval=1.0)

inbox = bv.inbox()
request_envs = [e for e in inbox if e.name and e.name.startswith("request_") and e.name not in processed_requests]
assert len(request_envs) > 0, "No new request found in inbox"
request_env = request_envs[0]
print(f"[DO] Received: {request_env.name}")

request_env.load()
processed_requests.add(request_env.name)
request_obj = globals()[request_env.name]
print(f"[DO] Running: {request_env.name}")

result = request_obj.run_both()
assert result is not None, "run_both() returned None"

result.approve()
print("[DO] ✓ Request 2 complete")

In [None]:
# Request 3: PCA variance
print("[DO] Waiting for request 3/4 (pca)...")
bv.wait_for_message(timeout=TIMEOUT, poll_interval=1.0)

inbox = bv.inbox()
request_envs = [e for e in inbox if e.name and e.name.startswith("request_") and e.name not in processed_requests]
assert len(request_envs) > 0, "No new request found in inbox"
request_env = request_envs[0]
print(f"[DO] Received: {request_env.name}")

request_env.load()
processed_requests.add(request_env.name)
request_obj = globals()[request_env.name]
print(f"[DO] Running: {request_env.name}")

result = request_obj.run_both()
assert result is not None, "run_both() returned None"

result.approve()
print("[DO] ✓ Request 3 complete")

In [None]:
# Request 4: UMAP
print("[DO] Waiting for request 4/4 (umap)...")
bv.wait_for_message(timeout=TIMEOUT, poll_interval=1.0)

inbox = bv.inbox()
request_envs = [e for e in inbox if e.name and e.name.startswith("request_") and e.name not in processed_requests]
assert len(request_envs) > 0, "No new request found in inbox"
request_env = request_envs[0]
print(f"[DO] Received: {request_env.name}")

request_env.load()
processed_requests.add(request_env.name)
request_obj = globals()[request_env.name]
print(f"[DO] Running: {request_env.name}")

result = request_obj.run_both()
assert result is not None, "run_both() returned None"

result.approve()
print("[DO] ✓ Request 4 complete")

In [None]:
# Final assertions
print("\n" + "="*50)
print("[DO] TEST PASSED")
print("[DO] Successfully handled 4 computation requests")
print("="*50)