Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,12 @@
<version>${protobuf.version}</version>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-common-protos</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions runners/core-construction-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@
*/
package org.apache.beam.runners.core.construction;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.sdk.common.runner.v1.StandardWindowFns;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
Expand Down Expand Up @@ -153,9 +158,16 @@ public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime
}
}

// This URN says that the WindowFn is just a UDF blob the indicated SDK understands
public static final String GLOBAL_WINDOWS_FN = "beam:windowfn:global_windows:v0.1";
public static final String FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1";
public static final String SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1";
public static final String SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1";
// This URN says that the WindowFn is just a UDF blob the Java SDK understands
// TODO: standardize such things
public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW the urn: at the beginning is the URI scheme that makes it a URN. The official syntax is "urn:" Namespace : Namespace-specific format

(https://tools.ietf.org/html/rfc8141#section-2)

I agree the scheme is mostly pointless but wanted to just note the departure.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(in particular, top level schemes and the remainder of the URI have exactly the same relationship as URN namespaces and the NSS, as far as I can tell)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know urns were an official thing with a spec... We can prefix everything with urn: if you want.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I just wanted to note it. I find the prefixing a bit tedious. I've been doing it because following internet standards is my first instinct, but I don't think it is worthwhile. We should lay out the namespaces in a section of the runner guide on the website.

public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious - does the v add anything in particular?

Copy link
Copy Markdown
Contributor Author

@robertwb robertwb May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes it clearer that this number is a version field.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say drop it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use 'v' elsewhere (e.g. in the package name itself). Otherwise it looks like a ratio or something.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is in the package name to make it a valid package name. Not a huge deal. If you feel strongly about it I'm fine with it.

public static final String OLD_SERIALIZED_JAVA_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
// Remove this once the dataflow worker understands all the above formats.
private static final boolean USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN = true;

/**
* Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
Expand All @@ -164,19 +176,80 @@ public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime
*/
public static SdkFunctionSpec toProto(
WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) {
return SdkFunctionSpec.newBuilder()
// TODO: Set environment ID
.setSpec(
FunctionSpec.newBuilder()
.setUrn(CUSTOM_WINDOWFN_URN)
.setParameter(
Any.pack(
BytesValue.newBuilder()
.setValue(
ByteString.copyFrom(
SerializableUtils.serializeToByteArray(windowFn)))
.build())))
.build();
// TODO: Set environment IDs
if (USE_OLD_SERIALIZED_JAVA_WINDOWFN_URN) {
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(OLD_SERIALIZED_JAVA_WINDOWFN_URN)
.setParameter(
Any.pack(
BytesValue.newBuilder()
.setValue(
ByteString.copyFrom(
SerializableUtils.serializeToByteArray(windowFn)))
.build())))
.build();
} else if (windowFn instanceof GlobalWindows) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have some sort of registration scheme for WindowFns in the medium-long term, even if only to simplify this method. File a JIRA?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return SdkFunctionSpec.newBuilder()
.setSpec(FunctionSpec.newBuilder().setUrn(GLOBAL_WINDOWS_FN))
.build();
} else if (windowFn instanceof FixedWindows) {
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(FIXED_WINDOWS_FN)
.setParameter(
Any.pack(
StandardWindowFns.FixedWindowsPayload.newBuilder()
.setSize(Durations.fromMillis(
((FixedWindows) windowFn).getSize().getMillis()))
.setOffset(Timestamps.fromMillis(
((FixedWindows) windowFn).getOffset().getMillis()))
.build())))
.build();
} else if (windowFn instanceof SlidingWindows) {
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SLIDING_WINDOWS_FN)
.setParameter(
Any.pack(
StandardWindowFns.SlidingWindowsPayload.newBuilder()
.setSize(Durations.fromMillis(
((SlidingWindows) windowFn).getSize().getMillis()))
.setOffset(Timestamps.fromMillis(
((SlidingWindows) windowFn).getOffset().getMillis()))
.setPeriod(Durations.fromMillis(
((SlidingWindows) windowFn).getPeriod().getMillis()))
.build())))
.build();
} else if (windowFn instanceof Sessions) {
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SESSION_WINDOWS_FN)
.setParameter(
Any.pack(
StandardWindowFns.SessionsPayload.newBuilder()
.setGapSize(Durations.fromMillis(
((Sessions) windowFn).getGapDuration().getMillis()))
.build())))
.build();
} else {
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(SERIALIZED_JAVA_WINDOWFN_URN)
.setParameter(
Any.pack(
BytesValue.newBuilder()
.setValue(
ByteString.copyFrom(
SerializableUtils.serializeToByteArray(windowFn)))
.build())))
.build();
}
}

/**
Expand Down Expand Up @@ -261,18 +334,38 @@ public static RunnerApi.WindowingStrategy toProto(

public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)
throws InvalidProtocolBufferException {
checkArgument(
windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
"Only Java-serialized %s instances are supported, with URN %s. But found URN %s",
WindowFn.class.getSimpleName(),
CUSTOM_WINDOWFN_URN,
windowFnSpec.getSpec().getUrn());

Object deserializedWindowFn =
SerializableUtils.deserializeFromByteArray(
switch (windowFnSpec.getSpec().getUrn()) {
case GLOBAL_WINDOWS_FN:
return new GlobalWindows();
case FIXED_WINDOWS_FN:
StandardWindowFns.FixedWindowsPayload fixedParams =
windowFnSpec.getSpec().getParameter().unpack(
StandardWindowFns.FixedWindowsPayload.class);
return FixedWindows.of(
Duration.millis(Durations.toMillis(fixedParams.getSize())))
.withOffset(Duration.millis(Timestamps.toMillis(fixedParams.getOffset())));
case SLIDING_WINDOWS_FN:
StandardWindowFns.SlidingWindowsPayload slidingParams =
windowFnSpec.getSpec().getParameter().unpack(
StandardWindowFns.SlidingWindowsPayload.class);
return SlidingWindows.of(
Duration.millis(Durations.toMillis(slidingParams.getSize())))
.every(Duration.millis(Durations.toMillis(slidingParams.getPeriod())))
.withOffset(Duration.millis(Timestamps.toMillis(slidingParams.getOffset())));
case SESSION_WINDOWS_FN:
StandardWindowFns.SessionsPayload sessionParams =
windowFnSpec.getSpec().getParameter().unpack(
StandardWindowFns.SessionsPayload.class);
return Sessions.withGapDuration(
Duration.millis(Durations.toMillis(sessionParams.getGapSize())));
case SERIALIZED_JAVA_WINDOWFN_URN:
case OLD_SERIALIZED_JAVA_WINDOWFN_URN:
return (WindowFn<?, ?>) SerializableUtils.deserializeFromByteArray(
windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
"WindowFn");

return (WindowFn<?, ?>) deserializedWindowFn;
default:
throw new IllegalArgumentException(
"Unknown or unsupported WindowFn: " + windowFnSpec.getSpec().getUrn());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use WindowFn.class.getSimpleName()

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
Expand Down Expand Up @@ -62,6 +64,13 @@ private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy windowing
public static Iterable<ToProtoAndBackSpec> data() {
return ImmutableList.of(
toProtoAndBackSpec(WindowingStrategy.globalDefault()),
toProtoAndBackSpec(WindowingStrategy.of(
FixedWindows.of(Duration.millis(11)).withOffset(Duration.millis(3)))),
toProtoAndBackSpec(WindowingStrategy.of(
SlidingWindows.of(Duration.millis(37)).every(Duration.millis(3))
.withOffset(Duration.millis(2)))),
toProtoAndBackSpec(WindowingStrategy.of(
Sessions.withGapDuration(Duration.millis(389)))),
toProtoAndBackSpec(
WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
Expand Down
53 changes: 53 additions & 0 deletions sdks/common/runner-api/src/main/proto/standard_window_fns.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Protocol Buffers describing the Runner API, which is the runner-independent,
* SDK-independent definition of the Beam model.
*/

syntax = "proto3";

package org.apache.beam.runner_api.v1;

option java_package = "org.apache.beam.sdk.common.runner.v1";
option java_outer_classname = "StandardWindowFns";

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

// beam:windowfn:global_windows:v0.1
// empty payload

// beam:windowfn:fixed_windows:v0.1
message FixedWindowsPayload {
google.protobuf.Duration size = 1;
google.protobuf.Timestamp offset = 2;
}

// beam:windowfn:sliding_windows:v0.1
message SlidingWindowsPayload {
google.protobuf.Duration size = 1;
google.protobuf.Timestamp offset = 2;
google.protobuf.Duration period = 3;
}

// beam:windowfn:session_windows:v0.1
message SessionsPayload {
google.protobuf.Duration gap_size = 1;
}
39 changes: 38 additions & 1 deletion sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import threading
import time
import traceback
import urllib

from apache_beam import error
from apache_beam import coders
Expand Down Expand Up @@ -416,7 +417,9 @@ def run_GroupByKey(self, transform_node):
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
windowing = transform_node.transform.get_windowing(
transform_node.inputs)
step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(windowing))
step.add_property(
PropertyNames.SERIALIZED_FN,
self.serialize_windowing_strategy(windowing))

def run_ParDo(self, transform_node):
transform = transform_node.transform
Expand Down Expand Up @@ -697,6 +700,40 @@ def run__NativeWrite(self, transform_node):
PropertyNames.STEP_NAME: input_step.proto.name,
PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})

@classmethod
def serialize_windowing_strategy(cls, windowing):
from apache_beam.runners import pipeline_context
from apache_beam.runners.api import beam_runner_api_pb2
context = pipeline_context.PipelineContext()
windowing_proto = windowing.to_runner_api(context)
return cls.byte_array_to_json_string(
beam_runner_api_pb2.MessageWithComponents(
components=context.to_runner_api(),
windowing_strategy=windowing_proto).SerializeToString())

@classmethod
def deserialize_windowing_strategy(cls, serialized_data):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.runners import pipeline_context
from apache_beam.runners.api import beam_runner_api_pb2
from apache_beam.transforms.core import Windowing
proto = beam_runner_api_pb2.MessageWithComponents()
proto.ParseFromString(cls.json_string_to_byte_array(serialized_data))
return Windowing.from_runner_api(
proto.windowing_strategy,
pipeline_context.PipelineContext(proto.components))

@staticmethod
def byte_array_to_json_string(raw_bytes):
"""Implements org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString."""
return urllib.quote(raw_bytes)

@staticmethod
def json_string_to_byte_array(encoded_string):
"""Implements org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray."""
return urllib.unquote(encoded_string)


class DataflowPipelineResult(PipelineResult):
"""Represents the state of a pipeline run on the Dataflow service."""
Expand Down
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.core import _GroupByKeyOnly
from apache_beam.transforms.core import Windowing
from apache_beam.transforms import window
from apache_beam.typehints import typehints

# Protect against environments where apitools library is not available.
Expand Down Expand Up @@ -240,6 +242,15 @@ def _test_flatten_input_visitor(self, input_type, output_type, num_inputs):
for _ in range(num_inputs):
self.assertEqual(inputs[0].element_type, output_type)

def test_serialize_windowing_strategy(self):
# This just tests the basic path; more complete tests
# are in window_test.py.
strategy = Windowing(window.FixedWindows(10))
self.assertEqual(
strategy,
DataflowRunner.deserialize_windowing_strategy(
DataflowRunner.serialize_windowing_strategy(strategy)))


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
# Update this version to the next version whenever there is a change that will
# require changes to the execution environment.
# This should be in the beam-[version]-[date] format, date is optional.
BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170518'
BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170601'

# Standard file names used for staging files.
WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
Expand Down
Loading