Skip to content

Commit

Permalink
Merge pull request #11373 from boyuanzz/timers
Browse files Browse the repository at this point in the history
[BEAM-9562] Update Element.timer and Element.Timer to Element.timers and Element.Timers
  • Loading branch information
boyuanzz committed Apr 10, 2020
2 parents c17e7c9 + d4c283e commit 99fa8ac
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 284 deletions.
4 changes: 2 additions & 2 deletions model/fn-execution/src/main/proto/beam_fn_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ message Elements {

// Represent the encoded user timer for a given instruction, transform and
// timer id.
message Timer {
message Timers {
// (Required) A reference to an active instruction request with the given
// instruction id.
string instruction_id = 1;
Expand Down Expand Up @@ -516,7 +516,7 @@ message Elements {
repeated Data data = 1;

// (Optional) A list of timer byte streams.
repeated Timer timer = 2;
repeated Timers timers = 2;
}

// Stable
Expand Down
506 changes: 255 additions & 251 deletions sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go

Large diffs are not rendered by default.

51 changes: 48 additions & 3 deletions sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void onNext(BeamFnApi.Elements value) {
}
}

for (BeamFnApi.Elements.Timer timer : value.getTimerList()) {
for (BeamFnApi.Elements.Timers timer : value.getTimersList()) {
try {
LogicalEndpoint key =
LogicalEndpoint.timer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void close() throws Exception {
// This will add an empty data block representing the end of stream.
if (outputLocation.isTimer()) {
elements
.addTimerBuilder()
.addTimersBuilder()
.setInstructionId(outputLocation.getInstructionId())
.setTransformId(outputLocation.getTransformId())
.setTimerFamilyId(outputLocation.getTimerFamilyId())
Expand Down Expand Up @@ -121,7 +121,7 @@ private BeamFnApi.Elements.Builder convertBufferForTransmission() {

if (outputLocation.isTimer()) {
elements
.addTimerBuilder()
.addTimersBuilder()
.setInstructionId(outputLocation.getInstructionId())
.setTransformId(outputLocation.getTransformId())
.setTimerFamilyId(outputLocation.getTimerFamilyId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public class BeamFnDataGrpcMultiplexerTest {
.setInstructionId(DATA_LOCATION.getInstructionId())
.setTransformId(DATA_LOCATION.getTransformId())
.setData(ByteString.copyFrom(new byte[1])))
.addTimer(
BeamFnApi.Elements.Timer.newBuilder()
.addTimers(
BeamFnApi.Elements.Timers.newBuilder()
.setInstructionId(TIMER_LOCATION.getInstructionId())
.setTransformId(TIMER_LOCATION.getTransformId())
.setTimerFamilyId(TIMER_LOCATION.getTimerFamilyId())
Expand All @@ -64,8 +64,8 @@ public class BeamFnDataGrpcMultiplexerTest {
.setInstructionId(DATA_LOCATION.getInstructionId())
.setTransformId(DATA_LOCATION.getTransformId())
.setIsLast(true))
.addTimer(
BeamFnApi.Elements.Timer.newBuilder()
.addTimers(
BeamFnApi.Elements.Timers.newBuilder()
.setInstructionId(TIMER_LOCATION.getInstructionId())
.setTransformId(TIMER_LOCATION.getTransformId())
.setTimerFamilyId(TIMER_LOCATION.getTimerFamilyId())
Expand Down Expand Up @@ -122,6 +122,6 @@ public void testInboundObserverBlocksTillConsumerConnects() throws Exception {
contains(KV.of(ELEMENTS.getData(0).getData(), false), KV.of(ByteString.EMPTY, true)));
assertThat(
timerInboundValues,
contains(KV.of(ELEMENTS.getTimer(0).getTimers(), false), KV.of(ByteString.EMPTY, true)));
contains(KV.of(ELEMENTS.getTimers(0).getTimers(), false), KV.of(ByteString.EMPTY, true)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ public void testConfiguredBufferLimit() throws Exception {

BeamFnApi.Elements.Builder builder = messageWithDataBuilder(new byte[1]);
if (endpoint.isTimer()) {
builder.addTimer(
BeamFnApi.Elements.Timer.newBuilder()
builder.addTimers(
BeamFnApi.Elements.Timers.newBuilder()
.setInstructionId(endpoint.getInstructionId())
.setTransformId(endpoint.getTransformId())
.setTimerFamilyId(endpoint.getTimerFamilyId())
Expand All @@ -183,8 +183,8 @@ BeamFnApi.Elements.Builder messageWithDataBuilder(byte[]... datum) throws IOExce
}
if (endpoint.isTimer()) {
return BeamFnApi.Elements.newBuilder()
.addTimer(
BeamFnApi.Elements.Timer.newBuilder()
.addTimers(
BeamFnApi.Elements.Timers.newBuilder()
.setInstructionId(endpoint.getInstructionId())
.setTransformId(endpoint.getTransformId())
.setTimerFamilyId(endpoint.getTimerFamilyId())
Expand All @@ -206,7 +206,7 @@ BeamFnApi.Elements messageWithData(byte[]... datum) throws IOException {
BeamFnApi.Elements endMessageWithData() throws IOException {
BeamFnApi.Elements.Builder builder = messageWithDataBuilder();
if (endpoint.isTimer()) {
builder.getTimerBuilder(0).setIsLast(true);
builder.getTimersBuilder(0).setIsLast(true);
} else {
builder.getDataBuilder(0).setIsLast(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ BeamFnApi.Elements.Builder messageWithDataBuilder(byte[]... datum) throws IOExce
}
if (endpoint.isTimer()) {
return BeamFnApi.Elements.newBuilder()
.addTimer(
BeamFnApi.Elements.Timer.newBuilder()
.addTimers(
BeamFnApi.Elements.Timers.newBuilder()
.setInstructionId(endpoint.getInstructionId())
.setTransformId(endpoint.getTransformId())
.setTimerFamilyId(endpoint.getTimerFamilyId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ def process_bundle(self,
expect_reads,
abort_callback=lambda:
(result_future.is_done() and result_future.get().error)):
if isinstance(output, beam_fn_api_pb2.Elements.Timer):
if isinstance(output, beam_fn_api_pb2.Elements.Timers):
with BundleManager._lock:
self._get_buffer(
expected_output_timers[(
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/worker/bundle_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ def process_bundle(self, instruction_id):
for data_channel, expected_inputs in data_channels.items():
for element in data_channel.input_elements(instruction_id,
expected_inputs):
if isinstance(element, beam_fn_api_pb2.Elements.Timer):
if isinstance(element, beam_fn_api_pb2.Elements.Timers):
timer_coder_impl = (
self.timers_info[(
element.transform_id,
Expand Down
22 changes: 11 additions & 11 deletions sdks/python/apache_beam/runners/worker/data_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import apache_beam.coders.slow_stream
OutputStream = apache_beam.coders.slow_stream.OutputStream
DataOrTimers = \
Union[beam_fn_api_pb2.Elements.Data, beam_fn_api_pb2.Elements.Timer]
Union[beam_fn_api_pb2.Elements.Data, beam_fn_api_pb2.Elements.Timers]
else:
OutputStream = type(coder_impl.create_OutputStream())

Expand Down Expand Up @@ -223,7 +223,7 @@ def input_elements(self,
):
# type: (...) -> Iterator[DataOrTimers]

"""Returns an iterable of all Element.Data and Element.Timer bundles for
"""Returns an iterable of all Element.Data and Element.Timers bundles for
instruction_id.
This iterable terminates only once the full set of data has been recieved
Expand Down Expand Up @@ -309,7 +309,7 @@ def input_elements(self,
other_inputs = []
for element in self._inputs:
if element.instruction_id == instruction_id:
if isinstance(element, beam_fn_api_pb2.Elements.Timer):
if isinstance(element, beam_fn_api_pb2.Elements.Timers):
if not element.is_last:
yield element
if isinstance(element, beam_fn_api_pb2.Elements.Data):
Expand All @@ -323,7 +323,7 @@ def output_timer_stream(self, instruction_id, transform_id, timer_family_id):
def add_to_inverse_output(timer):
if timer:
self._inverse._inputs.append(
beam_fn_api_pb2.Elements.Timer(
beam_fn_api_pb2.Elements.Timers(
instruction_id=instruction_id,
transform_id=transform_id,
timer_family_id=timer_family_id,
Expand All @@ -333,7 +333,7 @@ def add_to_inverse_output(timer):
def close_stream(timer):
add_to_inverse_output(timer)
self._inverse._inputs.append(
beam_fn_api_pb2.Elements.Timer(
beam_fn_api_pb2.Elements.Timers(
instruction_id=instruction_id,
transform_id=transform_id,
timer_family_id='',
Expand Down Expand Up @@ -428,7 +428,7 @@ def input_elements(self,
t, v, tb = self._exc_info
raise_(t, v, tb)
else:
if isinstance(element, beam_fn_api_pb2.Elements.Timer):
if isinstance(element, beam_fn_api_pb2.Elements.Timers):
if element.is_last:
done_inputs.add((element.transform_id, element.timer_family_id))
else:
Expand Down Expand Up @@ -476,7 +476,7 @@ def output_timer_stream(self, instruction_id, transform_id, timer_family_id):
def add_to_send_queue(timer):
if timer:
self._to_send.put(
beam_fn_api_pb2.Elements.Timer(
beam_fn_api_pb2.Elements.Timers(
instruction_id=instruction_id,
transform_id=transform_id,
timer_family_id=timer_family_id,
Expand All @@ -486,7 +486,7 @@ def add_to_send_queue(timer):
def close_callback(timer):
add_to_send_queue(timer)
self._to_send.put(
beam_fn_api_pb2.Elements.Timer(
beam_fn_api_pb2.Elements.Timers(
instruction_id=instruction_id,
transform_id=transform_id,
timer_family_id=timer_family_id,
Expand Down Expand Up @@ -514,19 +514,19 @@ def _write_outputs(self):
data_stream = []
timer_stream = []
for stream in streams:
if isinstance(stream, beam_fn_api_pb2.Elements.Timer):
if isinstance(stream, beam_fn_api_pb2.Elements.Timers):
timer_stream.append(stream)
elif isinstance(stream, beam_fn_api_pb2.Elements.Data):
data_stream.append(stream)
else:
raise ValueError('Unexpected output element type %s' % type(stream))
yield beam_fn_api_pb2.Elements(data=data_stream, timer=timer_stream)
yield beam_fn_api_pb2.Elements(data=data_stream, timers=timer_stream)

def _read_inputs(self, elements_iterator):
# type: (Iterable[beam_fn_api_pb2.Elements]) -> None
try:
for elements in elements_iterator:
for timer in elements.timer:
for timer in elements.timers:
self._receiving_queue(timer.instruction_id).put(timer)
for data in elements.data:
self._receiving_queue(data.instruction_id).put(data)
Expand Down

0 comments on commit 99fa8ac

Please sign in to comment.