In [None]:
# from syft_rds import *

In [None]:
from __future__ import annotations

from syft_rds.db.db import StorableItemRegistry

StorableItemRegistry.items

In [None]:
from syft_rds.models.dataset import Dataset, DatasetCreate

In [None]:
from syft_event import SyftEvents
from syft_rds.api.api import API
from syft_core import Client
from syft_rds.service.dataset_service import DatasetService
from syft_rds.connection.connection import get_connection
from syft_rds.service.context import BaseRPCContext
from syft_rds.models.job import JobCreate
from syft_rds.models.job import Job
from syft_rds.models.action_object import ActionObject
import shutil

In [None]:
from syft_rds.db.db import StorableItemRegistry

StorableItemRegistry.items

In [None]:
client = Client.load()

In [None]:
# input policy: constrain the inputs of a syftfunction
# output policy: How do we modify the outputs of a syftfunction, how many outputs can you create

In [None]:
from uuid import UUID
from syft_core import SyftBoxURL
from syft_rds.models.code import CodeCreate, Code
from syft_rds.models.request import Request, RequestCreate
from syft_rds.service.code_service import CodeService
from syft_rds.service.job_service import JobService
from syft_rds.service.request_service import RequestService


box = SyftEvents("my-rds-app")

# TODO: pass this in the handler
context = BaseRPCContext(client=client, box=box)


@box.on_request("/apis/list")
def get_apis() -> list[str]:
    """Respond to a ping request."""
    return {
        SyftBoxURL.from_path(k, client.workspace): v
        for k, v in box._SyftEvents__rpc.items()
    }


@box.on_request("/datasets/create")
def create_dataset(dataset: DatasetCreate) -> Dataset:
    """Respond to a ping request."""
    dataset_service = DatasetService.from_context(context)
    res = dataset_service.create_item(dataset)
    return res


@box.on_request("/datasets/list")
def list_datasets() -> list[Dataset]:
    """Respond to a ping request."""
    dataset_service = DatasetService.from_context(context)
    res = dataset_service.list_items()
    return res


@box.on_request("/code/create")
def create_code(code: CodeCreate) -> Code:
    """Respond to a ping request."""
    code_service = CodeService.from_context(context)
    res = code_service.create_item(code)
    return res


@box.on_request("/requests/create")
def create_request(request: RequestCreate) -> Request:
    """Respond to a ping request."""
    request_service = RequestService.from_context(context)
    res = request_service.create_item(request)
    return res


@box.on_request("/requests/approve")
def approve_request(request_id: str) -> Request:
    # TODO, fix serialization for UUIDs
    request_id = UUID(request_id)
    """Respond to a ping request."""
    request_service = RequestService.from_context(context)
    res = request_service.approve_request(request_id)
    return res


@box.on_request("/jobs/spawn")
def spawn_job(job: JobCreate) -> Job:
    # todo: maybe we dont need jobcreate and we can just pass in the args (need to fix serialization)
    """Respond to a ping request."""
    job_service = JobService.from_context(context)
    res = job_service.spawn_job(job)
    return res


@box.on_request("/jobs/get_result")
def get_job_result(job_id: str) -> ActionObject:
    job_id = UUID(job_id)
    job_service = JobService.from_context(context)
    res = job_service.get_job_result(context=context, job_id=job_id)
    return res

In [None]:
app_data_dir = client.my_datasite / "apps" / context.box.app_name
shutil.rmtree(app_data_dir, ignore_errors=True)

In [None]:
conn = get_connection(box, mock=True)
api = API.from_email(client.email, conn)

In [None]:
api.datasets.create(body=DatasetCreate(name="my-dataset", description="my dataset"))

In [None]:
datasets = api.datasets.list()

In [None]:
dataset = datasets[0]

In [None]:
code = api.code.create(body=CodeCreate(code_str="my-code"))

In [None]:
request = api.requests.create(
    body=RequestCreate(code_id=code.uid, kwargs={"y": dataset.uid})
)

In [None]:
request

In [None]:
api.requests.approve(body=str(request.uid))  # TODO: fix serialization for UUIDs

In [None]:
job = api.jobs.spawn(body=JobCreate(code_id=code.uid, kwargs={"y": dataset.uid}))

In [None]:
job

## Run the job (mock executor)

In [None]:
class Executor:
    def __init__(self, client: Client, context: BaseRPCContext):
        self.client = client
        self.context = context

    def write_result(self, res_obj: ActionObject):
        res_path = self.context.item_dir(ActionObject) / f"{res_obj.uid}.json"
        res_path.parent.mkdir(parents=True, exist_ok=True)
        res_path.write_text(res_obj.model_dump_json())

    def execute_code(self):
        job_files = list((self.context.item_dir(Job)).glob("*.json"))
        job = Job.model_validate_json(job_files[0].read_text())
        code_id = job.code_id
        code_files = self.context.item_dir(Code).glob("*.json")
        code_file = [c for c in code_files if c.name == f"{code_id}.json"][0]
        Code.model_validate_json(code_file.read_text())
        # execute code here
        res = 1
        res_obj = ActionObject(syft_action_data=res, uid=job.result_id)
        self.write_result(res_obj)

In [None]:
executor = Executor(client, context)
executor.execute_code()

# Back to client side

In [None]:
api.jobs.get_result(body=str(job.uid))

In [None]:
job.result_id

# Appendix

In [None]:
res = conn.send(
    url=f"syft://{client.email}/api_data/my-rds-app/rpc/apis/list",
    body={},
    expiry="5m",
    cache=True,
)

In [None]:
res

In [None]:
res = conn.send(
    url=f"syft://{client.email}/api_data/my-rds-app/rpc/datasets/create",
    body=DatasetCreate(name="my-dataset", description="my dataset"),
    expiry="5m",
    cache=True,
)

In [None]:
res