diff --git a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java index 8552ea126ab..0f3c2a39f24 100644 --- a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java +++ b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java @@ -115,13 +115,12 @@ public void close(ConversationContext context) { @Override public void onConnect(ConversationContext context) { - final CompletableFuture future = new CompletableFuture<>(); // If we have connection credentials available, try to set up the AMS routes. CompletableFuture setupAmsRouteFuture; if (context.getAuthentication() != null) { if (!(context.getAuthentication() instanceof PlcUsernamePasswordAuthentication)) { - future.completeExceptionally(new PlcConnectionException( + context.getChannel().pipeline().fireExceptionCaught(new PlcConnectionException( "This type of connection only supports username-password authentication")); return; } @@ -152,15 +151,15 @@ public void onConnect(ConversationContext context) { RequestTransactionManager.RequestTransaction readDeviceInfoTx = tm.startRequest(); readDeviceInfoTx.submit(() -> context.sendRequest(new AmsTCPPacket(readDeviceInfoRequest)) .expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest())) - .onTimeout(future::completeExceptionally) - .onError((p, e) -> future.completeExceptionally(e)) + .onTimeout(e -> context.getChannel().pipeline().fireExceptionCaught(e)) + .onError((p, e) -> context.getChannel().pipeline().fireExceptionCaught(e)) .check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == readDeviceInfoRequest.getInvokeId()) .unwrap(response -> (AdsReadDeviceInfoResponse) response.getUserdata()) .handle(readDeviceInfoResponse -> { readDeviceInfoTx.endRequest(); if (readDeviceInfoResponse.getResult() != ReturnCode.OK) { // TODO: Handle this - future.completeExceptionally(new PlcException("Result is " + readDeviceInfoResponse.getResult())); + context.getChannel().pipeline().fireExceptionCaught(new PlcException("Result is " + readDeviceInfoResponse.getResult())); return; } @@ -178,15 +177,15 @@ public void onConnect(ConversationContext context) { RequestTransactionManager.RequestTransaction readOnlineVersionNumberTx = tm.startRequest(); readOnlineVersionNumberTx.submit(() -> context.sendRequest(new AmsTCPPacket(readOnlineVersionNumberRequest)) .expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest())) - .onTimeout(future::completeExceptionally) - .onError((p, e) -> future.completeExceptionally(e)) + .onTimeout(e -> context.getChannel().pipeline().fireExceptionCaught(e)) + .onError((p, e) -> context.getChannel().pipeline().fireExceptionCaught(e)) .check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == readOnlineVersionNumberRequest.getInvokeId()) .unwrap(response -> (AdsReadWriteResponse) response.getUserdata()) .handle(readOnlineVersionNumberResponse -> { readOnlineVersionNumberTx.endRequest(); if (readOnlineVersionNumberResponse.getResult() != ReturnCode.OK) { // TODO: Handle this - future.completeExceptionally(new PlcException("Result is " + readOnlineVersionNumberResponse.getResult())); + context.getChannel().pipeline().fireExceptionCaught(new PlcException("Result is " + readOnlineVersionNumberResponse.getResult())); return; } try { @@ -201,15 +200,15 @@ public void onConnect(ConversationContext context) { RequestTransactionManager.RequestTransaction readSymbolVersionNumberTx = tm.startRequest(); readSymbolVersionNumberTx.submit(() -> context.sendRequest(new AmsTCPPacket(readSymbolVersionNumberRequest)) .expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest())) - .onTimeout(future::completeExceptionally) - .onError((p, e) -> future.completeExceptionally(e)) + .onTimeout(e -> context.getChannel().pipeline().fireExceptionCaught(e)) + .onError((p, e) -> context.getChannel().pipeline().fireExceptionCaught(e)) .check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == readSymbolVersionNumberRequest.getInvokeId()) .unwrap(response -> (AdsReadResponse) response.getUserdata()) .handle(readSymbolVersionNumberResponse -> { readSymbolVersionNumberTx.endRequest(); if (readSymbolVersionNumberResponse.getResult() != ReturnCode.OK) { // TODO: Handle this - future.completeExceptionally(new PlcException("Result is " + readSymbolVersionNumberResponse.getResult())); + context.getChannel().pipeline().fireExceptionCaught(new PlcException("Result is " + readSymbolVersionNumberResponse.getResult())); return; } try { @@ -226,11 +225,11 @@ public void onConnect(ConversationContext context) { } }); } catch (ParseException e) { - future.completeExceptionally(new PlcConnectionException("Error reading the symbol version of data type and symbol data.", e)); + context.getChannel().pipeline().fireExceptionCaught(new PlcConnectionException("Error reading the symbol version of data type and symbol data.", e)); } })); } catch (ParseException e) { - future.completeExceptionally(new PlcConnectionException("Error reading the online version of data type and symbol data.", e)); + context.getChannel().pipeline().fireExceptionCaught(new PlcConnectionException("Error reading the online version of data type and symbol data.", e)); } })); }));