Skip to content

only expand update graph if needed#38332

Open
reuvenlax wants to merge 1 commit intoapache:masterfrom
reuvenlax:dont_expand_full_schema_evolution
Open

only expand update graph if needed#38332
reuvenlax wants to merge 1 commit intoapache:masterfrom
reuvenlax:dont_expand_full_schema_evolution

Conversation

@reuvenlax
Copy link
Copy Markdown
Contributor

No description provided.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a performance optimization by conditionally expanding the update graph in the BigQuery Storage API conversion logic. By checking for the presence of schema update options before building the graph, the pipeline avoids unnecessary overhead when these features are not in use.

Highlights

  • Conditional Graph Expansion: Added a conditional check to skip the expansion of the update graph if schema update options are not required, optimizing pipeline construction.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@reuvenlax reuvenlax requested a review from Abacn April 29, 2026 19:24
@reuvenlax
Copy link
Copy Markdown
Contributor Author

FYI @ahmedabu98

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a conditional check in StorageApiConvertMessages.java to skip the schema update graph expansion when hasSchemaUpdateOptions is false, optimizing performance when updates are unnecessary. The review feedback suggests using a guard clause to return early, which would simplify the code structure by reducing unnecessary nesting in the method implementation.

Comment on lines +153 to +260
} else {
final int numShards =
input
.getPipeline()
.getOptions()
.as(BigQueryOptions.class)
.getSchemaUpgradeBufferingShards();

final int numShards =
input
.getPipeline()
.getOptions()
.as(BigQueryOptions.class)
.getSchemaUpgradeBufferingShards();
// Throttle the stream to the patch-table function so that only a single update per table per
// two seconds gets processed (to match quotas). The combiner merges incremental schemas, so
// we
// won't miss any updates.
PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
result
.get(patchTableSchemaTag)
.apply(
"rewindow",
Window.<KV<DestinationT, TableSchema>>configure()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(2))))
.discardingFiredPanes())
.apply("merge schemas", Combine.fewKeys(new MergeSchemaCombineFn()))
.setCoder(KvCoder.of(destinationCoder, ProtoCoder.of(TableSchema.class)))
.apply(
"Patch table schema",
ParDo.of(
new PatchTableSchemaDoFn<>(operationName, bqServices, dynamicDestinations)))
.setCoder(KvCoder.of(destinationCoder, NullableCoder.of(elementCoder)))
// We need to make sure that all shards of the buffering transform are notified.
.apply(
"fanout to all shards",
FlatMapElements.via(
new SimpleFunction<
KV<DestinationT, ElementT>,
Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
@Override
public Iterable<KV<ShardedKey<DestinationT>, ElementT>> apply(
KV<DestinationT, ElementT> elem) {
return IntStream.range(0, numShards)
.mapToObj(
i ->
KV.of(
StorageApiConvertMessages.AssignShardFn.getShardedKey(
elem.getKey(), i, numShards),
elem.getValue()))
.collect(Collectors.toList());
}
}))
.setCoder(
KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder)))
.apply(
Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
.triggering(DefaultTrigger.of()));

// Throttle the stream to the patch-table function so that only a single update per table per
// two seconds gets processed (to match quotas). The combiner merges incremental schemas, so we
// won't miss any updates.
PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
result
.get(patchTableSchemaTag)
.apply(
"rewindow",
Window.<KV<DestinationT, TableSchema>>configure()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(2))))
.discardingFiredPanes())
.apply("merge schemas", Combine.fewKeys(new MergeSchemaCombineFn()))
.setCoder(KvCoder.of(destinationCoder, ProtoCoder.of(TableSchema.class)))
.apply(
"Patch table schema",
ParDo.of(
new PatchTableSchemaDoFn<>(operationName, bqServices, dynamicDestinations)))
.setCoder(KvCoder.of(destinationCoder, NullableCoder.of(elementCoder)))
// We need to make sure that all shards of the buffering transform are notified.
.apply(
"fanout to all shards",
FlatMapElements.via(
new SimpleFunction<
KV<DestinationT, ElementT>,
Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
@Override
public Iterable<KV<ShardedKey<DestinationT>, ElementT>> apply(
KV<DestinationT, ElementT> elem) {
return IntStream.range(0, numShards)
.mapToObj(
i ->
KV.of(
StorageApiConvertMessages.AssignShardFn.getShardedKey(
elem.getKey(), i, numShards),
elem.getValue()))
.collect(Collectors.toList());
}
}))
.setCoder(
KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder)))
.apply(
Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
.triggering(DefaultTrigger.of()));
// Any elements that are waiting for a schema update are sent to this stateful DoFn to be
// buffered.
// Note: we currently do not provide the DynamicDestinations object access to the side input
// in
// this path.
// This is because side inputs are not currently available from timer callbacks. Since side
// inputs are generally
// used for getSchema and in this case we read the schema from the table, this is unlikely to
// be
// a problem.
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedWaitingElements =
result
.get(elementsWaitingForSchemaTag)
// TODO: Consider using GroupIntoBatchs.withShardingKey to get auto sharding here
// instead of fixed sharding.
.apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
.setCoder(
KvCoder.of(
ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder)));

// Any elements that are waiting for a schema update are sent to this stateful DoFn to be
// buffered.
// Note: we currently do not provide the DynamicDestinations object access to the side input in
// this path.
// This is because side inputs are not currently available from timer callbacks. Since side
// inputs are generally
// used for getSchema and in this case we read the schema from the table, this is unlikely to be
// a problem.
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedWaitingElements =
result
.get(elementsWaitingForSchemaTag)
// TODO: Consider using GroupIntoBatchs.withShardingKey to get auto sharding here
// instead of fixed sharding.
.apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
.setCoder(
KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder)));
PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> waitingElementsList =
PCollectionList.of(shardedWaitingElements).and(tablesPatched);
PCollectionTuple retryResult =
waitingElementsList
.apply("Buffered flatten", Flatten.pCollections())
.apply(
"bufferElements",
ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, convertMessagesDoFn))
.withOutputTags(
successfulWritesTag,
TupleTagList.of(ImmutableList.of(failedWritesTag, BAD_RECORD_TAG))));
retryResult.get(successfulWritesTag).setCoder(successCoder);
retryResult.get(failedWritesTag).setCoder(errorCoder);
retryResult.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));

PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> waitingElementsList =
PCollectionList.of(shardedWaitingElements).and(tablesPatched);
PCollectionTuple retryResult =
waitingElementsList
.apply("Buffered flatten", Flatten.pCollections())
.apply(
"bufferElements",
ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, convertMessagesDoFn))
.withOutputTags(
successfulWritesTag,
TupleTagList.of(ImmutableList.of(failedWritesTag, BAD_RECORD_TAG))));
retryResult.get(successfulWritesTag).setCoder(successCoder);
retryResult.get(failedWritesTag).setCoder(errorCoder);
retryResult.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));

// Flatten successes and failures from both the regular transform and the retry transform.
PCollection<KV<DestinationT, StorageApiWritePayload>> allSuccesses =
PCollectionList.of(result.get(successfulWritesTag))
.and(retryResult.get(successfulWritesTag))
.apply("flattenSuccesses", Flatten.pCollections());
PCollection<BigQueryStorageApiInsertError> allFailures =
PCollectionList.of(result.get(failedWritesTag))
.and(retryResult.get(failedWritesTag))
.apply("flattenFailures", Flatten.pCollections());
PCollection<BadRecord> allBadRecords =
PCollectionList.of(result.get(BAD_RECORD_TAG))
.and(retryResult.get(BAD_RECORD_TAG))
.apply("flattenBadRecords", Flatten.pCollections());
return PCollectionTuple.of(successfulWritesTag, allSuccesses)
.and(failedWritesTag, allFailures)
.and(BAD_RECORD_TAG, allBadRecords);
// Flatten successes and failures from both the regular transform and the retry transform.
PCollection<KV<DestinationT, StorageApiWritePayload>> allSuccesses =
PCollectionList.of(result.get(successfulWritesTag))
.and(retryResult.get(successfulWritesTag))
.apply("flattenSuccesses", Flatten.pCollections());
PCollection<BigQueryStorageApiInsertError> allFailures =
PCollectionList.of(result.get(failedWritesTag))
.and(retryResult.get(failedWritesTag))
.apply("flattenFailures", Flatten.pCollections());
PCollection<BadRecord> allBadRecords =
PCollectionList.of(result.get(BAD_RECORD_TAG))
.and(retryResult.get(BAD_RECORD_TAG))
.apply("flattenBadRecords", Flatten.pCollections());
return PCollectionTuple.of(successfulWritesTag, allSuccesses)
.and(failedWritesTag, allFailures)
.and(BAD_RECORD_TAG, allBadRecords);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve readability and reduce nesting, you can use a guard clause. Instead of wrapping the rest of the method in an else block, you can return early and then have the main logic at the top level of the method.

    final int numShards =
        input
            .getPipeline()
            .getOptions()
            .as(BigQueryOptions.class)
            .getSchemaUpgradeBufferingShards();

    // Throttle the stream to the patch-table function so that only a single update per table per
    // two seconds gets processed (to match quotas). The combiner merges incremental schemas, so
    // we
    // won't miss any updates.
    PCollection<KV<ShardedKey<DestinationT>, ElementT>> tablesPatched =
        result
            .get(patchTableSchemaTag)
            .apply(
                "rewindow",
                Window.<KV<DestinationT, TableSchema>>configure()
                    .triggering(
                        Repeatedly.forever(
                            AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(Duration.standardSeconds(2))))
                    .discardingFiredPanes())
            .apply("merge schemas", Combine.fewKeys(new MergeSchemaCombineFn()))
            .setCoder(KvCoder.of(destinationCoder, ProtoCoder.of(TableSchema.class)))
            .apply(
                "Patch table schema",
                ParDo.of(
                    new PatchTableSchemaDoFn<>(operationName, bqServices, dynamicDestinations)))
            .setCoder(KvCoder.of(destinationCoder, NullableCoder.of(elementCoder)))
            // We need to make sure that all shards of the buffering transform are notified.
            .apply(
                "fanout to all shards",
                FlatMapElements.via(
                    new SimpleFunction<
                        KV<DestinationT, ElementT>,
                        Iterable<KV<ShardedKey<DestinationT>, ElementT>>>() {
                      @Override
                      public Iterable<KV<ShardedKey<DestinationT>, ElementT>> apply(
                          KV<DestinationT, ElementT> elem) {
                        return IntStream.range(0, numShards)
                            .mapToObj(
                                i ->
                                    KV.of(
                                        StorageApiConvertMessages.AssignShardFn.getShardedKey(
                                            elem.getKey(), i, numShards),
                                        elem.getValue()))
                            .collect(Collectors.toList());
                      }
                    }))
            .setCoder(
                KvCoder.of(ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder)))
            .apply(
                Window.<KV<ShardedKey<DestinationT>, ElementT>>configure()
                    .triggering(DefaultTrigger.of()));

    // Any elements that are waiting for a schema update are sent to this stateful DoFn to be
    // buffered.
    // Note: we currently do not provide the DynamicDestinations object access to the side input
    // in
    // this path.
    // This is because side inputs are not currently available from timer callbacks. Since side
    // inputs are generally
    // used for getSchema and in this case we read the schema from the table, this is unlikely to
    // be
    // a problem.
    PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedWaitingElements =
        result
            .get(elementsWaitingForSchemaTag)
            // TODO: Consider using GroupIntoBatchs.withShardingKey to get auto sharding here
            // instead of fixed sharding.
            .apply("assignShard", ParDo.of(new AssignShardFn<>(numShards)))
            .setCoder(
                KvCoder.of(
                    ShardedKey.Coder.of(destinationCoder), NullableCoder.of(elementCoder)));

    PCollectionList<KV<ShardedKey<DestinationT>, ElementT>> waitingElementsList =
        PCollectionList.of(shardedWaitingElements).and(tablesPatched);
    PCollectionTuple retryResult =
        waitingElementsList
            .apply("Buffered flatten", Flatten.pCollections())
            .apply(
                "bufferElements",
                ParDo.of(new SchemaUpdateHoldingFn<>(elementCoder, convertMessagesDoFn))
                    .withOutputTags(
                        successfulWritesTag,
                        TupleTagList.of(ImmutableList.of(failedWritesTag, BAD_RECORD_TAG))));
    retryResult.get(successfulWritesTag).setCoder(successCoder);
    retryResult.get(failedWritesTag).setCoder(errorCoder);
    retryResult.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()));

    // Flatten successes and failures from both the regular transform and the retry transform.
    PCollection<KV<DestinationT, StorageApiWritePayload>> allSuccesses =
        PCollectionList.of(result.get(successfulWritesTag))
            .and(retryResult.get(successfulWritesTag))
            .apply("flattenSuccesses", Flatten.pCollections());
    PCollection<BigQueryStorageApiInsertError> allFailures =
        PCollectionList.of(result.get(failedWritesTag))
            .and(retryResult.get(failedWritesTag))
            .apply("flattenFailures", Flatten.pCollections());
    PCollection<BadRecord> allBadRecords =
        PCollectionList.of(result.get(BAD_RECORD_TAG))
            .and(retryResult.get(BAD_RECORD_TAG))
            .apply("flattenBadRecords", Flatten.pCollections());
    return PCollectionTuple.of(successfulWritesTag, allSuccesses)
        .and(failedWritesTag, allFailures)
        .and(BAD_RECORD_TAG, allBadRecords);

@reuvenlax
Copy link
Copy Markdown
Contributor Author

Adding @ahmedabu98 as a reviewer, since @Abacn will be out of office.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant