From f06f5e1ee8d745a2cfb8c66055291d1c099a3d60 Mon Sep 17 00:00:00 2001 From: Pablo Date: Wed, 15 Feb 2017 15:16:12 -0800 Subject: [PATCH 1/2] Making metrics queriable in Python SDK. --- .../examples/cookbook/datastore_wordcount.py | 9 +- sdks/python/apache_beam/examples/wordcount.py | 11 +- sdks/python/apache_beam/metrics/metric.py | 2 +- sdks/python/apache_beam/pipeline_test.py | 17 +++ .../runners/dataflow/dataflow_metrics.py | 77 +++++++++++- .../runners/dataflow/dataflow_metrics_test.py | 116 ++++++++++++++++++ .../runners/dataflow/dataflow_runner.py | 8 +- .../runners/dataflow/dataflow_runner_test.py | 5 - .../runners/dataflow/internal/apiclient.py | 13 ++ 9 files changed, 246 insertions(+), 12 deletions(-) diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index bb5d5c083927..9fa10d3bb260 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -75,6 +75,7 @@ from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore from apache_beam.metrics import Metrics +from apache_beam.metrics.metric import MetricsFilter from apache_beam.utils.pipeline_options import GoogleCloudOptions from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions @@ -247,8 +248,12 @@ def run(argv=None): result = read_from_datastore(gcloud_options.project, known_args, pipeline_options) - result.metrics().query() - #TODO(pabloem)(BEAM-1366) Fix these once metrics are 100% queriable. + empty_lines_filter = MetricsFilter().with_name('empty_lines') + query_result = result.metrics().query(empty_lines_filter) + if query_result['counters']: + empty_lines_counter = query_result['counters'][0] + logging.info('number of empty lines: %d', empty_lines_counter.committed) + # TODO(pabloem)(BEAM-1366): Add querying of MEAN metrics. if __name__ == '__main__': diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 4d482b02c489..50c03286dcdb 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -27,6 +27,7 @@ from apache_beam.io import ReadFromText from apache_beam.io import WriteToText from apache_beam.metrics import Metrics +from apache_beam.metrics.metric import MetricsFilter from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions @@ -38,6 +39,8 @@ def __init__(self): super(WordExtractingDoFn, self).__init__() self.words_counter = Metrics.counter(self.__class__, 'words') self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths') + self.word_lengths_dist = Metrics.distribution( + self.__class__, 'word_len_dist') self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines') def process(self, element): @@ -58,6 +61,7 @@ def process(self, element): for w in words: self.words_counter.inc() self.word_lengths_counter.inc(len(w)) + self.word_lengths_dist.update(len(w)) return words @@ -100,7 +104,12 @@ def run(argv=None): # Actually run the pipeline (all operations above are deferred). result = p.run() result.wait_until_finish() - #TODO(pabloem)(BEAM-1366) Add querying of metrics once they are queriable. + empty_lines_filter = MetricsFilter().with_name('empty_lines') + query_result = result.metrics().query(empty_lines_filter) + if query_result['counters']: + empty_lines_counter = query_result['counters'][0] + logging.info('number of empty lines: %d', empty_lines_counter.committed) + # TODO(pabloem)(BEAM-1366): Add querying of MEAN metrics. if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 13ca77b08be4..a0e3cbab2aa0 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -105,7 +105,7 @@ def matches(filter, metric_key): else: return False - def query(self, filter): + def query(self, filter=None): raise NotImplementedError diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 6c2512f4ae23..ad5dd1a32cd9 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -21,6 +21,9 @@ import platform import unittest +from nose.plugins.attrib import attr + +from apache_beam.metrics import Metrics from apache_beam.pipeline import Pipeline from apache_beam.pipeline import PipelineOptions from apache_beam.pipeline import PipelineVisitor @@ -50,6 +53,7 @@ class _Reader(object): def __init__(self, vals): self._vals = vals + self._output_counter = Metrics.counter('main', 'outputs') def __enter__(self): return self @@ -59,6 +63,7 @@ def __exit__(self, exception_type, exception_value, traceback): def __iter__(self): for v in self._vals: + self._output_counter.inc() yield v def __init__(self, vals): @@ -132,6 +137,18 @@ def test_create_singleton_pcollection(self): assert_that(pcoll, equal_to([[1, 2, 3]])) pipeline.run() + @attr('ValidatesRunner') + def test_metrics_in_source(self): + pipeline = TestPipeline() + pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3, 4, 5, 6])) + assert_that(pcoll, equal_to([1, 2, 3, 4, 5, 6])) + res = pipeline.run() + metric_results = res.metrics().query() + outputs_counter = metric_results['counters'][0] + self.assertEqual(outputs_counter.key.step, 'read') + self.assertEqual(outputs_counter.key.metric.name, 'outputs') + self.assertEqual(outputs_counter.committed, 6) + def test_read(self): pipeline = TestPipeline() pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3])) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py index 1d86f2f9727c..db4b7f919913 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py @@ -21,13 +21,88 @@ service. """ +from collections import defaultdict + +from apache_beam.metrics.execution import MetricKey +from apache_beam.metrics.execution import MetricResult from apache_beam.metrics.metric import MetricResults +from apache_beam.metrics.metricbase import MetricName # TODO(pabloem)(JIRA-1381) Implement this once metrics are queriable from # dataflow service class DataflowMetrics(MetricResults): + """Implementation of MetricResults class for the Dataflow runner.""" + + def __init__(self, dataflow_client=None, job_result=None): + """Initialize the Dataflow metrics object. + + Args: + dataflow_client: apiclient.DataflowApplicationClient to interact with the + dataflow service. + job_result: DataflowPipelineResult with the state and id information of + the job + """ + super(DataflowMetrics, self).__init__() + self._dataflow_client = dataflow_client + self.job_result = job_result + self._queried_after_termination = False + self._cached_metrics = None + + def _populate_metric_results(self, response): + """Take a list of metrics, and convert it to a list of MetricResult.""" + user_metrics = [metric + for metric in response.metrics + if metric.name.origin == 'user'] + + # Get the tentative/committed versions of every metric together. + metrics_by_name = defaultdict(lambda: {}) + for metric in user_metrics: + tentative = [prop + for prop in metric.name.context.additionalProperties + if prop.key == 'tentative' and prop.value == 'true'] + key = 'tentative' if tentative else 'committed' + metrics_by_name[metric.name.name][key] = metric + + # Now we create the MetricResult elements. + result = [] + for name, metric in metrics_by_name.iteritems(): + if (name.endswith('(DIST)') or + name.endswith('[MIN]') or + name.endswith('[MAX]') or + name.endswith('[MEAN]') or + name.endswith('[COUNT]')): + # Distributions are not yet fully supported in this runner + continue + [step, namespace, name] = name.split('/') + key = MetricKey(step, MetricName(namespace, name)) + attempted = metric['tentative'].scalar.integer_value + committed = metric['committed'].scalar.integer_value + result.append(MetricResult(key, attempted=attempted, committed=committed)) + + return result + + def _get_metrics_from_dataflow(self): + """Return cached metrics or query the dataflow service.""" + try: + job_id = self.job_result.job_id() + except AttributeError: + job_id = None + if not job_id: + raise ValueError('Can not query metrics. Job id is unknown.') + + if self._cached_metrics: + return self._cached_metrics + + job_metrics = self._dataflow_client.get_job_metrics(job_id) + # If the job has terminated, metrics will not change and we can cache them. + if self.job_result._is_in_terminal_state(): + self._cached_metrics = job_metrics + return job_metrics def query(self, filter=None): - return {'counters': [], + response = self._get_metrics_from_dataflow() + counters = self._populate_metric_results(response) + # TODO(pabloem): Populate distributions once they are available. + return {'counters': [c for c in counters if self.matches(filter, c.key)], 'distributions': []} diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py index 5475ac739132..4d95443f33c0 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py @@ -18,3 +18,119 @@ Tests corresponding to the DataflowRunner implementation of MetricsResult, the DataflowMetrics class. """ +import unittest + +import mock + +from apache_beam.metrics.execution import MetricKey +from apache_beam.metrics.execution import MetricResult +from apache_beam.metrics.metricbase import MetricName +from apache_beam.runners.dataflow import dataflow_metrics + + +class DictToObject(object): + """Translate from a dict(list()) structure to an object structure""" + def __init__(self, data): + for name, value in data.iteritems(): + setattr(self, name, self._wrap(value)) + + def _wrap(self, value): + if isinstance(value, (tuple, list, set, frozenset)): + return type(value)([self._wrap(v) for v in value]) + else: + return DictToObject(value) if isinstance(value, dict) else value + + +class TestDataflowMetrics(unittest.TestCase): + + BASIC_COUNTER_LIST = {"metrics": [ + {"name": {"context": + {"additionalProperties":[ + {"key": "original_name", + "value": "user-split-split/__main__.WordExtractingDoFn/" + "empty_lines_TentativeAggregateValue"}, + {"key": "step", "value": "split"}]}, + "name": "split/__main__.WordExtractingDoFn/empty_lines", + "origin": "user"}, + "scalar": {"integer_value": 1080}, + "updateTime": "2017-02-23T01:13:36.659Z"}, + {"name": {"context": + {"additionalProperties": [ + {"key": "original_name", + "value": "user-split-split/__main__.WordExtractingDoFn/" + "empty_lines_TentativeAggregateValue"}, + {"key": "step", "value": "split"}, + {"key": "tentative", "value": "true"}]}, + "name": "split/__main__.WordExtractingDoFn/empty_lines", + "origin": "user"}, + "scalar": {"integer_value": 1080}, + "updateTime": "2017-02-23T01:13:36.659Z"}, + {"name": {"context": + {"additionalProperties": [ + {"key": "original_name", + "value": "user-split-split/__main__.WordExtractingDoFn/" + "words_TentativeAggregateValue"}, + {"key": "step", "value": "split"}]}, + "name": "split/__main__.WordExtractingDoFn/words", + "origin": "user"}, + "scalar": {"integer_value": 26181}, + "updateTime": "2017-02-23T01:13:36.659Z"}, + {"name": {"context": + {"additionalProperties": [ + {"key": "original_name", + "value": "user-split-split/__main__.WordExtractingDoFn/" + "words_TentativeAggregateValue"}, + {"key": "step", "value": "split"}, + {"key": "tentative", "value": "true"}]}, + "name": "split/__main__.WordExtractingDoFn/words", + "origin": "user"}, + "scalar": {"integer_value": 26185}, + "updateTime": "2017-02-23T01:13:36.659Z"} + ]} + + def setup_mock_client_result(self): + mock_client = mock.Mock() + mock_query_result = DictToObject(self.BASIC_COUNTER_LIST) + mock_client.get_job_metrics.return_value = mock_query_result + mock_job_result = mock.Mock() + mock_job_result.job_id.return_value = 1 + mock_job_result._is_in_terminal_state.return_value = False + return mock_client, mock_job_result + + def test_cache_functions(self): + mock_client, mock_job_result = self.setup_mock_client_result() + dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result) + + # At first creation, we should always query dataflow. + self.assertTrue(dm._cached_metrics is None) + + # Right after querying, we still query again. + dm.query() + self.assertTrue(dm._cached_metrics is None) + + # The job has ended. The query should not run again after this. + mock_job_result._is_in_terminal_state.return_value = True + dm.query() + self.assertTrue(dm._cached_metrics) + + def test_query_counters(self): + mock_client, mock_job_result = self.setup_mock_client_result() + dm = dataflow_metrics.DataflowMetrics(mock_client, mock_job_result) + query_result = dm.query() + expected_counters = [ + MetricResult( + MetricKey('split', + MetricName('__main__.WordExtractingDoFn', 'empty_lines')), + 1080, 1080), + MetricResult( + MetricKey('split', + MetricName('__main__.WordExtractingDoFn', 'words')), + 26181, 26185), + ] + self.assertEqual(sorted(query_result['counters'], + key=lambda x: x.key.metric.name), + sorted(expected_counters, + key=lambda x: x.key.metric.name)) + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index ebc902408750..6ce9406bcc0e 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -171,9 +171,13 @@ def run(self, pipeline): pipeline.options, job_version) # Create the job - return DataflowPipelineResult( + result = DataflowPipelineResult( self.dataflow_client.create_job(self.job), self) + self._metrics = DataflowMetrics(self.dataflow_client, result) + result.metric_results = self._metrics + return result + def _get_typehint_based_encoding(self, typehint, window_coder): """Returns an encoding based on a typehint object.""" return self._get_cloud_encoding(self._get_coder(typehint, @@ -639,7 +643,7 @@ def job_id(self): return self._job.id def metrics(self): - return DataflowMetrics() + return getattr(self, 'metric_results', None) @property def has_job(self): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 4a0815a64b67..cc5928a2e439 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -55,11 +55,6 @@ class DataflowRunnerTest(unittest.TestCase): '--temp_location=/dev/null', '--no_auth=True'] - def test_dataflow_runner_has_metrics(self): - df_result = DataflowPipelineResult('somejob', 'somerunner') - self.assertTrue(df_result.metrics()) - self.assertTrue(df_result.metrics().query()) - @mock.patch('time.sleep', return_value=None) def test_wait_until_finish(self, patched_time_sleep): values_enum = dataflow_api.Job.CurrentStateValueValuesEnum diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index d4fa3ce09d02..269ec7aae588 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -436,6 +436,19 @@ def create_job_description(self, job): # TODO(silviuc): Remove the debug logging eventually. logging.info('JOB: %s', job) + @retry.with_exponential_backoff() # Using retry defaults from utils/retry.py + def get_job_metrics(self, job_id): + request = dataflow.DataflowProjectsJobsGetMetricsRequest() + request.jobId = job_id + request.projectId = self.google_cloud_options.project + try: + response = self._client.projects_jobs.GetMetrics(request) + except exceptions.BadStatusCodeError as e: + logging.error('HTTP status %d. Unable to query metrics', + e.response.status) + raise + return response + def submit_job_description(self, job): """Creates and excutes a job request.""" request = dataflow.DataflowProjectsJobsCreateRequest() From d294e6f7013715845496385cc6731ae94aa8e893 Mon Sep 17 00:00:00 2001 From: Pablo Date: Fri, 24 Feb 2017 12:05:52 -0800 Subject: [PATCH 2/2] Addressing comments. --- sdks/python/apache_beam/pipeline_test.py | 4 ++-- .../apache_beam/runners/dataflow/dataflow_metrics.py | 3 +++ .../runners/dataflow/dataflow_metrics_test.py | 12 ++++++++++++ .../apache_beam/runners/dataflow/dataflow_runner.py | 3 ++- .../runners/dataflow/internal/apiclient.py | 2 +- 5 files changed, 20 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index ad5dd1a32cd9..a08edf875df8 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -140,12 +140,12 @@ def test_create_singleton_pcollection(self): @attr('ValidatesRunner') def test_metrics_in_source(self): pipeline = TestPipeline() - pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3, 4, 5, 6])) + pcoll = pipeline | Read(FakeSource([1, 2, 3, 4, 5, 6])) assert_that(pcoll, equal_to([1, 2, 3, 4, 5, 6])) res = pipeline.run() metric_results = res.metrics().query() outputs_counter = metric_results['counters'][0] - self.assertEqual(outputs_counter.key.step, 'read') + self.assertEqual(outputs_counter.key.step, 'Read') self.assertEqual(outputs_counter.key.metric.name, 'outputs') self.assertEqual(outputs_counter.committed, 6) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py index db4b7f919913..db5a2bcf8c4f 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py @@ -22,6 +22,7 @@ """ from collections import defaultdict +from warnings import warn from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.execution import MetricResult @@ -72,6 +73,8 @@ def _populate_metric_results(self, response): name.endswith('[MAX]') or name.endswith('[MEAN]') or name.endswith('[COUNT]')): + warn('Distribution metrics will be ignored in the MetricsResult.query' + 'method. You can see them in the Dataflow User Interface.') # Distributions are not yet fully supported in this runner continue [step, namespace, name] = name.split('/') diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py index 4d95443f33c0..8d18fae29bc4 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py @@ -85,6 +85,18 @@ class TestDataflowMetrics(unittest.TestCase): "name": "split/__main__.WordExtractingDoFn/words", "origin": "user"}, "scalar": {"integer_value": 26185}, + "updateTime": "2017-02-23T01:13:36.659Z"}, + {"name": {"context": + {"additionalProperties": [ + {"key": "original_name", + "value": "user-split-split/__main__.WordExtractingDoFn/" + "secretdistribution(DIST)"}, + {"key": "step", "value": "split"}, + {"key": "tentative", "value": "true"}]}, + "name": + "split/__main__.WordExtractingDoFn/secretdistribution(DIST)", + "origin": "user"}, + "scalar": {"integer_value": 15}, "updateTime": "2017-02-23T01:13:36.659Z"} ]} diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 6ce9406bcc0e..25f2fd4f358e 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -638,12 +638,13 @@ def __init__(self, job, runner): """Job is a Job message from the Dataflow API.""" self._job = job self._runner = runner + self.metric_results = None def job_id(self): return self._job.id def metrics(self): - return getattr(self, 'metric_results', None) + return self.metric_results @property def has_job(self): diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 269ec7aae588..e980b14208c4 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -436,7 +436,7 @@ def create_job_description(self, job): # TODO(silviuc): Remove the debug logging eventually. logging.info('JOB: %s', job) - @retry.with_exponential_backoff() # Using retry defaults from utils/retry.py + @retry.with_exponential_backoff(num_retries=3, initial_delay_secs=3) def get_job_metrics(self, job_id): request = dataflow.DataflowProjectsJobsGetMetricsRequest() request.jobId = job_id