From 8918b260346658414dc2c8184e3e622f4fc20c43 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Thu, 2 Apr 2026 16:27:40 -0700 Subject: [PATCH 1/5] Refactor StorageApi BigQuery sink to simplify cache management and management of pins. --- .../io/gcp/bigquery/AppendClientCache.java | 148 +++ .../sdk/io/gcp/bigquery/AppendClientInfo.java | 66 +- .../sdk/io/gcp/bigquery/BigQueryServices.java | 2 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 2 +- .../StorageApiWriteUnshardedRecords.java | 148 +-- .../StorageApiWritesShardedRecords.java | 951 +++++++++--------- .../io/gcp/testing/FakeDatasetService.java | 4 +- 7 files changed, 695 insertions(+), 626 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java new file mode 100644 index 000000000000..b465f4f3aa44 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the cache of {@link AppendClientInfo} objects and the synchronization protocol + * required to use them safely. The Guava cache object is thread-safe. However our protocol requires + * that client pin the StreamAppendClient after looking up the cache, and we must ensure that the + * cache is not accessed in between the lookup and the pin (any access of the cache could trigger + * element expiration). + */ +class AppendClientCache { + private static final Logger LOG = LoggerFactory.getLogger(AppendClientCache.class); + private final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); + + private final Cache appendCache; + + @SuppressWarnings({"FutureReturnValueIgnored"}) + AppendClientCache(Duration expireAfterAccess) { + this.appendCache = + CacheBuilder.newBuilder() + .expireAfterAccess(expireAfterAccess.getMillis(), TimeUnit.MILLISECONDS) + .removalListener( + (RemovalNotification removal) -> { + LOG.info("Expiring append client for {}", removal.getKey()); + final @Nullable AppendClientInfo appendClientInfo = removal.getValue(); + if (appendClientInfo != null) { + // Remove the pin owned by the cache itself. Since the client has not been + // marked as closed, we + // can call unpin in this thread without worrying about blocking the thread. + appendClientInfo.unpinAppendClient(null); + // Close the client in another thread to avoid blocking the main thread. + closeWriterExecutor.submit(appendClientInfo::close); + } + }) + .build(); + } + + // The cache itself always own one pin on the object. This Callable is always used to ensure that + // the cache + // adds a pin before loading a value. + private static Callable wrapWithPin(Callable loader) { + return () -> { + AppendClientInfo client = loader.call(); + client.pinAppendClient(); + return client; + }; + } + + /** + * Atomically get an append client from the cache and add a pin. This pin is owned by the client, + * which has the responsibility of removing it. If the client is not in the cache, loader will be + * used to load the client; in this case an additional pin will be added owned by the cache, + * removed when the item is evicted. + */ + public AppendClientInfo getAndPin(KeyT key, Callable loader) throws Exception { + synchronized (this) { + AppendClientInfo info = appendCache.get(key, wrapWithPin(loader)); + info.pinAppendClient(); + return info; + } + } + + public AppendClientInfo get(KeyT key, Callable loader) throws Exception { + return appendCache.get(key, wrapWithPin(loader)); + } + + public AppendClientInfo putAndPin(KeyT key, Callable loader) throws Exception { + synchronized (this) { + AppendClientInfo info = wrapWithPin(loader).call(); + appendCache.put(key, info); + info.pinAppendClient(); + return info; + } + } + + public AppendClientInfo put(KeyT key, Callable loader) throws Exception { + AppendClientInfo info = wrapWithPin(loader).call(); + appendCache.put(key, info); + return info; + } + + public void invalidate(KeyT key, AppendClientInfo expectedClient) { + // The default stream is cached across multiple different DoFns. If they all try + // and + // invalidate, then we can get races between threads invalidating and recreating + // streams. For this reason, + // we check to see that the cache still contains the object we created before + // invalidating (in case another + // thread has already invalidated and recreated the stream). + synchronized (this) { + AppendClientInfo cachedAppendClient = appendCache.getIfPresent(key); + if (cachedAppendClient != null + && System.identityHashCode(cachedAppendClient) + == System.identityHashCode(expectedClient)) { + appendCache.invalidate(key); + } + } + } + + public void invalidate(KeyT key) { + synchronized (this) { + appendCache.invalidate(key); + } + } + + public void tickle(KeyT key) { + appendCache.getIfPresent(key); + } + + public void clear() { + synchronized (this) { + appendCache.invalidateAll(); + } + } + + public void unpinAsync(AppendClientInfo appendClientInfo) { + appendClientInfo.unpinAppendClient(closeWriterExecutor); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index e9adc8097604..67b8784b62d2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -27,13 +27,19 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import com.google.protobuf.Message; -import java.util.function.Consumer; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import javax.annotation.Nullable; +import org.apache.beam.sdk.function.ThrowingConsumer; +import org.apache.beam.sdk.function.ThrowingRunnable; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Container class used by {@link StorageApiWritesShardedRecords} and {@link @@ -42,6 +48,8 @@ */ @AutoValue abstract class AppendClientInfo { + private static final Logger LOG = LoggerFactory.getLogger(AppendClientInfo.class); + private final Counter activeStreamAppendClients = Metrics.counter(AppendClientInfo.class, "activeStreamAppendClients"); @@ -49,7 +57,7 @@ abstract class AppendClientInfo { abstract TableSchema getTableSchema(); - abstract Consumer getCloseAppendClient(); + abstract ThrowingConsumer getCloseAppendClient(); abstract com.google.api.services.bigquery.model.TableSchema getJsonTableSchema(); @@ -65,7 +73,8 @@ abstract static class Builder { abstract Builder setTableSchema(TableSchema value); - abstract Builder setCloseAppendClient(Consumer value); + abstract Builder setCloseAppendClient( + ThrowingConsumer value); abstract Builder setJsonTableSchema(com.google.api.services.bigquery.model.TableSchema value); @@ -83,7 +92,7 @@ abstract static class Builder { static AppendClientInfo of( TableSchema tableSchema, DescriptorProtos.DescriptorProto descriptor, - Consumer closeAppendClient) + ThrowingConsumer closeAppendClient) throws Exception { return new AutoValue_AppendClientInfo.Builder() .setTableSchema(tableSchema) @@ -97,7 +106,7 @@ static AppendClientInfo of( static AppendClientInfo of( TableSchema tableSchema, - Consumer closeAppendClient, + ThrowingConsumer closeAppendClient, boolean includeCdcColumns) throws Exception { return of( @@ -134,7 +143,18 @@ public AppendClientInfo withAppendClient( public void close() { BigQueryServices.StreamAppendClient client = getStreamAppendClient(); if (client != null) { - getCloseAppendClient().accept(client); + try { + getCloseAppendClient().accept(client); + } catch (Exception e) { + // We ignore errors when closing clients. + String msg = + e + + "\n" + + Arrays.stream(e.getStackTrace()) + .map(StackTraceElement::toString) + .collect(Collectors.joining("\n")); + LOG.warn("Caught exception whilw trying to close append client. Ignoring {}", msg); + } activeStreamAppendClients.dec(); } } @@ -199,4 +219,38 @@ public TableRow toTableRow(ByteString protoBytes, Predicate includeField throw new RuntimeException(e); } } + + public void pinAppendClient() { + BigQueryServices.StreamAppendClient client = + Preconditions.checkStateNotNull(getStreamAppendClient()); + client.pin(); + } + + public void unpinAppendClient(@Nullable ExecutorService executor) { + BigQueryServices.StreamAppendClient client = + Preconditions.checkStateNotNull(getStreamAppendClient()); + if (executor != null) { + runAsyncIgnoreFailure(executor, client::unpin); + } else { + client.unpin(); + } + } + + @SuppressWarnings({"FutureReturnValueIgnored"}) + private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) { + executor.submit( + () -> { + try { + task.run(); + } catch (Throwable e) { + String msg = + e.toString() + + "\n" + + Arrays.stream(e.getStackTrace()) + .map(StackTraceElement::toString) + .collect(Collectors.joining("\n")); + System.err.println("Exception happened while executing async task. Ignoring: " + msg); + } + }); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index 78e714a7ccd6..66458a8339f9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -270,7 +270,7 @@ default long getInflightWaitSeconds() { /** * Unpin this object. If the object has been closed, this will release any underlying resources. */ - void unpin() throws Exception; + void unpin(); } /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 2a9cc7649c21..aa9a5fd310b0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1617,7 +1617,7 @@ public void pin() { } @Override - public void unpin() throws Exception { + public void unpin() { boolean closeWriter; synchronized (this) { Preconditions.checkState(pins > 0, "Tried to unpin when pins==0"); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 600f49b0be3e..8ce9869e3966 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -47,9 +47,6 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; @@ -89,9 +86,6 @@ import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -122,56 +116,17 @@ public class StorageApiWriteUnshardedRecords private final Coder successfulRowsCoder; private final boolean autoUpdateSchema; private final boolean ignoreUnknownValues; - private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); private final BigQueryIO.Write.CreateDisposition createDisposition; private final @Nullable String kmsKey; private final boolean usesCdc; private final AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation; private final @Nullable Map bigLakeConfiguration; - /** - * The Guava cache object is thread-safe. However our protocol requires that client pin the - * StreamAppendClient after looking up the cache, and we must ensure that the cache is not - * accessed in between the lookup and the pin (any access of the cache could trigger element - * expiration). Therefore most used of APPEND_CLIENTS should synchronize. - */ - private static final Cache APPEND_CLIENTS = - CacheBuilder.newBuilder() - .expireAfterAccess(15, TimeUnit.MINUTES) - .removalListener( - (RemovalNotification removal) -> { - LOG.info("Expiring append client for {}", removal.getKey()); - final @Nullable AppendClientInfo appendClientInfo = removal.getValue(); - if (appendClientInfo != null) { - appendClientInfo.close(); - } - }) - .build(); + private static final AppendClientCache APPEND_CLIENTS = + new AppendClientCache<>(Duration.standardMinutes(15)); static void clearCache() { - APPEND_CLIENTS.invalidateAll(); - } - - // Run a closure asynchronously, ignoring failures. - private interface ThrowingRunnable { - void run() throws Exception; - } - - private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) { - executor.submit( - () -> { - try { - task.run(); - } catch (Exception e) { - String msg = - e.toString() - + "\n" - + Arrays.stream(e.getStackTrace()) - .map(StackTraceElement::toString) - .collect(Collectors.joining("\n")); - System.err.println("Exception happened while executing async task. Ignoring: " + msg); - } - }); + APPEND_CLIENTS.clear(); } public StorageApiWriteUnshardedRecords( @@ -362,18 +317,9 @@ public TableDestination getTableDestination() { void teardown() { maybeTickleCache(); - if (appendClientInfo != null) { - StreamAppendClient client = appendClientInfo.getStreamAppendClient(); - if (client != null) { - runAsyncIgnoreFailure(closeWriterExecutor, client::unpin); - } - // if this is a PENDING stream, we won't be using it again after cleaning up this - // destination state, so clear it from the cache - if (!useDefaultStream) { - APPEND_CLIENTS.invalidate(streamName); - } - appendClientInfo = null; - } + // if this is a PENDING stream, we won't be using it again after cleaning up this + // destination state, so clear it from the cache + invalidateAppendClient(!useDefaultStream); } String getDefaultStreamName() { @@ -419,18 +365,7 @@ AppendClientInfo generateClient(@Nullable TableSchema updatedSchema) throws Exce AppendClientInfo.of( schemaAndDescriptor.tableSchema, schemaAndDescriptor.descriptor, - // Make sure that the client is always closed in a different thread to avoid - // blocking. - client -> - runAsyncIgnoreFailure( - closeWriterExecutor, - () -> { - synchronized (APPEND_CLIENTS) { - // Remove the pin owned by the cache. - client.unpin(); - client.close(); - } - }))); + AutoCloseable::close)); CreateTableHelpers.createTableWrapper( () -> { @@ -446,9 +381,6 @@ AppendClientInfo generateClient(@Nullable TableSchema updatedSchema) throws Exce return null; }, tryCreateTable); - - // This pin is "owned" by the cache. - Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient()).pin(); return appendClientInfo.get(); } @@ -515,23 +447,14 @@ AppendClientInfo getAppendClientInfo( try { if (this.appendClientInfo == null) { getOrCreateStreamName(); - final AppendClientInfo newAppendClientInfo; - synchronized (APPEND_CLIENTS) { - if (lookupCache) { - newAppendClientInfo = - APPEND_CLIENTS.get( + this.appendClientInfo = + lookupCache + ? APPEND_CLIENTS.getAndPin( + getStreamAppendClientCacheEntryKey(), () -> generateClient(updatedSchema)) + : APPEND_CLIENTS.putAndPin( getStreamAppendClientCacheEntryKey(), () -> generateClient(updatedSchema)); - } else { - newAppendClientInfo = generateClient(updatedSchema); - // override the clients in the cache. - APPEND_CLIENTS.put(getStreamAppendClientCacheEntryKey(), newAppendClientInfo); - } - // This pin is "owned" by the current DoFn. - Preconditions.checkStateNotNull(newAppendClientInfo.getStreamAppendClient()).pin(); - } - nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1)); - this.appendClientInfo = newAppendClientInfo; } + nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1)); return Preconditions.checkStateNotNull(appendClientInfo); } catch (Exception e) { throw new RuntimeException(e); @@ -540,37 +463,24 @@ AppendClientInfo getAppendClientInfo( void maybeTickleCache() { if (appendClientInfo != null && Instant.now().isAfter(nextCacheTickle)) { - synchronized (APPEND_CLIENTS) { - APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryKey()); - } + APPEND_CLIENTS.tickle(getStreamAppendClientCacheEntryKey()); nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1)); } } - void invalidateWriteStream() { - if (appendClientInfo != null) { - synchronized (APPEND_CLIENTS) { - // Unpin in a different thread, as it may execute a blocking close. - StreamAppendClient client = appendClientInfo.getStreamAppendClient(); - if (client != null) { - runAsyncIgnoreFailure(closeWriterExecutor, client::unpin); - } - // The default stream is cached across multiple different DoFns. If they all try and - // invalidate, then we can get races between threads invalidating and recreating - // streams. For this reason, - // we check to see that the cache still contains the object we created before - // invalidating (in case another - // thread has already invalidated and recreated the stream). - String cacheEntryKey = getStreamAppendClientCacheEntryKey(); - @Nullable - AppendClientInfo cachedAppendClient = APPEND_CLIENTS.getIfPresent(cacheEntryKey); - if (cachedAppendClient != null - && System.identityHashCode(cachedAppendClient) - == System.identityHashCode(appendClientInfo)) { - APPEND_CLIENTS.invalidate(cacheEntryKey); - } + void invalidateAppendClient(boolean invalidateCache) { + if (this.appendClientInfo != null) { + // Unpin in a different thread, as it may execute a blocking close. + StreamAppendClient client = appendClientInfo.getStreamAppendClient(); + if (client != null) { + APPEND_CLIENTS.unpinAsync(Preconditions.checkStateNotNull(this.appendClientInfo)); + } + if (invalidateCache) { + APPEND_CLIENTS.invalidate( + getStreamAppendClientCacheEntryKey(), + Preconditions.checkStateNotNull(this.appendClientInfo)); } - appendClientInfo = null; + this.appendClientInfo = null; } } @@ -677,7 +587,7 @@ long flush( LOG.info("Schema out of date: refreshing table schema for {}.", tableUrn); // Refresh our view of the schema and try again.. this.messageConverter.updateSchemaFromTable(); - invalidateWriteStream(); + invalidateAppendClient(true); this.appendClientInfo = Preconditions.checkStateNotNull( getAppendClientInfo( @@ -874,7 +784,7 @@ long flush( if (!quotaError) { // This forces us to close and reopen all gRPC connections to Storage API on error, // which empirically fixes random stuckness issues. - invalidateWriteStream(); + invalidateAppendClient(true); allowedRetry = 5; } else { allowedRetry = 35; @@ -1031,7 +941,7 @@ void postFlush() { TableSchemaUpdateUtils.getUpdatedSchema( this.messageConverter.getTableSchema(), updatedTableSchemaReturned); if (updatedTableSchema.isPresent()) { - invalidateWriteStream(); + invalidateAppendClient(true); appendClientInfo = Preconditions.checkStateNotNull( getAppendClientInfo(false, updatedTableSchema.get())); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 5771ea5074b8..93b0cf2d0ded 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -37,23 +37,18 @@ import io.grpc.Status.Code; import java.io.IOException; import java.time.Instant; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -104,9 +99,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; @@ -149,44 +141,12 @@ public class StorageApiWritesShardedRecords succussfulRowsCoder; private final TupleTag> flushTag = new TupleTag<>("flushTag"); - private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); - - private static final Cache>, AppendClientInfo> APPEND_CLIENTS = - CacheBuilder.newBuilder() - .expireAfterAccess(5, TimeUnit.MINUTES) - .removalListener( - (RemovalNotification>, AppendClientInfo> removal) -> { - final @Nullable AppendClientInfo appendClientInfo = removal.getValue(); - if (appendClientInfo != null) { - appendClientInfo.close(); - } - }) - .build(); - static void clearCache() { - APPEND_CLIENTS.invalidateAll(); - } + private static AppendClientCache>> APPEND_CLIENTS = + new AppendClientCache<>(Duration.standardMinutes(5)); - // Run a closure asynchronously, ignoring failures. - private interface ThrowingRunnable { - void run() throws Exception; - } - - private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) { - executor.submit( - () -> { - try { - task.run(); - } catch (Exception e) { - String msg = - e.toString() - + "\n" - + Arrays.stream(e.getStackTrace()) - .map(StackTraceElement::toString) - .collect(Collectors.joining("\n")); - System.err.println("Exception happened while executing async task. Ignoring: " + msg); - } - }); + static void clearCache() { + APPEND_CLIENTS.clear(); } public StorageApiWritesShardedRecords( @@ -282,7 +242,6 @@ static class AppendRowsContext extends RetryManager.Operation.Context { final ShardedKey key; String streamName = ""; - @Nullable StreamAppendClient client = null; long offset = -1; long numRows = 0; long tryIteration = 0; @@ -487,6 +446,46 @@ public void onTeardown() { } } + // Holder for an AppendClientHolder. Maintains a pin on the client as long as it's active. + private class AppendClientHolder implements AutoCloseable { + private final KV> key; + private final Callable appendClientInfoCallable; + + private AppendClientInfo appendClientInfo; + private boolean valid; + + public AppendClientHolder( + ShardedKey key, Callable appendClientInfoCallable) + throws Exception { + this.key = messageConverters.getAppendClientKey(key); + this.appendClientInfoCallable = appendClientInfoCallable; + this.appendClientInfo = APPEND_CLIENTS.getAndPin(this.key, appendClientInfoCallable); + this.valid = true; + } + + void invalidateAndReset() throws Exception { + APPEND_CLIENTS.unpinAsync(this.appendClientInfo); // Make sure to unpin in another thread. + APPEND_CLIENTS.invalidate(key); + this.appendClientInfo = APPEND_CLIENTS.getAndPin(key, appendClientInfoCallable); + } + + @Override + public void close() { + APPEND_CLIENTS.unpinAsync(this.appendClientInfo); // Make sure to unpin in another thread. + this.valid = false; + } + + AppendClientInfo get() { + org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState( + valid); + return appendClientInfo; + } + + StreamAppendClient getStreamAppendClient() { + return Preconditions.checkStateNotNull(appendClientInfo.getStreamAppendClient()); + } + } + private CreateRetryManagerResult createRetryManager( ShardedKey key, Iterable messages, @@ -554,6 +553,214 @@ private CreateRetryManagerResult createRetryManager( retryManager, failedRows, recordsAppended, histogramUpdates); } + private void handleAppendFailure( + Iterable> failedContexts, + TableReference tableReference, + String shortTableId, + AppendClientInfo appendClientInfo, + Callable tryCreateTable, + BiConsumer>, Boolean> initializeContexts, + Consumer>> clearClients, + ValueState streamOffset, + MultiOutputReceiver o) { + // The first context is always the one that fails. + AppendRowsContext failedContext = + Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null)); + BigQuerySinkMetrics.reportFailedRPCMetrics( + failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId); + String errorCode = BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError()); + + // AppendSerializationError means that BigQuery detected errors on individual rows, e.g. + // a row not conforming to bigQuery invariants. These errors are persistent, so we redirect + // those rows to the + // failedInserts PCollection, and retry with the remaining rows. + if (failedContext.getError() != null + && failedContext.getError() instanceof Exceptions.AppendSerializationError) { + Exceptions.AppendSerializationError error = + Preconditions.checkArgumentNotNull( + (Exceptions.AppendSerializationError) failedContext.getError()); + + Set failedRowIndices = error.getRowIndexToErrorMessage().keySet(); + for (int failedIndex : failedRowIndices) { + // Convert the message to a TableRow and send it to the failedRows collection. + TableRow failedRow = failedContext.failsafeTableRows.get(failedIndex); + if (failedRow == null) { + ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex); + failedRow = appendClientInfo.toTableRow(protoBytes, Predicates.alwaysTrue()); + } + org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex); + o.get(failedRowsTag) + .outputWithTimestamp( + new BigQueryStorageApiInsertError( + failedRow, + error.getRowIndexToErrorMessage().get(failedIndex), + tableReference), + timestamp); + } + int failedRows = failedRowIndices.size(); + rowsSentToFailedRowsCollection.inc(failedRows); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableId) + .inc(failedRows); + + // Remove the failed row from the payload, so we retry the batch without the failed + // rows. + ProtoRows.Builder retryRows = ProtoRows.newBuilder(); + @Nullable List timestamps = Lists.newArrayList(); + for (int i = 0; i < failedContext.protoRows.getSerializedRowsCount(); ++i) { + if (!failedRowIndices.contains(i)) { + ByteString rowBytes = failedContext.protoRows.getSerializedRows(i); + retryRows.addSerializedRows(rowBytes); + timestamps.add(failedContext.timestamps.get(i)); + } + } + failedContext.protoRows = retryRows.build(); + failedContext.timestamps = timestamps; + int retriedRows = failedContext.protoRows.getSerializedRowsCount(); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId) + .inc(retriedRows); + + // Since we removed rows, we need to update the insert offsets for all remaining rows. + long offset = failedContext.offset; + for (AppendRowsContext context : failedContexts) { + context.offset = offset; + offset += context.protoRows.getSerializedRowsCount(); + } + streamOffset.write(offset); + return; + } + + Throwable error = Preconditions.checkStateNotNull(failedContext.getError()); + Status.Code statusCode = Status.fromThrowable(error).getCode(); + + // This means that the offset we have stored does not match the current end of + // the stream in the Storage API. Usually this happens because a crash or a bundle + // failure + // happened after an append but before the worker could checkpoint it's + // state. The records that were appended in a failed bundle will be retried, + // meaning that the unflushed tail of the stream must be discarded to prevent + // duplicates. + boolean offsetMismatch = + statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS); + + boolean quotaError = statusCode.equals(Code.RESOURCE_EXHAUSTED); + if (!offsetMismatch) { + // Don't log errors for expected offset mismatch. These will be logged as warnings + // below. + LOG.error("Got error {} closing {}", failedContext.getError(), failedContext.streamName); + } + + try { + // TODO: Only do this on explicit NOT_FOUND errors once BigQuery reliably produces + // them. + tryCreateTable.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (!quotaError) { + // For known errors (offset mismatch, not found) we must reestablish + // the streams. + // However we've seen that doing this fixes random stuckness issues by reestablishing + // gRPC connections, + // so we close the clients for all non-quota errors. + + clearClients.accept(failedContexts); + } + appendFailures.inc(); + int retriedRows = failedContext.protoRows.getSerializedRowsCount(); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId) + .inc(retriedRows); + + // Schema mismatched exceptions can happen if the table was recently updated. Since + // vortex caches schemas + // we might see the new schema before vortex does. In this case, we simply need to + // retry. + Exceptions.@Nullable StorageException storageException = Exceptions.toStorageException(error); + boolean schemaMismatchError = + (storageException instanceof Exceptions.SchemaMismatchedException); + if (!schemaMismatchError) { + // There's no special error code for missing required fields, and that can also happen + // due to vortex + // being delayed at seeing a new schema. We're forced to parse the description to + // determine that this has happened. + // TODO: Vortex team to introduce a special storage error code for this, so we don't + // have to parse + // descriptions. + Status status = Status.fromThrowable(error); + if (status.getCode() == Code.INVALID_ARGUMENT) { + String description = status.getDescription(); + schemaMismatchError = description != null && description.contains("incompatible fields"); + } + } + if (schemaMismatchError) { + LOG.info( + "Vortex failed stream open due to incompatible fields. This is likely because the BigTable " + + "schema was recently updated and Vortex hasn't noticed yet, so retrying. error {}", + Preconditions.checkStateNotNull(error).toString()); + } + + boolean explicitStreamFinalized = + failedContext.getError() instanceof StreamFinalizedException; + // This implies that the stream doesn't exist or has already been finalized. In this + // case we have no choice but to create a new stream. + boolean streamDoesNotExist = + explicitStreamFinalized + || statusCode.equals(Code.INVALID_ARGUMENT) + || statusCode.equals(Code.NOT_FOUND) + || statusCode.equals(Code.FAILED_PRECONDITION); + streamDoesNotExist = streamDoesNotExist && !schemaMismatchError; + + if (offsetMismatch || streamDoesNotExist) { + appendOffsetFailures.inc(); + LOG.warn( + "Append to {} failed. Will retry with a new stream", + failedContext, + failedContext.getError()); + // Finalize the stream and clear streamName so a new stream will be created. + o.get(flushTag) + .output(KV.of(failedContext.streamName, new Operation(failedContext.offset - 1, true))); + // Reinitialize all contexts with the new stream and new offsets. + initializeContexts.accept(failedContexts, true); + + // Offset failures imply that all subsequent parallel appends will also fail. + // Retry them all. + } + } + + private void handleAppendSuccess( + AppendRowsContext context, + String shortTableId, + AppendClientInfo appendClientInfo, + MultiOutputReceiver o) { + AppendRowsResponse response = Preconditions.checkStateNotNull(context.getResult()); + o.get(flushTag) + .output( + KV.of( + context.streamName, + new Operation( + context.offset + context.protoRows.getSerializedRowsCount() - 1, false))); + int flushedRows = context.protoRows.getSerializedRowsCount(); + flushesScheduled.inc(flushedRows); + BigQuerySinkMetrics.reportSuccessfulRpcMetrics( + context, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.SUCCESSFUL, BigQuerySinkMetrics.OK, shortTableId) + .inc(flushedRows); + + if (successfulRowsTag != null) { + for (int i = 0; i < context.protoRows.getSerializedRowsCount(); ++i) { + ByteString protoBytes = context.protoRows.getSerializedRows(i); + org.joda.time.Instant timestamp = context.timestamps.get(i); + o.get(successfulRowsTag) + .outputWithTimestamp( + appendClientInfo.toTableRow(protoBytes, successfulRowsPredicate), timestamp); + } + } + } + @ProcessElement public void process( ProcessContext c, @@ -631,10 +838,12 @@ public void process( () -> { @Nullable TableSchema tableSchema; DescriptorProtos.DescriptorProto descriptor; - TableSchema updatedSchemaValue = updatedSchema.read(); + TableSchema updatedSchemaValue = autoUpdateSchema ? updatedSchema.read() : null; + if (autoUpdateSchema && updatedSchemaValue != null) { - // We've seen an updated schema, so we use that instead of querying the - // MessageConverter. + // This means that Vortex has told us in the past that the table schema has been + // updated. We should use + // this updated schema instead of the initial schema from the messageConverter. tableSchema = updatedSchemaValue; descriptor = TableRowToStorageApiProto.descriptorSchemaFromTableSchema( @@ -648,8 +857,8 @@ public void process( if (autoUpdateSchema) { // A StreamWriter ignores table schema updates that happen prior to its creation. // So before creating a StreamWriter below, we fetch the table schema to check if we - // missed an update. - // If so, use the new schema instead of the base schema + // missed an update. If so, use the new schema instead of the base schema. + // TODO: There's still a race here! @Nullable TableSchema streamSchema = MoreObjects.firstNonNull( @@ -671,475 +880,223 @@ public void process( AppendClientInfo.of( Preconditions.checkStateNotNull(tableSchema), descriptor, - // Make sure that the client is always closed in a different thread - // to - // avoid blocking. - client -> - runAsyncIgnoreFailure( - closeWriterExecutor, - () -> { - // Remove the pin that is "owned" by the cache. - client.unpin(); - client.close(); - })) + AutoCloseable::close) .withAppendClient( writeStreamService, getOrCreateStream, false, defaultMissingValueInterpretation); - // This pin is "owned" by the cache. - Preconditions.checkStateNotNull(info.getStreamAppendClient()).pin(); return info; }; - AtomicReference appendClientInfo = - new AtomicReference<>( - APPEND_CLIENTS.get( - messageConverters.getAppendClientKey(element.getKey()), getAppendClientInfo)); - String currentStream = getOrCreateStream.get(); - if (!currentStream.equals(appendClientInfo.get().getStreamName())) { - // Cached append client is inconsistent with persisted state. Throw away cached item and - // force it to be - // recreated. - APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey())); - appendClientInfo.set( - APPEND_CLIENTS.get( - messageConverters.getAppendClientKey(element.getKey()), getAppendClientInfo)); - } - - TableSchema updatedSchemaValue = updatedSchema.read(); - if (autoUpdateSchema && updatedSchemaValue != null) { - if (appendClientInfo.get().hasSchemaChanged(updatedSchemaValue)) { - appendClientInfo.set( - AppendClientInfo.of( - updatedSchemaValue, appendClientInfo.get().getCloseAppendClient(), false)); - APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey())); - APPEND_CLIENTS.put( - messageConverters.getAppendClientKey(element.getKey()), appendClientInfo.get()); + // The StreamWriter has two pins on it. The static cache holds a pin, as it continues to cache + // values after this + // method exits, so must hold the pin. The local AppendClientHolder also holds a pin, as the + // cache could in + // theory evict the object during execution and we want a pin held throughout the execution of + // this function. + try (AppendClientHolder appendClientHolder = + new AppendClientHolder(element.getKey(), getAppendClientInfo)) { + String currentStream = getOrCreateStream.get(); + if (!currentStream.equals(appendClientHolder.get().getStreamName())) { + // Cached append client is inconsistent with persisted state. Throw away cached item and + // force it to be recreated. + appendClientHolder.invalidateAndReset(); } - } - // Initialize stream names and offsets for all contexts. This will be called initially, but - // will also be called if we roll over to a new stream on a retry. - BiConsumer>, Boolean> initializeContexts = - (contexts, isFailure) -> { - try { - if (isFailure) { - // Clear the stream name, forcing a new one to be created. - streamName.write(""); - } - appendClientInfo.set( - appendClientInfo - .get() - .withAppendClient( - writeStreamService, - getOrCreateStream, - false, - defaultMissingValueInterpretation)); - StreamAppendClient streamAppendClient = - Preconditions.checkArgumentNotNull( - appendClientInfo.get().getStreamAppendClient()); - String streamNameRead = Preconditions.checkArgumentNotNull(streamName.read()); - long currentOffset = Preconditions.checkArgumentNotNull(streamOffset.read()); - for (AppendRowsContext context : contexts) { - context.streamName = streamNameRead; - streamAppendClient.pin(); - context.client = appendClientInfo.get().getStreamAppendClient(); - context.offset = currentOffset; - ++context.tryIteration; - currentOffset = context.offset + context.protoRows.getSerializedRowsCount(); - } - streamOffset.write(currentOffset); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; - - Consumer>> clearClients = - (contexts) -> { - APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey())); - appendClientInfo.set(appendClientInfo.get().withNoAppendClient()); - APPEND_CLIENTS.put( - messageConverters.getAppendClientKey(element.getKey()), appendClientInfo.get()); - for (AppendRowsContext context : contexts) { - if (context.client != null) { - // Unpin in a different thread, as it may execute a blocking close. - runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin); - context.client = null; - } - } - }; - - Function, ApiFuture> runOperation = - context -> { - if (context.protoRows.getSerializedRowsCount() == 0) { - // This might happen if all rows in a batch failed and were sent to the failed-rows - // PCollection. - return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build()); - } - try { - appendClientInfo.set( - appendClientInfo - .get() - .withAppendClient( - writeStreamService, - getOrCreateStream, - false, - defaultMissingValueInterpretation)); - return Preconditions.checkStateNotNull(appendClientInfo.get().getStreamAppendClient()) - .appendRows(context.offset, context.protoRows); - } catch (Exception e) { - throw new RuntimeException(e); - } - }; + TableSchema updatedSchemaValue = autoUpdateSchema ? updatedSchema.read() : null; + if (autoUpdateSchema && updatedSchemaValue != null) { + if (appendClientHolder.get().hasSchemaChanged(updatedSchemaValue)) { + appendClientHolder.invalidateAndReset(); + } + } - Function>, RetryType> onError = - failedContexts -> { - // The first context is always the one that fails. - AppendRowsContext failedContext = - Preconditions.checkStateNotNull(Iterables.getFirst(failedContexts, null)); - BigQuerySinkMetrics.reportFailedRPCMetrics( - failedContext, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId); - String errorCode = - BigQuerySinkMetrics.throwableToGRPCCodeString(failedContext.getError()); - - // AppendSerializationError means that BigQuery detected errors on individual rows, e.g. - // a row not conforming - // to bigQuery invariants. These errors are persistent, so we redirect those rows to the - // failedInserts - // PCollection, and retry with the remaining rows. - if (failedContext.getError() != null - && failedContext.getError() instanceof Exceptions.AppendSerializationError) { - Exceptions.AppendSerializationError error = - Preconditions.checkArgumentNotNull( - (Exceptions.AppendSerializationError) failedContext.getError()); - - Set failedRowIndices = error.getRowIndexToErrorMessage().keySet(); - for (int failedIndex : failedRowIndices) { - // Convert the message to a TableRow and send it to the failedRows collection. - TableRow failedRow = failedContext.failsafeTableRows.get(failedIndex); - if (failedRow == null) { - // TODO: MAKE SURE WE USE UPDATED DESCRIPTOR - ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex); - failedRow = - appendClientInfo.get().toTableRow(protoBytes, Predicates.alwaysTrue()); + // Initialize stream names and offsets for all contexts. This will be called initially, but + // will also be called if we roll over to a new stream on a retry. + BiConsumer>, Boolean> initializeContexts = + (contexts, isFailure) -> { + try { + if (isFailure) { + // Clear the stream name, forcing a new one to be created. + streamName.write(""); } - org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex); - o.get(failedRowsTag) - .outputWithTimestamp( - new BigQueryStorageApiInsertError( - failedRow, - error.getRowIndexToErrorMessage().get(failedIndex), - tableReference), - timestamp); - } - int failedRows = failedRowIndices.size(); - rowsSentToFailedRowsCollection.inc(failedRows); - BigQuerySinkMetrics.appendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.FAILED, errorCode, shortTableId) - .inc(failedRows); - - // Remove the failed row from the payload, so we retry the batch without the failed - // rows. - ProtoRows.Builder retryRows = ProtoRows.newBuilder(); - @Nullable List timestamps = Lists.newArrayList(); - for (int i = 0; i < failedContext.protoRows.getSerializedRowsCount(); ++i) { - if (!failedRowIndices.contains(i)) { - ByteString rowBytes = failedContext.protoRows.getSerializedRows(i); - retryRows.addSerializedRows(rowBytes); - timestamps.add(failedContext.timestamps.get(i)); + String streamNameRead = Preconditions.checkArgumentNotNull(streamName.read()); + long currentOffset = Preconditions.checkArgumentNotNull(streamOffset.read()); + for (AppendRowsContext context : contexts) { + context.streamName = streamNameRead; + context.offset = currentOffset; + ++context.tryIteration; + currentOffset = context.offset + context.protoRows.getSerializedRowsCount(); } + streamOffset.write(currentOffset); + } catch (Exception e) { + throw new RuntimeException(e); } - failedContext.protoRows = retryRows.build(); - failedContext.timestamps = timestamps; - int retriedRows = failedContext.protoRows.getSerializedRowsCount(); - BigQuerySinkMetrics.appendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId) - .inc(retriedRows); - - // Since we removed rows, we need to update the insert offsets for all remaining rows. - long offset = failedContext.offset; - for (AppendRowsContext context : failedContexts) { - context.offset = offset; - offset += context.protoRows.getSerializedRowsCount(); + }; + + Consumer>> clearClients = + (contexts) -> { + try { + appendClientHolder.invalidateAndReset(); + } catch (Exception e) { + throw new RuntimeException(e); } - streamOffset.write(offset); - return RetryType.RETRY_ALL_OPERATIONS; - } - - Throwable error = Preconditions.checkStateNotNull(failedContext.getError()); - - Status.Code statusCode = Status.fromThrowable(error).getCode(); - - // This means that the offset we have stored does not match the current end of - // the stream in the Storage API. Usually this happens because a crash or a bundle - // failure - // happened after an append but before the worker could checkpoint it's - // state. The records that were appended in a failed bundle will be retried, - // meaning that the unflushed tail of the stream must be discarded to prevent - // duplicates. - boolean offsetMismatch = - statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS); - - boolean quotaError = statusCode.equals(Code.RESOURCE_EXHAUSTED); - if (!offsetMismatch) { - // Don't log errors for expected offset mismatch. These will be logged as warnings - // below. - LOG.error( - "Got error {} closing {}", failedContext.getError(), failedContext.streamName); - } - - try { - // TODO: Only do this on explicit NOT_FOUND errors once BigQuery reliably produces - // them. - tryCreateTable.call(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - if (!quotaError) { - // For known errors (offset mismatch, not found) we must reestablish - // the streams. - // However we've seen that doing this fixes random stuckness issues by reestablishing - // gRPC connections, - // so we close the clients for all non-quota errors. - - clearClients.accept(failedContexts); - } - appendFailures.inc(); - int retriedRows = failedContext.protoRows.getSerializedRowsCount(); - BigQuerySinkMetrics.appendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.RETRIED, errorCode, shortTableId) - .inc(retriedRows); - - // Schema mismatched exceptions can happen if the table was recently updated. Since - // vortex caches schemas - // we might see the new schema before vortex does. In this case, we simply need to - // retry. - Exceptions.@Nullable StorageException storageException = - Exceptions.toStorageException(error); - boolean schemaMismatchError = - (storageException instanceof Exceptions.SchemaMismatchedException); - if (!schemaMismatchError) { - // There's no special error code for missing required fields, and that can also happen - // due to vortex - // being delayed at seeing a new schema. We're forced to parse the description to - // determine that this has happened. - // TODO: Vortex team to introduce a special storage error code for this, so we don't - // have to parse - // descriptions. - Status status = Status.fromThrowable(error); - if (status.getCode() == Code.INVALID_ARGUMENT) { - String description = status.getDescription(); - schemaMismatchError = - description != null && description.contains("incompatible fields"); + }; + + Function, ApiFuture> runOperation = + context -> { + if (context.protoRows.getSerializedRowsCount() == 0) { + // This might happen if all rows in a batch failed and were sent to the failed-rows + // PCollection. + return ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().build()); } - } - if (schemaMismatchError) { - LOG.info( - "Vortex failed stream open due to incompatible fields. This is likely because the BigTable " - + "schema was recently updated and Vortex hasn't noticed yet, so retrying. error {}", - Preconditions.checkStateNotNull(error).toString()); - } - - boolean explicitStreamFinalized = - failedContext.getError() instanceof StreamFinalizedException; - // This implies that the stream doesn't exist or has already been finalized. In this - // case we have no choice but to create a new stream. - boolean streamDoesNotExist = - explicitStreamFinalized - || statusCode.equals(Code.INVALID_ARGUMENT) - || statusCode.equals(Code.NOT_FOUND) - || statusCode.equals(Code.FAILED_PRECONDITION); - streamDoesNotExist = streamDoesNotExist && !schemaMismatchError; - - if (offsetMismatch || streamDoesNotExist) { - appendOffsetFailures.inc(); - LOG.warn( - "Append to {} failed. Will retry with a new stream", - failedContext, - failedContext.getError()); - // Finalize the stream and clear streamName so a new stream will be created. - o.get(flushTag) - .output( - KV.of( - failedContext.streamName, new Operation(failedContext.offset - 1, true))); - // Reinitialize all contexts with the new stream and new offsets. - initializeContexts.accept(failedContexts, true); - - // Offset failures imply that all subsequent parallel appends will also fail. - // Retry them all. + try { + return appendClientHolder + .getStreamAppendClient() + .appendRows(context.offset, context.protoRows); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + Function>, RetryType> onError = + failedContexts -> { + handleAppendFailure( + failedContexts, + tableReference, + shortTableId, + appendClientHolder.get(), + tryCreateTable, + initializeContexts, + clearClients, + streamOffset, + o); return RetryType.RETRY_ALL_OPERATIONS; - } - - return RetryType.RETRY_ALL_OPERATIONS; - }; + }; + Consumer> onSuccess = + context -> handleAppendSuccess(context, shortTableId, appendClientHolder.get(), o); + + BackOff backoff = + FluentBackoff.DEFAULT + .withInitialBackoff(Duration.standardSeconds(1)) + .withMaxBackoff(Duration.standardMinutes(1)) + .withMaxRetries(500) + .withThrottledTimeCounter( + BigQuerySinkMetrics.throttledTimeCounter( + BigQuerySinkMetrics.RpcMethod.OPEN_WRITE_STREAM)) + .backoff(); + CreateRetryManagerResult createRetryManagerResult; + do { + // Each ProtoRows object contains at most 1MB of rows. + // TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely + // if + // already proto or already schema. + Iterable messages = + new SplittingIterable( + element.getValue(), + splitSize, + // Unknown field merger + (bytes, tableRow) -> + appendClientHolder.get().mergeNewFields(bytes, tableRow, ignoreUnknownValues), + // Convert back to TableRow + bytes -> appendClientHolder.get().toTableRow(bytes, Predicates.alwaysTrue()), + // Failed rows consumer + (failedRow, errorMessage) -> { + o.get(failedRowsTag) + .outputWithTimestamp( + new BigQueryStorageApiInsertError( + failedRow.getValue(), errorMessage, tableReference), + failedRow.getTimestamp()); + rowsSentToFailedRowsCollection.inc(); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, + BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, + shortTableId) + .inc(1); + }, + // Get the currently-known TableSchema hash + () -> appendClientHolder.get().getTableSchemaHash(), + () -> + TableRowToStorageApiProto.wrapDescriptorProto( + messageConverter.getDescriptor(false)), + autoUpdateSchema, + elementTs); + + createRetryManagerResult = + createRetryManager( + element.getKey(), + messages, + runOperation, + onError, + onSuccess, + appendClientHolder.get(), + tableReference); + if (createRetryManagerResult.getSchemaMismatchSeen()) { + // TODO: The call to updateSchemaFromTable will throttle the DoFn (both because of the + // RPC + // call and because + // the cache has a delay on refresh). We should update throttling counters here as well. + LOG.info("Schema out of date: refreshing table schema for {}", tableId); + // Force the message converter to get the schema again from the table. + messageConverter.updateSchemaFromTable(); + // Close all RPC clients that were opened with the old descriptor. Clear the cache, + // forcing us to create a new append client with the updated descriptor. + appendClientHolder.invalidateAndReset(); + } + } while (createRetryManagerResult.getSchemaMismatchSeen() + && BackOffUtils.next(Sleeper.DEFAULT, backoff)); + + // Output any rows that failed along they way. + createRetryManagerResult + .getFailedRows() + .forEach( + tv -> o.get(failedRowsTag).outputWithTimestamp(tv.getValue(), tv.getTimestamp())); + rowsSentToFailedRowsCollection.inc(createRetryManagerResult.getFailedRows().size()); + BigQuerySinkMetrics.appendRowsRowStatusCounter( + BigQuerySinkMetrics.RowStatus.FAILED, + BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, + shortTableId) + .inc(createRetryManagerResult.getFailedRows().size()); + + recordsAppended.inc(createRetryManagerResult.getRecordsAppended()); + createRetryManagerResult.getHistogramValues().forEach(appendSizeDistribution::update); + + Instant now = Instant.now(); + + RetryManager> retryManager = + Preconditions.checkStateNotNull(createRetryManagerResult.getRetryManager()); + int numAppends = retryManager.getRemainingOperationCount(); + Iterable> contexts = retryManager.getRemainingContexts(); + + if (numAppends > 0) { + initializeContexts.accept(contexts, false); + retryManager.run(true); - Consumer> onSuccess = - context -> { - AppendRowsResponse response = Preconditions.checkStateNotNull(context.getResult()); - o.get(flushTag) - .output( - KV.of( - context.streamName, - new Operation( - context.offset + context.protoRows.getSerializedRowsCount() - 1, - false))); - int flushedRows = context.protoRows.getSerializedRowsCount(); - flushesScheduled.inc(flushedRows); - BigQuerySinkMetrics.reportSuccessfulRpcMetrics( - context, BigQuerySinkMetrics.RpcMethod.APPEND_ROWS, shortTableId); - BigQuerySinkMetrics.appendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.SUCCESSFUL, BigQuerySinkMetrics.OK, shortTableId) - .inc(flushedRows); - - if (successfulRowsTag != null) { - for (int i = 0; i < context.protoRows.getSerializedRowsCount(); ++i) { - ByteString protoBytes = context.protoRows.getSerializedRows(i); - org.joda.time.Instant timestamp = context.timestamps.get(i); - o.get(successfulRowsTag) - .outputWithTimestamp( - appendClientInfo.get().toTableRow(protoBytes, successfulRowsPredicate), - timestamp); + appendSplitDistribution.update(numAppends); + if (autoUpdateSchema) { + @Nullable + StreamAppendClient streamAppendClient = appendClientHolder.getStreamAppendClient(); + TableSchema originalSchema = appendClientHolder.get().getTableSchema(); + + @Nullable + TableSchema updatedSchemaReturned = + (streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null; + // Update the table schema and clear the append client. + if (updatedSchemaReturned != null) { + Optional newSchema = + TableSchemaUpdateUtils.getUpdatedSchema(originalSchema, updatedSchemaReturned); + if (newSchema.isPresent()) { + APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey())); + LOG.debug( + "Fetched updated schema for table {}:\n\t{}", tableId, updatedSchemaReturned); + updatedSchema.write(newSchema.get()); } } - }; - - BackOff backoff = - FluentBackoff.DEFAULT - .withInitialBackoff(Duration.standardSeconds(1)) - .withMaxBackoff(Duration.standardMinutes(1)) - .withMaxRetries(500) - .withThrottledTimeCounter( - BigQuerySinkMetrics.throttledTimeCounter( - BigQuerySinkMetrics.RpcMethod.OPEN_WRITE_STREAM)) - .backoff(); - CreateRetryManagerResult createRetryManagerResult; - do { - // Each ProtoRows object contains at most 1MB of rows. - // TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if - // already proto or already schema. - Iterable messages = - new SplittingIterable( - element.getValue(), - splitSize, - // Unknown field merger - (bytes, tableRow) -> - appendClientInfo.get().mergeNewFields(bytes, tableRow, ignoreUnknownValues), - // Convert back to TableRow - bytes -> appendClientInfo.get().toTableRow(bytes, Predicates.alwaysTrue()), - // Failed rows consumer - (failedRow, errorMessage) -> { - o.get(failedRowsTag) - .outputWithTimestamp( - new BigQueryStorageApiInsertError( - failedRow.getValue(), errorMessage, tableReference), - failedRow.getTimestamp()); - rowsSentToFailedRowsCollection.inc(); - BigQuerySinkMetrics.appendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.FAILED, - BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, - shortTableId) - .inc(1); - }, - // Get the currently-known TableSchema hash - () -> appendClientInfo.get().getTableSchemaHash(), - () -> - TableRowToStorageApiProto.wrapDescriptorProto( - messageConverter.getDescriptor(false)), - autoUpdateSchema, - elementTs); - - createRetryManagerResult = - createRetryManager( - element.getKey(), - messages, - runOperation, - onError, - onSuccess, - appendClientInfo.get(), - tableReference); - if (createRetryManagerResult.getSchemaMismatchSeen()) { - // TODO: The call to updateSchemaFromTable will throttle the DoFn (both because of the RPC - // call and because - // the cache has a delay on refresh). We should update throttling counters here as well. - LOG.info("Schema out of date: refreshing table schema for {}", tableId); - // Force the message converter to get the schema again from the table. - messageConverter.updateSchemaFromTable(); - // Close all RPC clients that were opened with the old descriptor. Clear the cache, - // forcing us to create a new append client with the updated descriptor. - APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey())); - appendClientInfo.set( - APPEND_CLIENTS.get( - messageConverters.getAppendClientKey(element.getKey()), getAppendClientInfo)); - } - } while (createRetryManagerResult.getSchemaMismatchSeen() - && BackOffUtils.next(Sleeper.DEFAULT, backoff)); - - // Output any rows that failed along they way. - createRetryManagerResult - .getFailedRows() - .forEach( - tv -> o.get(failedRowsTag).outputWithTimestamp(tv.getValue(), tv.getTimestamp())); - rowsSentToFailedRowsCollection.inc(createRetryManagerResult.getFailedRows().size()); - BigQuerySinkMetrics.appendRowsRowStatusCounter( - BigQuerySinkMetrics.RowStatus.FAILED, - BigQuerySinkMetrics.PAYLOAD_TOO_LARGE, - shortTableId) - .inc(createRetryManagerResult.getFailedRows().size()); - - recordsAppended.inc(createRetryManagerResult.getRecordsAppended()); - createRetryManagerResult.getHistogramValues().forEach(appendSizeDistribution::update); - - Instant now = Instant.now(); - - RetryManager> retryManager = - Preconditions.checkStateNotNull(createRetryManagerResult.getRetryManager()); - int numAppends = retryManager.getRemainingOperationCount(); - Iterable> contexts = retryManager.getRemainingContexts(); - - if (numAppends > 0) { - initializeContexts.accept(contexts, false); - try { - retryManager.run(true); - } finally { - // Make sure that all pins are removed. - for (AppendRowsContext context : contexts) { - if (context.client != null) { - runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin); - } - } - } - appendSplitDistribution.update(numAppends); - - if (autoUpdateSchema) { - @Nullable - StreamAppendClient streamAppendClient = appendClientInfo.get().getStreamAppendClient(); - TableSchema originalSchema = appendClientInfo.get().getTableSchema(); - ; - @Nullable - TableSchema updatedSchemaReturned = - (streamAppendClient != null) ? streamAppendClient.getUpdatedSchema() : null; - // Update the table schema and clear the append client. - if (updatedSchemaReturned != null) { - Optional newSchema = - TableSchemaUpdateUtils.getUpdatedSchema(originalSchema, updatedSchemaReturned); - if (newSchema.isPresent()) { - appendClientInfo.set( - AppendClientInfo.of( - newSchema.get(), appendClientInfo.get().getCloseAppendClient(), false)); - APPEND_CLIENTS.invalidate(messageConverters.getAppendClientKey(element.getKey())); - APPEND_CLIENTS.put( - messageConverters.getAppendClientKey(element.getKey()), appendClientInfo.get()); - LOG.debug( - "Fetched updated schema for table {}:\n\t{}", tableId, updatedSchemaReturned); - updatedSchema.write(newSchema.get()); - } } - } - java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now()); - appendLatencyDistribution.update(timeElapsed.toMillis()); + java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now()); + appendLatencyDistribution.update(timeElapsed.toMillis()); + } } idleTimer.offset(streamIdleTime).withNoOutputTimestamp().setRelative(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 1849df422425..814f4eec421f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -764,7 +764,7 @@ public StreamAppendClient getStreamAppendClient( AppendRowsRequest.MissingValueInterpretation missingValueInterpretation) throws Exception { return new StreamAppendClient() { - private Descriptor protoDescriptor; + private Descriptor protoDescriptor = null; private TableSchema currentSchema; private @Nullable com.google.cloud.bigquery.storage.v1.TableSchema updatedSchema; TableRowToStorageApiProto.SchemaInformation schemaInformation; @@ -900,7 +900,7 @@ public void close() throws Exception {} public void pin() {} @Override - public void unpin() throws Exception {} + public void unpin() {} }; } From 4e8a1bbcb65ceab3784dffc36ef936fea37bdc2d Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Mon, 27 Apr 2026 12:20:10 -0700 Subject: [PATCH 2/5] fixes --- .../sdk/io/gcp/bigquery/AppendClientCache.java | 14 +++----------- .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 8 +------- .../bigquery/StorageApiWritesShardedRecords.java | 4 ++-- 3 files changed, 6 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java index b465f4f3aa44..73c2743003b1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java @@ -89,10 +89,6 @@ public AppendClientInfo getAndPin(KeyT key, Callable loader) t } } - public AppendClientInfo get(KeyT key, Callable loader) throws Exception { - return appendCache.get(key, wrapWithPin(loader)); - } - public AppendClientInfo putAndPin(KeyT key, Callable loader) throws Exception { synchronized (this) { AppendClientInfo info = wrapWithPin(loader).call(); @@ -102,12 +98,6 @@ public AppendClientInfo putAndPin(KeyT key, Callable loader) t } } - public AppendClientInfo put(KeyT key, Callable loader) throws Exception { - AppendClientInfo info = wrapWithPin(loader).call(); - appendCache.put(key, info); - return info; - } - public void invalidate(KeyT key, AppendClientInfo expectedClient) { // The default stream is cached across multiple different DoFns. If they all try // and @@ -133,7 +123,9 @@ public void invalidate(KeyT key) { } public void tickle(KeyT key) { - appendCache.getIfPresent(key); + synchronized (this) { + appendCache.getIfPresent(key); + } } public void clear() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index 67b8784b62d2..68fd73b7b9b1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -243,13 +243,7 @@ private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunn try { task.run(); } catch (Throwable e) { - String msg = - e.toString() - + "\n" - + Arrays.stream(e.getStackTrace()) - .map(StackTraceElement::toString) - .collect(Collectors.joining("\n")); - System.err.println("Exception happened while executing async task. Ignoring: " + msg); + LOG.info("Exception happened while executing async task. Ignoring: ", e); } }); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 93b0cf2d0ded..d9750e9ae631 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -142,7 +142,7 @@ public class StorageApiWritesShardedRecords> flushTag = new TupleTag<>("flushTag"); - private static AppendClientCache>> APPEND_CLIENTS = + private static final AppendClientCache>> APPEND_CLIENTS = new AppendClientCache<>(Duration.standardMinutes(5)); static void clearCache() { @@ -697,7 +697,7 @@ private void handleAppendFailure( } if (schemaMismatchError) { LOG.info( - "Vortex failed stream open due to incompatible fields. This is likely because the BigTable " + "Vortex failed stream open due to incompatible fields. This is likely because the BigQuery " + "schema was recently updated and Vortex hasn't noticed yet, so retrying. error {}", Preconditions.checkStateNotNull(error).toString()); } From c012d5f39e9863d765fb2257504dcbc0d4b4b464 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 28 Apr 2026 10:17:26 -0700 Subject: [PATCH 3/5] fixes --- .../beam/sdk/io/gcp/bigquery/AppendClientInfo.java | 10 +--------- .../gcp/bigquery/StorageApiWritesShardedRecords.java | 7 +++---- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java index 68fd73b7b9b1..55c6007e1986 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java @@ -27,11 +27,9 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import com.google.protobuf.Message; -import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.function.Predicate; import java.util.function.Supplier; -import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.beam.sdk.function.ThrowingConsumer; import org.apache.beam.sdk.function.ThrowingRunnable; @@ -147,13 +145,7 @@ public void close() { getCloseAppendClient().accept(client); } catch (Exception e) { // We ignore errors when closing clients. - String msg = - e - + "\n" - + Arrays.stream(e.getStackTrace()) - .map(StackTraceElement::toString) - .collect(Collectors.joining("\n")); - LOG.warn("Caught exception whilw trying to close append client. Ignoring {}", msg); + LOG.warn("Caught exception whilw trying to close append client. Ignoring", e); } activeStreamAppendClients.dec(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index d9750e9ae631..cbace6e7ff40 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -905,10 +905,9 @@ public void process( } TableSchema updatedSchemaValue = autoUpdateSchema ? updatedSchema.read() : null; - if (autoUpdateSchema && updatedSchemaValue != null) { - if (appendClientHolder.get().hasSchemaChanged(updatedSchemaValue)) { - appendClientHolder.invalidateAndReset(); - } + if ((autoUpdateSchema && updatedSchemaValue != null) + && appendClientHolder.get().hasSchemaChanged(updatedSchemaValue)) { + appendClientHolder.invalidateAndReset(); } // Initialize stream names and offsets for all contexts. This will be called initially, but From 4748fcd2b7ee128c94c7dcced4fb22f0326104b2 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 28 Apr 2026 11:15:11 -0700 Subject: [PATCH 4/5] fixes --- .../beam/sdk/io/gcp/bigquery/AppendClientCache.java | 10 +++++----- .../gcp/bigquery/StorageApiWriteUnshardedRecords.java | 8 ++++++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java index 73c2743003b1..a9ac8dfe5905 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java @@ -89,12 +89,12 @@ public AppendClientInfo getAndPin(KeyT key, Callable loader) t } } - public AppendClientInfo putAndPin(KeyT key, Callable loader) throws Exception { + /** "Refresh" an object by invalidating the old cache entry. */ + public AppendClientInfo refreshObjectAndAndPin(KeyT key, Callable loader) + throws Exception { synchronized (this) { - AppendClientInfo info = wrapWithPin(loader).call(); - appendCache.put(key, info); - info.pinAppendClient(); - return info; + appendCache.invalidate(key); + return getAndPin(key, loader); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 8ce9869e3966..8954d4dee737 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -451,7 +451,7 @@ AppendClientInfo getAppendClientInfo( lookupCache ? APPEND_CLIENTS.getAndPin( getStreamAppendClientCacheEntryKey(), () -> generateClient(updatedSchema)) - : APPEND_CLIENTS.putAndPin( + : APPEND_CLIENTS.refreshObjectAndAndPin( getStreamAppendClientCacheEntryKey(), () -> generateClient(updatedSchema)); } nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1)); @@ -941,7 +941,11 @@ void postFlush() { TableSchemaUpdateUtils.getUpdatedSchema( this.messageConverter.getTableSchema(), updatedTableSchemaReturned); if (updatedTableSchema.isPresent()) { - invalidateAppendClient(true); + invalidateAppendClient(false); + // TODO: This overwrites whatever is in the cache which can cause races between + // threads. + // A better approach would be to check the cache, and keep whichever schema is + // "larger". appendClientInfo = Preconditions.checkStateNotNull( getAppendClientInfo(false, updatedTableSchema.get())); From 4199c880ae2a9d73c21fb29a865680e15300cb37 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Tue, 28 Apr 2026 11:51:44 -0700 Subject: [PATCH 5/5] typo --- .../org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java | 2 +- .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java index a9ac8dfe5905..b172a4ac3046 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientCache.java @@ -90,7 +90,7 @@ public AppendClientInfo getAndPin(KeyT key, Callable loader) t } /** "Refresh" an object by invalidating the old cache entry. */ - public AppendClientInfo refreshObjectAndAndPin(KeyT key, Callable loader) + public AppendClientInfo refreshObjectAndPin(KeyT key, Callable loader) throws Exception { synchronized (this) { appendCache.invalidate(key); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 8954d4dee737..2dfc8b2f1c00 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -451,7 +451,7 @@ AppendClientInfo getAppendClientInfo( lookupCache ? APPEND_CLIENTS.getAndPin( getStreamAppendClientCacheEntryKey(), () -> generateClient(updatedSchema)) - : APPEND_CLIENTS.refreshObjectAndAndPin( + : APPEND_CLIENTS.refreshObjectAndPin( getStreamAppendClientCacheEntryKey(), () -> generateClient(updatedSchema)); } nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1));