Skip to content

Commit

Permalink
Iterated on some PR changes, unit tests in next commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Amato committed Aug 15, 2018
1 parent b25d883 commit cf5d89d
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
10 changes: 7 additions & 3 deletions sdks/python/apache_beam/metrics/monitoring_infos.py
Expand Up @@ -48,6 +48,10 @@
DISTRIBUTION_INT64_TYPE = 'beam:metrics:distribution_int_64'
LATEST_INT64_TYPE = 'beam:metrics:latest_int_64'

COUNTER_TYPES = set([SUM_INT64_TYPE])
DISTRIBUTION_TYPES = set([DISTRIBUTION_INT64_TYPE])
GAUGE_TYPES = set([LATEST_INT64_TYPE])


def to_timestamp_proto(timestamp_secs):
"""Converts seconds since epoch to a google.protobuf.Timestamp.
Expand Down Expand Up @@ -182,17 +186,17 @@ def user_metric_urn(namespace, name):

def is_counter(monitoring_info_proto):
"""Returns true if the monitoring info is a coutner metric."""
return monitoring_info_proto.type in [SUM_INT64_TYPE]
return monitoring_info_proto.type in COUNTER_TYPES


def is_distribution(monitoring_info_proto):
"""Returns true if the monitoring info is a distrbution metric."""
return monitoring_info_proto.type in [DISTRIBUTION_INT64_TYPE]
return monitoring_info_proto.type in DISTRIBUTION_TYPES


def is_gauge(monitoring_info_proto):
"""Returns true if the monitoring info is a gauge metric."""
return monitoring_info_proto.type in [LATEST_INT64_TYPE]
return monitoring_info_proto.type in GAUGE_TYPES


def is_user_monitoring_info(monitoring_info_proto):
Expand Down
2 changes: 0 additions & 2 deletions sdks/python/apache_beam/runners/worker/sdk_worker.py
Expand Up @@ -223,7 +223,6 @@ def register(self, request, instruction_id):
instruction_id=instruction_id,
register=beam_fn_api_pb2.RegisterResponse())

# Are these methods dead code? No. invoked via do_instruction
def process_bundle(self, request, instruction_id):
process_bundle_desc = self.fns[request.process_bundle_descriptor_reference]
state_handler = self.state_handler_factory.create_state_handler(
Expand All @@ -245,7 +244,6 @@ def process_bundle(self, request, instruction_id):
metrics=processor.metrics(),
monitoring_infos=processor.monitoring_infos()))

# Are these methods dead code?
def process_bundle_progress(self, request, instruction_id):
# It is an error to get progress for a not-in-flight bundle.
processor = self.bundle_processors.get(request.instruction_reference)
Expand Down

0 comments on commit cf5d89d

Please sign in to comment.