In [None]:
SYFT_VERSION = ">=0.9,<1.0.0"
package_string = f'"syft{SYFT_VERSION}"'

In [2]:
# stdlib
import os
import time

# third party
import docker
import numpy as np

# syft absolute
import syft as sy

sy.requires(SYFT_VERSION)

# syft absolute
from syft.service.worker.image_registry import SyftImageRegistry
from syft.service.worker.worker_image import SyftWorkerImage

# Local registry to test external registry


class LocalRegistryContainer:
    def __init__(self):
        self.name = "local_registry"
        self.client = docker.from_env()

    def start(self, host_port=5678):
        existing = self.get()
        if existing:
            return existing

        result = self.client.containers.run(
            "registry:2",
            name=self.name,
            detach=True,
            ports={"5000/tcp": host_port},
            labels={"orgs.openmined.syft": "local-registry"},
        )

        return result

    def teardown(self):
        existing = self.get()
        if existing:
            existing.stop()
            existing.remove()

    def get(self):
        try:
            result = self.client.containers.get(self.name)
            if result.status == "running":
                return result
        except docker.errors.NotFound:
            return None


local_registry_container = LocalRegistryContainer()

  "cipher": algorithms.TripleDES,
  "class": algorithms.TripleDES,


✅ The installed version of syft==0.9.1b1 matches the requirement >=0.9 and the requirement <1.0.0


In [3]:
# Uncomment this to run the whole docker based custom workers
# os.environ["ORCHESTRA_DEPLOYMENT_TYPE"] = "container_stack"
# os.environ["DEV_MODE"] = "True"


# Disable inmemory worker for container stack
running_as_container = os.environ.get("ORCHESTRA_DEPLOYMENT_TYPE") in (
    "container_stack",
)

In [4]:
datasite = sy.orchestra.launch(
    name="Evaluating PySyft",
    dev_mode=True,
    create_producer=True,
    reset=True,
    port=8081,
)

Autoreload enabled
Starting Evaluating PySyft server on 0.0.0.0:8081
Found `reset=True` in the launch configuration. Resetting the server...


INFO:     Will watch for changes in these directories: ['/home/joserico/.local/lib/python3.10/site-packages/syft']
INFO:     Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)
INFO:     Started reloader process [7569] using WatchFiles


Waiting for server to start

  "cipher": algorithms.TripleDES,
  "class": algorithms.TripleDES,


..

INFO:     Started server process [7583]
INFO:     Waiting for application startup.
INFO:     Application startup complete.


.WARN: private key is based on server name: Evaluating PySyft in dev_mode. Don't run this in production.
INFO:     127.0.0.1:43074 - "GET /api/v2/metadata HTTP/1.1" 200 OK
 Done.


INFO:     Stopping reloader process [7569]


In [5]:
datasite_client = datasite.login(email="sba23021@student.cct.ie", password="abc54321")

INFO:     127.0.0.1:43078 - "GET /api/v2/metadata HTTP/1.1" 200 OK
INFO:     127.0.0.1:43078 - "POST /api/v2/login HTTP/1.1" 200 OK
INFO:     127.0.0.1:43078 - "GET /api/v2/api?verify_key=ba46610f539d671d18f0ffa7775c829a7614bc10a201da7e4b0ae2065959a8be&communication_protocol=3 HTTP/1.1" 200 OK
INFO:     127.0.0.1:43086 - "POST /api/v2/api_call HTTP/1.1" 200 OK
Logged into <Evaluating PySyft: High side Datasite> as <sba23021@student.cct.ie>


We should see a default worker pool

In [6]:
datasite_client.worker_pools

INFO:     127.0.0.1:43088 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:43092 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:43104 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:43106 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [7]:
syft_base_worker_tag = (
    "local-dev"
    if (bool(os.environ["DEV_MODE"]) and running_as_container)
    else sy.__version__
)
syft_base_worker_tag = "0.9.0-beta.5"

#### Submit Dockerfile

In [8]:
opendp_dockerfile_str = f"""
FROM openmined/syft-backend:{syft_base_worker_tag}

RUN uv pip install opendp

""".strip()

docker_tag = "openmined/custom-worker-opendp:1.0.0"

In [9]:
docker_config = sy.DockerWorkerConfig(dockerfile=opendp_dockerfile_str)

In [10]:
# test image build locally
test_build_res = docker_config.test_image_build(tag=docker_tag)
test_build_res

In [11]:
assert isinstance(test_build_res, sy.SyftSuccess), str(test_build_res)

In [12]:
assert docker_config.dockerfile == opendp_dockerfile_str

In [13]:
submit_result = datasite_client.api.services.worker_image.submit(
    worker_config=docker_config
)

INFO:     127.0.0.1:39998 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [14]:
submit_result

In [15]:
assert isinstance(submit_result, sy.SyftSuccess), str(submit_result)

In [16]:
dockerfile_list = datasite_client.images.get_all()
dockerfile_list

INFO:     127.0.0.1:40004 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [17]:
assert len(datasite_client.images.get_all()) == 2

INFO:     127.0.0.1:40010 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [18]:
workerimage: SyftWorkerImage = None
for image in dockerfile_list:
    if not image.is_prebuilt and image.config.dockerfile == opendp_dockerfile_str:
        workerimage = image
        break

assert isinstance(workerimage, SyftWorkerImage), str(workerimage)

In [19]:
workerimage

```python
class SyftWorkerImage:
  id: str = 359deec955ac40e193722e5e0aaa61ab
  image_identifier: str = None
  image_hash: str = None
  created_at: str = 2024-08-03 21:48:19
  built_at: str = None
  config: str = FROM openmined/syft-backend:0.9.0-beta.5

RUN uv pip install opendp

```

#### Add Local Registry in Syft

In [20]:
registry_add_result = datasite_client.api.services.image_registry.add("localhost:5678")
registry_add_result

INFO:     127.0.0.1:40022 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [21]:
assert isinstance(registry_add_result, sy.SyftSuccess), str(registry_add_result)

In [22]:
images = datasite_client.api.services.image_registry.get_all()
assert len(images) == 1
images

INFO:     127.0.0.1:40030 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [23]:
local_registry = images[0]
local_registry

```python
class SyftImageRegistry:
  id: str = 5fda50b78df04ee9ac0218ef1c8771fa
  url: str = "localhost:5678"

```

In [24]:
assert isinstance(local_registry, SyftImageRegistry), str(local_registry)

#### Build Image

In [25]:
pull = False if syft_base_worker_tag == "local-dev" else True
pull

True

In [26]:
registry_uid = local_registry.id if running_as_container else local_registry.id

docker_build_result = datasite_client.api.services.worker_image.build(
    image_uid=workerimage.id,
    tag=docker_tag,
    registry_uid=registry_uid,
    pull_image=pull,
)
docker_build_result

INFO:     127.0.0.1:40046 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [27]:
workerimage.config.dockerfile

'FROM openmined/syft-backend:0.9.0-beta.5\n\nRUN uv pip install opendp'

In [28]:
assert isinstance(docker_build_result, sy.SyftSuccess), str(docker_build_result)

In [29]:
image_list = datasite_client.images.get_all()
image_list

INFO:     127.0.0.1:40050 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [30]:
for image in image_list:
    if image.id == workerimage.id:
        workerimage = (
            image  # we can also index with string using the repo_with_tag format
        )

if running_as_container:
    image_list[workerimage.built_image_tag]
    assert image_list[workerimage.built_image_tag] == workerimage

workerimage

```python
class SyftWorkerImage:
  id: str = 359deec955ac40e193722e5e0aaa61ab
  image_identifier: str = localhost:5678/openmined/custom-worker-opendp:1.0.0
  image_hash: str = None
  created_at: str = 2024-08-03 21:48:19
  built_at: str = None
  config: str = FROM openmined/syft-backend:0.9.0-beta.5

RUN uv pip install opendp

```

In [31]:
def get_image_hash(tag) -> str:
    client = docker.from_env()
    try:
        image = client.images.get(tag)
        return image.id
    except docker.errors.ImageNotFound:
        return None

In [32]:
if running_as_container:
    assert workerimage.image_hash == get_image_hash(
        workerimage.built_image_tag
    ), "Worker Image image_hash does not match with built image hash"

#### Push Image to Local Registry

In [33]:
push_result = None
if running_as_container:
    # stdlib
    from time import sleep

    local_registry_container.start()
    sleep(5)

    push_result = datasite_client.api.services.worker_image.push(workerimage.id)
    assert isinstance(push_result, sy.SyftSuccess), str(push_result)

In [34]:
push_result

In [35]:
if running_as_container:
    # third party
    import requests

    base_url = f"http://{workerimage.image_identifier.registry_host}"
    expected_tag = workerimage.image_identifier.tag
    search_tag = "openmined/custom-worker-opendp"

    repos = requests.get(f"{base_url}/v2/_catalog").json()["repositories"]
    tags = requests.get(f"{base_url}/v2/{search_tag}/tags/list").json()
    tags = tags["tags"]

    print(tags)

    assert (
        search_tag in repos
    ), f"'{search_tag}' not uploaded to local registry | {repos}"
    assert (
        expected_tag in tags
    ), f"'{search_tag}' with tag {expected_tag} not available | {tags}"

#### Delete locally built image to force pull from local registry

This should make the subsequent `worker_pool.launch` pull from registry at 'localhost:5678`

In [36]:
# stdlib
from time import sleep


def remove_local_image(tag):
    client = docker.from_env()
    try:
        client.images.remove(tag)
    except docker.errors.ImageNotFound:
        pass


if running_as_container:
    remove_local_image(workerimage.built_image_tag)

#### Create Worker Pool From Image

In [37]:
worker_pool_name = "opendp-pool"
worker_pool_res = datasite_client.api.services.worker_pool.launch(
    pool_name=worker_pool_name,
    image_uid=workerimage.id,
    num_workers=2,
)

INFO:     127.0.0.1:40064 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [38]:
assert len(worker_pool_res) == 2

In [39]:
for status in worker_pool_res:
    assert status.error is None
    if running_as_container:
        assert status.worker.image.image_hash == get_image_hash(
            workerimage.built_image_tag
        ), "Worker Pool Image image_hash does not match with built image hash"

In [40]:
worker_pool_list = datasite_client.worker_pools
worker_pool_list

INFO:     127.0.0.1:40080 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40086 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40102 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40112 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40128 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40140 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40142 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40146 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40150 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40156 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40164 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40174 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40188 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40198 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40206 - "POST /api/v2/api_ca

In [41]:
assert len(datasite_client.worker_pools.get_all()) == 2
worker_pool = None
for pool in worker_pool_list:
    if pool.name == worker_pool_name:
        worker_pool = pool
        break
assert worker_pool is not None
assert len(worker_pool.workers) == 2

INFO:     127.0.0.1:40236 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40242 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40244 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40256 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40264 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40270 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40284 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [42]:
# We can filter pools based on the image id upon which the pools were built
datasite_client.api.services.worker_pool.filter_by_image_id(image_uid=workerimage.id)

INFO:     127.0.0.1:40296 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40302 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40310 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40322 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40324 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40328 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40338 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40350 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40366 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40380 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40390 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40400 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40406 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40422 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40430 - "POST /api/v2/api_ca

In [43]:
# Delete the second worker
second_worker = worker_pool.workers[1]

INFO:     127.0.0.1:40460 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40464 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40476 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40480 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [44]:
second_worker

```python
class SyftWorker:
  id: str = 2a143b0064b44b6ab7f24df156d7d481
  name: str = "opendp-pool-2"
  container_id: str = None
  image: str = None
  status: str = WorkerStatus.RUNNING
  healthcheck: str = WorkerHealth.HEALTHY
  worker_pool_name: str = "opendp-pool"
  created_at: str = 2024-08-03 21:48:19

```

#### Get Worker Logs

In [45]:
raw_worker_logs = datasite_client.api.services.worker.logs(
    uid=second_worker.id,
    raw=True,
)
raw_worker_logs

INFO:     127.0.0.1:40482 - "POST /api/v2/api_call HTTP/1.1" 200 OK


b'Logs not implemented for In Memory Workers'

In [46]:
assert isinstance(raw_worker_logs, bytes)

In [47]:
worker_logs = datasite_client.api.services.worker.logs(
    uid=second_worker.id,
)
worker_logs

INFO:     127.0.0.1:40492 - "POST /api/v2/api_call HTTP/1.1" 200 OK


'Logs not implemented for In Memory Workers'

In [48]:
assert isinstance(worker_logs, str)

#### Delete Worker from Pool

In [49]:
worker_delete_res = datasite_client.api.services.worker.delete(
    uid=second_worker.id, force=True
)

INFO:     127.0.0.1:40506 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [50]:
worker_delete_res

In [51]:
assert isinstance(worker_delete_res, sy.SyftSuccess), str(worker_delete_res)

In [52]:
# Refetch the worker pool
# Ensure that the deleted worker's id is not present
for pool in datasite_client.api.services.worker_pool.get_all():
    if pool.name == worker_pool_name:
        worker_pool = pool
assert len(worker_pool.workers) == 1
for worker in worker_pool.workers:
    assert second_worker.id != worker.id

INFO:     127.0.0.1:40522 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40528 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40530 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40544 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40548 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [53]:
worker_pool

INFO:     127.0.0.1:40552 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40562 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40572 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40588 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40598 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40608 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40620 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40628 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40642 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40650 - "POST /api/v2/api_call HTTP/1.1" 200 OK


### Syft function

In [54]:
data = np.array([1, 2, 3])
data_action_obj = sy.ActionObject.from_obj(data)

data_pointer = data_action_obj.send(datasite_client)
data_pointer

INFO:     127.0.0.1:40666 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40674 - "POST /api/v2/api_call HTTP/1.1" 200 OK



**Pointer**

array([1, 2, 3])


In [55]:
@sy.syft_function(
    input_policy=sy.ExactMatch(x=data_pointer),
    output_policy=sy.SingleExecutionExactOutput(),
    worker_pool_name=worker_pool_name,
)
def custom_worker_func(x):
    # third party

    return {"y": x + 1}

INFO:     127.0.0.1:40686 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [56]:
custom_worker_func

```python
class SubmitUserCode:
  id: str = None
  func_name: str = "custom_worker_func"
  code: str = "@sy.syft_function(
    input_policy=sy.ExactMatch(x=data_pointer),
    output_policy=sy.SingleExecutionExactOutput(),
    worker_pool_name=worker_pool_name,
)
def custom_worker_func(x):
    # third party

    return {"y": x + 1}
"

```

In [57]:
assert custom_worker_func.worker_pool_name == worker_pool.name

In [58]:
request = datasite_client.code.request_code_execution(custom_worker_func)
request

INFO:     127.0.0.1:40688 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:43078 - "GET /api/v2/api?verify_key=ba46610f539d671d18f0ffa7775c829a7614bc10a201da7e4b0ae2065959a8be&communication_protocol=3 HTTP/1.1" 200 OK
INFO:     127.0.0.1:40702 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40710 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [59]:
datasite_client.requests[-1].approve(approve_nested=True)

INFO:     127.0.0.1:40724 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40732 - "POST /api/v2/api_call HTTP/1.1" 200 OK
Approving request on change custom_worker_func for datasite Evaluating PySyft
INFO:     127.0.0.1:40744 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [60]:
job = datasite_client.code.custom_worker_func(x=data_pointer, blocking=False)
job

INFO:     127.0.0.1:40760 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40768 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40776 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40780 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40796 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40798 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40806 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40816 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40822 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40832 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40842 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [61]:
worker_pool = datasite_client.worker_pools[worker_pool_name]
worker_pool

INFO:     127.0.0.1:40856 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40864 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40874 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40878 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40886 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40894 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40906 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40920 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40932 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40936 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40948 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [62]:
job.wait()

INFO:     127.0.0.1:40950 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40962 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40976 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40978 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40984 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:40994 - "POST /api/v2/api_call HTTP/1.1" 200 OK


  "cipher": algorithms.TripleDES,
  "class": algorithms.TripleDES,


INFO:     127.0.0.1:40998 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41000 - "POST /api/v2/api_call HTTP/1.1" 200 OK




INFO:     127.0.0.1:41006 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41012 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41014 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41020 - "POST /api/v2/api_call HTTP/1.1" 200 OK



**Pointer**

{'y': array([2, 3, 4])}


In [63]:
assert job.status.value == "completed"

In [64]:
job = datasite_client.jobs[-1]
job

INFO:     127.0.0.1:41032 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41034 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41036 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41044 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41052 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41060 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41076 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41084 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41092 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41106 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41120 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [65]:
job.job_worker_id

<UID: 4629df8835e9457787236a23b9541152>

In [66]:
# Disabling it due to Race Condition Error
# assert job.job_worker_id is not None

In [67]:
# Sleeping so that consumer state is updated
time.sleep(5)

In [69]:
# Once the work is done by the worker, its state is returned to idle again.
consuming_worker_is_now_idle = False
for worker in datasite_client.worker_pools[worker_pool_name].workers:
    if worker.id == job.job_worker_id:
        consuming_worker_is_now_idle = worker.consumer_state.value.lower() == "idle"

assert consuming_worker_is_now_idle is True

INFO:     127.0.0.1:33946 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:33958 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:33962 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [70]:
# Validate the result received from the syft function
result = job.wait().get()
result_matches = result["y"] == data + 1
assert result_matches.all()

INFO:     127.0.0.1:41294 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41300 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41302 - "POST /api/v2/api_call HTTP/1.1" 200 OK


#### Worker Image

In [71]:
# delete the remaining workers
for worker in worker_pool.workers:
    res = datasite_client.api.services.worker.delete(
        uid=worker.id,
    )
    assert isinstance(res, sy.SyftSuccess), str(res)

INFO:     127.0.0.1:41308 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41324 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41326 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [72]:
delete_res = datasite_client.api.services.worker_image.remove(workerimage.id)
delete_res

INFO:     127.0.0.1:41342 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [73]:
# Since the containers are delete, we should be able to delete the image
assert isinstance(delete_res, sy.SyftSuccess), str(delete_res)

In [74]:
if running_as_container:
    local_registry_container.teardown()

#### Worker Pool and Image Creation Request/Approval

In [75]:
custom_dockerfile_str_2 = f"""
FROM openmined/syft-backend:{syft_base_worker_tag}

RUN uv pip install opendp
""".strip()

docker_config_2 = sy.DockerWorkerConfig(dockerfile=custom_dockerfile_str_2)

In [76]:
submit_result = datasite_client.api.services.worker_image.submit(
    worker_config=docker_config_2
)
submit_result

INFO:     127.0.0.1:41358 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [77]:
datasite_client.images

INFO:     127.0.0.1:41362 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [78]:
# get the image that's not built
workerimage_2 = None
for im in datasite_client.images:
    if im.config == docker_config_2:
        workerimage_2 = im

INFO:     127.0.0.1:41366 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41372 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41382 - "POST /api/v2/api_call HTTP/1.1" 200 OK


##### Build image first then create pool

In [79]:
docker_tag_2 = "openmined/custom-worker-opendp:latest"

docker_build_result = datasite_client.api.services.worker_image.build(
    image_uid=workerimage_2.id,
    tag=docker_tag_2,
    pull=pull,
)
docker_build_result

In [80]:
opendp_pool_name = "second-opendp-pool"
pool_create_request = datasite_client.api.services.worker_pool.pool_creation_request(
    pool_name=opendp_pool_name, num_workers=2, image_uid=workerimage_2.id
)
pool_create_request

INFO:     127.0.0.1:41386 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41400 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [81]:
assert len(pool_create_request.changes) == 1

In [82]:
# get the pending request and approve it
req_result = pool_create_request.approve()
req_result

Approving request for datasite Evaluating PySyft
INFO:     127.0.0.1:41406 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [83]:
assert isinstance(req_result, sy.SyftSuccess), str(req_result)

In [84]:
datasite_client.worker_pools[opendp_pool_name]

INFO:     127.0.0.1:41422 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41428 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41444 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41452 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41468 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41480 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41494 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41498 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41514 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41518 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41520 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41536 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41538 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41550 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41552 - "POST /api/v2/api_ca

In [85]:
assert datasite_client.worker_pools[opendp_pool_name]
assert len(datasite_client.worker_pools[opendp_pool_name].workers) == 2

INFO:     127.0.0.1:41598 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41614 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41628 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41642 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41646 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:41660 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [86]:
# default, opendp-pool, second-opendp-pool
assert len(datasite_client.worker_pools.get_all()) == 3

INFO:     127.0.0.1:41676 - "POST /api/v2/api_call HTTP/1.1" 200 OK


Remove all `second-opendp-pool` workers

In [87]:
for worker in datasite_client.worker_pools["second-opendp-pool"].workers:
    res = datasite_client.api.services.worker.delete(uid=worker.id, force=True)
    assert isinstance(res, sy.SyftSuccess), str(res)

assert len(datasite_client.worker_pools["second-opendp-pool"].workers) == 0

INFO:     127.0.0.1:52766 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52774 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52788 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52790 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52800 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52810 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52820 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52824 - "POST /api/v2/api_call HTTP/1.1" 200 OK


Remove the `second-opendp-pool`'s worker image

In [88]:
delete_res = datasite_client.api.services.worker_image.remove(workerimage_2.id)
delete_res

INFO:     127.0.0.1:52832 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [89]:
# Since the containers are delete, we should be able to delete the image
assert isinstance(delete_res, sy.SyftSuccess), str(delete_res)

##### Request to build the image and create the pool at the same time

In [90]:
custom_dockerfile_str_3 = f"""
FROM openmined/syft-backend:{syft_base_worker_tag}

RUN uv pip install recordlinkage
""".strip()

docker_config_3 = sy.DockerWorkerConfig(dockerfile=custom_dockerfile_str_3)

docker_tag_3 = "openmined/custom-worker-recordlinkage:latest"

In [91]:
recordlinkage_pool_name = "recordlinkage-pool"
pool_image_create_request = (
    datasite_client.api.services.worker_pool.create_image_and_pool_request(
        pool_name=recordlinkage_pool_name,
        num_workers=2,
        tag=docker_tag_3,
        config=docker_config_3,
        reason="I want to do some more cool data science with PySyft and recordlinkage",
        pull_image=pull,
    )
)
pool_image_create_request

INFO:     127.0.0.1:52838 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52846 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [92]:
assert len(pool_image_create_request.changes) == 2
assert pool_image_create_request.changes[0].config == docker_config_3
assert pool_image_create_request.changes[1].num_workers == 2
assert pool_image_create_request.changes[1].pool_name == recordlinkage_pool_name

In [93]:
# get the pending request and approve it
req_result = pool_image_create_request.approve()
req_result

Approving request for datasite Evaluating PySyft
INFO:     127.0.0.1:52858 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [94]:
assert isinstance(req_result, sy.SyftSuccess), str(req_result)

In [95]:
# Get updated request object and status
for req in datasite_client.requests:
    if req.id == pool_image_create_request.id:
        pool_image_create_request = req

assert pool_image_create_request.status.value == 2

INFO:     127.0.0.1:52868 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52882 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52892 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52908 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [96]:
image_exists = False
recordlinkage_image = None

for im in datasite_client.images.get_all():
    if im.image_identifier and im.image_identifier.repo_with_tag == docker_tag_3:
        image_exists = True
        recordlinkage_image = im
assert image_exists
assert recordlinkage_image
recordlinkage_image

INFO:     127.0.0.1:52910 - "POST /api/v2/api_call HTTP/1.1" 200 OK


```python
class SyftWorkerImage:
  id: str = 1b281e86049a4bb2af4f27519a0a1220
  image_identifier: str = docker.io/openmined/custom-worker-recordlinkage:latest
  image_hash: str = None
  created_at: str = 2024-08-03 21:48:19
  built_at: str = None
  config: str = FROM openmined/syft-backend:0.9.0-beta.5

RUN uv pip install recordlinkage

```

In [97]:
recordlinkage_pool = datasite_client.worker_pools[recordlinkage_pool_name]

assert recordlinkage_pool
assert len(recordlinkage_pool.workers) == 2

INFO:     127.0.0.1:52926 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52928 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52930 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52942 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52950 - "POST /api/v2/api_call HTTP/1.1" 200 OK


Cleanup `recordlinkage-pool` workers

In [98]:
for worker in recordlinkage_pool.workers:
    res = datasite_client.api.services.worker.delete(uid=worker.id, force=True)
    assert isinstance(res, sy.SyftSuccess), str(res)

INFO:     127.0.0.1:52966 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52974 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52982 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52988 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:52998 - "POST /api/v2/api_call HTTP/1.1" 200 OK
INFO:     127.0.0.1:53006 - "POST /api/v2/api_call HTTP/1.1" 200 OK


Cleanup `recordlinkage-pool`'s image

In [99]:
delete_res = datasite_client.api.services.worker_image.remove(recordlinkage_image.id)
delete_res

INFO:     127.0.0.1:53010 - "POST /api/v2/api_call HTTP/1.1" 200 OK


In [100]:
datasite.land()

Stopping Evaluating PySyft


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [7583]


## Reference:
https://github.com/OpenMined/PySyft/tree/dev/notebooks/api/0.8