Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions runners/core-construction-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,27 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-common-runner-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand All @@ -88,6 +99,12 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<scope>provided</scope>
</dependency>

<!-- test dependencies -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;
package org.apache.beam.runners.core.construction;

import static com.google.common.base.Preconditions.checkArgument;

Expand All @@ -36,9 +36,10 @@
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Triggers;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
import org.joda.time.Duration;
Expand Down Expand Up @@ -82,7 +83,7 @@ public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMo
}
}

public static RunnerApi.ClosingBehavior toProto(Window.ClosingBehavior closingBehavior) {
public static RunnerApi.ClosingBehavior toProto(ClosingBehavior closingBehavior) {
switch (closingBehavior) {
case FIRE_ALWAYS:
return RunnerApi.ClosingBehavior.EMIT_ALWAYS;
Expand Down Expand Up @@ -138,7 +139,7 @@ public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {

/**
* Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where
* {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec}
* {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link FunctionSpec}
* for the input {@link WindowFn}.
*/
public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn)
Expand All @@ -148,8 +149,8 @@ public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn)
// TODO: re-use components
String windowCoderId = UUID.randomUUID().toString();

RunnerApi.SdkFunctionSpec windowFnSpec =
RunnerApi.SdkFunctionSpec.newBuilder()
SdkFunctionSpec windowFnSpec =
SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(CUSTOM_WINDOWFN_URN)
Expand Down Expand Up @@ -212,7 +213,7 @@ public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> wi
}

/**
* Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link RunnerApi.Components}
* Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link Components}
* to the SDK's {@link WindowingStrategy}.
*/
public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto)
Expand All @@ -233,7 +234,7 @@ public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> wi
* the provided components to dereferences identifiers found in the proto.
*/
public static WindowingStrategy<?, ?> fromProto(
RunnerApi.WindowingStrategy proto, RunnerApi.Components components)
RunnerApi.WindowingStrategy proto, Components components)
throws InvalidProtocolBufferException {

SdkFunctionSpec windowFnSpec = proto.getWindowFn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.util;
package org.apache.beam.runners.core.construction;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
Expand All @@ -28,6 +28,7 @@
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.joda.time.Duration;
import org.junit.Test;
Expand Down