Skip to content

Refactor StorageAPI BigQuery sink#38264

Merged
reuvenlax merged 5 commits intoapache:masterfrom
reuvenlax:refactor_storageapi_conversion
Apr 29, 2026
Merged

Refactor StorageAPI BigQuery sink#38264
reuvenlax merged 5 commits intoapache:masterfrom
reuvenlax:refactor_storageapi_conversion

Conversation

@reuvenlax
Copy link
Copy Markdown
Contributor

@reuvenlax reuvenlax commented Apr 22, 2026

Refactors the caching and pinning logic. This change consolidates complicated caching logic spread across StorageApiWriteShardedRecords and StorageApiWriteUnshardedRecords into a common location. This simplifies the caching logic, and also simplifies the append-client pinning logic which had become hard to follow.

@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@reuvenlax reuvenlax force-pushed the refactor_storageapi_conversion branch from c6c467d to f61e264 Compare April 27, 2026 17:47
@reuvenlax reuvenlax force-pushed the refactor_storageapi_conversion branch from fe48b2c to 8918b26 Compare April 27, 2026 18:14
@reuvenlax reuvenlax changed the title Refactor storageapi conversion Refactor StorageAPI BigQuery sink Apr 27, 2026
@apache apache deleted a comment from gemini-code-assist Bot Apr 27, 2026
@reuvenlax
Copy link
Copy Markdown
Contributor Author

/gemini

@reuvenlax
Copy link
Copy Markdown
Contributor Author

/gemini summary

@reuvenlax
Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

This pull request refactors the StorageAPI BigQuery sink to improve the maintainability and reliability of its caching and client-pinning infrastructure. By consolidating complex, distributed logic into a dedicated cache manager, the changes reduce code duplication and simplify the synchronization protocol required for safe append-client management. These improvements lead to a more robust system for handling stream lifecycle and resource cleanup.

Highlights

  • Consolidated Caching Logic: Introduced a new AppendClientCache class to centralize and simplify the caching and pinning logic previously duplicated across StorageApiWriteShardedRecords and StorageApiWriteUnshardedRecords.
  • Improved Pinning Protocol: Refactored the append-client pinning mechanism to be more robust and easier to maintain, ensuring safe synchronization between cache lookups and client usage.
  • Error Handling and Cleanup: Enhanced error handling during client closure and unpinning, including better support for asynchronous cleanup to avoid blocking main processing threads.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Activity
  • The pull request was created by reuvenlax.
  • Automated checks initially failed, preventing review requests.
  • The author requested a summary and review via automated tools.

@reuvenlax reuvenlax requested a review from ahmedabu98 April 27, 2026 18:18
@reuvenlax
Copy link
Copy Markdown
Contributor Author

R: @ahmedabu98

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a centralized AppendClientCache to manage the lifecycle and pinning protocol of BigQuery Storage API append clients, refactoring StorageApiWriteUnshardedRecords and StorageApiWritesShardedRecords to use this new component. The changes improve resource management by ensuring clients are pinned during use and closed asynchronously upon eviction. Feedback focuses on ensuring thread safety by synchronizing all cache access methods in AppendClientCache, adhering to logging best practices by using SLF4J's built-in exception handling instead of manual stack trace formatting, and correcting minor typos in log messages.

Comment on lines +92 to +94
public AppendClientInfo get(KeyT key, Callable<AppendClientInfo> loader) throws Exception {
return appendCache.get(key, wrapWithPin(loader));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This method should be synchronized to maintain the atomicity protocol described in the class Javadoc. Without synchronization, a concurrent access via this method could trigger cache maintenance and evict an entry that another thread is currently pinning in getAndPin.

Suggested change
public AppendClientInfo get(KeyT key, Callable<AppendClientInfo> loader) throws Exception {
return appendCache.get(key, wrapWithPin(loader));
}
public synchronized AppendClientInfo get(KeyT key, Callable<AppendClientInfo> loader)
throws Exception {
return appendCache.get(key, wrapWithPin(loader));
}

Comment on lines +105 to +109
public AppendClientInfo put(KeyT key, Callable<AppendClientInfo> loader) throws Exception {
AppendClientInfo info = wrapWithPin(loader).call();
appendCache.put(key, info);
return info;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This method should be synchronized to ensure that the cache access is atomic relative to other operations like getAndPin and invalidate. This prevents races where an entry might be evicted or invalidated while being updated.

Suggested change
public AppendClientInfo put(KeyT key, Callable<AppendClientInfo> loader) throws Exception {
AppendClientInfo info = wrapWithPin(loader).call();
appendCache.put(key, info);
return info;
}
public synchronized AppendClientInfo put(KeyT key, Callable<AppendClientInfo> loader)
throws Exception {
AppendClientInfo info = wrapWithPin(loader).call();
appendCache.put(key, info);
return info;
}

Comment on lines +135 to +137
public void tickle(KeyT key) {
appendCache.getIfPresent(key);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The tickle method should be synchronized. Since it performs a cache access (getIfPresent), it can trigger maintenance and eviction. To protect the atomicity of the pinning protocol used in getAndPin, all cache access methods must be synchronized on the same monitor.

Suggested change
public void tickle(KeyT key) {
appendCache.getIfPresent(key);
}
public synchronized void tickle(KeyT key) {
appendCache.getIfPresent(key);
}

Comment on lines +150 to +156
String msg =
e
+ "\n"
+ Arrays.stream(e.getStackTrace())
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n"));
LOG.warn("Caught exception whilw trying to close append client. Ignoring {}", msg);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The manual formatting of the stack trace is unnecessary and inefficient. SLF4J loggers can handle exceptions directly, which includes printing the stack trace. This also fixes a typo ('whilw' -> 'while').

        LOG.warn("Caught exception while trying to close append client. Ignoring", e);

Comment on lines +246 to +252
String msg =
e.toString()
+ "\n"
+ Arrays.stream(e.getStackTrace())
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n"));
System.err.println("Exception happened while executing async task. Ignoring: " + msg);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Avoid using System.err.println for error reporting. Use the class logger instead. Additionally, let the logger handle the exception object directly rather than manually formatting the stack trace.

            LOG.error("Exception happened while executing async task. Ignoring", e);

Comment on lines +145 to +146
private static AppendClientCache<KV<String, ShardedKey<?>>> APPEND_CLIENTS =
new AppendClientCache<>(Duration.standardMinutes(5));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The APPEND_CLIENTS cache should be declared as final to ensure it is not reassigned and to follow best practices for static constants.

Suggested change
private static AppendClientCache<KV<String, ShardedKey<?>>> APPEND_CLIENTS =
new AppendClientCache<>(Duration.standardMinutes(5));
private static final AppendClientCache<KV<String, ShardedKey<?>>> APPEND_CLIENTS =
new AppendClientCache<>(Duration.standardMinutes(5));

if (schemaMismatchError) {
LOG.info(
"Vortex failed stream open due to incompatible fields. This is likely because the BigTable "
+ "schema was recently updated and Vortex hasn't noticed yet, so retrying. error {}",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Typo: 'BigTable' should be 'BigQuery' in this context, as this class handles BigQuery Storage API writes.

            "Vortex failed stream open due to incompatible fields. This is likely because the BigQuery "

@reuvenlax
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a centralized AppendClientCache to manage the lifecycle and pinning of StreamAppendClient objects, refactoring StorageApiWriteUnshardedRecords and StorageApiWritesShardedRecords to use this new component. The changes aim to improve thread safety and resource management when interacting with the BigQuery Storage API. Feedback includes recommendations to use named daemon threads for the closer executor, optimize cache synchronization to prevent bottlenecks during slow client loading, and improve logging practices by simplifying stack trace handling and using appropriate log levels for background task failures.

*/
class AppendClientCache<KeyT extends @NonNull Object> {
private static final Logger LOG = LoggerFactory.getLogger(AppendClientCache.class);
private final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

It is recommended to use a named ThreadFactory with daemon threads for the ExecutorService. This improves observability in thread dumps and ensures that the threads do not prevent the JVM from exiting if the process is shutting down.

  private final ExecutorService closeWriterExecutor =
      Executors.newCachedThreadPool(
          new org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder()
              .setDaemon(true)
              .setNameFormat("AppendClientCache-closer-%d")
              .build());

Comment on lines +85 to +89
synchronized (this) {
AppendClientInfo info = appendCache.get(key, wrapWithPin(loader));
info.pinAppendClient();
return info;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using synchronized(this) on the entire getAndPin method creates a global bottleneck for all cache accesses. Since the loader (which creates the StreamAppendClient) can be slow due to network calls, this will block all other threads trying to access the cache for different keys. Consider using a more granular locking mechanism, such as Guava's Striped locks, to allow concurrent access for different keys.

Comment on lines +150 to +156
String msg =
e
+ "\n"
+ Arrays.stream(e.getStackTrace())
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n"));
LOG.warn("Caught exception whilw trying to close append client. Ignoring {}", msg);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The manual formatting of the stack trace is redundant and inefficient. SLF4J's logger can handle exceptions directly, which includes printing the stack trace. Additionally, there is a typo in the log message ('whilw' instead of 'while').

        LOG.warn("Caught exception while trying to close append client. Ignoring", e);

try {
task.run();
} catch (Throwable e) {
LOG.info("Exception happened while executing async task. Ignoring: ", e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Logging an exception at INFO level is unusual. Even if the failure is ignored, it is generally better to log it at WARN or ERROR level to make it more visible in logs when troubleshooting background tasks.

Suggested change
LOG.info("Exception happened while executing async task. Ignoring: ", e);
LOG.warn("Exception happened while executing async task. Ignoring: ", e);

Copy link
Copy Markdown
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still wrapping my head around it, but mostly looks good. Happy to see the handleAppendFailure and handleAppendSuccess methods

Comment on lines +92 to +94
public AppendClientInfo putAndPin(KeyT key, Callable<AppendClientInfo> loader) throws Exception {
synchronized (this) {
AppendClientInfo info = wrapWithPin(loader).call();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to unpin any existing AppendClients?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand - do you mean unpin other clients when inserting this one?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, unpinning a client if it already exists for this key

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, that's actually a good point. The only place we call it is immediately after a call to invalidate, but there's a race condition there where somebody else could sneak in.

I don't think this could actually cause problems, since the pin owned by the DoFn is always released based on the instance variable (this.appendClientInfo), so even if we "lose" the cache entry we won't lose the pin. However to make things a bit clearer, I got rid of this method.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I think I just got mixed up on the unpinning logic.
I thought we were leaking resources by not unpinning the consuming processElements, but I just did another pass and it looks fine to me.

extends RetryManager.Operation.Context<AppendRowsResponse> {
final ShardedKey<DestinationT> key;
String streamName = "";
@Nullable StreamAppendClient client = null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm so we're no longer pinning the client for each AppenRowsContext? Can you briefly explain this change somewhere? maybe in the PR description?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a comment in front of the try-with-resources block explaining this.

Basically pins are ref counts. Since all calls happen in the context of processElements, all we really need is one pin for the entire function - there was never a need for per-RPC pins which was hard to follow. Instead we add one pin for the duration of the function, managed via a try-with-resources block. Since the cache holds onto objects longer, the cache also manages one additional pin per object (though that can be invalidated at any time)

Copy link
Copy Markdown
Contributor

@ahmedabu98 ahmedabu98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

just one typo left tho

}

/** "Refresh" an object by invalidating the old cache entry. */
public AppendClientInfo refreshObjectAndAndPin(KeyT key, Callable<AppendClientInfo> loader)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo

Suggested change
public AppendClientInfo refreshObjectAndAndPin(KeyT key, Callable<AppendClientInfo> loader)
public AppendClientInfo refreshObjectAndPin(KeyT key, Callable<AppendClientInfo> loader)

@github-actions
Copy link
Copy Markdown
Contributor

Assigning reviewers:

R: @kennknowles for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@reuvenlax reuvenlax merged commit 281270c into apache:master Apr 29, 2026
17 of 18 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants