Skip to content

Commit

Permalink
Merge 54b969f into 2363ee5
Browse files Browse the repository at this point in the history
  • Loading branch information
sb2nov committed Dec 2, 2016
2 parents 2363ee5 + 54b969f commit 9b27161
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
19 changes: 17 additions & 2 deletions sdks/python/apache_beam/internal/apiclient.py
Expand Up @@ -46,7 +46,6 @@
from apache_beam.internal.clients import storage
import apache_beam.internal.clients.dataflow as dataflow


BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
COMPUTE_API_SERVICE = 'compute.googleapis.com'
STORAGE_API_SERVICE = 'storage.googleapis.com'
Expand All @@ -55,13 +54,19 @@
class Step(object):
"""Wrapper for a dataflow Step protobuf."""

def __init__(self, step_kind, step_name):
def __init__(self, step_kind, step_name, additional_properties=None):
self.step_kind = step_kind
self.step_name = step_name
self.proto = dataflow.Step(kind=step_kind, name=step_name)
self.proto.properties = {}
self._additional_properties = []

if additional_properties is not None:
for (n, v, t) in additional_properties:
self.add_property(n, v, t)

def add_property(self, name, value, with_type=False):
self._additional_properties.append((name, value, with_type))
self.proto.properties.additionalProperties.append(
dataflow.Step.PropertiesValue.AdditionalProperty(
key=name, value=to_json_value(value, with_type=with_type)))
Expand All @@ -77,6 +82,11 @@ def _get_outputs(self):
outputs.append(entry_prop.value.string_value)
return outputs

def __reduce__(self):
"""Reduce hook for pickling the Step class more easily
"""
return (Step, (self.step_kind, self.step_name, self._additional_properties))

def get_output(self, tag=None):
"""Returns name if it is one of the outputs or first output if name is None.
Expand Down Expand Up @@ -327,6 +337,11 @@ def __init__(self, options):
self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$')
self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')

def __reduce__(self):
"""Reduce hook for pickling the Job class more easily
"""
return (Job, (self.options,))


class DataflowApplicationClient(object):
"""A Dataflow API client used by application code to create and query jobs."""
Expand Down
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/internal/pickler.py
Expand Up @@ -204,6 +204,14 @@ def loads(encoded):


def dump_session(file_path):
"""Pickle the current python session to be used in the worker.
Note: Due to the inconsistency in the first dump of dill dump_session we
create and load the dump twice to have consistent results in the worker and
the running session. Check: https://github.com/uqfoundation/dill/issues/195
"""
dill.dump_session(file_path)
dill.load_session(file_path)
return dill.dump_session(file_path)


Expand Down

0 comments on commit 9b27161

Please sign in to comment.