Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public BeamFnControlClient(
private class InboundObserver implements StreamObserver<BeamFnApi.InstructionRequest> {
@Override
public void onNext(BeamFnApi.InstructionRequest value) {
LOG.info("InstructionRequest received {}", value);
LOG.debug("Received InstructionRequest {}", value);
Uninterruptibles.putUninterruptibly(bufferedInstructions, value);
}

Expand Down Expand Up @@ -155,6 +155,7 @@ public BeamFnApi.InstructionResponse delegateOnInstructionRequestType(
}

public void sendInstructionResponse(BeamFnApi.InstructionResponse value) {
LOG.debug("Sending InstructionResponse {}", value);
outboundObserver.onNext(value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public <T> CompletableFuture<Void> forInboundConsumer(
KV<String, BeamFnApi.Target> inputLocation,
Coder<WindowedValue<T>> coder,
ThrowingConsumer<WindowedValue<T>> consumer) {
LOG.debug("Registering consumer instruction {} for target {}",
LOG.debug("Registering consumer for instruction {} and target {}",
inputLocation.getKey(),
inputLocation.getValue());

Expand Down Expand Up @@ -106,6 +106,9 @@ public <T> CloseableThrowingConsumer<WindowedValue<T>> forOutboundConsumer(
Coder<WindowedValue<T>> coder) {
BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor);

LOG.debug("Creating output consumer for instruction {} and target {}",
outputLocation.getKey(),
outputLocation.getValue());
return new BeamFnDataBufferingOutboundObserver<>(
options, outputLocation, coder, client.getOutboundObserver());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ public void onNext(BeamFnApi.Elements value) {
KV.of(data.getInstructionReference(), data.getTarget());
CompletableFuture<Consumer<BeamFnApi.Elements.Data>> consumer = futureForKey(key);
if (!consumer.isDone()) {
LOG.debug("Received data for key {} without consumer ready.", key);
LOG.debug("Received data for key {} without consumer ready. "
+ "Waiting for consumer to be registered.", key);
}
consumer.get().accept(data);
if (data.getData().isEmpty()) {
Expand Down