Skip to content

Commit

Permalink
Refactor FnApiRunner to operate directly on the runner API protos.
Browse files Browse the repository at this point in the history
This allows for optimization and execution of pipelines in other langauges
over the Fn API (modulo aligning URNs and using the runner API for Coders).

The only portions of the pipeline that are deserialized are the Coders.
  • Loading branch information
robertwb committed Aug 4, 2017
1 parent 9e6530a commit 5e71d53
Show file tree
Hide file tree
Showing 7 changed files with 678 additions and 18 deletions.
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/coders/stream.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ cdef class InputStream(object):
cdef bytes all
cdef char* allc

cpdef size_t size(self) except? -1
cpdef ssize_t size(self) except? -1
cpdef bytes read(self, size_t len)
cpdef long read_byte(self) except? -1
cpdef libc.stdint.int64_t read_var_int64(self) except? -1
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/coders/stream.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ cdef class InputStream(object):
# unsigned char here.
return <long>(<unsigned char> self.allc[self.pos - 1])

cpdef size_t size(self) except? -1:
cpdef ssize_t size(self) except? -1:
return len(self.all) - self.pos

cpdef bytes read_all(self, bint nested=False):
Expand Down
8 changes: 7 additions & 1 deletion sdks/python/apache_beam/runners/pipeline_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, context, obj_type, proto_map=None):
self._obj_type = obj_type
self._obj_to_id = {}
self._id_to_obj = {}
self._id_to_proto = proto_map if proto_map else {}
self._id_to_proto = dict(proto_map) if proto_map else {}
self._counter = 0

def _unique_ref(self, obj=None, label=None):
Expand All @@ -66,6 +66,12 @@ def get_by_id(self, id):
self._id_to_proto[id], self._pipeline_context)
return self._id_to_obj[id]

def __getitem__(self, id):
return self.get_by_id(id)

def __contains__(self, id):
return id in self._id_to_proto


class PipelineContext(object):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down
Loading

0 comments on commit 5e71d53

Please sign in to comment.