diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java new file mode 100644 index 000000000000..6ae35d247be0 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiers.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.metrics; + +/** + * Helper functions to generate resource labels strings for GCP entitites These can be used on + * MonitoringInfo 'resource' labels. See example entities: + * + *

https://s.apache.org/beam-gcp-debuggability For GCP entities, populate the RESOURCE label with + * the aip.dev/122 format: https://google.aip.dev/122 If an official GCP format does not exist, try + * to use the following format. //whatever.googleapis.com/parents/{parentId}/whatevers/{whateverId} + */ +public class GcpResourceIdentifiers { + + public static String bigQueryTable(String projectId, String datasetId, String tableId) { + return String.format( + "//bigquery.googleapis.com/projects/%s/datasets/%s/tables/%s", + projectId, datasetId, tableId); + } +} diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ServiceCallMetric.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ServiceCallMetric.java new file mode 100644 index 000000000000..17f55ba35f01 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ServiceCallMetric.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.metrics; + +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/* + * Metric class which records Service API call metrics. + * This class will capture a request count metric for the specified + * request_count_urn and base_labels. + * When Call() is invoked the status must be provided, which will + * be converted to a canonical GCP status code, if possible. + */ +public class ServiceCallMetric { + + public static final Map CANONICAL_STATUS_MAP = + ImmutableMap.builder() + .put(200, "ok") + .put(400, "out_of_range") + .put(401, "unauthenticated") + .put(403, "permission_denied") + .put(404, "not_found") + .put(409, "already_exists") + .put(429, "resource_exhausted") + .put(499, "cancelled") + .put(500, "internal") + .put(501, "not_implemented") + .put(503, "unavailable") + .put(504, "deadline_exceeded") + .build(); + + public static final String CANONICAL_STATUS_UNKNOWN = "unknown"; + + public static final Map STATUS_NORMALIZATION_MAP = + ImmutableMap.builder() + .put("outofrange", "out_of_range") + .put("permissiondenied", "permission_denied") + .put("notfound", "not_found") + .put("alreadyexists", "already_exists") + .put("resourceexhausted", "resource_exhausted") + .put("notimplemented", "not_implemented") + .put("unavailable", "unavailable") + .put("deadlineexceeded", "deadline_exceeded") + .build(); + + private HashMap labels; + private final String requestCountUrn; + + public ServiceCallMetric(String requestCountUrn, HashMap baseLabels) { + this.requestCountUrn = requestCountUrn; + this.labels = baseLabels; + } + + public void call(int httpStatusCode) { + String canonicalStatusCode = ServiceCallMetric.convertToCanonicalStatusString(httpStatusCode); + call(canonicalStatusCode); + } + + public void call(String statusCode) { + labels.put( + MonitoringInfoConstants.Labels.STATUS, + ServiceCallMetric.convertToCanonicalStatusString(statusCode)); + // MonitoringInfoMetricName will copy labels. So its safe to reuse this reference. + MonitoringInfoMetricName name = MonitoringInfoMetricName.named(requestCountUrn, labels); + Counter counter = LabeledMetrics.counter(name, true); + counter.inc(); + } + + /** Converts an http status code to a canonical GCP status code string. */ + public static String convertToCanonicalStatusString(int httpStatusCode) { + return CANONICAL_STATUS_MAP.getOrDefault(httpStatusCode, CANONICAL_STATUS_UNKNOWN); + } + + /** + * Converts an status code string to a canonical GCP status code string. This is used to make + * strings like "notFound" to "not_found". If a mapping cannot be created known, then + * statusCode.toLowerCase() will be returned. + */ + public static String convertToCanonicalStatusString(String statusCode) { + if (statusCode == null) { + return CANONICAL_STATUS_UNKNOWN; + } + String normalizedStatus = STATUS_NORMALIZATION_MAP.get(statusCode.toLowerCase()); + if (normalizedStatus != null) { + return normalizedStatus; + } + return statusCode.toLowerCase(); + } +} diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiersTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiersTest.java new file mode 100644 index 000000000000..97afa096f64b --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/GcpResourceIdentifiersTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.metrics; + +import org.junit.Assert; +import org.junit.Test; + +public class GcpResourceIdentifiersTest { + @Test + public void testBigQueryTable() { + String resource = GcpResourceIdentifiers.bigQueryTable("myProject", "myDataset", "myTableId"); + Assert.assertEquals( + "//bigquery.googleapis.com/projects/myProject/datasets/myDataset/tables/myTableId", + resource); + } +} diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ServiceCallMetricTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ServiceCallMetricTest.java new file mode 100644 index 000000000000..182ebd31b182 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/ServiceCallMetricTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.metrics; + +import java.util.HashMap; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.junit.Assert; +import org.junit.Test; + +public class ServiceCallMetricTest { + + @Test + public void testCall() { + // Test that its on the ProcessWideMetricContainer. + MetricsContainerImpl container = new MetricsContainerImpl(null); + MetricsEnvironment.setProcessWideContainer(container); + + String urn = MonitoringInfoConstants.Urns.API_REQUEST_COUNT; + HashMap labels = new HashMap(); + labels.put("key", "value"); + ServiceCallMetric metric = + new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, labels); + + // Http Status Code + metric.call(200); + labels.put(MonitoringInfoConstants.Labels.STATUS, "ok"); + MonitoringInfoMetricName name = MonitoringInfoMetricName.named(urn, labels); + Assert.assertEquals(1, (long) container.getCounter(name).getCumulative()); + + // Normalize the status by lower casing and mapping to a canonical name with underscores. + metric.call("notFound"); + labels.put(MonitoringInfoConstants.Labels.STATUS, "not_found"); + name = MonitoringInfoMetricName.named(urn, labels); + Assert.assertEquals(1, (long) container.getCounter(name).getCumulative()); + + // Normalize the status by lower casing and mapping to a canonical name with underscores. + metric.call("PERMISSIONDENIED"); + labels.put(MonitoringInfoConstants.Labels.STATUS, "permission_denied"); + name = MonitoringInfoMetricName.named(urn, labels); + Assert.assertEquals(1, (long) container.getCounter(name).getCumulative()); + + // Accept other string codes passed in, even if they aren't in the canonical map. + metric.call("something_else"); + labels.put(MonitoringInfoConstants.Labels.STATUS, "something_else"); + name = MonitoringInfoMetricName.named(urn, labels); + Assert.assertEquals(1, (long) container.getCounter(name).getCumulative()); + + // Map unknown numeric codes to "unknown" + metric.call(123); + labels.put(MonitoringInfoConstants.Labels.STATUS, "unknown"); + name = MonitoringInfoMetricName.named(urn, labels); + Assert.assertEquals(1, (long) container.getCounter(name).getCumulative()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index fd32a84db313..5e9cdb706754 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -37,6 +37,7 @@ import com.google.api.services.bigquery.Bigquery.Tables; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.DatasetReference; +import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationExtract; @@ -97,9 +98,9 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import org.apache.beam.runners.core.metrics.LabeledMetrics; +import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; -import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName; +import org.apache.beam.runners.core.metrics.ServiceCallMetric; import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; @@ -131,8 +132,9 @@ * service. */ @SuppressWarnings({"keyfor", "nullness"}) // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -class BigQueryServicesImpl implements BigQueryServices { +// TODO(https://issues.apache.org/jira/browse/BEAM-10402) +class BigQueryServicesImpl implements BigQueryServices { private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class); // How frequently to log while polling. @@ -449,6 +451,7 @@ public void close() throws Exception {} @VisibleForTesting static class DatasetServiceImpl implements DatasetService { + // Backoff: 200ms * 1.5 ^ n, n=[1,5] private static final FluentBackoff INSERT_BACKOFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5); @@ -470,24 +473,6 @@ static class DatasetServiceImpl implements DatasetService { private ExecutorService executor; - protected static final Map CANONICAL_STATUS_MAP = - ImmutableMap.builder() - .put(200, "ok") - .put(400, "out_of_range") - .put(401, "unauthenticated") - .put(403, "permission_denied") - .put(404, "not_found") - .put(409, "already_exists") - .put(429, "resource_exhausted") - .put(499, "cancelled") - .put(500, "internal") - .put(501, "not_implemented") - .put(503, "unavailable") - .put(504, "deadline_exceeded") - .build(); - - protected static final String CANONICAL_STATUS_UNKNOWN = "unknown"; - @VisibleForTesting DatasetServiceImpl( Bigquery client, @Nullable BigQueryWriteClient newWriteClient, PipelineOptions options) { @@ -853,6 +838,24 @@ long insertAll( if (!ignoreInsertIds) { idsToPublish = insertIdList; } + + HashMap baseLabels = new HashMap(); + // TODO(ajamato): Add Ptransform label. Populate it as empty for now to prevent the + // SpecMonitoringInfoValidator from dropping the MonitoringInfo. + baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); + baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery"); + baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite"); + baseLabels.put( + MonitoringInfoConstants.Labels.RESOURCE, + GcpResourceIdentifiers.bigQueryTable( + ref.getProjectId(), ref.getDatasetId(), ref.getTableId())); + baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, ref.getProjectId()); + baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_DATASET, ref.getDatasetId()); + baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, ref.getTableId()); + + ServiceCallMetric serviceCallMetric = + new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels); + while (true) { List> retryRows = new ArrayList<>(); List retryIds = (idsToPublish != null) ? new ArrayList<>() : null; @@ -896,6 +899,8 @@ long insertAll( .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content) .setPrettyPrint(false); + // Create final reference (which cannot change). + // So the lamba expression can refer to rowsInsertedForRequest to use on error. futures.add( executor.submit( () -> { @@ -905,13 +910,25 @@ long insertAll( long totalBackoffMillis = 0L; while (true) { try { - return insert.execute().getInsertErrors(); + List response = + insert.execute().getInsertErrors(); + if (response == null || response.isEmpty()) { + serviceCallMetric.call("ok"); + } else { + for (TableDataInsertAllResponse.InsertErrors insertErrors : response) { + for (ErrorProto insertError : insertErrors.getErrors()) { + serviceCallMetric.call(insertError.getReason()); + } + } + } + return response; } catch (IOException e) { - recordError(e); GoogleJsonError.ErrorInfo errorInfo = getErrorInfo(e); if (errorInfo == null) { + serviceCallMetric.call(ServiceCallMetric.CANONICAL_STATUS_UNKNOWN); throw e; } + serviceCallMetric.call(errorInfo.getReason()); /** * TODO(BEAM-10584): Check for QUOTA_EXCEEDED error will be replaced by * ApiErrorExtractor.INSTANCE.quotaExceeded(e) after the next release of @@ -958,15 +975,18 @@ long insertAll( if (errors == null) { continue; } + for (TableDataInsertAllResponse.InsertErrors error : errors) { if (error.getIndex() == null) { throw new IOException("Insert failed: " + error + ", other errors: " + allErrors); } - int errorIndex = error.getIndex().intValue() + strideIndices.get(i); if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error))) { allErrors.add(error); retryRows.add(rowsToPublish.get(errorIndex)); + // TODO (BEAM-12139): Select the retry rows(using errorIndex) from the batch of rows + // which attempted insertion in this call. Not the entire set of rows in + // rowsToPublish. if (retryIds != null) { retryIds.add(idsToPublish.get(errorIndex)); } @@ -1045,23 +1065,6 @@ protected GoogleJsonError.ErrorInfo getErrorInfo(IOException e) { return errorInfo; } - protected void recordError(IOException e) { - if (e instanceof GoogleJsonResponseException) { - int errorCode = ((GoogleJsonResponseException) e).getDetails().getCode(); - String canonicalGcpStatus = - CANONICAL_STATUS_MAP.getOrDefault(errorCode, CANONICAL_STATUS_UNKNOWN); - LabeledMetrics.counter( - MonitoringInfoMetricName.named( - MonitoringInfoConstants.Urns.API_REQUEST_COUNT, - ImmutableMap.builder() - .putAll(API_METRIC_LABEL) - .put(MonitoringInfoConstants.Labels.STATUS, canonicalGcpStatus) - .build()), - true) - .inc(); - } - } - @Override public Table patchTableDescription( TableReference tableReference, @Nullable String tableDescription) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 6a0c9cb7dfce..52543dbaa019 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -68,13 +68,19 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; +import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName; import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter; import org.apache.beam.sdk.extensions.gcp.util.FastNanoClockAndSleeper; import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.extensions.gcp.util.Transport; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -132,6 +138,10 @@ public LowLevelHttpResponse execute() throws IOException { new Bigquery.Builder( transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()) .build(); + + // Setup the ProcessWideContainer for testing metrics are set. + MetricsContainerImpl container = new MetricsContainerImpl(null); + MetricsEnvironment.setProcessWideContainer(container); } @FunctionalInterface @@ -169,6 +179,30 @@ private void verifyAllResponsesAreRead() throws IOException { } } + private void verifyWriteMetricWasSet( + String projectId, String dataset, String table, String status, long count) { + // Verify the metric as reported. + HashMap labels = new HashMap(); + // TODO(ajamato): Add Ptransform label. Populate it as empty for now to prevent the + // SpecMonitoringInfoValidator from dropping the MonitoringInfo. + labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, ""); + labels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery"); + labels.put(MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite"); + labels.put( + MonitoringInfoConstants.Labels.RESOURCE, + GcpResourceIdentifiers.bigQueryTable(projectId, dataset, table)); + labels.put(MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, projectId); + labels.put(MonitoringInfoConstants.Labels.BIGQUERY_DATASET, dataset); + labels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, table); + labels.put(MonitoringInfoConstants.Labels.STATUS, status); + + MonitoringInfoMetricName name = + MonitoringInfoMetricName.named(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, labels); + MetricsContainerImpl container = + (MetricsContainerImpl) MetricsEnvironment.getProcessWideContainer(); + assertEquals(count, (long) container.getCounter(name).getCumulative()); + } + /** Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds. */ @Test public void testStartLoadJobSucceeds() throws IOException, InterruptedException { @@ -625,6 +659,8 @@ public void testInsertRateLimitRetry() throws Exception { verifyAllResponsesAreRead(); expectedLogs.verifyInfo("BigQuery insertAll error, retrying:"); + + verifyWriteMetricWasSet("project", "dataset", "table", "ratelimitexceeded", 1); } /** Tests that {@link DatasetServiceImpl#insertAll} retries quota exceeded attempts. */ @@ -667,6 +703,8 @@ public void testInsertQuotaExceededRetry() throws Exception { verifyAllResponsesAreRead(); expectedLogs.verifyInfo("BigQuery insertAll error, retrying:"); + + verifyWriteMetricWasSet("project", "dataset", "table", "quotaexceeded", 1); } /** Tests that {@link DatasetServiceImpl#insertAll} can stop quotaExceeded retry attempts. */ @@ -720,6 +758,8 @@ public void testInsertStoppedRetry() throws Exception { false); verifyAllResponsesAreRead(); + + verifyWriteMetricWasSet("project", "dataset", "table", "quotaexceeded", 1); } // A BackOff that makes a total of 4 attempts @@ -776,6 +816,9 @@ public void testInsertRetrySelectRows() throws Exception { false); verifyAllResponsesAreRead(); + + verifyWriteMetricWasSet("project", "dataset", "table", "unknown", 1); + verifyWriteMetricWasSet("project", "dataset", "table", "ok", 1); } /** Tests that {@link DatasetServiceImpl#insertAll} fails gracefully when persistent issues. */ @@ -786,13 +829,18 @@ public void testInsertFailsGracefully() throws Exception { List> rows = ImmutableList.of(wrapValue(new TableRow()), wrapValue(new TableRow())); + ErrorProto errorProto = new ErrorProto().setReason("schemaMismatch"); final TableDataInsertAllResponse row1Failed = new TableDataInsertAllResponse() - .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L))); + .setInsertErrors( + ImmutableList.of( + new InsertErrors().setIndex(1L).setErrors(ImmutableList.of(errorProto)))); final TableDataInsertAllResponse row0Failed = new TableDataInsertAllResponse() - .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(0L))); + .setInsertErrors( + ImmutableList.of( + new InsertErrors().setIndex(0L).setErrors(ImmutableList.of(errorProto)))); MockSetupFunction row0FailureResponseFunction = response -> { @@ -837,12 +885,14 @@ public void testInsertFailsGracefully() throws Exception { } catch (IOException e) { assertThat(e, instanceOf(IOException.class)); assertThat(e.getMessage(), containsString("Insert failed:")); - assertThat(e.getMessage(), containsString("[{\"index\":0}]")); + assertThat(e.getMessage(), containsString("[{\"errors\":[{\"reason\":\"schemaMismatch\"}]")); } // Verify the exact number of retries as well as log messages. verifyAllResponsesAreRead(); expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery"); + + verifyWriteMetricWasSet("project", "dataset", "table", "schemamismatch", 4); } /** @@ -856,6 +906,8 @@ public void testFailInsertOtherRetry() throws Exception { List> rows = new ArrayList<>(); rows.add(wrapValue(new TableRow())); + final TableDataInsertAllResponse allRowsSucceeded = + new TableDataInsertAllResponse().setInsertErrors(ImmutableList.of()); // First response is 403 non-{rate-limited, quota-exceeded}, second response has valid payload // but should not be invoked. setupMockResponses( @@ -898,6 +950,8 @@ public void testFailInsertOtherRetry() throws Exception { verify(responses[1], never()).getContent(); verify(responses[1], never()).getContentType(); } + + verifyWriteMetricWasSet("project", "dataset", "table", "actually forbidden", 1); } /** @@ -972,6 +1026,8 @@ public void testInsertRetryPolicy() throws InterruptedException, IOException { false); assertEquals(1, failedInserts.size()); expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery"); + + verifyWriteMetricWasSet("project", "dataset", "table", "timeout", 2); } /** @@ -1042,6 +1098,8 @@ public void testSkipInvalidRowsIgnoreUnknownIgnoreInsertIdsValuesStreaming() assertTrue(parsedRequest.getIgnoreUnknownValues()); assertNull(parsedRequest.getRows().get(0).getInsertId()); assertNull(parsedRequest.getRows().get(1).getInsertId()); + + verifyWriteMetricWasSet("project", "dataset", "table", "ok", 2); } /** A helper to convert a string response back to a {@link GenericJson} subclass. */