In [1]:
# syft absolute
import syft as sy
from syft import ActionObject
from syft.client.syncing import compare_states



In [2]:
node_low = sy.orchestra.launch(
    name="test_l",
    node_side_type="low",
    dev_mode=True,
    reset=True,
    local_db=True,
    n_consumers=1,
    create_producer=True,
)
node_high = sy.orchestra.launch(
    name="test_h",
    node_side_type="high",
    dev_mode=True,
    reset=True,
    local_db=True,
    n_consumers=1,
    create_producer=True,
)

Staging Protocol Changes...
SQLite Store Path:
!open file:///tmp/8a1c04544655402190588aec30079bc3.sqlite

Creating default worker image with tag='local-dev'
Building default worker image with tag=local-dev
Setting up worker poolname=default-pool workers=1 image_uid=3a9cb0c9324145d391d5bffbedd32447 in_memory=True
Created default worker pool.
Data Migrated to latest version !!!
Staging Protocol Changes...
SQLite Store Path:
!open file:///tmp/8212e6797fde4c3fba4fc53ab555a886.sqlite

Creating default worker image with tag='local-dev'
Building default worker image with tag=local-dev
Setting up worker poolname=default-pool workers=1 image_uid=e127de1b42e444458e8e093f6f27e38c in_memory=True
Created default worker pool.
Data Migrated to latest version !!!


In [3]:
client_low = node_low.login(email="info@openmined.org", password="changethis")
client_high = node_high.login(email="info@openmined.org", password="changethis")

Logged into <test_l: Low side Domain> as <info@openmined.org>


Logged into <test_h: High side Domain> as <info@openmined.org>


In [4]:
client_low.register(
    email="newuser@openmined.org", name="John Doe", password="pw", password_verify="pw"
)
client_low_ds = node_low.login(email="newuser@openmined.org", password="pw")

Logged into <test_l: Low side Domain> as <newuser@openmined.org>


# create datasets

In [5]:
# third party
import numpy as np

In [6]:
mock_high = np.array([10, 11, 12, 13, 14])
private_high = np.array([15, 16, 17, 18, 19])

dataset_high = sy.Dataset(
    name="my-dataset",
    description="abc",
    asset_list=[
        sy.Asset(
            name="numpy-data",
            mock=mock_high,
            data=private_high,
            shape=private_high.shape,
            mock_is_real=True,
        )
    ],
)

client_high.upload_dataset(dataset_high)

  0%|          | 0/1 [00:00<?, ?it/s]

Uploading: numpy-data


100%|██████████| 1/1 [00:00<00:00,  8.14it/s]


In [7]:
mock_low = np.array([0, 1, 2, 3, 4])  # do_high.mock
# private_low = np.array([5, 6, 7, 8, 9])  # AOEmpty? create new type AO

dataset_low = sy.Dataset(
    id=dataset_high.id,
    name="my-dataset",
    description="abc",
    asset_list=[
        sy.Asset(
            name="numpy-data",
            mock=mock_low,
            data=ActionObject.empty(data_node_id=client_high.id),
            shape=mock_low.shape,
            mock_is_real=True,
        )
    ],
)

res = client_low.upload_dataset(dataset_low)

100%|██████████| 1/1 [00:00<00:00, 25.04it/s]

Uploading: numpy-data





# Make Requests

In [8]:
data_low = client_low_ds.datasets[0].assets[0]


@sy.syft_function_single_use(data=data_low)
def compute_mean(data) -> float:
    print("Computing mean...")
    return data.mean()


compute_mean(data=data_low.mock)

SyftInfo: Creating a node with n_consumers=2 (the default value)
Staging Protocol Changes...
SQLite Store Path:
!open file:///tmp/475fa0af23f8422c867946d245a1b399.sqlite

Creating default worker image with tag='local-dev'
Building default worker image with tag=local-dev
Setting up worker poolname=default-pool workers=2 image_uid=1eee6d3d82004a26aeefdfbabb25f6ab in_memory=True
Created default worker pool.
Data Migrated to latest version !!!
Logged into <ephemeral_node_compute_mean_2868: High side Domain> as <info@openmined.org>


Approving request for domain ephemeral_node_compute_mean_2868
override True
Computing mean...
SyftInfo: Landing the ephmeral node...


```python
Pointer
```
2.0

In [9]:
client_low_ds.code.request_code_execution(compute_mean)

## Sync to high side

In [10]:
low_state = client_low.sync.get_state()

high_state = client_high.sync.get_state()

In [11]:
diff_state = compare_states(low_state, high_state)

diff_state

In [12]:
# syft absolute
# state.objs_to_sync
from syft.client.syncing import resolve

resolved_state_low, resolved_state_high = resolve(diff_state, decision="low")

Decision: Syncing all objects from low side


In [13]:
print("Resolved state low side")
print(resolved_state_low)
print()
print("Resolved state high side")
print(resolved_state_high)

Resolved state low side
ResolvedSyncState(
  create_objs=[],
  update_objs=[],
  delete_objs=[]
)

Resolved state high side
ResolvedSyncState(
  create_objs=[syft.service.request.request.Request, syft.service.code.user_code.UserCode],
  update_objs=[],
  delete_objs=[]
)


In [14]:
client_low.apply_state(resolved_state_low)

In [15]:
client_high.apply_state(resolved_state_high)

SyftInfo: Node Landed!


# Run code high and sync back result

In [16]:
data_high = client_high.datasets[0].assets[0]

In [17]:
job_high = client_high.code.compute_mean(data=data_high, blocking=False)
display(job_high)

```python
class Job:
    id: UID = 76d13946679149f591566d1fd26a0678
    status: JobStatus.CREATED
    has_parent: False
    result: syft.service.action.action_data_empty.ObjectNotReady
    logs:

0 
    
```

In [18]:
# wait for the result
job_high.wait().get()

override True


29/02/24 17:27:37 FUNCTION LOG (76d13946679149f591566d1fd26a0678): Computing mean...


17.0

In [19]:
job_info = job_high.info(public_metadata=True, result=True)

request = client_high.requests[0]
result_obj = job_high.result

In [20]:
# syft absolute
from syft import SyftError
from syft import SyftSuccess

# Accepting the result directly gives an error
accept_res = request.accept_by_depositing_result(result_obj)

assert isinstance(accept_res, SyftError)
accept_res

In [21]:
accept_res = request.accept_by_depositing_result(job_info)

assert isinstance(accept_res, SyftSuccess)
accept_res

Approving request for domain test_h
Approving request for domain test_h


In [22]:
# accept_res = request.accept_by_depositing_result(job_info.result.get())

# assert isinstance(accept_res, SyftSuccess)
# accept_res

In [23]:
# Need to refresh Job because we overwrite result
job_high = client_high.jobs[0]

action_store_high = node_high.python_node.get_service("actionservice").store
blob_store_high = node_high.python_node.get_service(
    "blobstorageservice"
).stash.partition
assert (
    f"{client_low_ds.verify_key}_READ"
    in action_store_high.permissions[job_high.result.id.id]
)
assert (
    f"{client_low_ds.verify_key}_READ"
    in blob_store_high.permissions[job_high.result.syft_blob_storage_entry_id]
)

## Sync back to low

In [24]:
client_high.jobs[0]

```python
class Job:
    id: UID = 76d13946679149f591566d1fd26a0678
    status: JobStatus.COMPLETED
    has_parent: False
    result: 17.0
    logs:

0 Computing mean...
JOB COMPLETED
    
```

In [25]:
low_state = client_low.sync.get_state()
high_state = client_high.sync.get_state()

high_state

In [26]:
diff_state_2 = compare_states(low_state, high_state)

diff_state_2

In [27]:
resolved_state_low, resolved_state_high = resolve(
    diff_state_2, decision="high", all_permissions=True
)

Decision: Syncing all objects from high side


In [28]:
print(resolved_state_low)

ResolvedSyncState(
  create_objs=[syft.service.job.job_stash.Job, syft.service.log.log.SyftLog, Pointer:
17.0],
  update_objs=[syft.service.request.request.Request, syft.service.code.user_code.UserCode],
  delete_objs=[]
)


In [29]:
client_low.apply_state(resolved_state_low)

In [30]:
client_high.apply_state(resolved_state_high)

In [31]:
action_store_low = node_low.python_node.get_service("actionservice").store
blob_store_low = node_low.python_node.get_service("blobstorageservice").stash.partition
assert (
    f"{client_low_ds.verify_key}_READ"
    in action_store_low.permissions[job_high.result.id.id]
)
assert (
    f"{client_low_ds.verify_key}_READ"
    in blob_store_low.permissions[job_high.result.syft_blob_storage_entry_id]
)

# Run code low

## Run

In [32]:
res_low = client_low_ds.code.compute_mean(data=data_low)

res_low.get()

override False


17.0

In [33]:
res_low.get()

17.0

In [34]:
code = client_low_ds.code[0]

assert res_low.get() == private_high.mean()
assert (
    res_low.id == job_high.result.id.id == code.output_policy.last_output_ids[0].id.id
)
assert job_high.result.syft_blob_storage_entry_id == res_low.syft_blob_storage_entry_id

In [35]:
private_high.mean()

17.0

In [36]:
job_low = client_low_ds.code.compute_mean(data=data_low, blocking=False)
job_low

```python
class Job:
    id: UID = 76d13946679149f591566d1fd26a0678
    status: JobStatus.COMPLETED
    has_parent: False
    result: 17.0
    logs:

0 Log a37d9c30bd254132b5155c595f9ad47c not available
JOB COMPLETED
    
```

In [37]:
low_state = client_low.sync.get_state()

In [38]:
job_low.wait().get()

17.0

In [39]:
job_low.logs()

Log a37d9c30bd254132b5155c595f9ad47c not available


In [40]:
assert job_low.id == job_high.id
assert job_low.result.id == job_high.result.id
assert (
    job_low.result.syft_blob_storage_entry_id
    == job_high.result.syft_blob_storage_entry_id
)

## TODO

- Hard-deleting objects (in a clean way)
    - We *should* be able to delete without messing up a node state, because we make a single choice for a whole hierarchy 
    - What happens if we delete a queued/running job?
- Soft-delete/archive objects
    - More work, new flag on every object in every stash
- 