Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/trigger_files/beam_PostCommit_Java_DataflowV1.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1,
Expand Down
1 change: 1 addition & 0 deletions .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 3,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"revision": 1
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2,
"https://github.com/apache/beam/pull/34294": "noting that PR #34294 should run this test",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"https://github.com/apache/beam/pull/35213": "Eliminating getPane() in favor of getPaneInfo()",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"https://github.com/apache/beam/pull/35213": "Eliminating getPane() in favor of getPaneInfo()",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"https://github.com/apache/beam/pull/35213": "Eliminating getPane() in favor of getPaneInfo()",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test"
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,7 @@ class BeamModulePlugin implements Plugin<Project> {

List<String> skipDefRegexes = []
skipDefRegexes << "AutoValue_.*"
skipDefRegexes << "AutoBuilder_.*"
skipDefRegexes << "AutoOneOf_.*"
skipDefRegexes << ".*\\.jmh_generated\\..*"
skipDefRegexes += configuration.generatedClassPatterns
Expand Down Expand Up @@ -1287,7 +1288,8 @@ class BeamModulePlugin implements Plugin<Project> {
'**/org/apache/beam/gradle/**',
'**/org/apache/beam/model/**',
'**/org/apache/beam/runners/dataflow/worker/windmill/**',
'**/AutoValue_*'
'**/AutoValue_*',
'**/AutoBuilder_*',
]

def jacocoEnabled = project.hasProperty('enableJacocoReport')
Expand Down
1 change: 1 addition & 0 deletions runners/core-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.jackson_core
implementation library.java.jackson_databind
implementation library.java.hamcrest
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.junit
testImplementation library.java.mockito_core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public static <K, V> Iterable<WindowedValue<V>> dropExpiredWindows(
if (input == null) {
return null;
}
return input.explodeWindows();
// The generics in this chain of calls line up best if we drop the covariance
// in the return value of explodeWindows()
return (Iterable<WindowedValue<V>>) input.explodeWindows();
})
.filter(
input -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.OutputBuilderSuppliers;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
Expand Down Expand Up @@ -180,7 +181,8 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {

@Override
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedReceiver(processContext, null);
return DoFnOutputReceivers.windowedReceiver(
processContext, OutputBuilderSuppliers.supplierForElement(element), null);
}

@Override
Expand All @@ -190,7 +192,8 @@ public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {

@Override
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedMultiReceiver(processContext, null);
return DoFnOutputReceivers.windowedMultiReceiver(
processContext, OutputBuilderSuppliers.supplierForElement(element));
}

@Override
Expand Down Expand Up @@ -385,12 +388,12 @@ public PaneInfo pane() {

@Override
public String currentRecordId() {
return element.getCurrentRecordId();
return element.getRecordId();
}

@Override
public Long currentRecordOffset() {
return element.getCurrentRecordOffset();
return element.getRecordOffset();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1057,8 +1057,13 @@ private void prefetchOnTrigger(
}

// Output the actual value.
outputter.output(
WindowedValues.of(KV.of(key, toOutput), outputTimestamp, windows, paneInfo));
WindowedValues.<KV<K, OutputT>>builder()
.setValue(KV.of(key, toOutput))
.setTimestamp(outputTimestamp)
.setWindows(windows)
.setPaneInfo(paneInfo)
.setReceiver(outputter)
.output();
});

reduceFn.onTrigger(renamedTriggerContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.OutputBuilderSupplier;
import org.apache.beam.sdk.util.OutputBuilderSuppliers;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
Expand Down Expand Up @@ -113,7 +115,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out

final @Nullable SchemaCoder<OutputT> mainOutputSchemaCoder;

private @Nullable Map<TupleTag<?>, Coder<?>> outputCoders;
private final @Nullable Map<TupleTag<?>, Coder<?>> outputCoders;

private final @Nullable DoFnSchemaInformation doFnSchemaInformation;

Expand Down Expand Up @@ -395,6 +397,8 @@ private class DoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext
/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
private @Nullable StateNamespace namespace;

private final OutputBuilderSupplier builderSupplier;

/**
* The state namespace for this context.
*
Expand All @@ -412,6 +416,7 @@ private StateNamespace getNamespace() {
private DoFnProcessContext(WindowedValue<InputT> elem) {
fn.super();
this.elem = elem;
this.builderSupplier = OutputBuilderSuppliers.supplierForElement(elem);
}

@Override
Expand Down Expand Up @@ -494,8 +499,17 @@ public <T> void outputWindowedValue(
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
SimpleDoFnRunner.this.outputWindowedValue(
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
builderSupplier
.builder(output)
.setTimestamp(timestamp)
.setWindows(windows)
.setPaneInfo(paneInfo)
.setReceiver(
wv -> {
checkTimestamp(elem.getTimestamp(), wv.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(tag, wv);
})
.output();
}

@Override
Expand All @@ -520,12 +534,12 @@ public Instant timestamp() {

@Override
public String currentRecordId() {
return elem.getCurrentRecordId();
return elem.getRecordId();
}

@Override
public Long currentRecordOffset() {
return elem.getCurrentRecordOffset();
return elem.getRecordOffset();
}

public Collection<? extends BoundedWindow> windows() {
Expand Down Expand Up @@ -604,17 +618,18 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {

@Override
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag);
return DoFnOutputReceivers.windowedReceiver(this, builderSupplier, mainOutputTag);
}

@Override
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.rowReceiver(this, mainOutputTag, mainOutputSchemaCoder);
return DoFnOutputReceivers.rowReceiver(
this, builderSupplier, mainOutputTag, mainOutputSchemaCoder);
}

@Override
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders);
return DoFnOutputReceivers.windowedMultiReceiver(this, builderSupplier, outputCoders);
}

@Override
Expand Down Expand Up @@ -710,6 +725,7 @@ private class OnTimerArgumentProvider<KeyT> extends DoFn<InputT, OutputT>.OnTime
private final TimeDomain timeDomain;
private final String timerId;
private final KeyT key;
private final OutputBuilderSupplier builderSupplier;

/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
private @Nullable StateNamespace namespace;
Expand Down Expand Up @@ -742,6 +758,13 @@ private OnTimerArgumentProvider(
this.timestamp = timestamp;
this.timeDomain = timeDomain;
this.key = key;
this.builderSupplier =
OutputBuilderSuppliers.supplierForElement(
WindowedValues.builder()
.setValue(null)
.setTimestamp(timestamp)
.setWindow(window)
.setPaneInfo(PaneInfo.NO_FIRING));
}

@Override
Expand Down Expand Up @@ -828,17 +851,19 @@ public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {

@Override
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag);
return DoFnOutputReceivers.windowedReceiver(this, builderSupplier, mainOutputTag);
}

@Override
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.rowReceiver(this, mainOutputTag, mainOutputSchemaCoder);
return DoFnOutputReceivers.rowReceiver(
this, builderSupplier, mainOutputTag, mainOutputSchemaCoder);
}

@Override
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders);
// ... what to doooo 0...
return DoFnOutputReceivers.windowedMultiReceiver(this, builderSupplier, outputCoders);
}

@Override
Expand Down Expand Up @@ -978,8 +1003,14 @@ public <T> void outputWindowedValue(
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
checkTimestamp(timestamp(), timestamp);
SimpleDoFnRunner.this.outputWindowedValue(
tag, WindowedValues.of(output, timestamp, windows, paneInfo));

builderSupplier
.builder(output)
.setTimestamp(timestamp)
.setWindows(windows)
.setPaneInfo(paneInfo)
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv))
.output();
}

@Override
Expand Down Expand Up @@ -1015,6 +1046,8 @@ private class OnWindowExpirationArgumentProvider<KeyT>
private final BoundedWindow window;
private final Instant timestamp;
private final KeyT key;
private final OutputBuilderSupplier builderSupplier;

/** Lazily initialized; should only be accessed via {@link #getNamespace()}. */
private @Nullable StateNamespace namespace;

Expand All @@ -1037,6 +1070,13 @@ private OnWindowExpirationArgumentProvider(BoundedWindow window, Instant timesta
this.window = window;
this.timestamp = timestamp;
this.key = key;
this.builderSupplier =
OutputBuilderSuppliers.supplierForElement(
WindowedValues.<Void>builder()
.setValue(null)
.setWindow(window)
.setTimestamp(timestamp)
.setPaneInfo(PaneInfo.NO_FIRING));
}

@Override
Expand Down Expand Up @@ -1109,17 +1149,18 @@ public KeyT key() {

@Override
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedReceiver(this, mainOutputTag);
return DoFnOutputReceivers.windowedReceiver(this, builderSupplier, mainOutputTag);
}

@Override
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.rowReceiver(this, mainOutputTag, mainOutputSchemaCoder);
return DoFnOutputReceivers.rowReceiver(
this, builderSupplier, mainOutputTag, mainOutputSchemaCoder);
}

@Override
public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders);
return DoFnOutputReceivers.windowedMultiReceiver(this, builderSupplier, outputCoders);
}

@Override
Expand Down Expand Up @@ -1241,8 +1282,13 @@ public <T> void outputWindowedValue(
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
checkTimestamp(this.timestamp, timestamp);
SimpleDoFnRunner.this.outputWindowedValue(
tag, WindowedValues.of(output, timestamp, windows, paneInfo));
builderSupplier
.builder(output)
.setTimestamp(timestamp)
.setWindows(windows)
.setPaneInfo(paneInfo)
.setReceiver(wv -> SimpleDoFnRunner.this.outputWindowedValue(tag, wv))
.output();
}

@Override
Expand Down
Loading
Loading