Permalink
Browse files

update mapreduce framework.

update by r319-r339.
  • Loading branch information...
1 parent fce3cac commit 0bdde503d97906e807a92b421ce933f810d2eeec @cloudysunny14 committed Aug 3, 2012
Showing with 2,073 additions and 537 deletions.
  1. +1 −1 appengine-mapreduce2GCS/googlestorage_test/input_readers_test.py
  2. +19 −0 src/mapreduce/errors.py
  3. +92 −68 src/mapreduce/handlers.py
  4. +137 −180 src/mapreduce/input_readers.py
  5. +1 −18 src/mapreduce/lib/files/blobstore.py
  6. +212 −49 src/mapreduce/lib/files/file.py
  7. +307 −1 src/mapreduce/lib/files/file_service_pb.py
  8. +203 −0 src/mapreduce/lib/files/gs.py
  9. +9 −0 src/mapreduce/lib/files/testutil.py
  10. +44 −11 src/mapreduce/lib/key_range/__init__.py
  11. +15 −11 test/mapreduce_test/gs_files_test.py → src/mapreduce/lib/pipeline/index.yaml
  12. +1 −1 src/mapreduce/lib/pipeline/models.py
  13. +192 −124 src/mapreduce/lib/pipeline/pipeline.py
  14. +160 −0 src/mapreduce/lib/pipeline/status_ui.py
  15. +28 −1 src/mapreduce/lib/pipeline/ui/common.css
  16. +41 −0 src/mapreduce/lib/pipeline/ui/root_list.css
  17. +61 −0 src/mapreduce/lib/pipeline/ui/root_list.html
  18. +124 −0 src/mapreduce/lib/pipeline/ui/root_list.js
  19. +16 −5 src/mapreduce/lib/pipeline/ui/status.css
  20. +1 −1 src/mapreduce/lib/pipeline/ui/status.html
  21. +35 −10 src/mapreduce/mapreduce_pipeline.py
  22. +39 −32 src/mapreduce/output_writers.py
  23. 0 test/{mapreduce_test → mapreduce}/base_handler_test.py
  24. 0 test/{mapreduce_test → mapreduce}/combiner_test.py
  25. 0 test/{mapreduce_test → mapreduce}/context_test.py
  26. 0 test/{mapreduce_test → mapreduce}/control_test.py
  27. +22 −0 test/{mapreduce_test → mapreduce}/end_to_end_test.py
  28. +106 −8 test/{mapreduce_test → mapreduce}/handlers_test.py
  29. +155 −7 test/{mapreduce_test → mapreduce}/input_readers_test.py
  30. 0 test/{mapreduce_test → mapreduce}/large_mapreduce_test.py
  31. 0 test/{mapreduce_test → mapreduce}/main_test.py
  32. 0 test/{mapreduce_test → mapreduce}/mapper_pipeline_test.py
  33. +43 −9 test/{mapreduce_test → mapreduce}/mapreduce_pipeline_test.py
  34. 0 test/{mapreduce_test → mapreduce}/model_test.py
  35. 0 test/{mapreduce_test → mapreduce}/namespace_range_test.py
  36. 0 test/{mapreduce_test → mapreduce}/operation/counters_test.py
  37. 0 test/{mapreduce_test → mapreduce}/operation/db_test.py
  38. 0 test/{mapreduce_test → mapreduce}/output_writers_end_to_end_test.py
  39. +9 −0 test/{mapreduce_test → mapreduce}/output_writers_test.py
  40. 0 test/{mapreduce_test → mapreduce}/quota_test.py
  41. 0 test/{mapreduce_test → mapreduce}/shuffler_end_to_end_test.py
  42. 0 test/{mapreduce_test → mapreduce}/shuffler_test.py
  43. 0 test/{mapreduce_test → mapreduce}/status_test.py
  44. 0 test/{mapreduce_test → mapreduce}/util_test.py
View
2 appengine-mapreduce2GCS/googlestorage_test/input_readers_test.py
@@ -27,7 +27,7 @@
class GoogleStorageInputReaderTest(testutil.HandlerTestBase):
READER_NAME = (
- "mapreduce.input_readers.CloudStorageLineInputReader")
+ "googlestorage.input_readers.GoogleStorageLineInputReader")
def assertDone(self, reader):
self.assertRaises(StopIteration, reader.next)
View
19 src/mapreduce/errors.py
@@ -25,8 +25,11 @@
"BadWriterParamsError",
"BadYamlError",
"Error",
+ "FailJobError",
"MissingYamlError",
"MultipleDocumentsInMrYaml",
+ "NotEnoughArgumentsError",
+ "RetrySliceError",
"ShuffleServiceError",
]
@@ -65,3 +68,19 @@ class ShuffleServiceError(Error):
class BadCombinerOutputError(Error):
"""Combiner outputs data instead of yielding it."""
+
+class FailJobError(Error):
+ """The job will be failed if this exception is thrown anywhere."""
+
+
+class RetrySliceError(Error):
+ """The slice will be retried up to some maximum number of times.
+
+ The job will be failed if the slice can't progress before maximum
+ number of retries.
+ """
+
+
+class NotEnoughArgumentsError(Error):
+ """Required argument is missing."""
+
View
160 src/mapreduce/handlers.py
@@ -41,6 +41,11 @@
from mapreduce import quota
from mapreduce import util
+try:
+ from google.appengine.ext import ndb
+except ImportError:
+ ndb = None
+
# TODO(user): Make this a product of the reader or in quotas.py
_QUOTA_BATCH_SIZE = 20
@@ -52,22 +57,14 @@
# Delay between consecutive controller callback invocations.
_CONTROLLER_PERIOD_SEC = 2
+# How many times to cope with a RetrySliceError before totally
+# giving up and aborting the whole job.
+_RETRY_SLICE_ERROR_MAX_RETRIES = 10
+
# Set of strings of various test-injected faults.
_TEST_INJECTED_FAULTS = set()
-class Error(Exception):
- """Base class for exceptions in this module."""
-
-
-class NotEnoughArgumentsError(Error):
- """Required argument is missing."""
-
-
-class NoDataError(Error):
- """There is no data present for a desired input."""
-
-
def _run_task_hook(hooks, method, task, queue_name):
"""Invokes hooks.method(task, queue_name).
@@ -134,10 +131,8 @@ def handle(self):
if control and control.command == model.MapreduceControl.ABORT:
logging.info("Abort command received by shard %d of job '%s'",
shard_state.shard_number, shard_state.mapreduce_id)
- if tstate.output_writer:
- tstate.output_writer.finalize(ctx, shard_state.shard_number)
- # We recieved a command to abort. We don't care if we override
- # some data.
+ # NOTE: When aborting, specifically do not finalize the output writer
+ # because it might be in a bad state.
shard_state.active = False
shard_state.result_status = model.ShardState.RESULT_ABORTED
shard_state.put(config=util.create_datastore_write_config(spec))
@@ -154,6 +149,16 @@ def handle(self):
else:
quota_consumer = None
+ # Tell NDB to never cache anything in memcache or in-process. This ensures
+ # that entities fetched from Datastore input_readers via NDB will not bloat
+ # up the request memory size and Datastore Puts will avoid doing calls
+ # to memcache. Without this you get soft memory limit exits, which hurts
+ # overall throughput.
+ if ndb is not None:
+ ndb_ctx = ndb.get_context()
+ ndb_ctx.set_cache_policy(lambda key: False)
+ ndb_ctx.set_memcache_policy(lambda key: False)
+
context.Context._set(ctx)
try:
# consume quota ahead, because we do not want to run a datastore
@@ -162,49 +167,67 @@ def handle(self):
scan_aborted = False
entity = None
- # We shouldn't fetch an entity from the reader if there's not enough
- # quota to process it. Perform all quota checks proactively.
- if not quota_consumer or quota_consumer.consume():
- for entity in input_reader:
- if isinstance(entity, db.Model):
- shard_state.last_work_item = repr(entity.key())
- else:
- shard_state.last_work_item = repr(entity)[:100]
-
- scan_aborted = not self.process_data(
- entity, input_reader, ctx, tstate)
-
- # Check if we've got enough quota for the next entity.
- if (quota_consumer and not scan_aborted and
- not quota_consumer.consume()):
- scan_aborted = True
- if scan_aborted:
- break
- else:
+ try:
+ # We shouldn't fetch an entity from the reader if there's not enough
+ # quota to process it. Perform all quota checks proactively.
+ if not quota_consumer or quota_consumer.consume():
+ for entity in input_reader:
+ if isinstance(entity, db.Model):
+ shard_state.last_work_item = repr(entity.key())
+ else:
+ shard_state.last_work_item = repr(entity)[:100]
+
+ scan_aborted = not self.process_data(
+ entity, input_reader, ctx, tstate)
+
+ # Check if we've got enough quota for the next entity.
+ if (quota_consumer and not scan_aborted and
+ not quota_consumer.consume()):
+ scan_aborted = True
+ if scan_aborted:
+ break
+ else:
+ scan_aborted = True
+
+ if not scan_aborted:
+ logging.info("Processing done for shard %d of job '%s'",
+ shard_state.shard_number, shard_state.mapreduce_id)
+ # We consumed extra quota item at the end of for loop.
+ # Just be nice here and give it back :)
+ if quota_consumer:
+ quota_consumer.put(1)
+ shard_state.active = False
+ shard_state.result_status = model.ShardState.RESULT_SUCCESS
+
+ operation.counters.Increment(
+ context.COUNTER_MAPPER_WALLTIME_MS,
+ int((time.time() - self._start_time)*1000))(ctx)
+
+ # TODO(user): Mike said we don't want this happen in case of
+ # exception while scanning. Figure out when it's appropriate to skip.
+ ctx.flush()
+ except errors.RetrySliceError, e:
+ logging.error("Slice error: %s", e)
+ retry_count = int(
+ os.environ.get("HTTP_X_APPENGINE_TASKRETRYCOUNT") or 0)
+ if retry_count <= _RETRY_SLICE_ERROR_MAX_RETRIES:
+ raise
+ logging.error("Too many retries: %d, failing the job", retry_count)
scan_aborted = True
-
-
- if not scan_aborted:
- logging.info("Processing done for shard %d of job '%s'",
- shard_state.shard_number, shard_state.mapreduce_id)
- # We consumed extra quota item at the end of for loop.
- # Just be nice here and give it back :)
- if quota_consumer:
- quota_consumer.put(1)
shard_state.active = False
- shard_state.result_status = model.ShardState.RESULT_SUCCESS
-
- operation.counters.Increment(
- context.COUNTER_MAPPER_WALLTIME_MS,
- int((time.time() - self._start_time)*1000))(ctx)
-
- # TODO(user): Mike said we don't want this happen in case of
- # exception while scanning. Figure out when it's appropriate to skip.
- ctx.flush()
+ shard_state.result_status = model.ShardState.RESULT_FAILED
+ except errors.FailJobError, e:
+ logging.error("Job failed: %s", e)
+ scan_aborted = True
+ shard_state.active = False
+ shard_state.result_status = model.ShardState.RESULT_FAILED
if not shard_state.active:
- # shard is going to stop. Finalize output writer if any.
- if tstate.output_writer:
+ # shard is going to stop. Don't finalize output writer unless the job is
+ # going to be successful, because writer might be stuck in some bad state
+ # otherwise.
+ if (shard_state.result_status == model.ShardState.RESULT_SUCCESS and
+ tstate.output_writer):
tstate.output_writer.finalize(ctx, shard_state.shard_number)
config = util.create_datastore_write_config(spec)
@@ -219,6 +242,8 @@ def handle(self):
def tx():
fresh_shard_state = db.get(
model.ShardState.get_key_by_shard_id(shard_id))
+ if not fresh_shard_state:
+ raise db.Rollback()
if (not fresh_shard_state.active or
"worker_active_state_collision" in _TEST_INJECTED_FAULTS):
shard_state.active = False
@@ -273,8 +298,6 @@ def process_data(self, data, input_reader, ctx, transient_shard_state):
output_writer.write(output, ctx)
if self._time() - self._start_time > _SLICE_DURATION_SEC:
- logging.debug("Spent %s seconds. Rescheduling",
- self._time() - self._start_time)
return False
return True
@@ -372,11 +395,6 @@ def handle(self):
spec = model.MapreduceSpec.from_json_str(
self.request.get("mapreduce_spec"))
- # TODO(user): Make this logging prettier.
- logging.debug("post: id=%s headers=%s spec=%s",
- spec.mapreduce_id, self.request.headers,
- self.request.get("mapreduce_spec"))
-
state, control = db.get([
model.MapreduceState.get_key_by_job_id(spec.mapreduce_id),
model.MapreduceControl.get_key_by_job_id(spec.mapreduce_id),
@@ -407,6 +425,8 @@ def handle(self):
state.active_shards = len(active_shards)
state.failed_shards = len(failed_shards)
state.aborted_shards = len(aborted_shards)
+ if not control and failed_shards:
+ model.MapreduceControl.abort(spec.mapreduce_id)
if (not state.active and control and
control.command == model.MapreduceControl.ABORT):
@@ -512,9 +532,13 @@ def _finalize_job(mapreduce_spec, mapreduce_state, base_path):
base_path: handler base path.
"""
config = util.create_datastore_write_config(mapreduce_spec)
- # Enqueue done_callback if needed.
- if mapreduce_spec.mapper.output_writer_class():
+
+ # Only finalize the output writers if we the job is successful.
+ if (mapreduce_spec.mapper.output_writer_class() and
+ mapreduce_state.result_status == model.MapreduceState.RESULT_SUCCESS):
mapreduce_spec.mapper.output_writer_class().finalize_job(mapreduce_state)
+
+ # Enqueue done_callback if needed.
def put_state(state):
state.put(config=config)
done_callback = mapreduce_spec.params.get(
@@ -680,11 +704,11 @@ def _get_required_param(self, param_name):
parameter value
Raises:
- NotEnoughArgumentsError: if parameter is not specified.
+ errors.NotEnoughArgumentsError: if parameter is not specified.
"""
value = self.request.get(param_name)
if not value:
- raise NotEnoughArgumentsError(param_name + " not specified")
+ raise errors.NotEnoughArgumentsError(param_name + " not specified")
return value
@classmethod
@@ -821,11 +845,11 @@ def _get_required_param(self, param_name):
parameter value
Raises:
- NotEnoughArgumentsError: if parameter is not specified.
+ errors.NotEnoughArgumentsError: if parameter is not specified.
"""
value = self.request.get(param_name)
if not value:
- raise NotEnoughArgumentsError(param_name + " not specified")
+ raise errors.NotEnoughArgumentsError(param_name + " not specified")
return value
@classmethod
View
317 src/mapreduce/input_readers.py
@@ -31,19 +31,22 @@
"DatastoreEntityInputReader",
"DatastoreInputReader",
"DatastoreKeyInputReader",
+ "RandomStringInputReader",
"Error",
"InputReader",
"LogInputReader",
"NamespaceInputReader",
"RecordsReader",
- "CloudStorageLineInputReader"
]
# pylint: disable-msg=C6409
import base64
import copy
import logging
+import random
+import string
+import StringIO
import time
import zipfile
@@ -66,10 +69,6 @@
from mapreduce import operation
from mapreduce import util
-try:
- from cStringIO import StringIO
-except:
- from StringIO import StringIO
# Classes moved to errors module. Copied here for compatibility.
Error = errors.Error
@@ -246,6 +245,7 @@ class AbstractDatastoreInputReader(InputReader):
KEY_RANGE_PARAM = "key_range"
NAMESPACE_RANGE_PARAM = "namespace_range"
CURRENT_KEY_RANGE_PARAM = "current_key_range"
+ FILTERS_PARAM = "filters"
# TODO(user): Add support for arbitrary queries. It's not possible to
# support them without cursors since right now you can't even serialize query
@@ -255,7 +255,8 @@ def __init__(self,
key_ranges=None,
ns_range=None,
batch_size=_BATCH_SIZE,
- current_key_range=None):
+ current_key_range=None,
+ filters=None):
"""Create new AbstractDatastoreInputReader object.
This is internal constructor. Use split_query in a concrete class instead.
@@ -268,6 +269,9 @@ def __init__(self,
key_ranges or ns_range can be non-None.
batch_size: size of read batch as int.
current_key_range: the current key_range.KeyRange being processed.
+ filters: optional list of filters to apply to the query. Each filter is
+ a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
+ User filters are applied first.
"""
assert key_ranges is not None or ns_range is not None, (
"must specify one of 'key_ranges' or 'ns_range'")
@@ -282,7 +286,7 @@ def __init__(self,
self._ns_range = ns_range
self._batch_size = int(batch_size)
self._current_key_range = current_key_range
-
+ self._filters = filters
@classmethod
def _get_raw_entity_kind(cls, entity_kind):
@@ -293,7 +297,6 @@ def _get_raw_entity_kind(cls, entity_kind):
entity_kind, cls.__name__)
return entity_kind
-
def __iter__(self):
"""Iterates over the given KeyRanges or NamespaceRange.
@@ -411,8 +414,6 @@ def _split_input_from_namespace(cls, app, namespace, entity_kind,
# With one shard we don't need to calculate any splitpoints at all.
return [key_range.KeyRange(namespace=namespace, _app=app)]
- # we use datastore.Query instead of ext.db.Query here, because we can't
- # erase ordering on db.Query once we set it.
ds_query = datastore.Query(kind=raw_entity_kind,
namespace=namespace,
_app=app,
@@ -523,6 +524,20 @@ def validate(cls, mapper_spec):
"Expected a single namespace string")
if cls.NAMESPACES_PARAM in params:
raise BadReaderParamsError("Multiple namespaces are no longer supported")
+ if cls.FILTERS_PARAM in params:
+ filters = params[cls.FILTERS_PARAM]
+ if not isinstance(filters, list):
+ raise BadReaderParamsError("Expected list for filters parameter")
+ for f in filters:
+ if not isinstance(f, tuple):
+ raise BadReaderParamsError("Filter should be a tuple: %s", f)
+ if len(f) != 3:
+ raise BadReaderParamsError("Filter should be a 3-tuple: %s", f)
+ if not isinstance(f[0], basestring):
+ raise BadReaderParamsError("First element should be string: %s", f)
+ if f[1] != "=":
+ raise BadReaderParamsError(
+ "Only equality filters are supported: %s", f)
@classmethod
def split_input(cls, mapper_spec):
@@ -552,6 +567,7 @@ def split_input(cls, mapper_spec):
shard_count = mapper_spec.shard_count
namespace = params.get(cls.NAMESPACE_PARAM)
app = params.get(cls._APP_PARAM)
+ filters = params.get(cls.FILTERS_PARAM)
if namespace is None:
# It is difficult to efficiently shard large numbers of namespaces because
@@ -578,21 +594,27 @@ def split_input(cls, mapper_spec):
return [cls(entity_kind_name,
key_ranges=None,
ns_range=ns_range,
- batch_size=batch_size)
+ batch_size=batch_size,
+ filters=filters)
for ns_range in ns_ranges]
elif not namespace_keys:
return [cls(entity_kind_name,
key_ranges=None,
ns_range=namespace_range.NamespaceRange(),
- batch_size=shard_count)]
+ batch_size=shard_count,
+ filters=filters)]
else:
namespaces = [namespace_key.name() or ""
for namespace_key in namespace_keys]
else:
namespaces = [namespace]
- return cls._split_input_from_params(
+ readers = cls._split_input_from_params(
app, namespaces, entity_kind_name, params, shard_count)
+ if filters:
+ for reader in readers:
+ reader._filters = filters
+ return readers
def to_json(self):
"""Serializes all the data in this query range into json form.
@@ -624,7 +646,8 @@ def to_json(self):
self.NAMESPACE_RANGE_PARAM: namespace_range_json,
self.CURRENT_KEY_RANGE_PARAM: current_key_range_json,
self.ENTITY_KIND_PARAM: self._entity_kind,
- self.BATCH_SIZE_PARAM: self._batch_size}
+ self.BATCH_SIZE_PARAM: self._batch_size,
+ self.FILTERS_PARAM: self._filters}
return json_dict
@classmethod
@@ -664,7 +687,8 @@ def from_json(cls, json):
key_ranges,
ns_range,
json[cls.BATCH_SIZE_PARAM],
- current_key_range)
+ current_key_range,
+ filters=json.get(cls.FILTERS_PARAM))
class DatastoreInputReader(AbstractDatastoreInputReader):
@@ -682,7 +706,8 @@ def _iter_key_range(self, k_range):
cursor = None
while True:
query = k_range.make_ascending_query(
- util.for_name(self._entity_kind))
+ util.for_name(self._entity_kind),
+ filters=self._filters)
if isinstance(query, db.Query):
# Old db version.
if cursor:
@@ -746,7 +771,7 @@ class DatastoreKeyInputReader(AbstractDatastoreInputReader):
def _iter_key_range(self, k_range):
raw_entity_kind = self._get_raw_entity_kind(self._entity_kind)
query = k_range.make_ascending_datastore_query(
- raw_entity_kind, keys_only=True)
+ raw_entity_kind, keys_only=True, filters=self._filters)
for key in query.Run(
config=datastore_query.QueryOptions(batch_size=self._batch_size)):
yield key, key
@@ -758,7 +783,7 @@ class DatastoreEntityInputReader(AbstractDatastoreInputReader):
def _iter_key_range(self, k_range):
raw_entity_kind = self._get_raw_entity_kind(self._entity_kind)
query = k_range.make_ascending_datastore_query(
- raw_entity_kind)
+ raw_entity_kind, self._filters)
for entity in query.Run(
config=datastore_query.QueryOptions(batch_size=self._batch_size)):
yield entity.key(), entity
@@ -1243,7 +1268,7 @@ def next(self):
raise StopIteration()
entry = self._entries.pop()
value = self._zip.read(entry.filename)
- self._filestream = StringIO(value)
+ self._filestream = StringIO.StringIO(value)
if self._initial_offset:
self._filestream.seek(self._initial_offset)
self._filestream.readline()
@@ -1313,6 +1338,95 @@ def __str__(self):
self._next_offset())
+class RandomStringInputReader(InputReader):
+ """RandomStringInputReader generates random strings as output.
+
+ Primary usage is to populate output with testing entries.
+ """
+
+ # Total number of entries this reader should generate.
+ COUNT = "count"
+ # Length of the generated strings.
+ STRING_LENGTH = "string_length"
+
+ DEFAULT_STRING_LENGTH = 10
+
+ def __init__(self, count, string_length):
+ """Initialize input reader.
+
+ Args:
+ count: number of entries this shard should generate.
+ string_length: the length of generated random strings.
+ """
+ self._count = count
+ self._string_length = string_length
+
+ def __iter__(self):
+ ctx = context.get()
+
+ while self._count:
+ self._count -= 1
+ start_time = time.time()
+ content = "".join(random.choice(string.ascii_lowercase)
+ for _ in range(self._string_length))
+ if ctx:
+ operation.counters.Increment(
+ COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
+ operation.counters.Increment(COUNTER_IO_READ_BYTES, len(content))(ctx)
+ yield content
+
+ @classmethod
+ def split_input(cls, mapper_spec):
+ params = _get_params(mapper_spec)
+ count = params[cls.COUNT]
+ string_length = cls.DEFAULT_STRING_LENGTH
+ if cls.STRING_LENGTH in params:
+ string_length = params[cls.STRING_LENGTH]
+
+ shard_count = mapper_spec.shard_count
+ count_per_shard = count // shard_count
+
+ mr_input_readers = [
+ cls(count_per_shard, string_length) for _ in range(shard_count)]
+
+ left = count - count_per_shard*shard_count
+ if left > 0:
+ mr_input_readers.append(cls(left, string_length))
+
+ return mr_input_readers
+
+ @classmethod
+ def validate(cls, mapper_spec):
+ if mapper_spec.input_reader_class() != cls:
+ raise BadReaderParamsError("Mapper input reader class mismatch")
+
+ params = _get_params(mapper_spec)
+ if cls.COUNT not in params:
+ raise BadReaderParamsError("Must specify %s" % cls.COUNT)
+ if not isinstance(params[cls.COUNT], int):
+ raise BadReaderParamsError("%s should be an int but is %s" %
+ (cls.COUNT, type(params[cls.COUNT])))
+ if params[cls.COUNT] <= 0:
+ raise BadReaderParamsError("%s should be a positive int")
+ if cls.STRING_LENGTH in params and not (
+ isinstance(params[cls.STRING_LENGTH], int) and
+ params[cls.STRING_LENGTH] > 0):
+ raise BadReaderParamsError("%s should be a positive int but is %s" %
+ (cls.STRING_LENGTH, params[cls.STRING_LENGTH]))
+ if (not isinstance(mapper_spec.shard_count, int) or
+ mapper_spec.shard_count <= 0):
+ raise BadReaderParamsError(
+ "shard_count should be a positive int but is %s" %
+ mapper_spec.shard_count)
+
+ @classmethod
+ def from_json(cls, json):
+ return cls(json[cls.COUNT], json[cls.STRING_LENGTH])
+
+ def to_json(self):
+ return {self.COUNT: self._count, self.STRING_LENGTH: self._string_length}
+
+
class ConsistentKeyReader(DatastoreKeyInputReader):
"""A key reader which reads consistent data from datastore.
@@ -1641,6 +1755,10 @@ def __iter__(self):
COUNTER_IO_READ_MSEC, int((time.time() - start_time) * 1000))(ctx)
operation.counters.Increment(COUNTER_IO_READ_BYTES, len(record))(ctx)
yield record
+ except (files.ExistenceError), e:
+ raise errors.FailJobError("ExistenceError: %s" % e)
+ except (files.UnknownError), e:
+ raise errors.RetrySliceError("UnknownError: %s" % e)
except EOFError:
self._filenames.pop(0)
if not self._filenames:
@@ -1939,164 +2057,3 @@ def __str__(self):
params.append("%s=%s" % (key, value))
return "LogInputReader(%s)" % ", ".join(params)
-
-class CloudStorageLineInputReader(InputReader):
- """Input reader for files from a stored in the CloudStorage.
-
- You requires activate the cloud storage and create bucket.
- The class shouldn't be instantiated directly. Use the split_input class method
- instead.
- """
- # TODO(user): Should we set this based on MAX_BLOB_FETCH_SIZE?
- _BLOB_BUFFER_SIZE = 64000
-
- # Maximum number of shards to allow.
- _MAX_SHARD_COUNT = 256
-
- # Maximum number of file path
- _MAX_FILE_PATHS_COUNT = 1
-
- # Mapreduce parameters.
- FILE_PATHS_PARAM = "file_paths"
- # Serialyzation parameters.
- INITIAL_POSITION_PARAM = "initial_position"
- START_POSITION_PARAM = "start_position"
- END_POSITION_PARAM = "end_position"
- FILE_PATH_PARAM = "file_path"
-
- def __init__(self, file_path, start_position, end_position):
- """Initializes this instance with the given blob key and character range.
-
- This BlobstoreInputReader will read from the first record starting after
- strictly after start_position until the first record ending at or after
- end_position (exclusive). As an exception, if start_position is 0, then
- this InputReader starts reading at the first record.
-
- Args:
- blob_key: the BlobKey that this input reader is processing.
- start_position: the position to start reading at.
- end_position: a position in the last record to read.
- """
- self._file_path = file_path
- self._start_position = start_position
- self._end_position = end_position
- self._has_iterated = False
- with files.open(self._file_path, 'r') as fp:
- fp.seek(self._start_position, 0)
- value = fp.read(self._BLOB_BUFFER_SIZE)
- self._filestream = StringIO(value)
- self._read_before_start = bool(start_position)
-
- @classmethod
- def validate(cls, mapper_spec):
- """Validates mapper spec and all mapper parameters.
-
- Args:
- mapper_spec: The MapperSpec for this InputReader.
-
- Raises:
- BadReaderParamsError: required parameters are missing or invalid.
- """
- if mapper_spec.input_reader_class() != cls:
- raise BadReaderParamsError("Mapper input reader class mismatch")
- params = _get_params(mapper_spec)
- if cls.FILE_PATHS_PARAM not in params:
- raise BadReaderParamsError("Must specify 'file_path' for mapper input")
-
- file_paths = params[cls.FILE_PATHS_PARAM]
- if isinstance(file_paths, basestring):
- # This is a mechanism to allow multiple blob keys (which do not contain
- # commas) in a single string. It may go away.
- file_paths = file_paths.split(",")
- if len(file_paths) > cls._MAX_FILE_PATHS_COUNT:
- raise BadReaderParamsError("Too many 'file_paht' for mapper input")
- if not file_paths:
- raise BadReaderParamsError("No 'file_pahts' specified for mapper input")
-
- @classmethod
- def split_input(cls, mapper_spec):
- """Returns a list of shard_count input_spec_shards for input_spec.
-
- Args:
- mapper_spec: The mapper specification to split from. Must contain
- 'blob_keys' parameter with one or more blob keys.
-
- Returns:
- A list of BlobstoreInputReaders corresponding to the specified shards.
- """
- params = _get_params(mapper_spec)
- file_paths = params[cls.FILE_PATHS_PARAM]
- if isinstance(file_paths, basestring):
- # This is a mechanism to allow multiple blob keys (which do not contain
- # commas) in a single string. It may go away.
- file_paths = file_paths.split(",")
-
- file_sizes = {}
- for file_path in file_paths:
- with files.open(file_path, 'r') as fp:
- fp.seek(0,2)
- file_sizes[file_path] = fp.tell()
-
- shard_count = min(cls._MAX_SHARD_COUNT, mapper_spec.shard_count)
-
- shards_per_blob = shard_count // len(file_paths)
- if shards_per_blob == 0:
- shards_per_blob = 1
-
- chunks = []
- for blob_key, blob_size in file_sizes.items():
- blob_chunk_size = blob_size // shards_per_blob
- for i in xrange(shards_per_blob - 1):
- chunks.append(CloudStorageLineInputReader.from_json(
- {cls.FILE_PATH_PARAM: blob_key,
- cls.INITIAL_POSITION_PARAM: blob_chunk_size * i,
- cls.END_POSITION_PARAM: blob_chunk_size * (i + 1)}))
- chunks.append(CloudStorageLineInputReader.from_json(
- {cls.FILE_PATH_PARAM: blob_key,
- cls.INITIAL_POSITION_PARAM: blob_chunk_size * (shards_per_blob - 1),
- cls.END_POSITION_PARAM: blob_size}))
-
- return chunks
-
- def next(self):
- """Returns the next input from as an (offset, line) tuple."""
- self._has_iterated = True
-
- if self._read_before_start:
- self._filestream.readline()
- self._read_before_start = False
-
- start_position = self._filestream.tell()
-
- if start_position > self._end_position:
- self.stopIteration()
-
- line = self._filestream.readline()
-
- if not line:
- self.stopIteration()
-
- return start_position, line.rstrip("\n")
-
- def stopIteration(self):
- self._filestream.close()
- self._filestream = None
- raise StopIteration()
-
- def to_json(self):
- """Returns an json-compatible input shard spec for remaining inputs."""
- return {self.FILE_PATH_PARAM: self._file_path,
- self.INITIAL_POSITION_PARAM: self._start_position,
- self.END_POSITION_PARAM: self._end_position}
-
- def __str__(self):
- """Returns the string representation of this BlobstoreLineInputReader."""
- return "blobstore.BlobKey(%r):[%d, %d]" % (
- self._file_path, self._filestream.tell(), self._end_position)
-
- @classmethod
- def from_json(cls, json):
- """Instantiates an instance of this InputReader for the given shard spec."""
- return cls(json[cls.FILE_PATH_PARAM],
- json[cls.INITIAL_POSITION_PARAM],
- json[cls.END_POSITION_PARAM])
View
19 src/mapreduce/lib/files/blobstore.py
@@ -37,7 +37,6 @@
_BLOBSTORE_FILESYSTEM = files.BLOBSTORE_FILESYSTEM
_BLOBSTORE_DIRECTORY = '/' + _BLOBSTORE_FILESYSTEM + '/'
_BLOBSTORE_NEW_FILE_NAME = 'new'
-_CREATION_HANDLE_PREFIX = 'writable:'
_MIME_TYPE_PARAMETER = 'content_type'
_BLOBINFO_UPLOADED_FILENAME_PARAMETER = 'file_name'
@@ -102,7 +101,7 @@ def get_blob_key(create_file_name):
(create_file_name, _BLOBSTORE_DIRECTORY))
ticket = create_file_name[len(_BLOBSTORE_DIRECTORY):]
- if not ticket.startswith(_CREATION_HANDLE_PREFIX):
+ if not ticket.startswith(files._CREATION_HANDLE_PREFIX):
return blobstore.BlobKey(ticket)
@@ -153,19 +152,3 @@ def get_file_name(blob_key):
if not isinstance(blob_key, (blobstore.BlobKey, basestring)):
raise files.InvalidArgumentError('Expected string or blobstore.BlobKey')
return '%s%s' % (_BLOBSTORE_DIRECTORY, blob_key)
-
-
-def _delete(filename):
- """Permanently delete a file.
-
- Args:
- filename: finalized file name as string.
- """
-
- blob_key = get_blob_key(filename)
- if blob_key is None:
- return
- blob_info = blobstore.BlobInfo.get(blob_key)
- if blob_info is None:
- return
- blob_info.delete()
View
261 src/mapreduce/lib/files/file.py
@@ -52,13 +52,13 @@
'delete',
'finalize',
+ 'listdir',
'open',
'stat',
'BufferedFile',
]
-import gc
import os
import sys
import StringIO
@@ -72,6 +72,8 @@
GS_FILESYSTEM = 'gs'
FILESYSTEMS = (BLOBSTORE_FILESYSTEM, GS_FILESYSTEM)
READ_BLOCK_SIZE = 1024 * 512
+_CREATION_HANDLE_PREFIX = 'writable:'
+_DEFAULT_BUFFER_SIZE = 512 * 1024
class Error(Exception):
@@ -400,7 +402,7 @@ def read(self, size=None):
buf.close()
def _verify_read_mode(self):
- if self._mode != 'r':
+ if self._mode not in ('r', 'rb'):
raise WrongOpenModeError('File is opened for write.')
def _open(self):
@@ -411,9 +413,9 @@ def _open(self):
request.set_exclusive_lock(self._exclusive_lock)
request.set_content_type(self._content_type)
- if self._mode == 'a' or self._mode == 'ab':
+ if self._mode in ('a', 'ab'):
request.set_open_mode(file_service_pb.OpenRequest.APPEND)
- elif self._mode == 'r' or self._mode == 'rb':
+ elif self._mode in ('r', 'rb'):
request.set_open_mode(file_service_pb.OpenRequest.READ)
else:
raise UnsupportedOpenModeError('Unsupported open mode: %s', self._mode)
@@ -464,11 +466,17 @@ def stat(self):
file_stat.filename = file_stat_pb.filename()
file_stat.finalized = file_stat_pb.finalized()
file_stat.st_size = file_stat_pb.length()
+ file_stat.st_mtime = file_stat_pb.mtime()
+ file_stat.st_ctime = file_stat_pb.ctime()
return file_stat
-def open(filename, mode='r', content_type=RAW, exclusive_lock=False):
+def open(filename,
+ mode='r',
+ content_type=RAW,
+ exclusive_lock=False,
+ buffering=0):
"""Open a file.
Args:
@@ -477,10 +485,17 @@ def open(filename, mode='r', content_type=RAW, exclusive_lock=False):
content_type: File's content type. Value from FileContentType.ContentType
enum.
exclusive_lock: If file should be exclusively locked. All other exclusive
- lock attempts will file untile file is correctly closed.
+ lock attempts will file until file is correctly closed.
+ buffering: optional argument similar to the one in Python's open.
+ It specifies the file's desired buffer size: 0 means unbuffered, positive
+ value means use a buffer of that size, any negative value means the
+ default size. Only read buffering is supported.
Returns:
File object.
+
+ Raises:
+ InvalidArgumentError: Raised when given illegal argument value or type.
"""
if not filename:
raise InvalidArgumentError('Filename is empty')
@@ -489,12 +504,48 @@ def open(filename, mode='r', content_type=RAW, exclusive_lock=False):
(filename.__class__, filename))
if content_type != RAW:
raise InvalidArgumentError('Invalid content type')
+ if not (isinstance(buffering, int) or isinstance(buffering, long)):
+ raise InvalidArgumentError('buffering should be an int but is %s'
+ % buffering)
+
+ if mode == 'r' or mode == 'rb':
+ if buffering > 0:
+ return BufferedFile(filename, buffering)
+ elif buffering < 0:
+ return BufferedFile(filename, _DEFAULT_BUFFER_SIZE)
+
+ return _File(filename,
+ mode=mode,
+ content_type=content_type,
+ exclusive_lock=exclusive_lock)
+
- f = _File(filename,
- mode=mode,
- content_type=content_type,
- exclusive_lock=exclusive_lock)
- return f
+def listdir(path, **kwargs):
+ """Return a sorted list of filenames (matching a pattern) in the given path.
+
+ Only Google Cloud Storage paths are supported in current implementation.
+
+ Args:
+ path: a Google Cloud Storage path of "/gs/bucketname" form.
+ kwargs: other keyword arguments to be relayed to Google Cloud Storage.
+ This can be used to select certain files with names matching a pattern.
+ See mapreduce.lib.files.gs.listdir for details.
+
+ Returns:
+ a list containing filenames (matching a pattern) from the given path.
+ Sorted by Python String.
+ """
+
+ from mapreduce.lib.files import gs
+
+ if not isinstance(path, basestring):
+ raise InvalidArgumentError('path should be a string, but is %s(%r)' %
+ (path.__class__.__name__, path))
+
+ if path.startswith(gs._GS_PREFIX):
+ return gs.listdir(path, kwargs)
+ else:
+ raise InvalidFileNameError('Unsupported path: %s' % path)
def finalize(filename, content_type=RAW):
@@ -527,12 +578,12 @@ class _FileStat(object):
filename: the uploaded filename of the file;
finalized: whether the file is finalized. This is always true by now;
st_size: number of bytes of the file;
- st_ctime: creation time. Currently not set;
- st_mtime: modification time. Currently not set.;
+ st_ctime: creation time;
+ st_mtime: modification time.
"""
def __init__(self):
self.filename = None
- self.finlized = True
+ self.finalized = True
self.st_size = None
self.st_ctime = None
self.st_mtime = None
@@ -597,21 +648,67 @@ def _create(filesystem, content_type=RAW, filename=None, params=None):
return response.filename()
-def delete(filename):
- """Permanently delete a file.
+def __checkIsFinalizedName(filename):
+ """Check if a filename is finalized.
+
+ A filename is finalized when it has creation handle prefix, which is the same
+ for both blobstore and gs files.
+
+ Args:
+ filename: a gs or blobstore filename that starts with '/gs/' or
+ '/blobstore/'
+
+ Raises:
+ InvalidFileNameError: raised when filename is finalized.
+ """
+ if filename.split('/')[2].startswith(_CREATION_HANDLE_PREFIX):
+ raise InvalidFileNameError('File %s should have finalized filename' %
+ filename)
+
+
+def delete(*filenames):
+ """Permanently delete files.
+
+ Delete on non-finalized/non-existent files is a no-op.
Args:
- filename: finalized file name as string.
+ filenames: finalized file names as strings. filename should has format
+ "/gs/bucket/filename" or "/blobstore/blobkey".
+
+ Raises:
+ InvalidFileNameError: Raised when any filename is not of valid format or
+ not a finalized name.
+ IOError: Raised if any problem occurs contacting the backend system.
"""
+
from mapreduce.lib.files import blobstore as files_blobstore
+ from mapreduce.lib.files import gs
+ from google.appengine.ext import blobstore
- if not isinstance(filename, basestring):
- raise InvalidArgumentError('Filename should be a string, but is %s(%r)' %
- (filename.__class__.__name__, filename))
- if filename.startswith(files_blobstore._BLOBSTORE_DIRECTORY):
- files_blobstore._delete(filename)
- else:
- raise InvalidFileNameError( 'Unsupported file name: %s' % filename)
+ blobkeys = []
+
+ for filename in filenames:
+ if not isinstance(filename, basestring):
+ raise InvalidArgumentError('Filename should be a string, but is %s(%r)' %
+ (filename.__class__.__name__, filename))
+ if filename.startswith(files_blobstore._BLOBSTORE_DIRECTORY):
+ __checkIsFinalizedName(filename)
+ blobkey = files_blobstore.get_blob_key(filename)
+ if blobkey:
+ blobkeys.append(blobkey)
+ elif filename.startswith(gs._GS_PREFIX):
+
+ __checkIsFinalizedName(filename)
+ blobkeys.append(blobstore.create_gs_key(filename))
+ else:
+ raise InvalidFileNameError('Filename should start with /%s or /%s' %
+ (files_blobstore._BLOBSTORE_DIRECTORY,
+ gs._GS_PREFIX))
+
+ try:
+ blobstore.delete(blobkeys)
+ except Exception, e:
+ raise IOError('Blobstore failure.', e)
def _get_capabilities():
@@ -630,9 +727,7 @@ def _get_capabilities():
class BufferedFile(object):
"""BufferedFile is a file-like object reading underlying file in chunks."""
- _BUFFER_SIZE = 512 * 1024
-
- def __init__(self, filename, buffer_size=_BUFFER_SIZE):
+ def __init__(self, filename, buffer_size=_DEFAULT_BUFFER_SIZE):
"""Constructor.
Args:
@@ -644,6 +739,18 @@ def __init__(self, filename, buffer_size=_BUFFER_SIZE):
self._buffer = ''
self._buffer_pos = 0
self._buffer_size = buffer_size
+ self._eof = False
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, atype, value, traceback):
+ self.close()
+
+ def close(self):
+ self._buffer = ''
+ self._eof = True
+ self._buffer_pos = 0
def tell(self):
"""Return file's current position."""
@@ -659,36 +766,87 @@ def read(self, size):
Returns:
A string with data read.
"""
- while len(self._buffer) - self._buffer_pos < size:
- self._buffer = self._buffer[self._buffer_pos:]
- self._buffer_pos = 0
- with open(self._filename, 'r') as f:
- f.seek(self._position + len(self._buffer))
- data = f.read(self._buffer_size)
- if not data:
- break
- self._buffer += data
- gc.collect()
+ data_list = []
+ while True:
+ result = self.__readBuffer(size)
+ data_list.append(result)
+ size -= len(result)
+ if size == 0 or self._eof:
+ return ''.join(data_list)
+ self.__refillBuffer()
+
+ def readline(self, size=-1):
+ """Read one line delimited by '\n' from the file.
+
+ A trailing newline character is kept in the string. It may be absent when a
+ file ends with an incomplete line. If the size argument is non-negative,
+ it specifies the maximum string size (counting the newline) to return. An
+ empty string is returned only when EOF is encountered immediately.
- if len(self._buffer) - self._buffer_pos < size:
- result = self._buffer[self._buffer_pos:]
- self._buffer = ''
- self._buffer_pos = 0
- self._position += len(result)
- return result
- else:
- result = self._buffer[self._buffer_pos:self._buffer_pos + size]
- self._buffer_pos += size
- self._position += size
- return result
+ Args:
+ size: Maximum number of bytes to read. If not specified, readline stops
+ only on '\n' or EOF.
+
+ Returns:
+ The data read as a string.
+ """
+ data_list = []
+
+ while True:
+
+ if size < 0:
+ end_pos = len(self._buffer)
+ else:
+ end_pos = self._buffer_pos + size
+ newline_pos = self._buffer.find('\n', self._buffer_pos, end_pos)
+
+ if newline_pos != -1:
+
+ data_list.append(self.__readBuffer(newline_pos + 1 - self._buffer_pos))
+ return ''.join(data_list)
+ else:
+ result = self.__readBuffer(size)
+ data_list.append(result)
+ size -= len(result)
+ if size == 0 or self._eof:
+ return ''.join(data_list)
+ self.__refillBuffer()
+
+ def __readBuffer(self, size):
+ """Read chars from self._buffer.
+
+ Args:
+ size: number of chars to read. Read the entire buffer if negative.
+
+ Returns:
+ chars read in string.
+ """
+ if size < 0:
+ size = len(self._buffer) - self._buffer_pos
+ result = self._buffer[self._buffer_pos:self._buffer_pos+size]
+
+ self._position += len(result)
+
+ self._buffer_pos += len(result)
+ return result
+
+ def __refillBuffer(self):
+ """Refill _buffer with another read from source."""
+ with open(self._filename, 'r') as f:
+ f.seek(self._position)
+ data = f.read(self._buffer_size)
+ self._eof = len(data) < self._buffer_size
+ self._buffer = data
+ self._buffer_pos = 0
def seek(self, offset, whence=os.SEEK_SET):
"""Set the file's current position.
Args:
offset: seek offset as number.
whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
- and os.SEEK_CUR (seek relative to the current position).
+ os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
+ (seek relative to the end, offset should be negative).
"""
if whence == os.SEEK_SET:
self._position = offset
@@ -698,6 +856,11 @@ def seek(self, offset, whence=os.SEEK_SET):
self._position += offset
self._buffer = ''
self._buffer_pos = 0
+ elif whence == os.SEEK_END:
+ file_stat = stat(self._filename)
+ self._position = file_stat.st_size + offset
+ self._buffer = ''
+ self._buffer_pos = 0
else:
raise InvalidArgumentError('Whence mode %d is not supported', whence)
View
308 src/mapreduce/lib/files/file_service_pb.py
@@ -5099,7 +5099,313 @@ def _BuildTagLookupTable(sparse, maxtag, default=None):
_STYLE = """"""
_STYLE_CONTENT_TYPE = """"""
_PROTO_DESCRIPTOR_NAME = 'apphosting.files.GetDefaultGsBucketNameResponse'
+class ListDirRequest(ProtocolBuffer.ProtocolMessage):
+ has_path_ = 0
+ path_ = ""
+ has_marker_ = 0
+ marker_ = ""
+ has_max_keys_ = 0
+ max_keys_ = 0
+ has_prefix_ = 0
+ prefix_ = ""
+
+ def __init__(self, contents=None):
+ if contents is not None: self.MergeFromString(contents)
+
+ def path(self): return self.path_
+
+ def set_path(self, x):
+ self.has_path_ = 1
+ self.path_ = x
+
+ def clear_path(self):
+ if self.has_path_:
+ self.has_path_ = 0
+ self.path_ = ""
+
+ def has_path(self): return self.has_path_
+
+ def marker(self): return self.marker_
+
+ def set_marker(self, x):
+ self.has_marker_ = 1
+ self.marker_ = x
+
+ def clear_marker(self):
+ if self.has_marker_:
+ self.has_marker_ = 0
+ self.marker_ = ""
+
+ def has_marker(self): return self.has_marker_
+
+ def max_keys(self): return self.max_keys_
+
+ def set_max_keys(self, x):
+ self.has_max_keys_ = 1
+ self.max_keys_ = x
+
+ def clear_max_keys(self):
+ if self.has_max_keys_:
+ self.has_max_keys_ = 0
+ self.max_keys_ = 0
+
+ def has_max_keys(self): return self.has_max_keys_
+
+ def prefix(self): return self.prefix_
+
+ def set_prefix(self, x):
+ self.has_prefix_ = 1
+ self.prefix_ = x
+
+ def clear_prefix(self):
+ if self.has_prefix_:
+ self.has_prefix_ = 0
+ self.prefix_ = ""
+
+ def has_prefix(self): return self.has_prefix_
+
+
+ def MergeFrom(self, x):
+ assert x is not self
+ if (x.has_path()): self.set_path(x.path())
+ if (x.has_marker()): self.set_marker(x.marker())
+ if (x.has_max_keys()): self.set_max_keys(x.max_keys())
+ if (x.has_prefix()): self.set_prefix(x.prefix())
+
+ def Equals(self, x):
+ if x is self: return 1
+ if self.has_path_ != x.has_path_: return 0
+ if self.has_path_ and self.path_ != x.path_: return 0
+ if self.has_marker_ != x.has_marker_: return 0
+ if self.has_marker_ and self.marker_ != x.marker_: return 0
+ if self.has_max_keys_ != x.has_max_keys_: return 0
+ if self.has_max_keys_ and self.max_keys_ != x.max_keys_: return 0
+ if self.has_prefix_ != x.has_prefix_: return 0
+ if self.has_prefix_ and self.prefix_ != x.prefix_: return 0
+ return 1
+
+ def IsInitialized(self, debug_strs=None):
+ initialized = 1
+ if (not self.has_path_):
+ initialized = 0
+ if debug_strs is not None:
+ debug_strs.append('Required field: path not set.')
+ return initialized
+
+ def ByteSize(self):
+ n = 0
+ n += self.lengthString(len(self.path_))
+ if (self.has_marker_): n += 1 + self.lengthString(len(self.marker_))
+ if (self.has_max_keys_): n += 1 + self.lengthVarInt64(self.max_keys_)
+ if (self.has_prefix_): n += 1 + self.lengthString(len(self.prefix_))
+ return n + 1
+
+ def ByteSizePartial(self):
+ n = 0
+ if (self.has_path_):
+ n += 1
+ n += self.lengthString(len(self.path_))
+ if (self.has_marker_): n += 1 + self.lengthString(len(self.marker_))
+ if (self.has_max_keys_): n += 1 + self.lengthVarInt64(self.max_keys_)
+ if (self.has_prefix_): n += 1 + self.lengthString(len(self.prefix_))
+ return n
+
+ def Clear(self):
+ self.clear_path()
+ self.clear_marker()
+ self.clear_max_keys()
+ self.clear_prefix()
+
+ def OutputUnchecked(self, out):
+ out.putVarInt32(10)
+ out.putPrefixedString(self.path_)
+ if (self.has_marker_):
+ out.putVarInt32(18)
+ out.putPrefixedString(self.marker_)
+ if (self.has_max_keys_):
+ out.putVarInt32(24)
+ out.putVarInt64(self.max_keys_)
+ if (self.has_prefix_):
+ out.putVarInt32(34)
+ out.putPrefixedString(self.prefix_)
+
+ def OutputPartial(self, out):
+ if (self.has_path_):
+ out.putVarInt32(10)
+ out.putPrefixedString(self.path_)
+ if (self.has_marker_):
+ out.putVarInt32(18)
+ out.putPrefixedString(self.marker_)
+ if (self.has_max_keys_):
+ out.putVarInt32(24)
+ out.putVarInt64(self.max_keys_)
+ if (self.has_prefix_):
+ out.putVarInt32(34)
+ out.putPrefixedString(self.prefix_)
+
+ def TryMerge(self, d):
+ while d.avail() > 0:
+ tt = d.getVarInt32()
+ if tt == 10:
+ self.set_path(d.getPrefixedString())
+ continue
+ if tt == 18:
+ self.set_marker(d.getPrefixedString())
+ continue
+ if tt == 24:
+ self.set_max_keys(d.getVarInt64())
+ continue
+ if tt == 34:
+ self.set_prefix(d.getPrefixedString())
+ continue
+
+
+ if (tt == 0): raise ProtocolBuffer.ProtocolBufferDecodeError
+ d.skipData(tt)
+
+
+ def __str__(self, prefix="", printElemNumber=0):
+ res=""
+ if self.has_path_: res+=prefix+("path: %s\n" % self.DebugFormatString(self.path_))
+ if self.has_marker_: res+=prefix+("marker: %s\n" % self.DebugFormatString(self.marker_))
+ if self.has_max_keys_: res+=prefix+("max_keys: %s\n" % self.DebugFormatInt64(self.max_keys_))
+ if self.has_prefix_: res+=prefix+("prefix: %s\n" % self.DebugFormatString(self.prefix_))
+ return res
+
+
+ def _BuildTagLookupTable(sparse, maxtag, default=None):
+ return tuple([sparse.get(i, default) for i in xrange(0, 1+maxtag)])
+
+ kpath = 1
+ kmarker = 2
+ kmax_keys = 3
+ kprefix = 4
+
+ _TEXT = _BuildTagLookupTable({
+ 0: "ErrorCode",
+ 1: "path",
+ 2: "marker",
+ 3: "max_keys",
+ 4: "prefix",
+ }, 4)
+
+ _TYPES = _BuildTagLookupTable({
+ 0: ProtocolBuffer.Encoder.NUMERIC,
+ 1: ProtocolBuffer.Encoder.STRING,
+ 2: ProtocolBuffer.Encoder.STRING,
+ 3: ProtocolBuffer.Encoder.NUMERIC,
+ 4: ProtocolBuffer.Encoder.STRING,
+ }, 4, ProtocolBuffer.Encoder.MAX_TYPE)
+
+
+ _STYLE = """"""
+ _STYLE_CONTENT_TYPE = """"""
+ _PROTO_DESCRIPTOR_NAME = 'apphosting.files.ListDirRequest'
+class ListDirResponse(ProtocolBuffer.ProtocolMessage):
+
+ def __init__(self, contents=None):
+ self.filenames_ = []
+ if contents is not None: self.MergeFromString(contents)
+
+ def filenames_size(self): return len(self.filenames_)
+ def filenames_list(self): return self.filenames_
+
+ def filenames(self, i):
+ return self.filenames_[i]
+
+ def set_filenames(self, i, x):
+ self.filenames_[i] = x
+
+ def add_filenames(self, x):
+ self.filenames_.append(x)
+
+ def clear_filenames(self):
+ self.filenames_ = []
+
+
+ def MergeFrom(self, x):
+ assert x is not self
+ for i in xrange(x.filenames_size()): self.add_filenames(x.filenames(i))
+
+ def Equals(self, x):
+ if x is self: return 1
+ if len(self.filenames_) != len(x.filenames_): return 0
+ for e1, e2 in zip(self.filenames_, x.filenames_):
+ if e1 != e2: return 0
+ return 1
+
+ def IsInitialized(self, debug_strs=None):
+ initialized = 1
+ return initialized
+
+ def ByteSize(self):
+ n = 0
+ n += 1 * len(self.filenames_)
+ for i in xrange(len(self.filenames_)): n += self.lengthString(len(self.filenames_[i]))
+ return n
+
+ def ByteSizePartial(self):
+ n = 0
+ n += 1 * len(self.filenames_)
+ for i in xrange(len(self.filenames_)): n += self.lengthString(len(self.filenames_[i]))
+ return n
+
+ def Clear(self):
+ self.clear_filenames()
+
+ def OutputUnchecked(self, out):
+ for i in xrange(len(self.filenames_)):
+ out.putVarInt32(10)
+ out.putPrefixedString(self.filenames_[i])
+
+ def OutputPartial(self, out):
+ for i in xrange(len(self.filenames_)):
+ out.putVarInt32(10)
+ out.putPrefixedString(self.filenames_[i])
+
+ def TryMerge(self, d):
+ while d.avail() > 0:
+ tt = d.getVarInt32()
+ if tt == 10:
+ self.add_filenames(d.getPrefixedString())
+ continue
+
+
+ if (tt == 0): raise ProtocolBuffer.ProtocolBufferDecodeError
+ d.skipData(tt)
+
+
+ def __str__(self, prefix="", printElemNumber=0):
+ res=""
+ cnt=0
+ for e in self.filenames_:
+ elm=""
+ if printElemNumber: elm="(%d)" % cnt
+ res+=prefix+("filenames%s: %s\n" % (elm, self.DebugFormatString(e)))
+ cnt+=1
+ return res
+
+
+ def _BuildTagLookupTable(sparse, maxtag, default=None):
+ return tuple([sparse.get(i, default) for i in xrange(0, 1+maxtag)])
+
+ kfilenames = 1
+
+ _TEXT = _BuildTagLookupTable({
+ 0: "ErrorCode",
+ 1: "filenames",
+ }, 1)
+
+ _TYPES = _BuildTagLookupTable({
+ 0: ProtocolBuffer.Encoder.NUMERIC,
+ 1: ProtocolBuffer.Encoder.STRING,
+ }, 1, ProtocolBuffer.Encoder.MAX_TYPE)
+
+
+ _STYLE = """"""
+ _STYLE_CONTENT_TYPE = """"""
+ _PROTO_DESCRIPTOR_NAME = 'apphosting.files.ListDirResponse'
if _extension_runtime:
pass
-__all__ = ['FileServiceErrors','KeyValue','KeyValues','FileContentType','CreateRequest_Parameter','CreateRequest','CreateResponse','OpenRequest','OpenResponse','CloseRequest','CloseResponse','FileStat','StatRequest','StatResponse','AppendRequest','AppendResponse','DeleteRequest','DeleteResponse','ReadRequest','ReadResponse','ReadKeyValueRequest','ReadKeyValueResponse_KeyValue','ReadKeyValueResponse','ShuffleEnums','ShuffleInputSpecification','ShuffleOutputSpecification','ShuffleRequest_Callback','ShuffleRequest','ShuffleResponse','GetShuffleStatusRequest','GetShuffleStatusResponse','GetCapabilitiesRequest','GetCapabilitiesResponse','FinalizeRequest','FinalizeResponse','GetDefaultGsBucketNameRequest','GetDefaultGsBucketNameResponse']
+__all__ = ['FileServiceErrors','KeyValue','KeyValues','FileContentType','CreateRequest_Parameter','CreateRequest','CreateResponse','OpenRequest','OpenResponse','CloseRequest','CloseResponse','FileStat','StatRequest','StatResponse','AppendRequest','AppendResponse','DeleteRequest','DeleteResponse','ReadRequest','ReadResponse','ReadKeyValueRequest','ReadKeyValueResponse_KeyValue','ReadKeyValueResponse','ShuffleEnums','ShuffleInputSpecification','ShuffleOutputSpecification','ShuffleRequest_Callback','ShuffleRequest','ShuffleResponse','GetShuffleStatusRequest','GetShuffleStatusResponse','GetCapabilitiesRequest','GetCapabilitiesResponse','FinalizeRequest','FinalizeResponse','GetDefaultGsBucketNameRequest','GetDefaultGsBucketNameResponse','ListDirRequest','ListDirResponse']
View
203 src/mapreduce/lib/files/gs.py
@@ -30,7 +30,16 @@
__all__ = ['create']
+import os
+import re
+from urllib import urlencode
+from xml.dom import minidom
+
+from google.appengine.api import app_identity
+from google.appengine.api import urlfetch
from mapreduce.lib.files import file as files
+from mapreduce.lib.files import file_service_pb
+
@@ -44,6 +53,200 @@
_USER_METADATA_PREFIX = 'x-goog-meta-'
+
+_GS_RESTFUL_URL = 'commondatastorage.googleapis.com'
+_GS_RESTFUL_SCOPE_READ_ONLY = (
+ 'https://www.googleapis.com/auth/devstorage.read_only')
+_GS_RESTFUL_API_VERSION = '2'
+_GS_BUCKETPATH_REGEX = re.compile(r'/gs/[a-z0-9\.\-_]{3,}$')
+_GS_FILEPATH_REGEX = re.compile(r'/gs/[a-z0-9\.\-_]{3,}')
+
+
+def parseGlob(filename):
+ """Parse a Gs filename or a filename pattern. Handle escape of '*' and '/'.
+
+ Args:
+ filename: a filename or filename pattern.
+ filename must be a valid gs filepath in the format of
+ '/gs/bucket/filename'. filename pattern has format '/gs/bucket/prefix*'.
+ filename pattern represents filenames with the given prefix in the bucket.
+ Please escape '*' and '\' with '\' if your filename contains them. We
+ recommend using Python raw string to simplify escape expressions.
+
+ Returns:
+ A (string, string) tuple if filename is a pattern. The first string is
+ the bucket name, second is the prefix or '' if prefix doesn't exist.
+ Properly escaped filename if filename is not a pattern.
+
+ example
+ '/gs/bucket1/file1' => '/gs/bucket1/file1'
+ '/gs/bucket2/*' => ('gs/bucket2', '') all files under bucket2
+ '/gs/bucket3/p*' => ('gs/bucket2', 'p') files under bucket3 with
+ a prefix 'p' in its name
+ r'/gs/bucket/file\*' => '/gs/bucket/file*'
+ r'/gs/bucket/file\\*' => ('/gs/bucket', r'file\') all files under bucket
+ with prefix r'file\'
+ r'/gs/bucket/file\\\*' => '/gs/bucket/file\*'
+ r'/gs/bucket/file\**' => ('/gs/bucket', 'file*') all files under bucket
+ with prefix 'file*'
+
+ Raises:
+ mapreduce.lib.files.InvalidFileNameError if filename is illegal.
+ """
+ if not filename:
+ raise files.InvalidFileNameError('filename is None.')
+ if not isinstance(filename, basestring):
+ raise files.InvalidFileNameError('filename %s should be of type string' %
+ filename)
+ match = _GS_FILEPATH_REGEX.match(filename)
+ if not match:
+ raise files.InvalidFileNameError(
+ 'filename %s should start with/gs/bucketname', filename)
+
+ bucketname = match.group(0)
+ rest = filename[len(bucketname):]
+
+ if not rest or (len(rest) == 1 and rest[0] == '/'):
+
+ return bucketname, ''
+
+ if not rest.startswith('/'):
+ raise files.InvalidFileNameError(
+ 'Expect / to separate bucketname and filename in %s' % filename)
+
+ i = 1
+
+ prefix = False
+
+ processed = ''
+ while i < len(rest):
+ char = rest[i]
+ if char == '\\':
+ if i + 1 == len(rest):
+
+ processed += char
+ else:
+
+ processed += rest[i + 1]
+ i += 1
+ elif char == '*':
+
+ if i + 1 != len(rest):
+ raise files.InvalidFileNameError('* as a wildcard is not the last.')
+ prefix = True
+ else:
+ processed += char
+ i += 1
+
+ if prefix:
+ return bucketname, processed
+ else:
+ return bucketname + '/' + processed
+
+
+def listdir(path, kwargs=None):
+ """Return a sorted list of filenames (matching a pattern) in the given path.
+
+ Sorting (decrease by string) is done automatically by Google Cloud Storage.
+
+ Args:
+ path: a Google Cloud Storage path of "/gs/bucketname" form.
+ kwargs: other keyword arguments to be relayed to Google Cloud Storage.
+ This can be used to select certain files with names matching a pattern.
+
+ Supported keywords:
+ marker: a string after which (exclusive) to start listing.
+ max_keys: the maximum number of filenames to return.
+ prefix: limits the returned filenames to those with this prefix. no regex.
+
+ See Google Cloud Storage documentation for more details and examples.
+ https://developers.google.com/storage/docs/reference-methods#getbucket
+
+ Returns:
+ a sorted list containing filenames (matching a pattern) from
+ the given path. The last filename can be used as a marker for another
+ request for more files.
+ """
+ if not path:
+ raise files.InvalidFileNameError('Empty path')
+ elif not isinstance(path, basestring):
+ raise files.InvalidFileNameError('Expected string for path %s' % path)
+ elif not _GS_BUCKETPATH_REGEX.match(path):
+ raise files.InvalidFileNameError(
+ 'Google storage path must have the form /gs/bucketname')
+
+
+
+ if kwargs and kwargs.has_key('max_keys'):
+ kwargs['max-keys'] = kwargs['max_keys']
+ kwargs.pop('max_keys')
+
+
+ if not os.environ.get('DATACENTER'):
+ return _listdir_local(path, kwargs)
+
+ bucketname = path[len(_GS_PREFIX):]
+
+ request_headers = {
+ 'Authorization': 'OAuth %s' % app_identity.get_access_token(
+ _GS_RESTFUL_SCOPE_READ_ONLY)[0],
+ 'x-goog-api-version': _GS_RESTFUL_API_VERSION
+ }
+
+ url = 'https://%s/%s' % (_GS_RESTFUL_URL, bucketname)
+
+ if kwargs:
+ url += '/?' + urlencode(kwargs)
+
+ response = urlfetch.fetch(url=url,
+ headers=request_headers,
+ deadline=60)
+
+ if response.status_code == 404:
+ raise files.InvalidFileNameError('Bucket %s does not exist.' % bucketname)
+ elif response.status_code == 401:
+ raise files.PermissionDeniedError('Permission denied to read bucket %s.' %
+ bucketname)
+
+ dom = minidom.parseString(response.content)
+
+ def __textValue(node):
+ return node.firstChild.nodeValue
+
+
+ error = dom.getElementsByTagName('Error')
+ if len(error) == 1:
+ details = error[0].getElementsByTagName('Details')
+ if len(details) == 1:
+ raise files.InvalidParameterError(__textValue(details[0]))
+ else:
+ code = __textValue(error[0].getElementsByTagName('Code')[0])
+ msg = __textValue(error[0].getElementsByTagName('Message')[0])
+ raise files.InvalidParameterError('%s: %s' % (code, msg))
+
+ return ['/'.join([path, __textValue(key)]) for key in
+ dom.getElementsByTagName('Key')]
+
+
+def _listdir_local(path, kwargs):
+ """Dev app server version of listdir.
+
+ See listdir for doc.
+ """
+ request = file_service_pb.ListDirRequest()
+ response = file_service_pb.ListDirResponse()
+ request.set_path(path)
+
+ if kwargs and kwargs.has_key('marker'):
+ request.set_marker(kwargs['marker'])
+ if kwargs and kwargs.has_key('max-keys'):
+ request.set_max_keys(kwargs['max-keys'])
+ if kwargs and kwargs.has_key('prefix'):
+ request.set_prefix(kwargs['prefix'])
+ files._make_call('ListDir', request, response)
+ return response.filenames_list()
+
+
def create(filename,
mime_type='application/octet-stream',
acl=None,
View
9 src/mapreduce/lib/files/testutil.py
@@ -25,6 +25,7 @@
from google.appengine.api import apiproxy_stub
+from mapreduce.lib.files import file_service_pb
class TestFileServiceStub(apiproxy_stub.APIProxyStub):
@@ -55,6 +56,14 @@ def _Dynamic_Read(self, request, response):
pos = request.pos()
response.set_data(content[pos:pos + request.max_bytes()])
+ def _Dynamic_Stat(self, request, response):
+ file_stat = response.add_stat()
+ file_stat.set_length(len(self.get_content(request.filename())))
+ file_stat.set_filename(request.filename())
+ file_stat.set_content_type(file_service_pb.FileContentType.RAW)
+ file_stat.set_finalized(True)
+ response.set_more_files_found(False)
+
def get_content(self, filename):
"""Get current in-memory file content."""
return self._file_content.get(filename, '')
View
55 src/mapreduce/lib/key_range/__init__.py
@@ -157,19 +157,28 @@ def advance(self, key):
key = key.to_old_key()
self.key_start = key
- def filter_query(self, query):
+ def filter_query(self, query, filters=None):
"""Add query filter to restrict to this key range.
Args:
query: A db.Query or ndb.Query instance.
+ filters: optional list of filters to apply to the query. Each filter is
+ a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
+ User filters are applied first.
Returns:
The input query restricted to this key range.
"""
if ndb is not None:
if _IsNdbQuery(query):
- return self.filter_ndb_query(query)
+ return self.filter_ndb_query(query, filters=filters)
assert not _IsNdbQuery(query)
+
+
+ if filters:
+ for f in filters:
+ query.filter("%s %s" % (f[0], f[1]), f[2])
+
if self.include_start:
start_comparator = ">="
else:
@@ -184,16 +193,25 @@ def filter_query(self, query):
query.filter("__key__ %s" % end_comparator, self.key_end)
return query
- def filter_ndb_query(self, query):
+ def filter_ndb_query(self, query, filters=None):
"""Add query filter to restrict to this key range.
Args:
query: An ndb.Query instance.
+ filters: optional list of filters to apply to the query. Each filter is
+ a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
+ User filters are applied first.
Returns:
The input query restricted to this key range.
"""
assert _IsNdbQuery(query)
+
+
+ if filters:
+ for f in filters:
+ query = query.filter(ndb.FilterNode(*f))
+
if self.include_start:
start_comparator = ">="
else:
@@ -212,16 +230,24 @@ def filter_ndb_query(self, query):
self.key_end))
return query
- def filter_datastore_query(self, query):
+ def filter_datastore_query(self, query, filters=None):
"""Add query filter to restrict to this key range.
Args:
query: A datastore.Query instance.
+ filters: optional list of filters to apply to the query. Each filter is
+ a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
+ User filters are applied first.
Returns:
The input query restricted to this key range.
"""
assert isinstance(query, datastore.Query)
+
+ if filters:
+ for f in filters:
+ query.update({"%s %s" % (f[0], f[1]): f[2]})
+
if self.include_start:
start_comparator = ">="
else:
@@ -330,28 +356,32 @@ def make_directed_datastore_query(self, kind, keys_only=False):
query = self.filter_datastore_query(query)
return query
- def make_ascending_query(self, kind_class, keys_only=False):
+ def make_ascending_query(self, kind_class, keys_only=False, filters=None):
"""Construct a query for this key range without setting the scan direction.
Args:
kind_class: A kind implementation class (a subclass of either
db.Model or ndb.Model).
keys_only: bool, default False, query only for keys.
+ filters: optional list of filters to apply to the query. Each filter is
+ a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
+ User filters are applied first.
Returns:
A db.Query or ndb.Query instance (corresponding to kind_class).
"""
if ndb is not None:
if issubclass(kind_class, ndb.Model):
- return self.make_ascending_ndb_query(kind_class, keys_only=keys_only)
+ return self.make_ascending_ndb_query(
+ kind_class, keys_only=keys_only, filters=filters)
assert self._app is None, '_app is not supported for db.Query'
query = db.Query(kind_class, namespace=self.namespace, keys_only=keys_only)
query.order("__key__")
- query = self.filter_query(query)
+ query = self.filter_query(query, filters=filters)
return query
- def make_ascending_ndb_query(self, kind_class, keys_only=False):
+ def make_ascending_ndb_query(self, kind_class, keys_only=False, filters=None):
"""Construct an NDB query for this key range, without the scan direction.
Args:
@@ -369,16 +399,19 @@ def make_ascending_ndb_query(self, kind_class, keys_only=False):
query = kind_class.query(app=self._app,
namespace=self.namespace,
default_options=default_options)
- query = self.filter_ndb_query(query)
+ query = self.filter_ndb_query(query, filters=filters)
query = query.order(kind_class._key)
return query
- def make_ascending_datastore_query(self, kind, keys_only=False):
+ def make_ascending_datastore_query(self, kind, keys_only=False, filters=None):
"""Construct a query for this key range without setting the scan direction.
Args:
kind: A string.
keys_only: bool, default False, use keys_only on Query?
+ filters: optional list of filters to apply to the query. Each filter is
+ a tuple: (<property_name_as_str>, <query_operation_as_str>, <value>).
+ User filters are applied first.
Returns:
A datastore.Query instance.
@@ -389,7 +422,7 @@ def make_ascending_datastore_query(self, kind, keys_only=False):
keys_only=keys_only)
query.Order(("__key__", datastore.Query.ASCENDING))
- query = self.filter_datastore_query(query)
+ query = self.filter_datastore_query(query, filters=filters)
return query
def split_range(self, batch_size=0):
View
26 test/mapreduce_test/gs_files_test.py → src/mapreduce/lib/pipeline/index.yaml 100644 → 100755
@@ -1,6 +1,4 @@
-#!/usr/bin/env python
-#
-# Copyright 2010 Google Inc.
+# Copyright 2012 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,14 +11,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+#
+
-'''
-Created on 2012/06/26
+# Add these indexes to your app to get a functional root status UI.
+indexes:
-@author: cloudysunny14
-'''
-import unittest
+- kind: _AE_Pipeline_Record
+ properties:
+ - name: is_root_pipeline
+ - name: start_time
+ direction: desc
-class CloudStorageInputReaderTest(unittest.TestCase):
- """Test Datastore{,Key,Entity}InputReader classes."""
-
+- kind: _AE_Pipeline_Record
+ properties:
+ - name: class_path
+ - name: start_time
+ direction: desc
View
2 src/mapreduce/lib/pipeline/models.py
@@ -59,7 +59,7 @@ class _PipelineRecord(db.Model):
root_pipeline = db.SelfReferenceProperty(
collection_name='child_pipelines_set')
fanned_out = db.ListProperty(db.Key, indexed=False)
- start_time = db.DateTimeProperty(indexed=False)
+ start_time = db.DateTimeProperty(indexed=True)
finalized_time = db.DateTimeProperty(indexed=False)
# One of these two will be set, depending on the size of the params.
View
316 src/mapreduce/lib/pipeline/pipeline.py
@@ -22,10 +22,12 @@
'PipelineRuntimeError', 'SlotNotFilledError', 'SlotNotDeclaredError',
'UnexpectedPipelineError', 'PipelineStatusError', 'Slot', 'Pipeline',
'PipelineFuture', 'After', 'InOrder', 'Retry', 'Abort', 'get_status_tree',
- 'create_handlers_map', 'set_enforce_auth',
+ 'get_pipeline_names', 'get_root_list', 'create_handlers_map',
+ 'set_enforce_auth',
]
import datetime
+import hashlib
import itertools
import logging
import os
@@ -47,6 +49,7 @@
# Relative imports
import models
from mapreduce.lib import simplejson
+import status_ui
import util as mr_util
@@ -62,12 +65,9 @@
# - Consider using sha1 of the UUID for user-supplied pipeline keys to ensure
# that they keys are definitely not sequential or guessable (Python's uuid1
# method generates roughly sequential IDs).
-# - Ability to list all root pipelines that are live on simple page.
# Potential TODOs:
# - Add support for ANY N barriers.
-# - Add a global 'flags' value passed in to start() that all pipelines have
-# access to; makes it easy to pass along Channel API IDs and such.
# - Allow Pipelines to declare they are "short" and optimize the evaluate()
# function to run as many of them in quick succession.
# - Add support in all Pipelines for hold/release where up-stream
@@ -114,7 +114,6 @@ class Retry(PipelineUserError):
class Abort(PipelineUserError):
"""The currently running pipeline should be aborted up to the root."""
-
class PipelineStatusError(Error):
"""Exceptions raised when trying to collect pipeline status."""
@@ -342,6 +341,29 @@ def __getattr__(self, name):
return slot
+class _PipelineMeta(type):
+ """Meta-class for recording all Pipelines that have been defined."""
+
+ # List of all Pipeline classes that have been seen.
+ _all_classes = []
+
+ def __new__(meta, name, bases, cls_dict):
+ """Initializes the class path of a Pipeline and saves it."""
+ cls = type.__new__(meta, name, bases, cls_dict)
+ meta._all_classes.append(cls)
+ return cls
+
+
+class ClassProperty(object):
+ """Descriptor that lets us have read-only class properties."""
+
+ def __init__(self, method):