Skip to content

Commit

Permalink
Begin work on Eventual message and service type
Browse files Browse the repository at this point in the history
  • Loading branch information
iamtrask committed Jul 29, 2020
1 parent 32932e3 commit f3a825d
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 29 deletions.
115 changes: 111 additions & 4 deletions examples/Dev.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"the old_message is for me!!!\n"
"the message is for me!!!\n"
]
}
],
Expand All @@ -385,7 +385,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"the old_message is for me!!!\n"
"the message is for me!!!\n"
]
}
],
Expand All @@ -402,7 +402,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"the old_message is for me!!!\n"
"the message is for me!!!\n"
]
}
],
Expand Down Expand Up @@ -445,7 +445,7 @@
{
"data": {
"text/plain": [
"ObjectStore:{723: tensor([1, 2, 3, 4]), 594: tensor([2, 4, 6, 8]), 748: tensor([ 4, 8, 12, 16])}"
"ObjectStore:{172: tensor([1, 2, 3, 4]), 958: tensor([2, 4, 6, 8])}"
]
},
"execution_count": 8,
Expand All @@ -457,6 +457,113 @@
"alice.store"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"the old_message is for me!!!\n"
]
},
{
"data": {
"text/plain": [
"tensor([2, 4, 6, 8])"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"y.get()"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Exception ignored in: <function Pointer.__del__ at 0x7fd408895430>\n",
"Traceback (most recent call last):\n",
" File \"/Users/atrask/Dropbox/Laboratory/openmined/PySyft/PySyft/src/syft/core/pointer/pointer.py\", line 27, in __del__\n",
" self.location.send_eventual_msg_without_reply(msg=obj_msg)\n",
" File \"/Users/atrask/Dropbox/Laboratory/openmined/PySyft/PySyft/src/syft/decorators/syft_decorator.py\", line 18, in wrapper\n",
" return function(*args, **kwargs)\n",
" File \"/Users/atrask/Dropbox/Laboratory/openmined/PySyft/PySyft/src/syft/decorators/typecheck.py\", line 78, in decorator\n",
" return typechecked(decorated)(*args, **kwargs)\n",
" File \"/Users/atrask/opt/anaconda3/envs/syft/lib/python3.8/site-packages/typeguard/__init__.py\", line 840, in wrapper\n",
" retval = func(*args, **kwargs)\n",
" File \"/Users/atrask/Dropbox/Laboratory/openmined/PySyft/PySyft/src/syft/core/nodes/common/client.py\", line 57, in send_eventual_msg_without_reply\n",
" return self.routes[0].send_eventual_msg_without_reply(msg=msg)\n",
" File \"/Users/atrask/Dropbox/Laboratory/openmined/PySyft/PySyft/src/syft/core/io/route.py\", line 136, in send_eventual_msg_without_reply\n",
" self.connection.send_msg_without_reply(msg=msg)\n",
" File \"/Users/atrask/Dropbox/Laboratory/openmined/PySyft/PySyft/src/syft/core/io/virtual.py\", line 41, in send_msg_without_reply\n",
" self.server.recv_msg_without_reply(msg=msg)\n",
" File \"/Users/atrask/Dropbox/Laboratory/openmined/PySyft/PySyft/src/syft/decorators/syft_decorator.py\", line 18, in wrapper\n",
" return function(*args, **kwargs)\n",
" File \"/Users/atrask/Dropbox/Laboratory/openmined/PySyft/PySyft/src/syft/decorators/typecheck.py\", line 78, in decorator\n",
" return typechecked(decorated)(*args, **kwargs)\n",
" File \"/Users/atrask/opt/anaconda3/envs/syft/lib/python3.8/site-packages/typeguard/__init__.py\", line 840, in wrapper\n",
" retval = func(*args, **kwargs)\n",
" File \"/Users/atrask/Dropbox/Laboratory/openmined/PySyft/PySyft/src/syft/core/io/virtual.py\", line 31, in recv_msg_without_reply\n",
" self.node.recv_msg_without_reply(msg=msg)\n",
" File \"/Users/atrask/Dropbox/Laboratory/openmined/PySyft/PySyft/src/syft/decorators/syft_decorator.py\", line 18, in wrapper\n",
" return function(*args, **kwargs)\n",
" File \"/Users/atrask/Dropbox/Laboratory/openmined/PySyft/PySyft/src/syft/decorators/typecheck.py\", line 78, in decorator\n",
" return typechecked(decorated)(*args, **kwargs)\n",
" File \"/Users/atrask/opt/anaconda3/envs/syft/lib/python3.8/site-packages/typeguard/__init__.py\", line 839, in wrapper\n",
" check_argument_types(memo)\n",
" File \"/Users/atrask/opt/anaconda3/envs/syft/lib/python3.8/site-packages/typeguard/__init__.py\", line 700, in check_argument_types\n",
" raise exc from None\n",
" File \"/Users/atrask/opt/anaconda3/envs/syft/lib/python3.8/site-packages/typeguard/__init__.py\", line 698, in check_argument_types\n",
" check_type(description, value, expected_type, memo)\n",
" File \"/Users/atrask/opt/anaconda3/envs/syft/lib/python3.8/site-packages/typeguard/__init__.py\", line 620, in check_type\n",
" raise TypeError(\n",
"TypeError: type of argument \"msg\" must be syft.core.message.ImmediateSyftMessageWithoutReply; got syft.core.nodes.common.action.garbage_collect_object_action.GarbageCollectObjectAction instead\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Deleted:<syft.ast.klass.torch.TensorPointer object at 0x7fd41858a460>\n"
]
}
],
"source": [
"del xp"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"ObjectStore:{669: tensor([1, 2, 3, 4])}"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"alice.store"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
8 changes: 6 additions & 2 deletions src/syft/core/io/route.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
from typing import Set
from syft.core.message import SyftMessageWithReply
from syft.core.message import SyftMessageWithoutReply
from syft.core.message import EventualSyftMessageWithoutReply
from .connection import ClientConnection
from typing import List

Expand Down Expand Up @@ -128,8 +129,11 @@ def __init__(self, source: Location, destination: Location, connection: ClientCo
self.schema = RouteSchema(source=source, destination=destination)
self.connection = connection

def send_msg_without_reply(self, msg:SyftMessageWithoutReply) -> None:
def send_msg_without_reply(self, msg: SyftMessageWithoutReply) -> None:
self.connection.send_msg_without_reply(msg=msg)

def send_eventual_msg_without_reply(self, msg: EventualSyftMessageWithoutReply) -> None:
self.connection.send_msg_without_reply(msg=msg)

def send_msg_with_reply(self, msg:SyftMessageWithReply) -> SyftMessageWithoutReply:
def send_msg_with_reply(self, msg: SyftMessageWithReply) -> SyftMessageWithoutReply:
return self.connection.send_msg_with_reply(msg=msg)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .common import EventualSyftMessageWithoutReply


class GarbageCollectObjectAction(EventualSyftMessageWithoutReply):
def __init__(self, obj_id, address, msg_id=None):
super().__init__(address=address, msg_id=msg_id)
self.obj_id = obj_id
5 changes: 5 additions & 0 deletions src/syft/core/nodes/common/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from syft.core.message import ImmediateSyftMessageWithReply
from syft.core.message import ImmediateSyftMessageWithoutReply
from syft.core.message import EventualSyftMessageWithoutReply
from ....decorators import syft_decorator
from ....common.id import UID
from ...io.address import Address
Expand Down Expand Up @@ -51,6 +52,10 @@ def send_msg_with_reply(self, msg: ImmediateSyftMessageWithReply) -> ImmediateSy
def send_msg_without_reply(self, msg: ImmediateSyftMessageWithoutReply) -> None:
return self.routes[0].send_msg_without_reply(msg=msg)

@syft_decorator(typechecking=True)
def send_eventual_msg_without_reply(self, msg: EventualSyftMessageWithoutReply) -> None:
return self.routes[0].send_eventual_msg_without_reply(msg=msg)

@syft_decorator(typechecking=True)
def __repr__(self) -> str:
return f"<Client pointing to node with id:{self.node_id}>"
82 changes: 67 additions & 15 deletions src/syft/core/nodes/common/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ....util import get_subclasses
from syft.core.message import SyftMessage
from syft.core.message import ImmediateSyftMessageWithReply
from syft.core.message import EventualSyftMessageWithoutReply
from syft.core.message import ImmediateSyftMessageWithoutReply

# CORE IMPORTS
Expand All @@ -22,8 +23,9 @@
from .service.child_node_lifecycle_service import ChildNodeLifecycleService
from .client import Client
from .service.heritage_update_service import HeritageUpdateService
from .service.obj_action_service import ObjectActionServiceWithoutReply
from .service.obj_action_service import ObjectActionServiceWithReply
from .service.obj_action_service import ImmediateObjectActionServiceWithoutReply
from .service.obj_action_service import EventualObjectActionServiceWithoutReply
from .service.obj_action_service import ImmediateObjectActionServiceWithReply
from ...io.address import Address
from ...io.virtual import create_virtual_connection
from ...io.route import SoloRoute
Expand All @@ -34,6 +36,7 @@

from .location_aware_object import LocationAwareObject


class Node(AbstractNode, LocationAwareObject):

"""
Expand Down Expand Up @@ -109,21 +112,28 @@ def __init__(self, name: str = None, address: Address = None):
# which addresses that old_message
self.msg_without_reply_router = {}

# for services which return a reply
self.services_with_reply = list()

# for services which do not return a reply
self.services_without_reply = list()
# for messages which don't need to be run right now
# and will not generate a reply.
self.eventual_msg_without_reply_router = {}

# This is the list of services which all nodes support.
# You can read more about them by reading their respective
# class documentation.
self.services_without_reply.append(ReprService)
self.services_without_reply.append(HeritageUpdateService)
self.services_without_reply.append(ChildNodeLifecycleService)
self.services_without_reply.append(ObjectActionServiceWithoutReply)

self.services_with_reply.append(ObjectActionServiceWithReply)
# for services which run immediately and do not return a reply
self.immediate_services_without_reply = list()
self.immediate_services_without_reply.append(ReprService)
self.immediate_services_without_reply.append(HeritageUpdateService)
self.immediate_services_without_reply.append(ChildNodeLifecycleService)
self.immediate_services_without_reply.append(ImmediateObjectActionServiceWithoutReply)

# for services which run immediately and return a reply
self.immediate_services_with_reply = list()
self.immediate_services_with_reply.append(ImmediateObjectActionServiceWithReply)

# for services which can run at a later time and do not return a reply
self.eventual_services_without_reply = list()
self.eventual_services_without_reply.append(EventualObjectActionServiceWithoutReply)

# This is a special service which cannot be listed in any
# of the other services because it handles messages of all
Expand Down Expand Up @@ -191,7 +201,7 @@ def recv_msg_with_reply(self, msg: ImmediateSyftMessageWithReply) -> ImmediateSy
def recv_msg_without_reply(self, msg: ImmediateSyftMessageWithoutReply) -> None:

if self.message_is_for_me(msg):
print("the old_message is for me!!!")
print("the message is for me!!!")
try: # we use try/except here because it's marginally faster in Python

self.msg_without_reply_router[type(msg)].process(node=self, msg=msg)
Expand All @@ -208,6 +218,32 @@ def recv_msg_without_reply(self, msg: ImmediateSyftMessageWithoutReply) -> None:

raise e

else:
print("the message is not for me...")
self.message_without_reply_forwarding_service.process(node=self, msg=msg)


@syft_decorator(typechecking=True)
def recv_eventual_msg_without_reply(self, msg: EventualSyftMessageWithoutReply) -> None:

if self.message_is_for_me(msg):
print("the old_message is for me!!!")
try: # we use try/except here because it's marginally faster in Python

self.eventual_msg_without_reply_router[type(msg)].process(node=self, msg=msg)

except KeyError as e:

if type(msg) not in self.eventual_msg_without_reply_router:
raise KeyError(
f"The node {self.id} of type {type(self)} cannot process messages of type "
+ f"{type(msg)} because there is no service running to process it."
)

self.ensure_services_have_been_registered_error_if_not()

raise e

else:
print("the old_message is not for me...")
self.message_without_reply_forwarding_service.process(node=self, msg=msg)
Expand All @@ -230,7 +266,7 @@ def _register_services(self) -> None:
correspond to it, but each old_message type can only have one service (per node
subclass) which corresponds to it."""

for s in self.services_with_reply:
for s in self.immediate_services_with_reply:
# Create a single instance of the service to cache in the router corresponding
# to one or more old_message types.
service_instance = s()
Expand All @@ -244,7 +280,7 @@ def _register_services(self) -> None:
for handler_type_subclass in get_subclasses(obj_type=handler_type):
self.msg_with_reply_router[handler_type_subclass] = service_instance

for s in self.services_without_reply:
for s in self.immediate_services_without_reply:
# Create a single instance of the service to cache in the router corresponding
# to one or more old_message types.
service_instance = s()
Expand All @@ -260,6 +296,22 @@ def _register_services(self) -> None:
handler_type_subclass
] = service_instance

for s in self.eventual_services_without_reply:
# Create a single instance of the service to cache in the router corresponding
# to one or more old_message types.
service_instance = s()
for handler_type in s.message_handler_types():

# for each explicitly supported type, add it to the router
self.eventual_msg_without_reply_router[handler_type] = service_instance

# for all sub-classes of the explicitly supported type, add them
# to the router as well.
for handler_type_subclass in get_subclasses(obj_type=handler_type):
self.eventual_msg_without_reply_router[
handler_type_subclass
] = service_instance

# Set the services_registered flag to true so that we know that all services
# have been properly registered. This mostly exists because someone might
# accidentally delete (forget to call) this method inside the __init__ function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def message_handler_types() -> List[type]:

class MessageWithReplyForwardingService(ImmediateNodeServiceWithReply):
@syft_decorator(typechecking=True)
def process(self, node: AbstractNode, msg: ImmediateSyftMessageWithReply) -> None:
def process(self, node: AbstractNode, msg: ImmediateSyftMessageWithReply) -> ImmediateSyftMessageWithoutReply:

addr = msg.address
pri_addr = addr.pri_address
Expand Down

0 comments on commit f3a825d

Please sign in to comment.