From ae858a54357252a6b3487c5cf25f3fa0a492f77a Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Mon, 5 Jun 2017 09:28:17 -0700 Subject: [PATCH] [BEAM-1347] Add additional logging --- .../beam/fn/harness/control/BeamFnControlClient.java | 3 ++- .../beam/fn/harness/data/BeamFnDataGrpcClient.java | 5 ++++- .../beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java | 9 +++++++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java index e40bb2f033f3..1c4d277dfde7 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java @@ -89,7 +89,7 @@ public BeamFnControlClient( private class InboundObserver implements StreamObserver { @Override public void onNext(BeamFnApi.InstructionRequest value) { - LOG.info("InstructionRequest received {}", value); + LOG.debug("Received InstructionRequest {}", value); Uninterruptibles.putUninterruptibly(bufferedInstructions, value); } @@ -155,6 +155,7 @@ public BeamFnApi.InstructionResponse delegateOnInstructionRequestType( } public void sendInstructionResponse(BeamFnApi.InstructionResponse value) { + LOG.debug("Sending InstructionResponse {}", value); outboundObserver.onNext(value); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java index 4137cd7d3f05..8351626f27bf 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClient.java @@ -78,7 +78,7 @@ public CompletableFuture forInboundConsumer( KV inputLocation, Coder> coder, ThrowingConsumer> consumer) { - LOG.debug("Registering consumer instruction {} for target {}", + LOG.debug("Registering consumer for instruction {} and target {}", inputLocation.getKey(), inputLocation.getValue()); @@ -106,6 +106,9 @@ public CloseableThrowingConsumer> forOutboundConsumer( Coder> coder) { BeamFnDataGrpcMultiplexer client = getClientFor(apiServiceDescriptor); + LOG.debug("Creating output consumer for instruction {} and target {}", + outputLocation.getKey(), + outputLocation.getValue()); return new BeamFnDataBufferingOutboundObserver<>( options, outputLocation, coder, client.getOutboundObserver()); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java index 53dfe11cc301..8ee549166eb3 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcMultiplexer.java @@ -84,7 +84,7 @@ public CompletableFuture> futureForKey( KV key) { return consumers.computeIfAbsent( key, - (KV providedKey) -> new CompletableFuture<>()); + (KV unused) -> new CompletableFuture<>()); } /** @@ -102,7 +102,12 @@ public void onNext(BeamFnApi.Elements value) { try { KV key = KV.of(data.getInstructionReference(), data.getTarget()); - futureForKey(key).get().accept(data); + CompletableFuture> consumer = futureForKey(key); + if (!consumer.isDone()) { + LOG.debug("Received data for key {} without consumer ready. " + + "Waiting for consumer to be registered.", key); + } + consumer.get().accept(data); if (data.getData().isEmpty()) { consumers.remove(key); }