Skip to content

Commit

Permalink
Moves standard URNs into .proto files
Browse files Browse the repository at this point in the history
  • Loading branch information
jkff committed Feb 14, 2018
1 parent 350aed5 commit 9e78fe4
Show file tree
Hide file tree
Showing 20 changed files with 447 additions and 611 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@


coder:
urn: "urn:beam:coders:bytes:0.1"
urn: "beam:coder:bytes:v1"
nested: false
examples:
"abc": abc
Expand All @@ -47,7 +47,7 @@ examples:
---

coder:
urn: "urn:beam:coders:bytes:0.1"
urn: "beam:coder:bytes:v1"
nested: true
examples:
"\u0003abc": abc
Expand All @@ -58,7 +58,7 @@ examples:
---

coder:
urn: "urn:beam:coders:varint:0.1"
urn: "beam:coder:varint:v1"
examples:
"\0": 0
"\u0001": 1
Expand All @@ -70,19 +70,19 @@ examples:
---

coder:
urn: "urn:beam:coders:kv:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"},
{urn: "urn:beam:coders:varint:0.1"}]
urn: "beam:coder:kv:v1"
components: [{urn: "beam:coder:bytes:v1"},
{urn: "beam:coder:varint:v1"}]
examples:
"\u0003abc\0": {key: abc, value: 0}
"\u0004ab\0c\u000A": {key: "ab\0c", value: 10}

---

coder:
urn: "urn:beam:coders:kv:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"},
{urn: "urn:beam:coders:bytes:0.1"}]
urn: "beam:coder:kv:v1"
components: [{urn: "beam:coder:bytes:v1"},
{urn: "beam:coder:bytes:v1"}]
nested: false
examples:
"\u0003abcdef": {key: abc, value: def}
Expand All @@ -91,9 +91,9 @@ examples:
---

coder:
urn: "urn:beam:coders:kv:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"},
{urn: "urn:beam:coders:bytes:0.1"}]
urn: "beam:coder:kv:v1"
components: [{urn: "beam:coder:bytes:v1"},
{urn: "beam:coder:bytes:v1"}]
nested: true
examples:
"\u0003abc\u0003def": {key: abc, value: def}
Expand All @@ -102,7 +102,7 @@ examples:
---

coder:
urn: "urn:beam:coders:interval_window:0.1"
urn: "beam:coder:interval_window:v1"
examples:
"\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {end: 1454293425000, span: 3600000}
"\u0080\u0000\u0001\u0053\u0034\u00ec\u0074\u00e8\u0080\u0090\u00fb\u00d3\u0009" : {end: 1456881825000, span: 2592000000}
Expand All @@ -112,8 +112,8 @@ examples:
---

coder:
urn: "urn:beam:coders:stream:0.1"
components: [{urn: "urn:beam:coders:varint:0.1"}]
urn: "beam:coder:iterable:v1"
components: [{urn: "beam:coder:varint:v1"}]
examples:
"\0\0\0\u0001\0": [0]
"\0\0\0\u0004\u0001\n\u00c8\u0001\u00e8\u0007": [1, 10, 200, 1000]
Expand All @@ -122,8 +122,8 @@ examples:
---

coder:
urn: "urn:beam:coders:stream:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"}]
urn: "beam:coder:iterable:v1"
components: [{urn: "beam:coder:bytes:v1"}]
examples:
"\0\0\0\u0001\u0003abc": ["abc"]
"\0\0\0\u0002\u0004ab\0c\u0004de\0f": ["ab\0c", "de\0f"]
Expand All @@ -132,8 +132,8 @@ examples:
---

coder:
urn: "urn:beam:coders:stream:0.1"
components: [{urn: "urn:beam:coders:bytes:0.1"}]
urn: "beam:coder:iterable:v1"
components: [{urn: "beam:coder:bytes:v1"}]
# This is for iterables of unknown length, where the encoding is not
# deterministic.
non_deterministic: True
Expand All @@ -145,15 +145,15 @@ examples:
---

coder:
urn: "urn:beam:coders:stream:0.1"
components: [{urn: "urn:beam:coders:global_window:0.1"}]
urn: "beam:coder:iterable:v1"
components: [{urn: "beam:coder:global_window:v1"}]
examples:
"\0\0\0\u0001": [""]

---

coder:
urn: "urn:beam:coders:global_window:0.1"
urn: "beam:coder:global_window:v1"
examples:
"": ""

Expand All @@ -162,9 +162,9 @@ examples:
# All windowed values consist of pane infos that represent NO_FIRING until full support is added
# in the Python SDK (BEAM-1522).
coder:
urn: "urn:beam:coders:windowed_value:0.1"
components: [{urn: "urn:beam:coders:varint:0.1"},
{urn: "urn:beam:coders:global_window:0.1"}]
urn: "beam:coder:windowed_value:v1"
components: [{urn: "beam:coder:varint:v1"},
{urn: "beam:coder:global_window:v1"}]
examples:
"\u0080\0\u0001R\u009a\u00a4\u009bh\0\0\0\u0001\u000f\u0002": {
value: 2,
Expand All @@ -176,9 +176,9 @@ examples:
---

coder:
urn: "urn:beam:coders:windowed_value:0.1"
components: [{urn: "urn:beam:coders:varint:0.1"},
{urn: "urn:beam:coders:interval_window:0.1"}]
urn: "beam:coder:windowed_value:v1"
components: [{urn: "beam:coder:varint:v1"},
{urn: "beam:coder:interval_window:v1"}]
examples:
"\u007f\u00ff\u00ff\u00ff\u00ff\u00f9\u00e5\u0080\0\0\0\u0001\u0080\0\u0001R\u009a\u00a4\u009bh\u00c0\u008b\u0011\u000f\u0004": {
value: 4,
Expand Down
10 changes: 0 additions & 10 deletions model/pipeline/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,6 @@
<description>Portable definitions for building pipelines</description>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>${project.build.directory}/original_sources_to_package</directory>
</resource>
</resources>

<plugins>
<!-- Skip the checkstyle plugin on generated code -->
<plugin>
Expand Down
110 changes: 110 additions & 0 deletions model/pipeline/src/main/proto/beam_runner_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "RunnerApi";

import "google/protobuf/any.proto";
import "google/protobuf/descriptor.proto";

// A set of mappings from id to message. This is included as an optional field
// on any proto message that may contain references needing resolution.
Expand Down Expand Up @@ -175,6 +176,72 @@ message PTransform {
DisplayData display_data = 6;
}

message StandardPTransformUrns {
enum Enum {
// Represents Beam's parallel do operation.
// Payload: ParDoPayload.
// TODO(BEAM-3595): Change this to beam:transform:pardo:v1.
PAR_DO = 0 [(standard_urn) = "urn:beam:transform:pardo:v1"];

// Represents Beam's flatten operation.
// Payload: None.
FLATTEN = 1 [(standard_urn) = "beam:transform:flatten:v1"];

// Represents Beam's group-by-key operation.
// Payload: None
GROUP_BY_KEY = 2 [(standard_urn) = "beam:transform:group_by_key:v1"];

// Represents the operation generating a single empty element.
IMPULSE = 3 [(standard_urn) = "beam:transform:impulse:v1"];

// Represents the operation to read a Bounded or Unbounded source.
// Payload: ReadPayload.
READ = 4 [(standard_urn) = "beam:transform:read:v1"];

// Represents the Window.into() operation.
// Payload: WindowIntoPayload.
ASSIGN_WINDOWS = 5 [(standard_urn) = "beam:transform:window_into:v1"];

// Represents the TestStream.
// Payload: TestStreamPayload
TEST_STREAM = 6 [(standard_urn) = "urn:beam:transform:teststream:v1"];

// Represents the Combine.perKey() operation.
// If this is produced by an SDK, it is assumed that the SDK understands pgbkcv,
// merge_accumulators and extract_outputs.
// Payload: CombinePayload
COMBINE_PER_KEY = 7 [(standard_urn) = "beam:transform:combine_per_key:v1"];

// Represents the Combine.globally() operation.
// If this is produced by an SDK, it is assumed that the SDK understands pgbkcv,
// merge_accumulators and extract_outputs.
// Payload: CombinePayload
COMBINE_GLOBALLY = 8 [(standard_urn) = "beam:transform:combine_globally:v1"];

// Represents the Combine.groupedValues() operation.
// If this is produced by an SDK, it is assumed that the SDK understands pgbkcv,
// merge_accumulators and extract_outputs.
// Payload: CombinePayload
COMBINE_GROUPED_VALUES = 9 [(standard_urn) = "beam:transform:combine_grouped_values:v1"];

COMBINE_PGBKCV = 10 [(standard_urn) = "beam:transform:combine_pgbkcv:v1"];
COMBINE_MERGE_ACCUMULATORS = 11 [(standard_urn) = "beam:transform:combine_merge_accumulators:v1"];
COMBINE_EXTRACT_OUTPUTS = 12 [(standard_urn) = "beam:transform:combine_extract_outputs:v1"];

// Represents the Reshuffle operation.
RESHUFFLE = 13 [(standard_urn) = "beam:transform:reshuffle:v1"];

// Less well-known. Payload: WriteFilesPayload.
WRITE_FILES = 14 [(standard_urn) = "beam:transform:write_files:0.1"];

// Deprecated: runners should move away from translating `CreatePCollectionView` and treat this as
// part of the translation for a `ParDo` side input.
CREATE_VIEW = 15 [(standard_urn) = "beam:transform:create_view:v1"];

MAP_WINDOWS = 16 [(standard_urn) = "beam:transform:map_windows:v1"];
}
}

// A PCollection!
message PCollection {

Expand Down Expand Up @@ -403,6 +470,33 @@ message Coder {
repeated string component_coder_ids = 2;
}

message StandardCoderUrns {
enum Enum {
// Components: None
BYTES = 0 [(standard_urn) = "beam:coder:bytes:v1"];

// Components: The key and value coder, in that order.
KV = 1 [(standard_urn) = "beam:coder:kv:v1"];

// Components: None
VARINT = 2 [(standard_urn) = "beam:coder:varint:v1"];

// Encodes an iterable of elements.
// Components: Coder for a single element.
ITERABLE = 3 [(standard_urn) = "beam:coder:iterable:v1"];

/*
* The following coders are typically not specified by manually by the user,
* but are used at runtime and must be supported by every SDK.
*/

INTERVAL_WINDOW = 4 [(standard_urn) = "beam:coder:interval_window:v1"];
LENGTH_PREFIX = 5 [(standard_urn) = "beam:coder:length_prefix:v1"];
GLOBAL_WINDOW = 6 [(standard_urn) = "beam:coder:global_window:v1"];
WINDOWED_VALUE = 7 [(standard_urn) = "beam:coder:windowed_value:v1"];
}
}

// A windowing strategy describes the window function, triggering, allowed
// lateness, and accumulation mode for a PCollection.
//
Expand Down Expand Up @@ -748,6 +842,22 @@ message SdkFunctionSpec {
string environment_id = 2;
}

extend google.protobuf.EnumValueOptions {
// An extension to be used for specifying the standard URN of various
// pipeline entities, e.g. transforms, functions, coders etc.
// Code should refer to the URNs of those entities by extracting
// it from the (standard_urn) extension, rather than by hard-coding
// the URN.
//
// The recommended pattern for declaring it is (exemplified by coders):
//
// enum StandardCoderUrns {
// BYTES = 0 [(standard_urn) = "beam:coder:bytes:v1"];
// ...
// }
string standard_urn = 185324356;
}

// A URN along with a parameter object whose schema is determined by the
// URN.
//
Expand Down
25 changes: 20 additions & 5 deletions model/pipeline/src/main/proto/standard_window_fns.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,41 @@ option go_package = "pipeline_v1";
option java_package = "org.apache.beam.model.pipeline.v1";
option java_outer_classname = "StandardWindowFns";

import "beam_runner_api.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

// beam:windowfn:global_windows:v0.1
// empty payload
message StandardWindowFnUrns {
enum Enum {
// Empty payload
// TODO(BEAM-3595): Change this to beam:windowfn:global_windows:v1
GLOBAL_WINDOWS = 0 [(standard_urn) = "beam:windowfn:global_windows:v0.1"];

// Payload of type FixedWindowsPayload
// TODO(BEAM-3595): Change this to beam:windowfn:fixed_windows:v1
FIXED_WINDOWS = 1 [(standard_urn) = "beam:windowfn:fixed_windows:v0.1"];

// Payload of type SlidingWindowsPayload
// TODO(BEAM-3595): Change this to beam:windowfn:sliding_windows:v1
SLIDING_WINDOWS = 2 [(standard_urn) = "beam:windowfn:sliding_windows:v0.1"];

// Payload of type SessionsPayload
// TODO(BEAM-3595): Change this to beam:windowfn:session_windows:v1
SESSION_WINDOWS = 3 [(standard_urn) = "beam:windowfn:session_windows:v0.1"];
}
}

// beam:windowfn:fixed_windows:v0.1
message FixedWindowsPayload {
google.protobuf.Duration size = 1;
google.protobuf.Timestamp offset = 2;
}

// beam:windowfn:sliding_windows:v0.1
message SlidingWindowsPayload {
google.protobuf.Duration size = 1;
google.protobuf.Timestamp offset = 2;
google.protobuf.Duration period = 3;
}

// beam:windowfn:session_windows:v0.1
message SessionsPayload {
google.protobuf.Duration gap_size = 1;
}

0 comments on commit 9e78fe4

Please sign in to comment.