Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions sdks/python/apache_beam/metrics/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from apache_beam.metrics.cells import DistributionCell
from apache_beam.metrics.cells import GaugeCell
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.runners.worker import statesampler


class MetricKey(object):
Expand Down Expand Up @@ -149,12 +150,6 @@ def current_container(self):
MetricsEnvironment = _MetricsEnvironment()


def metrics_startup():
"""Initialize metrics context to run."""
global statesampler # pylint: disable=global-variable-not-assigned
from apache_beam.runners.worker import statesampler


class MetricsContainer(object):
"""Holds the metrics of a single step and a single bundle."""
def __init__(self, step_name):
Expand Down
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ def gauge(namespace, name):
return Metrics.DelegatingGauge(MetricName(namespace, name))

class DelegatingCounter(Counter):
"""Metrics Counter that Delegates functionality to MetricsEnvironment."""

def __init__(self, metric_name):
super(Metrics.DelegatingCounter, self).__init__()
self.metric_name = metric_name

def inc(self, n=1):
Expand All @@ -102,7 +105,10 @@ def inc(self, n=1):
container.get_counter(self.metric_name).inc(n)

class DelegatingDistribution(Distribution):
"""Metrics Distribution Delegates functionality to MetricsEnvironment."""

def __init__(self, metric_name):
super(Metrics.DelegatingDistribution, self).__init__()
self.metric_name = metric_name

def update(self, value):
Expand All @@ -111,7 +117,10 @@ def update(self, value):
container.get_distribution(self.metric_name).update(value)

class DelegatingGauge(Gauge):
"""Metrics Gauge that Delegates functionality to MetricsEnvironment."""

def __init__(self, metric_name):
super(Metrics.DelegatingGauge, self).__init__()
self.metric_name = metric_name

def set(self, value):
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def test_create_counter_distribution(self):
statesampler.set_current_tracker(sampler)
state1 = sampler.scoped_state('mystep', 'myState',
metrics_container=MetricsContainer('mystep'))
sampler.start()
with state1:
counter_ns = 'aCounterNamespace'
distro_ns = 'aDistributionNamespace'
Expand All @@ -144,6 +145,7 @@ def test_create_counter_distribution(self):
self.assertEqual(
container.distributions[MetricName(distro_ns, name)].get_cumulative(),
DistributionData(12, 2, 2, 10))
sampler.stop()


if __name__ == '__main__':
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from apache_beam.options.pipeline_options import TestOptions
from apache_beam.portability import common_urns
from apache_beam.pvalue import AsSideInput
from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
from apache_beam.runners.dataflow.internal.names import PropertyNames
Expand Down Expand Up @@ -362,6 +361,8 @@ def run_pipeline(self, pipeline):
result = DataflowPipelineResult(
self.dataflow_client.create_job(self.job), self)

# TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.
from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
self._metrics = DataflowMetrics(self.dataflow_client, result, self.job)
result.metric_results = self._metrics
return result
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from apache_beam import coders
from apache_beam import typehints
from apache_beam.internal.util import ArgumentPlaceholder
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.options.pipeline_options import DirectOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import RuntimeValueProvider
Expand Down Expand Up @@ -357,6 +356,8 @@ def visit_transform(self, applied_ptransform):
pipeline.visit(visitor)
clock = TestClock() if visitor.uses_test_stream else RealClock()

# TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.
from apache_beam.metrics.execution import MetricsEnvironment
MetricsEnvironment.set_metrics_supported(True)
logging.info('Running pipeline with DirectRunner.')
self.consumer_tracking_visitor = ConsumerTrackingPipelineVisitor()
Expand Down
2 changes: 0 additions & 2 deletions sdks/python/apache_beam/runners/worker/statesampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import threading
from collections import namedtuple

from apache_beam.metrics import execution
from apache_beam.utils.counters import Counter
from apache_beam.utils.counters import CounterName

Expand Down Expand Up @@ -76,7 +75,6 @@ def stop_if_still_running(self):
def start(self):
self.tracked_thread = threading.current_thread()
set_current_tracker(self)
execution.metrics_startup()
super(StateSampler, self).start()
self.started = True

Expand Down