# toy graph

* Create a basic data node.
* Apply a basic data transformation.

## check point:

* Initialize checkpoint facility in execution context.
* Reconcile checkpoint state when building operation.

## non-trivial data flow

* employ some looping and parallel data flow

The essential difference between an abstract graph and a concrete graph is that
the concrete graph has definite and immutable inputs and outputs, whereas an
abstract graph may have placeholder inputs and outputs that define connectivity
and high-level relationships. This may not be a functional distinction, but
just a usage distinction, if nodes may generally be used to prototype equivalent
nodes with different inputs, and if running nodes can cause additional nodes
to be generated.

Checkpoint abstraction should come into play when deserializing the concrete graph.
Artifacts associated with prior execution may or may not exist. Nodes will be 
initialized in an appropriate state.

In [None]:
import os

In [None]:
class WorkerContext:
    """Simple data staging context with file-backed checkpoints."""
    def __init__(self, working_directory='.'):
        self.storage_system = os.path.abspath(working_directory)
        # TODO: On initialization, we should check for a clean and usable working directory.
        # TODO: We can also preemptively discover existing checkpoint state.

In [None]:
def add_node(context, node):
    """Register an operation node with the Context.
    
    Control interface. Used by the execution manager to add tasks to the worker.
    
    Arguments:
        context: Translates work into concrete tasks and manages the resources for execution and data.
        node: portable representation of a unit of work.
    """
    builder = node_builder(context, node)
    

In [None]:
import os

from serialization import Integer64

# Question: where can/should we optimize data store and checkpoint writers to
#  reduce data transformations? We need to be able to write blocks of serialized
#  data, but we should allow optimizations for known data formats.

class DataStore:
    """Abstract interface for stateful data resources."""
    class ReadHandle:
        """DataStore handle open for read-only access."""

    class WriteHandle:
        """DataStore handle open for writing."""

    def open(self, mode):
        # Check file state.
        # If safe, open file and return handle.
        pass

    def close(self):
        # Close and release filehandle, but retain access to the backing resources
        # and maximize the ability to reopen the file through a later call.
        pass

In [None]:
class CheckpointPublisher:
    """Update checkpoint data store with published state.

    Provide a standard publishing interface for arbitrary backing store.

    Instances are composed for the state data structures of distinct operations.
    """


class CheckpointFacility:
    """Manage file-backed checkpoint state for a concrete node.
    """
    def __init__(self, context, node):
        self.uid = get_uid(node)
        self.datastore = get_datastore(context, node)

    def write(self, state):
        """Update the checkpoint with new state."""

    def load(self, receiver):
        """Apply the checkpoint state to receiver.

        This aspect will evolve to a Director that can be applied to an
        operation node builder.
        """

In [None]:
def get_filesystem(context):
    """Get the root filesystem node managed by the Context."""
    return context.storage_system


def get_store_type(context, node):
    # In a given Context, we could choose a file container type based on data
    # type and/or local configuration, such as JSON, pickle, numpy, HDF5, XDR, etc.
    return 'numpy'


def lock_store(store, owner):
    """Get data store handle for owner.

    Assert ownership of the data store. If the store has no declared owner,
    take ownership. If the store has a declared owner, confirm the ownership
    matches.

    Raises:
        OwnershipError: if ownership cannot be established.
        ValueError: if *store* or *owner* are invalid.

    Returns:
        DataStore handle, with ownership established.

    Warning:
        It is the responsibility of the calling framework to release ownership
        of the data store, even in the event of exceptional termination. We will
        not bother to make this more robust, because the locking will move to
        the level of the Context resources as part of the Session context manager.
    """

In [None]:
def get_datastore(context, node):
    """Get the storage handle for a node in this context.

    If there is not yet a storage resource for the node, initialize one.
    """
    # Design roadmap
    # 1. Initialize or discover valid data store.
    #    * create initialized data store in temporary directory with ownership claim.
    #    * try to move to canonical location.
    #    * if canonical location already exists, reinitialize from existing store
    #      and remove temporary data store.
    # 2. Take ownership of valid data store.
    #    * if canonical location already exists, attempt to lock (create lock directory with ownership metadata)
    #    * if lock already exists, produce error output. else, reinitialize from store.
    # 3. Remove lock information when WorkerContext releases DataStore handle.

    import json
    import numpy
    import tempfile

    storage_system = get_filesystem(context)

    uid = node_uid(node)

    store_type = get_store_type(context, node)

    # 1. Create an initialized data store in a temporary directory.
    # Create an initialized data store in a temporary directory.
    temp_store = tempfile.mkdtemp(dir=os.path.join(storage_system, 'tmp'))
    # describe data store type, fingerprinting details...
    metadata = {'uid': uid,
                'file_type': store_type}
    with open(os.path.join(temp_store, 'metadata.json'), 'w') as fh:
        json.dump(fh, metadata)
    # "lock" the data store for the current context.
    owner = {'pid': os.getpid(),
              'object': id(context)}
    lockfile = os.path.join(temp_store, 'owner')
    assert not os.path.exists(lockfile)
    with open(lockfile, 'w') as fh:
        json.dump(fh, owner)
    # Create artifact
    npz_file = os.path.join(temp_store, '.'.join([uid, 'npz']))
    data = numpy.zeros(shape=node_shape(node), dtype=node_dtype(node))
    numpy.savez(npz_file, data=data)


    # 2. Try to rename the directory to the canonical name.
    #    In order to get an exception, os.rename destination needs to clash with
    #    a non-empty directory.
    store_path = os.path.join(storage_system, uid)
    try:
        os.rename(temp_store, store_path)
        store = lock_store(store_path, owner)
    except OSError as e:
        # Ref https://docs.python.org/3/library/os.html#os.rename
        # for more specific exceptions.

        # 3. If the canonical directory name is already in use, try to take ownership.
        try:
            # 4. If ownership taken, reinitialize from its contents.
            store = lock_store(store_path, owner)
        except:
            # TODO: What are the exception conditions?
            raise

    # TODO: Protocol for unlocking the backing store OR lock Context instead.
    return store

# Test program

In [None]:
"""Mock up the Context node builder protocol to test internal details."""
import json

my_array = Integer64([[1, 2], [3, 4]])

def receive_node(serialized):
    record = Integer64.from_json(serialized)
    uid = record.fingerprint().uid()
    return {'uid': uid, 'record': json.loads(serialized)}

# Receive record of node
node = receive_node(my_array.to_json())
uid = node['uid']
node_record = node['record']

In [None]:
# Create a local handle to the node
# 1. Check whether the node already exists.
# 2. Initialize checkpoint facility for the node.
# 3. Allow checkpoint facility to update the node.
# 4. Subscribe to needed resources.

import json
import numpy
import tempfile

context = WorkerContext()

storage_system = get_filesystem(context)

store_type = get_store_type(context, node)

In [None]:
# 1. Create an initialized data store in a temporary directory.

# Make it easy to find the stale temporary directories...
tmpdir = os.path.join(storage_system, 'tmp')
if not os.path.exists(tmpdir):
    os.mkdir(os.path.join(storage_system, 'tmp'))
assert os.path.exists(tmpdir)

# Create an initialized data store in a temporary directory.
temp_store = tempfile.mkdtemp(dir=tmpdir)

In [None]:
# describe data store type, fingerprinting details...
metadata = {'uid': str(uid.hex()),
            'file_type': store_type}
with open(os.path.join(temp_store, 'metadata.json'), 'w') as fh:
    json.dump(obj=metadata, fp=fh)

In [None]:
# "lock" the data store for the current context.

def check_owner(owner, candidate):
    for key, value in owner.items():
        if not candidate[key] == value:
            return False
    return True

owner = {'pid': int(os.getpid()),
          'object': id(context)}
lockfile = os.path.join(temp_store, 'owner')
if not os.path.exists(lockfile):
    with open(lockfile, 'w') as fh:
        json.dump(obj=owner, fp=fh)
else:
    with open(lockfile, 'r') as fh:
        candidate = json.load(fh)
        assert check_owner(owner, candidate)

In [None]:
# Create artifact

def node_shape(node):
    # TODO: generalize.
    # Note: The shape is either part of the input metadata or is inferrable from the
    # small number of native types.
    assert 'record' in node
    assert 'input' in node['record']
    assert 'data' in node['record']['input']
    assert isinstance(node['record']['input']['data'], list)
    def nested_list_length(nested_list):
        assert isinstance(nested_list, (list, tuple))
        yield len(nested_list)
        if isinstance(nested_list[0], (list, tuple)):
            for length in nested_list_length(nested_list[0]):
                yield length
            
    return tuple(nested_list_length(node['record']['input']['data']))

def node_dtype(node):
    # TODO: generalize, per graph schema specification.
    assert 'record' in node
    assert 'operation' in node['record']
    assert tuple(node['record']['operation']) == ('scalems', 'Integer64')
    return numpy.int64

npz_file = os.path.join(temp_store, '.'.join([uid.hex(), 'npz']))
data = numpy.zeros(shape=node_shape(node), dtype=node_dtype(node))
numpy.savez(npz_file, data=data)
assert os.path.exists(npz_file)

In [None]:
# 2. Try to rename the directory to the canonical name.
#    In order to get an exception, os.rename destination needs to clash with
#    a non-empty directory.
store_path = os.path.join(storage_system, uid)
try:
    os.rename(temp_store, store_path)
    store = lock_store(store_path, owner)
except OSError as e:
    # Ref https://docs.python.org/3/library/os.html#os.rename
    # for more specific exceptions.

    # 3. If the canonical directory name is already in use, try to take ownership.
    try:
        # 4. If ownership taken, reinitialize from its contents.
        store = lock_store(store_path, owner)
    except:
        # TODO: What are the exception conditions?
        raise


# Accept queries to the node status

# Accept subscriptions to the node

# Accept a request to publish the node results (trigger execution)