diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index ee64f912872f..3f323dda8d57 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -58,16 +58,27 @@
org.apache.beam
beam-sdks-common-runner-api
+
org.apache.beam
beam-sdks-java-core
+
+ com.google.protobuf
+ protobuf-java
+
+
com.fasterxml.jackson.core
jackson-annotations
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
com.google.code.findbugs
jsr305
@@ -88,6 +99,12 @@
slf4j-api
+
+ com.google.auto.value
+ auto-value
+ provided
+
+
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
similarity index 96%
rename from sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
rename to runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
index 9595362c54e5..353be0520686 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.util;
+package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
@@ -36,9 +36,10 @@
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Triggers;
-import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
import org.joda.time.Duration;
@@ -82,7 +83,7 @@ public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMo
}
}
- public static RunnerApi.ClosingBehavior toProto(Window.ClosingBehavior closingBehavior) {
+ public static RunnerApi.ClosingBehavior toProto(ClosingBehavior closingBehavior) {
switch (closingBehavior) {
case FIRE_ALWAYS:
return RunnerApi.ClosingBehavior.EMIT_ALWAYS;
@@ -138,7 +139,7 @@ public static RunnerApi.OutputTime toProto(OutputTimeFn> outputTimeFn) {
/**
* Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where
- * {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec}
+ * {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link FunctionSpec}
* for the input {@link WindowFn}.
*/
public static RunnerApi.MessageWithComponents toProto(WindowFn, ?> windowFn)
@@ -148,8 +149,8 @@ public static RunnerApi.MessageWithComponents toProto(WindowFn, ?> windowFn)
// TODO: re-use components
String windowCoderId = UUID.randomUUID().toString();
- RunnerApi.SdkFunctionSpec windowFnSpec =
- RunnerApi.SdkFunctionSpec.newBuilder()
+ SdkFunctionSpec windowFnSpec =
+ SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(CUSTOM_WINDOWFN_URN)
@@ -212,7 +213,7 @@ public static RunnerApi.MessageWithComponents toProto(WindowingStrategy, ?> wi
}
/**
- * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link RunnerApi.Components}
+ * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link Components}
* to the SDK's {@link WindowingStrategy}.
*/
public static WindowingStrategy, ?> fromProto(RunnerApi.MessageWithComponents proto)
@@ -233,7 +234,7 @@ public static RunnerApi.MessageWithComponents toProto(WindowingStrategy, ?> wi
* the provided components to dereferences identifiers found in the proto.
*/
public static WindowingStrategy, ?> fromProto(
- RunnerApi.WindowingStrategy proto, RunnerApi.Components components)
+ RunnerApi.WindowingStrategy proto, Components components)
throws InvalidProtocolBufferException {
SdkFunctionSpec windowFnSpec = proto.getWindowFn();
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
similarity index 97%
rename from sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java
rename to runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
index 5d3de51cb731..b603d65ea4dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.util;
+package org.apache.beam.runners.core.construction;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -28,6 +28,7 @@
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.joda.time.Duration;
import org.junit.Test;