In [82]:
import uuid
import msgpack
from abc import ABC, abstractmethod

class NodeFactory():
    def __init__(self, server):
        self.server = server
        self.rootid = None
        
    def get_root(self):
        self.rootid = msgpack.unpackb(self.server.request({'request': 'send_root'}))
        return self.rootid
        
    def get_header(self, uid):
        return msgpack.unpackb(self.server.request({'request': 'send_header', 'uid': uid}))
        
    def get_data(self, uid, **kwargs):
        return msgpack.unpackb(self.server.request({'request': 'send_data', 'uid': uid, 'kwargs': kwargs}))
        
    def get_edges(self, uid, **kwargs):
        return msgpack.unpackb(self.server.request({'request': 'send_edges', 'uid': uid, 'kwargs': kwargs}))
    
    def __call__(self, uid):
        # I'll make this dynamic later.
        # Can use entrypoints or importlib to discover node classes.
        node = BlueskyNode(uid=uid,
                          edges=self.get_edges(uid),
                          meta=self.get_header(uid),
                          data=(self.get_data(uid, i) for i in range(10)))
        return node


class BaseNode(ABC):
    """
    This is an abstract base class that defines the required methods and attributes for a node in the graph.
    This class is deigned to access data locally as well as remotely.
    """
    def __init__(self, uid=None, edges=None, meta=None, data=None, server=None):
        self.uid = uid or str(uuid.uuid4())
        self.edges = edges or {}
        self.info = {'module': 'Base', 'class': 'BaseNode', 'version': '0.0.1'}
        self.meta = meta
        self.data = data
        if server:
            self.factory = NodeFactory(server)
        else:
            self.factory = None
        
    @abstractmethod
    def read(self):
        """
        Convert raw data in self.data to a python object.
        """
        pass

    @abstractmethod
    def load_meta(self):
        """
        Load header data from the datasource.
        """
        pass
    
    @abstractmethod
    def load_data(self):
        """
        This method loads the data from the source to self.data
        self.data is a list of partitions, each partition must be serializable.
        """
        pass
    
    def send_meta(self, **kwargs):
        """
        Sends header data to client.
        """
        self.load_meta()
        return self.meta
    
    def send_data(self, index, **kwargs):
        """
        Sends the data partitions to the client.
        """
        if self.data is None:
            self.load_data()
        return self.data[index]
    
    def send_edges(self, **kwargs):
        """
        Send the list of edges to the client.
        """
        return {**{key:value.uid for (key, value) in self.edges.items() if isinstance(value, BaseNode)},
                **{key:value for (key, value) in self.edges.items() if not isinstance(value, BaseNode)}}
        
    def __getitem__(self, edge):
        """
        Access adjacent nodes.
        On the server side edges are the actual adjacent nodes.
        On the client side edges are the uids of the adjacent nodes.
        On the client side the factory access the server for the data to construct the node locally.
        """
        if not isinstance(edge, BaseNode):
            self.edges[edge] = self.factory(edge)
        return self.edges[edge]

In [89]:
import numpy

class NumpyTestNode(BaseNode):  
    def read(self):
        # Convert data to a different format.
        if self.data is None:
            self.load_data()
        flat_data = [item for sublist in self.data for item in sublist]
        return numpy.array(flat_data)
    
    def load_meta(self):
        # Load header data from a data source.
        self.meta.update({'field1': 'testing'})
    
    def load_data(self):
        self.data = [list(range(10)) for i in range(100)]

In [None]:
class ServerSim:
    def __init__(self, root_node):
        self.root = root_node
        self.nodes = {'root_node.id': root_node}
        
    def request(self, request):
        if request['request'] == 'send_root':
            return msgpack.packb(self.send_root())
        else:
            return msgpack.packb(getattr(self.nodes[request['uid']], request['request'])(**request['kwargs']))
    
    def send_root(self):
        return self.root.uid

## Remote mode: start a server, and get the root on the client side

In [None]:
# Start a server with the graph equal root_node
root_node = BlueskyNode()
server = ServerSim(root_node)

# Create a local copy of the node.
local_copy = BaseNode(server)

# Local mode: just build your graph locally, and access it locally

In [90]:
root_node = NumpyTestNode()
root_node.read()

array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
       2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3,
       4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5,
       6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,
       8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
       0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
       2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3,
       4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5,
       6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7,
       8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
       0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1,
       2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3,
       4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5,
       6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0,