Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions model/pipeline/src/main/proto/metrics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ message MonitoringInfo {
BIGQUERY_TABLE = 13 [(label_props) = { name: "BIGQUERY_TABLE" }];
BIGQUERY_VIEW = 14 [(label_props) = { name: "BIGQUERY_VIEW" }];
BIGQUERY_QUERY_NAME = 15 [(label_props) = { name: "BIGQUERY_QUERY_NAME" }];
GCS_BUCKET = 16 [(label_props) = { name: "GCS_BUCKET"}];
GCS_PROJECT_ID = 17 [(label_props) = { name: "GCS_PROJECT_ID"}];
}

// A set of key and value labels which define the scope of the metric. For
Expand Down
69 changes: 64 additions & 5 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
from itertools import islice

from apache_beam.internal.http_client import get_new_http
from apache_beam.internal.metrics.metric import ServiceCallMetric
from apache_beam.io.filesystemio import Downloader
from apache_beam.io.filesystemio import DownloaderStream
from apache_beam.io.filesystemio import PipeStream
from apache_beam.io.filesystemio import Uploader
from apache_beam.io.filesystemio import UploaderStream
from apache_beam.io.gcp import resource_identifiers
from apache_beam.metrics import monitoring_infos
from apache_beam.utils import retry

__all__ = ['GcsIO']
Expand Down Expand Up @@ -153,6 +156,14 @@ def __init__(self, storage_client=None):
response_encoding='utf8')
self.client = storage_client
self._rewrite_cb = None
self.bucket_to_project_number = {}

def get_project_number(self, bucket):
if bucket not in self.bucket_to_project_number:
bucket_metadata = self.get_bucket(bucket_name=bucket)
self.bucket_to_project_number[bucket] = bucket_metadata.projectNumber

return self.bucket_to_project_number[bucket]

def _set_rewrite_response_callback(self, callback):
"""For testing purposes only. No backward compatibility guarantees.
Expand Down Expand Up @@ -208,13 +219,20 @@ def open(
"""
if mode == 'r' or mode == 'rb':
downloader = GcsDownloader(
self.client, filename, buffer_size=read_buffer_size)
self.client,
filename,
buffer_size=read_buffer_size,
get_project_number=self.get_project_number)
return io.BufferedReader(
DownloaderStream(
downloader, read_buffer_size=read_buffer_size, mode=mode),
buffer_size=read_buffer_size)
elif mode == 'w' or mode == 'wb':
uploader = GcsUploader(self.client, filename, mime_type)
uploader = GcsUploader(
self.client,
filename,
mime_type,
get_project_number=self.get_project_number)
return io.BufferedWriter(
UploaderStream(uploader, mode=mode), buffer_size=128 * 1024)
else:
Expand Down Expand Up @@ -555,19 +573,37 @@ def list_prefix(self, path):


class GcsDownloader(Downloader):
def __init__(self, client, path, buffer_size):
def __init__(self, client, path, buffer_size, get_project_number):
self._client = client
self._path = path
self._bucket, self._name = parse_gcs_path(path)
self._buffer_size = buffer_size
self._get_project_number = get_project_number

project_number = self._get_project_number(self._bucket)

# Create a request count metric
resource = resource_identifiers.GoogleCloudStorageBucket(self._bucket)
labels = {
monitoring_infos.SERVICE_LABEL: 'Storage',
monitoring_infos.METHOD_LABEL: 'Objects.get',
monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.GCS_BUCKET_LABEL: self._bucket,
monitoring_infos.GCS_PROJECT_ID_LABEL: project_number
}
service_call_metric = ServiceCallMetric(
request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
base_labels=labels)

# Get object state.
self._get_request = (
storage.StorageObjectsGetRequest(
bucket=self._bucket, object=self._name))
try:
metadata = self._get_object_metadata(self._get_request)
service_call_metric.call('ok')
except HttpError as http_error:
service_call_metric.call(http_error)
if http_error.status_code == 404:
raise IOError(errno.ENOENT, 'Not found: %s' % self._path)
else:
Expand All @@ -586,7 +622,12 @@ def __init__(self, client, path, buffer_size):
auto_transfer=False,
chunksize=self._buffer_size,
num_retries=20)
self._client.objects.Get(self._get_request, download=self._downloader)

try:
self._client.objects.Get(self._get_request, download=self._downloader)
service_call_metric.call('ok')
except HttpError as e:
service_call_metric.call(e)

@retry.with_exponential_backoff(
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
Expand All @@ -605,11 +646,12 @@ def get_range(self, start, end):


class GcsUploader(Uploader):
def __init__(self, client, path, mime_type):
def __init__(self, client, path, mime_type, get_project_number):
self._client = client
self._path = path
self._bucket, self._name = parse_gcs_path(path)
self._mime_type = mime_type
self._get_project_number = get_project_number

# Set up communication with child thread.
parent_conn, child_conn = multiprocessing.Pipe()
Expand Down Expand Up @@ -643,9 +685,26 @@ def _start_upload(self):
#
# The uploader by default transfers data in chunks of 1024 * 1024 bytes at
# a time, buffering writes until that size is reached.

project_number = self._get_project_number(self._bucket)

# Create a request count metric
resource = resource_identifiers.GoogleCloudStorageBucket(self._bucket)
labels = {
monitoring_infos.SERVICE_LABEL: 'Storage',
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the project not available on the API response after the first call? When you ran it on a pipeline were you able to see it populated on the API responses with logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debugged it and also logged it but it seems that the acl is empty, so I couldn't get the project.
<Object acl: [] bucket: 'gcsio-metrics-test' generation: 1 name: 'dummy_mode_file' size: 5242980>

monitoring_infos.METHOD_LABEL: 'Objects.insert',
monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.GCS_BUCKET_LABEL: self._bucket,
monitoring_infos.GCS_PROJECT_ID_LABEL: project_number
}
service_call_metric = ServiceCallMetric(
request_count_urn=monitoring_infos.API_REQUEST_COUNT_URN,
base_labels=labels)
try:
self._client.objects.Insert(self._insert_request, upload=self._upload)
service_call_metric.call('ok')
except Exception as e: # pylint: disable=broad-except
service_call_metric.call(e)
_LOGGER.error(
'Error in _start_upload while inserting file %s: %s',
self._path,
Expand Down
66 changes: 65 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,20 @@

# Protect against environments where apitools library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.metrics.metricbase import MetricName

try:
from apache_beam.io.gcp import gcsio
from apache_beam.io.gcp import gcsio, resource_identifiers
from apache_beam.io.gcp.internal.clients import storage
from apitools.base.py.exceptions import HttpError
except ImportError:
HttpError = None
# pylint: enable=wrong-import-order, wrong-import-position

DEFAULT_GCP_PROJECT = 'apache-beam-testing'
DEFAULT_PROJECT_NUMBER = 1


class FakeGcsClient(object):
Expand All @@ -50,6 +55,7 @@ class FakeGcsClient(object):

def __init__(self):
self.objects = FakeGcsObjects()
self.buckets = FakeGcsBuckets()
# Referenced in GcsIO.copy_batch() and GcsIO.delete_batch().
self._http = object()

Expand Down Expand Up @@ -79,6 +85,17 @@ def get_metadata(self):
updated=last_updated_datetime)


class FakeGcsBuckets(object):
def __init__(self):
pass

def get_bucket(self, bucket):
return storage.Bucket(name=bucket, projectNumber=DEFAULT_PROJECT_NUMBER)

def Get(self, get_request):
return self.get_bucket(get_request.bucket)


class FakeGcsObjects(object):
def __init__(self):
self.files = {}
Expand Down Expand Up @@ -751,6 +768,53 @@ def test_mime_binary_encoding(self):
generator._handle_text(message)
self.assertEqual(test_msg.encode('ascii'), output_buffer.getvalue())

def test_downloader_monitoring_info(self):
file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
file_size = 5 * 1024 * 1024 + 100
random_file = self._insert_random_file(self.client, file_name, file_size)
self.gcs.open(file_name, 'r')

resource = resource_identifiers.GoogleCloudStorageBucket(random_file.bucket)
labels = {
monitoring_infos.SERVICE_LABEL: 'Storage',
monitoring_infos.METHOD_LABEL: 'Objects.get',
monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.GCS_BUCKET_LABEL: random_file.bucket,
monitoring_infos.GCS_PROJECT_ID_LABEL: DEFAULT_PROJECT_NUMBER,
monitoring_infos.STATUS_LABEL: 'ok'
}

metric_name = MetricName(
None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels)
metric_value = MetricsEnvironment.process_wide_container().get_counter(
metric_name).get_cumulative()

self.assertEqual(metric_value, 2)

def test_uploader_monitoring_info(self):
file_name = 'gs://gcsio-metrics-test/dummy_mode_file'
file_size = 5 * 1024 * 1024 + 100
random_file = self._insert_random_file(self.client, file_name, file_size)
f = self.gcs.open(file_name, 'w')

resource = resource_identifiers.GoogleCloudStorageBucket(random_file.bucket)
labels = {
monitoring_infos.SERVICE_LABEL: 'Storage',
monitoring_infos.METHOD_LABEL: 'Objects.insert',
monitoring_infos.RESOURCE_LABEL: resource,
monitoring_infos.GCS_BUCKET_LABEL: random_file.bucket,
monitoring_infos.GCS_PROJECT_ID_LABEL: DEFAULT_PROJECT_NUMBER,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these labels supposed to map to MonitoringInfo.labels? That's a map<string, string> but this value is an integer.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, I converted this value to str() and it worked 👍

monitoring_infos.STATUS_LABEL: 'ok'
}

f.close()
metric_name = MetricName(
None, None, urn=monitoring_infos.API_REQUEST_COUNT_URN, labels=labels)
metric_value = MetricsEnvironment.process_wide_container().get_counter(
Copy link
Member

@TheNeuralBit TheNeuralBit Jun 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this process_wide_container is a global container that ends up being shared between tests, because we often run many tests in one process.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the proper pattern it to reset it before each test

MetricsEnvironment.set_process_wide_container(new MetricsContainerImpl())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a MetricsEnvironment.process_wide_container().reset() at the begining of the tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failing test does have a reset() call:

MetricsEnvironment.process_wide_container().reset()

Shouldn't that have prevented this flake?

metric_name).get_cumulative()

self.assertEqual(metric_value, 1)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/io/gcp/resource_identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@
def BigQueryTable(project_id, dataset_id, table_id):
return '//bigquery.googleapis.com/projects/%s/datasets/%s/tables/%s' % (
project_id, dataset_id, table_id)


def GoogleCloudStorageBucket(bucket_id):
return '//storage.googleapis.com/buckets/%s' % bucket_id
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/metrics/monitoring_infos.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@
common_urns.monitoring_info_labels.BIGQUERY_VIEW.label_props.name)
BIGQUERY_QUERY_NAME_LABEL = (
common_urns.monitoring_info_labels.BIGQUERY_QUERY_NAME.label_props.name)
GCS_PROJECT_ID_LABEL = (
common_urns.monitoring_info_labels.GCS_PROJECT_ID.label_props.name)
GCS_BUCKET_LABEL = (
common_urns.monitoring_info_labels.GCS_BUCKET.label_props.name)


def extract_counter_value(monitoring_info_proto):
Expand Down