From 7c464ea4603a988d8703e727d4ce68b815b98673 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Mon, 27 Jan 2020 14:54:04 +0100 Subject: [PATCH] [BEAM-6957] Enable Counter/Distribution metrics tests for Portable Spark Runner --- runners/spark/job-server/build.gradle | 3 --- .../org/apache/beam/runners/spark/SparkPipelineResult.java | 7 ++++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/runners/spark/job-server/build.gradle b/runners/spark/job-server/build.gradle index 30dc24d09b522..90393b75f5ebe 100644 --- a/runners/spark/job-server/build.gradle +++ b/runners/spark/job-server/build.gradle @@ -99,11 +99,8 @@ def portableValidatesRunnerTask(String name) { includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' excludeCategories 'org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders' excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB' - excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics' excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics' - excludeCategories 'org.apache.beam.sdk.testing.UsesCounterMetrics' excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging' - excludeCategories 'org.apache.beam.sdk.testing.UsesDistributionMetrics' excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage' excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics' excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle' diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java index fd3fbcff6a11e..55953570ce2d5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java @@ -155,9 +155,10 @@ static class PortableBatchMode extends BatchMode implements PortablePipelineResu } @Override - public JobApi.MetricResults portableMetrics() throws UnsupportedOperationException { - LOG.warn("Collecting monitoring infos is not implemented yet in Spark portable runner."); - return JobApi.MetricResults.newBuilder().build(); + public JobApi.MetricResults portableMetrics() { + return JobApi.MetricResults.newBuilder() + .addAllAttempted(MetricsAccumulator.getInstance().value().getMonitoringInfos()) + .build(); } }