diff --git a/model/pipeline/src/main/proto/metrics.proto b/model/pipeline/src/main/proto/metrics.proto index be61f8e78c47..fe59266b95f2 100644 --- a/model/pipeline/src/main/proto/metrics.proto +++ b/model/pipeline/src/main/proto/metrics.proto @@ -292,6 +292,21 @@ message MonitoringInfoSpecs { value: "The remaining amount of work for each active element. Each active element represents an independent amount of work not shared with any other active element." }] }]; + + // The (0-based) index of the latest item processed from the data channel. + // This gives an indication of the SDKs progress through the data channel, + // and is a lower bound on where it is able to split. + // For an SDK that processes items sequentially, this is equivalently the + // number of items fully processed (or -1 if processing has not yet started). + DATA_CHANNEL_READ_INDEX = 18 [(monitoring_info_spec) = { + urn: "beam:metric:data_channel:read_index:v1", + type: "beam:metrics:sum_int64:v1", + required_labels: [ "PTRANSFORM" ], + annotations: [{ + key: "description", + value: "The read index of the data channel." + }] + }]; } } @@ -511,4 +526,3 @@ message MonitoringInfoTypeUrns { // repeated string column_names = 1; // repeated MonitoringRow row_data = 2; // } - diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py index 2d1524cef33e..426b86a936de 100644 --- a/sdks/python/apache_beam/metrics/monitoring_infos.py +++ b/sdks/python/apache_beam/metrics/monitoring_infos.py @@ -56,6 +56,8 @@ [USER_COUNTER_URN, USER_DISTRIBUTION_URN, USER_GAUGE_URN]) WORK_REMAINING_URN = common_urns.monitoring_info_specs.WORK_REMAINING.spec.urn WORK_COMPLETED_URN = common_urns.monitoring_info_specs.WORK_COMPLETED.spec.urn +DATA_CHANNEL_READ_INDEX = ( + common_urns.monitoring_info_specs.DATA_CHANNEL_READ_INDEX.spec.urn) # TODO(ajamato): Implement the remaining types, i.e. Double types # Extrema types, etc. See: diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 2cd69c3fbd61..aca7bff70e16 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -217,6 +217,18 @@ def process_encoded(self, encoded_windowed_values): input_stream, True) self.output(decoded_value) + def monitoring_infos(self, transform_id, tag_to_pcollection_id): + # type: (str, Dict[str, str]) -> Dict[FrozenSet, metrics_pb2.MonitoringInfo] + all_monitoring_infos = super(DataInputOperation, self).monitoring_infos( + transform_id, tag_to_pcollection_id) + read_progress_info = monitoring_infos.int64_counter( + monitoring_infos.DATA_CHANNEL_READ_INDEX, + self.index, + ptransform=transform_id) + all_monitoring_infos[monitoring_infos.to_key( + read_progress_info)] = read_progress_info + return all_monitoring_infos + def try_split( self, fraction_of_remainder, total_buffer_size, allowed_split_points): # type: (...) -> Optional[Tuple[int, Optional[operations.SdfSplitResultsPrimary], Optional[operations.SdfSplitResultsResidual], int]]