Skip to content

Commit

Permalink
Merge pull request #4672: Moves standard URNs into .proto files
Browse files Browse the repository at this point in the history
Moves standard URNs into .proto files
  • Loading branch information
jkff committed Apr 25, 2018
2 parents 5bb482f + 58502e0 commit bbabb42
Show file tree
Hide file tree
Showing 34 changed files with 632 additions and 648 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ sdks/python/LICENSE
sdks/python/NOTICE
sdks/python/README.md
sdks/python/apache_beam/portability/api/*pb2*.*
sdks/python/apache_beam/portability/common_urns.py
sdks/python/nosetests.xml

# Ignore IntelliJ files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@


coder:
urn: "urn:beam:coders:bytes:0.1"
urn: "beam:coder:bytes:v1"
nested: false
examples:
"abc": abc
Expand All @@ -47,7 +47,7 @@ examples:
---

coder:
urn: "urn:beam:coders:bytes:0.1"
urn: "beam:coder:bytes:v1"
nested: true
examples:
"\u0003abc": abc
Expand All @@ -58,7 +58,7 @@ examples:
---

coder:
urn: "urn:beam:coders:varint:0.1"
urn: "beam:coder:varint:v1"
examples:
"\0": 0
"\u0001": 1
Expand All @@ -70,19 +70,19 @@ examples:
---

coder:
urn: "urn:beam:coders:kv:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"},
{urn: "urn:beam:coders:varint:0.1"}]
urn: "beam:coder:kv:v1"
components: [{urn: "beam:coder:bytes:v1"},
{urn: "beam:coder:varint:v1"}]
examples:
"\u0003abc\0": {key: abc, value: 0}
"\u0004ab\0c\u000A": {key: "ab\0c", value: 10}

---

coder:
urn: "urn:beam:coders:kv:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"},
{urn: "urn:beam:coders:bytes:0.1"}]
urn: "beam:coder:kv:v1"
components: [{urn: "beam:coder:bytes:v1"},
{urn: "beam:coder:bytes:v1"}]
nested: false
examples:
"\u0003abcdef": {key: abc, value: def}
Expand All @@ -91,9 +91,9 @@ examples:
---

coder:
urn: "urn:beam:coders:kv:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"},
{urn: "urn:beam:coders:bytes:0.1"}]
urn: "beam:coder:kv:v1"
components: [{urn: "beam:coder:bytes:v1"},
{urn: "beam:coder:bytes:v1"}]
nested: true
examples:
"\u0003abc\u0003def": {key: abc, value: def}
Expand All @@ -102,7 +102,7 @@ examples:
---

coder:
urn: "urn:beam:coders:interval_window:0.1"
urn: "beam:coder:interval_window:v1"
examples:
"\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000}
"\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000}
Expand All @@ -112,8 +112,8 @@ examples:
---

coder:
urn: "urn:beam:coders:stream:0.1"
components: [{urn: "urn:beam:coders:varint:0.1"}]
urn: "beam:coder:iterable:v1"
components: [{urn: "beam:coder:varint:v1"}]
examples:
"\0\0\0\u0001\0": [0]
"\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000]
Expand All @@ -122,8 +122,8 @@ examples:
---

coder:
urn: "urn:beam:coders:stream:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"}]
urn: "beam:coder:iterable:v1"
components: [{urn: "beam:coder:bytes:v1"}]
examples:
"\0\0\0\u0001\u0003abc": ["abc"]
"\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
Expand All @@ -132,8 +132,8 @@ examples:
---

coder:
urn: "urn:beam:coders:stream:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"}]
urn: "beam:coder:iterable:v1"
components: [{urn: "beam:coder:bytes:v1"}]
# This is for iterables of unknown length, where the encoding is not
# deterministic.
non_deterministic: True
Expand All @@ -145,15 +145,15 @@ examples:
---

coder:
urn: "urn:beam:coders:stream:0.1"
components: [{urn: "urn:beam:coders:global_window:0.1"}]
urn: "beam:coder:iterable:v1"
components: [{urn: "beam:coder:global_window:v1"}]
examples:
"\0\0\0\u0001": [""]

---

coder:
urn: "urn:beam:coders:global_window:0.1"
urn: "beam:coder:global_window:v1"
examples:
"": ""

Expand All @@ -162,9 +162,9 @@ examples:
# All windowed values consist of pane infos that represent NO_FIRING until full support is added
# in the Python SDK (BEAM-1522).
coder:
urn: "urn:beam:coders:windowed_value:0.1"
components: [{urn: "urn:beam:coders:varint:0.1"},
{urn: "urn:beam:coders:global_window:0.1"}]
urn: "beam:coder:windowed_value:v1"
components: [{urn: "beam:coder:varint:v1"},
{urn: "beam:coder:global_window:v1"}]
examples:
"\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
value: 2,
Expand All @@ -176,9 +176,9 @@ examples:
---

coder:
urn: "urn:beam:coders:windowed_value:0.1"
components: [{urn: "urn:beam:coders:varint:0.1"},
{urn: "urn:beam:coders:interval_window:0.1"}]
urn: "beam:coder:windowed_value:v1"
components: [{urn: "beam:coder:varint:v1"},
{urn: "beam:coder:interval_window:v1"}]
examples:
"\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004": {
value: 4,
Expand Down
141 changes: 141 additions & 0 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "RunnerApi";

import "google/protobuf/any.proto";
import "google/protobuf/descriptor.proto";

// A set of mappings from id to message. This is included as an optional field
// on any proto message that may contain references needing resolution.
Expand Down Expand Up @@ -175,6 +176,88 @@ message PTransform {
DisplayData display_data = 6;
}

message StandardPTransforms {
enum Primitives {
// Represents Beam's parallel do operation.
// Payload: ParDoPayload.
// TODO(BEAM-3595): Change this to beam:transform:pardo:v1.
PAR_DO = 0 [(beam_urn) = "urn:beam:transform:pardo:v1"];

// Represents Beam's flatten operation.
// Payload: None.
FLATTEN = 1 [(beam_urn) = "beam:transform:flatten:v1"];

// Represents Beam's group-by-key operation.
// Payload: None
GROUP_BY_KEY = 2 [(beam_urn) = "beam:transform:group_by_key:v1"];

// Represents the operation generating a single empty element.
IMPULSE = 3 [(beam_urn) = "beam:transform:impulse:v1"];

// Represents the Window.into() operation.
// Payload: WindowIntoPayload.
ASSIGN_WINDOWS = 4 [(beam_urn) = "beam:transform:window_into:v1"];

// Represents the TestStream.
// Payload: TestStreamPayload
TEST_STREAM = 5 [(beam_urn) = "urn:beam:transform:teststream:v1"];

// Represents mapping of main input window into side input window.
// Payload: serialized WindowMappingFn.
MAP_WINDOWS = 6 [(beam_urn) = "beam:transform:map_windows:v1"];
}
enum DeprecatedPrimitives {
// Represents the operation to read a Bounded or Unbounded source.
// Payload: ReadPayload.
READ = 0 [(beam_urn) = "beam:transform:read:v1"];

// Runners should move away from translating `CreatePCollectionView` and treat this as
// part of the translation for a `ParDo` side input.
CREATE_VIEW = 1 [(beam_urn) = "beam:transform:create_view:v1"];
}
enum Composites {
// Represents the Combine.perKey() operation.
// If this is produced by an SDK, it is assumed that the SDK understands
// each of CombineComponents.
// Payload: CombinePayload
COMBINE_PER_KEY = 0 [(beam_urn) = "beam:transform:combine_per_key:v1"];

// Represents the Combine.globally() operation.
// If this is produced by an SDK, it is assumed that the SDK understands
// each of CombineComponents.
// Payload: CombinePayload
COMBINE_GLOBALLY = 1 [(beam_urn) = "beam:transform:combine_globally:v1"];

// Represents the Combine.groupedValues() operation.
// If this is produced by an SDK, it is assumed that the SDK understands
// each of CombineComponents.
// Payload: CombinePayload
COMBINE_GROUPED_VALUES = 2 [(beam_urn) = "beam:transform:combine_grouped_values:v1"];

// Represents the Reshuffle operation.
RESHUFFLE = 3 [(beam_urn) = "beam:transform:reshuffle:v1"];

// Less well-known. Payload: WriteFilesPayload.
WRITE_FILES = 4 [(beam_urn) = "beam:transform:write_files:v1"];
}
enum CombineComponents {
COMBINE_PGBKCV = 0 [(beam_urn) = "beam:transform:combine_pgbkcv:v1"];
COMBINE_MERGE_ACCUMULATORS = 1 [(beam_urn) = "beam:transform:combine_merge_accumulators:v1"];
COMBINE_EXTRACT_OUTPUTS = 2 [(beam_urn) = "beam:transform:combine_extract_outputs:v1"];
}

// This field is needed only as a work-around for a proto compiler bug.
// See https://github.com/google/protobuf/issues/4514
int32 ignored = 1;
}

message StandardSideInputTypes {
enum Enum {
ITERABLE = 0 [(beam_urn) = "beam:side_input:iterable:v1"];
MULTIMAP = 1 [(beam_urn) = "beam:side_input:multimap:v1"];
}
}

// A PCollection!
message PCollection {

Expand Down Expand Up @@ -435,6 +518,34 @@ message Coder {
repeated string component_coder_ids = 2;
}

message StandardCoders {
enum Enum {
// Components: None
BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"];

// Components: The key and value coder, in that order.
KV = 1 [(beam_urn) = "beam:coder:kv:v1"];

// Components: None
VARINT = 2 [(beam_urn) = "beam:coder:varint:v1"];

// Encodes an iterable of elements.
// Components: Coder for a single element.
ITERABLE = 3 [(beam_urn) = "beam:coder:iterable:v1"];

TIMESTAMP = 4 [(beam_urn) = "beam:coder:timestamp:v1"];
/*
* The following coders are typically not specified by manually by the user,
* but are used at runtime and must be supported by every SDK.
*/

INTERVAL_WINDOW = 5 [(beam_urn) = "beam:coder:interval_window:v1"];
LENGTH_PREFIX = 6 [(beam_urn) = "beam:coder:length_prefix:v1"];
GLOBAL_WINDOW = 7 [(beam_urn) = "beam:coder:global_window:v1"];
WINDOWED_VALUE = 8 [(beam_urn) = "beam:coder:windowed_value:v1"];
}
}

// A windowing strategy describes the window function, triggering, allowed
// lateness, and accumulation mode for a PCollection.
//
Expand Down Expand Up @@ -780,6 +891,36 @@ message SdkFunctionSpec {
string environment_id = 2;
}

extend google.protobuf.EnumValueOptions {
// An extension to be used for specifying the standard URN of various
// pipeline entities, e.g. transforms, functions, coders etc.
// Code should refer to the URNs of those entities by extracting
// it from the (beam_urn) extension, rather than by hard-coding
// the URN.
//
// The recommended pattern for declaring it is (exemplified by coders):
//
// message StandardCoders {
// enum Enum {
// BYTES = 0 [(beam_urn) = "beam:coder:bytes:v1"];
// ...
// }
// }
//
// If there are multiple categories of entities of this type, use the
// following pattern (exemplified by PTransforms):
//
// message StandardPTransforms {
// enum Primitives {
// ...
// }
// enum Composites {
// ...
// }
// }
string beam_urn = 185324356;
}

// A URN along with a parameter object whose schema is determined by the
// URN.
//
Expand Down
25 changes: 20 additions & 5 deletions model/pipeline/src/main/proto/standard_window_fns.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,41 @@ option go_package = "pipeline_v1";
option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "StandardWindowFns";

import "beam_runner_api.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

// beam:windowfn:global_windows:v0.1
// empty payload
message GlobalWindowsPayload {
enum Enum {
// TODO(BEAM-3595): Change this to beam:windowfn:global_windows:v1
PROPERTIES = 0 [(beam_urn) = "beam:windowfn:global_windows:v0.1"];
}
// Empty payload
}

// beam:windowfn:fixed_windows:v0.1
message FixedWindowsPayload {
enum Enum {
// TODO(BEAM-3595): Change this to beam:windowfn:fixed_windows:v1
PROPERTIES = 0 [(beam_urn) = "beam:windowfn:fixed_windows:v0.1"];
}
google.protobuf.Duration size = 1;
google.protobuf.Timestamp offset = 2;
}

// beam:windowfn:sliding_windows:v0.1
message SlidingWindowsPayload {
enum Enum {
// TODO(BEAM-3595): Change this to beam:windowfn:sliding_windows:v1
PROPERTIES = 0 [(beam_urn) = "beam:windowfn:sliding_windows:v0.1"];
}
google.protobuf.Duration size = 1;
google.protobuf.Timestamp offset = 2;
google.protobuf.Duration period = 3;
}

// beam:windowfn:session_windows:v0.1
message SessionsPayload {
enum Enum {
// TODO(BEAM-3595): Change this to beam:windowfn:session_windows:v1
PROPERTIES = 0 [(beam_urn) = "beam:windowfn:session_windows:v0.1"];
}
google.protobuf.Duration gap_size = 1;
}

0 comments on commit bbabb42

Please sign in to comment.