Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moves standard URNs into .proto files #4672

Merged
merged 6 commits into from
Apr 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ sdks/python/LICENSE
sdks/python/NOTICE
sdks/python/README.md
sdks/python/apache_beam/portability/api/*pb2*.*
sdks/python/apache_beam/portability/common_urns.py
sdks/python/nosetests.xml

# Ignore IntelliJ files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@


coder:
urn: "urn:beam:coders:bytes:0.1"
urn: "beam:coder:bytes:v1"
nested: false
examples:
"abc": abc
Expand All @@ -47,7 +47,7 @@ examples:
---

coder:
urn: "urn:beam:coders:bytes:0.1"
urn: "beam:coder:bytes:v1"
nested: true
examples:
"\u0003abc": abc
Expand All @@ -58,7 +58,7 @@ examples:
---

coder:
urn: "urn:beam:coders:varint:0.1"
urn: "beam:coder:varint:v1"
examples:
"\0": 0
"\u0001": 1
Expand All @@ -70,19 +70,19 @@ examples:
---

coder:
urn: "urn:beam:coders:kv:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"},
{urn: "urn:beam:coders:varint:0.1"}]
urn: "beam:coder:kv:v1"
components: [{urn: "beam:coder:bytes:v1"},
{urn: "beam:coder:varint:v1"}]
examples:
"\u0003abc\0": {key: abc, value: 0}
"\u0004ab\0c\u000A": {key: "ab\0c", value: 10}

---

coder:
urn: "urn:beam:coders:kv:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"},
{urn: "urn:beam:coders:bytes:0.1"}]
urn: "beam:coder:kv:v1"
components: [{urn: "beam:coder:bytes:v1"},
{urn: "beam:coder:bytes:v1"}]
nested: false
examples:
"\u0003abcdef": {key: abc, value: def}
Expand All @@ -91,9 +91,9 @@ examples:
---

coder:
urn: "urn:beam:coders:kv:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"},
{urn: "urn:beam:coders:bytes:0.1"}]
urn: "beam:coder:kv:v1"
components: [{urn: "beam:coder:bytes:v1"},
{urn: "beam:coder:bytes:v1"}]
nested: true
examples:
"\u0003abc\u0003def": {key: abc, value: def}
Expand All @@ -102,7 +102,7 @@ examples:
---

coder:
urn: "urn:beam:coders:interval_window:0.1"
urn: "beam:coder:interval_window:v1"
examples:
"\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000}
"\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000}
Expand All @@ -112,8 +112,8 @@ examples:
---

coder:
urn: "urn:beam:coders:stream:0.1"
components: [{urn: "urn:beam:coders:varint:0.1"}]
urn: "beam:coder:iterable:v1"
components: [{urn: "beam:coder:varint:v1"}]
examples:
"\0\0\0\u0001\0": [0]
"\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000]
Expand All @@ -122,8 +122,8 @@ examples:
---

coder:
urn: "urn:beam:coders:stream:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"}]
urn: "beam:coder:iterable:v1"
components: [{urn: "beam:coder:bytes:v1"}]
examples:
"\0\0\0\u0001\u0003abc": ["abc"]
"\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
Expand All @@ -132,8 +132,8 @@ examples:
---

coder:
urn: "urn:beam:coders:stream:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"}]
urn: "beam:coder:iterable:v1"
components: [{urn: "beam:coder:bytes:v1"}]
# This is for iterables of unknown length, where the encoding is not
# deterministic.
non_deterministic: True
Expand All @@ -145,15 +145,15 @@ examples:
---

coder:
urn: "urn:beam:coders:stream:0.1"
components: [{urn: "urn:beam:coders:global_window:0.1"}]
urn: "beam:coder:iterable:v1"
components: [{urn: "beam:coder:global_window:v1"}]
examples:
"\0\0\0\u0001": [""]

---

coder:
urn: "urn:beam:coders:global_window:0.1"
urn: "beam:coder:global_window:v1"
examples:
"": ""

Expand All @@ -162,9 +162,9 @@ examples:
# All windowed values consist of pane infos that represent NO_FIRING until full support is added
# in the Python SDK (BEAM-1522).
coder:
urn: "urn:beam:coders:windowed_value:0.1"
components: [{urn: "urn:beam:coders:varint:0.1"},
{urn: "urn:beam:coders:global_window:0.1"}]
urn: "beam:coder:windowed_value:v1"
components: [{urn: "beam:coder:varint:v1"},
{urn: "beam:coder:global_window:v1"}]
examples:
"\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
value: 2,
Expand All @@ -176,9 +176,9 @@ examples:
---

coder:
urn: "urn:beam:coders:windowed_value:0.1"
components: [{urn: "urn:beam:coders:varint:0.1"},
{urn: "urn:beam:coders:interval_window:0.1"}]
urn: "beam:coder:windowed_value:v1"
components: [{urn: "beam:coder:varint:v1"},
{urn: "beam:coder:interval_window:v1"}]
examples:
"\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004": {
value: 4,
Expand Down
141 changes: 141 additions & 0 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "RunnerApi";

import "google/protobuf/any.proto";
import "google/protobuf/descriptor.proto";

// A set of mappings from id to message. This is included as an optional field
// on any proto message that may contain references needing resolution.
Expand Down Expand Up @@ -175,6 +176,88 @@ message PTransform {
DisplayData display_data = 6;
}

message StandardPTransforms {
enum Primitives {
// Represents Beam's parallel do operation.
// Payload: ParDoPayload.
// TODO(BEAM-3595): Change this to beam:transform:pardo:v1.
PAR_DO = 0 [(beam_urn) = "urn:beam:transform:pardo:v1"];

// Represents Beam's flatten operation.
// Payload: None.
FLATTEN = 1 [(beam_urn) = "beam:transform:flatten:v1"];

// Represents Beam's group-by-key operation.
// Payload: None
GROUP_BY_KEY = 2 [(beam_urn) = "beam:transform:group_by_key:v1"];

// Represents the operation generating a single empty element.
IMPULSE = 3 [(beam_urn) = "beam:transform:impulse:v1"];

// Represents the Window.into() operation.
// Payload: WindowIntoPayload.
ASSIGN_WINDOWS = 4 [(beam_urn) = "beam:transform:window_into:v1"];

// Represents the TestStream.
// Payload: TestStreamPayload
TEST_STREAM = 5 [(beam_urn) = "urn:beam:transform:teststream:v1"];

// Represents mapping of main input window into side input window.
// Payload: serialized WindowMappingFn.
MAP_WINDOWS = 6 [(beam_urn) = "beam:transform:map_windows:v1"];
}
enum DeprecatedPrimitives {
// Represents the operation to read a Bounded or Unbounded source.
// Payload: ReadPayload.
READ = 0 [(beam_urn) = "beam:transform:read:v1"];

// Runners should move away from translating `CreatePCollectionView` and treat this as
// part of the translation for a `ParDo` side input.
CREATE_VIEW = 1 [(beam_urn) = "beam:transform:create_view:v1"];
}
enum Composites {
// Represents the Combine.perKey() operation.
// If this is produced by an SDK, it is assumed that the SDK understands
// each of CombineComponents.
// Payload: CombinePayload
COMBINE_PER_KEY = 0 [(beam_urn) = "beam:transform:combine_per_key:v1"];

// Represents the Combine.globally() operation.
// If this is produced by an SDK, it is assumed that the SDK understands
// each of CombineComponents.
// Payload: CombinePayload
COMBINE_GLOBALLY = 1 [(beam_urn) = "beam:transform:combine_globally:v1"];

// Represents the Combine.groupedValues() operation.
// If this is produced by an SDK, it is assumed that the SDK understands
// each of CombineComponents.
// Payload: CombinePayload
COMBINE_GROUPED_VALUES = 2 [(beam_urn) = "beam:transform:combine_grouped_values:v1"];

// Represents the Reshuffle operation.
RESHUFFLE = 3 [(beam_urn) = "beam:transform:reshuffle:v1"];

// Less well-known. Payload: WriteFilesPayload.
WRITE_FILES = 4 [(beam_urn) = "beam:transform:write_files:v1"];
}
enum CombineComponents {
COMBINE_PGBKCV = 0 [(beam_urn) = "beam:transform:combine_pgbkcv:v1"];
COMBINE_MERGE_ACCUMULATORS = 1 [(beam_urn) = "beam:transform:combine_merge_accumulators:v1"];
COMBINE_EXTRACT_OUTPUTS = 2 [(beam_urn) = "beam:transform:combine_extract_outputs:v1"];
}

// This field is needed only as a work-around for a proto compiler bug.
// See https://github.com/google/protobuf/issues/4514
int32 ignored = 1;
}

message StandardSideInputTypes {
enum Enum {
ITERABLE = 0 [(beam_urn) = "beam:side_input:iterable:v1"];
MULTIMAP = 1 [(beam_urn) = "beam:side_input:multimap:v1"];
}
}

// A PCollection!
message PCollection {

Expand Down Expand Up @@ -435,6 +518,34 @@ message Coder {
repeated string component_coder_ids = 2;
}

message StandardCoders {
enum Enum {
// Components: None
BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"];

// Components: The key and value coder, in that order.
KV = 1 [(beam_urn) = "beam:coder:kv:v1"];

// Components: None
VARINT = 2 [(beam_urn) = "beam:coder:varint:v1"];

// Encodes an iterable of elements.
// Components: Coder for a single element.
ITERABLE = 3 [(beam_urn) = "beam:coder:iterable:v1"];

TIMESTAMP = 4 [(beam_urn) = "beam:coder:timestamp:v1"];
/*
* The following coders are typically not specified by manually by the user,
* but are used at runtime and must be supported by every SDK.
*/

INTERVAL_WINDOW = 5 [(beam_urn) = "beam:coder:interval_window:v1"];
LENGTH_PREFIX = 6 [(beam_urn) = "beam:coder:length_prefix:v1"];
GLOBAL_WINDOW = 7 [(beam_urn) = "beam:coder:global_window:v1"];
WINDOWED_VALUE = 8 [(beam_urn) = "beam:coder:windowed_value:v1"];
}
}

// A windowing strategy describes the window function, triggering, allowed
// lateness, and accumulation mode for a PCollection.
//
Expand Down Expand Up @@ -780,6 +891,36 @@ message SdkFunctionSpec {
string environment_id = 2;
}

extend google.protobuf.EnumValueOptions {
// An extension to be used for specifying the standard URN of various
// pipeline entities, e.g. transforms, functions, coders etc.
// Code should refer to the URNs of those entities by extracting
// it from the (beam_urn) extension, rather than by hard-coding
// the URN.
//
// The recommended pattern for declaring it is (exemplified by coders):
//
// message StandardCoders {
// enum Enum {
// BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"];
// ...
// }
// }
//
// If there are multiple categories of entities of this type, use the
// following pattern (exemplified by PTransforms):
//
// message StandardPTransforms {
// enum Primitives {
// ...
// }
// enum Composites {
// ...
// }
// }
string beam_urn = 185324356;
}

// A URN along with a parameter object whose schema is determined by the
// URN.
//
Expand Down
25 changes: 20 additions & 5 deletions model/pipeline/src/main/proto/standard_window_fns.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,41 @@ option go_package = "pipeline_v1";
option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "StandardWindowFns";

import "beam_runner_api.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

// beam:windowfn:global_windows:v0.1
// empty payload
message GlobalWindowsPayload {
enum Enum {
// TODO(BEAM-3595): Change this to beam:windowfn:global_windows:v1
PROPERTIES = 0 [(beam_urn) = "beam:windowfn:global_windows:v0.1"];
}
// Empty payload
}

// beam:windowfn:fixed_windows:v0.1
message FixedWindowsPayload {
enum Enum {
// TODO(BEAM-3595): Change this to beam:windowfn:fixed_windows:v1
PROPERTIES = 0 [(beam_urn) = "beam:windowfn:fixed_windows:v0.1"];
}
google.protobuf.Duration size = 1;
google.protobuf.Timestamp offset = 2;
}

// beam:windowfn:sliding_windows:v0.1
message SlidingWindowsPayload {
enum Enum {
// TODO(BEAM-3595): Change this to beam:windowfn:sliding_windows:v1
PROPERTIES = 0 [(beam_urn) = "beam:windowfn:sliding_windows:v0.1"];
}
google.protobuf.Duration size = 1;
google.protobuf.Timestamp offset = 2;
google.protobuf.Duration period = 3;
}

// beam:windowfn:session_windows:v0.1
message SessionsPayload {
enum Enum {
// TODO(BEAM-3595): Change this to beam:windowfn:session_windows:v1
PROPERTIES = 0 [(beam_urn) = "beam:windowfn:session_windows:v0.1"];
}
google.protobuf.Duration gap_size = 1;
}