diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java index 33ef3166734..53804e0ec83 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java @@ -23,23 +23,26 @@ import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.MessageToMessageCodec; import io.vavr.control.Either; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.apache.plc4x.java.api.authentication.PlcAuthentication; +import org.apache.plc4x.java.spi.TimeoutManager.CompletionCallback; import org.apache.plc4x.java.spi.configuration.Configuration; import org.apache.plc4x.java.spi.events.*; +import org.apache.plc4x.java.spi.internal.DefaultConversationContext; import org.apache.plc4x.java.spi.internal.DefaultExpectRequestContext; import org.apache.plc4x.java.spi.internal.DefaultSendRequestContext; import org.apache.plc4x.java.spi.internal.HandlerRegistration; +import org.apache.plc4x.java.spi.netty.NettyHashTimerTimeoutManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; -import java.time.Instant; import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -55,8 +58,14 @@ public class Plc4xNettyWrapper extends MessageToMessageCodec { private final Queue registeredHandlers; private final ChannelPipeline pipeline; private final boolean passive; + private final TimeoutManager timeoutManager; public Plc4xNettyWrapper(ChannelPipeline pipeline, boolean passive, Plc4xProtocolBase protocol, + PlcAuthentication authentication, Class clazz) { + this(new NettyHashTimerTimeoutManager(), pipeline, passive, protocol, authentication, clazz); + } + + public Plc4xNettyWrapper(TimeoutManager timeoutManager, ChannelPipeline pipeline, boolean passive, Plc4xProtocolBase protocol, PlcAuthentication authentication, Class clazz) { super(clazz, Object.class); this.pipeline = pipeline; @@ -64,6 +73,8 @@ public Plc4xNettyWrapper(ChannelPipeline pipeline, boolean passive, Plc4xProtoco this.registeredHandlers = new ConcurrentLinkedQueue<>(); this.protocolBase = protocol; this.authentication = authentication; + this.timeoutManager = timeoutManager; + //this.protocolBase.setContext(new DefaultConversationContext<>(this::registerHandler, pipeline.context(""), authentication, passive)); this.protocolBase.setContext(new ConversationContext() { @Override @@ -102,19 +113,15 @@ public void fireDiscovered(Configuration c) { } @Override + @SuppressWarnings({"unchecked", "rawtypes"}) public SendRequestContext sendRequest(T packet) { - return new DefaultSendRequestContext<>(handler -> { - logger.trace("Adding Response Handler ..."); - registeredHandlers.add(handler); - }, packet, this); + return new DefaultSendRequestContext<>(Plc4xNettyWrapper.this::registerHandler, packet, this); } @Override + @SuppressWarnings({"unchecked", "rawtypes"}) public ExpectRequestContext expectRequest(Class clazz, Duration timeout) { - return new DefaultExpectRequestContext<>(handler -> { - logger.trace("Adding Request Handler ..."); - registeredHandlers.add(handler); - }, clazz, timeout, this); + return new DefaultExpectRequestContext<>(Plc4xNettyWrapper.this::registerHandler, clazz, timeout, this); } }); @@ -151,15 +158,6 @@ protected void decode(ChannelHandlerContext channelHandlerContext, T t, List(channelHandlerContext, authentication, passive), t); + protocolBase.decode(new DefaultConversationContext<>(this::registerHandler, channelHandlerContext, authentication, passive), t); } @Override @@ -200,85 +198,60 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc // by sending a connection request to the plc. logger.debug("User Event triggered {}", evt); if (evt instanceof ConnectEvent) { - this.protocolBase.onConnect(new DefaultConversationContext<>(ctx, authentication, passive)); + this.protocolBase.onConnect(new DefaultConversationContext<>(this::registerHandler, ctx, authentication, passive)); } else if (evt instanceof DisconnectEvent) { - this.protocolBase.onDisconnect(new DefaultConversationContext<>(ctx, authentication, passive)); + this.protocolBase.onDisconnect(new DefaultConversationContext<>(this::registerHandler, ctx, authentication, passive)); } else if (evt instanceof DiscoverEvent) { - this.protocolBase.onDiscover(new DefaultConversationContext<>(ctx, authentication, passive)); + this.protocolBase.onDiscover(new DefaultConversationContext<>(this::registerHandler, ctx, authentication, passive)); } else if (evt instanceof CloseConnectionEvent) { - this.protocolBase.close(new DefaultConversationContext<>(ctx, authentication, passive)); + this.protocolBase.close(new DefaultConversationContext<>(this::registerHandler, ctx, authentication, passive)); } else { super.userEventTriggered(ctx, evt); } } - public class DefaultConversationContext implements ConversationContext { - private final ChannelHandlerContext channelHandlerContext; - - private final PlcAuthentication authentication; - private final boolean passive; - - public DefaultConversationContext(ChannelHandlerContext channelHandlerContext, - PlcAuthentication authentication, - boolean passive) { - this.channelHandlerContext = channelHandlerContext; - this.authentication = authentication; - this.passive = passive; - } - - @Override - public Channel getChannel() { - return channelHandlerContext.channel(); - } - - public PlcAuthentication getAuthentication() { - return authentication; - } - - @Override - public boolean isPassive() { - return passive; - } - - @Override - public void sendToWire(T1 msg) { - logger.trace("Sending to wire {}", msg); - channelHandlerContext.channel().writeAndFlush(msg); - } - - @Override - public void fireConnected() { - logger.trace("Firing Connected!"); - channelHandlerContext.pipeline().fireUserEventTriggered(new ConnectedEvent()); - } - - @Override - public void fireDisconnected() { - logger.trace("Firing Disconnected!"); - channelHandlerContext.pipeline().fireUserEventTriggered(new DisconnectedEvent()); - } - - @Override - public void fireDiscovered(Configuration c) { - logger.trace("Firing Discovered!"); - channelHandlerContext.pipeline().fireUserEventTriggered(new DiscoveredEvent(c)); - } + /** + * Performs registration of packet handler and makes sure that its timeout will be handled properly. + * + * Since timeouts are controlled by {@link TimeoutManager} there is a need to decorate handler + * operations so both sides know what's going on. + * + * @param handler Handler to be registered. + */ + private void registerHandler(HandlerRegistration handler) { + AtomicReference deferred = new AtomicReference<>(); + CompletionCallback completionCallback = this.timeoutManager.register(new TimedOperation() { + @Override + public Consumer getOnTimeoutConsumer() { + return onTimeout(deferred, handler.getOnTimeoutConsumer()); + } - @Override - public SendRequestContext sendRequest(T1 packet) { - return new DefaultSendRequestContext<>(handler -> { - logger.trace("Adding Response Handler ..."); - registeredHandlers.add(handler); - }, packet, this); - } + @Override + public Duration getTimeout() { + return handler.getTimeout(); + } + }); + // wrap handler, so we can catch packet consumer call and inform completion callback. + HandlerRegistration registration = new HandlerRegistration( + handler.getCommands(), + handler.getExpectClazz(), + completionCallback.andThen(handler.getPacketConsumer()), + handler.getOnTimeoutConsumer(), + handler.getErrorConsumer(), + handler.getTimeout() + ); + deferred.set(registration); + registeredHandlers.add(registration); + } - @Override - public ExpectRequestContext expectRequest(Class clazz, Duration timeout) { - return new DefaultExpectRequestContext<>(handler -> { - logger.trace("Adding Request Handler ..."); - registeredHandlers.add(handler); - }, clazz, timeout, this); - } + private Consumer onTimeout(AtomicReference reference, Consumer onTimeoutConsumer) { + return new Consumer() { + @Override + public void accept(TimeoutException e) { + registeredHandlers.remove(reference.get()); + onTimeoutConsumer.accept(e); + } + }; } } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/configuration/Configuration.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/configuration/Configuration.java index ce47796d1b4..10de17cf3ca 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/configuration/Configuration.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/configuration/Configuration.java @@ -18,6 +18,13 @@ */ package org.apache.plc4x.java.spi.configuration; +import org.apache.plc4x.java.spi.netty.NettyHashTimerTimeoutManager; +import org.apache.plc4x.java.spi.TimeoutManager; + public interface Configuration { + default TimeoutManager getTimeoutManager() { + return new NettyHashTimerTimeoutManager(); + } + } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/CustomProtocolStackConfigurer.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/CustomProtocolStackConfigurer.java index 395a029d36b..82e350c72f9 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/CustomProtocolStackConfigurer.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/CustomProtocolStackConfigurer.java @@ -101,7 +101,7 @@ public Plc4xProtocolBase configurePipeline(Configuration conf if (driverContext != null) { protocol.setDriverContext(driverContext); } - Plc4xNettyWrapper context = new Plc4xNettyWrapper<>(pipeline, passive, protocol, authentication, basePacketClass); + Plc4xNettyWrapper context = new Plc4xNettyWrapper<>(configuration.getTimeoutManager(), pipeline, passive, protocol, authentication, basePacketClass); pipeline.addLast(context); return protocol; } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/SingleProtocolStackConfigurer.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/SingleProtocolStackConfigurer.java index 8c60e5262bc..59c3c00b8b2 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/SingleProtocolStackConfigurer.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/SingleProtocolStackConfigurer.java @@ -111,7 +111,7 @@ public Plc4xProtocolBase configurePipeline(Configuration conf if (driverContextClass != null) { protocol.setDriverContext(configure(configuration, createInstance(driverContextClass))); } - Plc4xNettyWrapper context = new Plc4xNettyWrapper<>(pipeline, passive, protocol, + Plc4xNettyWrapper context = new Plc4xNettyWrapper<>(configuration.getTimeoutManager(), pipeline, passive, protocol, authentication, basePacketClass); pipeline.addLast(context); return protocol; diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java new file mode 100644 index 00000000000..96de091dd03 --- /dev/null +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultConversationContext.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.plc4x.java.spi.internal; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import java.time.Duration; +import java.util.function.Consumer; +import org.apache.plc4x.java.api.authentication.PlcAuthentication; +import org.apache.plc4x.java.spi.ConversationContext; +import org.apache.plc4x.java.spi.configuration.Configuration; +import org.apache.plc4x.java.spi.events.ConnectedEvent; +import org.apache.plc4x.java.spi.events.DisconnectedEvent; +import org.apache.plc4x.java.spi.events.DiscoveredEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultConversationContext implements ConversationContext { + private final Logger logger = LoggerFactory.getLogger(DefaultConversationContext.class); + + private final Consumer handlerRegistrar; + + private final ChannelHandlerContext channelHandlerContext; + private final PlcAuthentication authentication; + private final boolean passive; + + public DefaultConversationContext(Consumer handlerRegistrar, + ChannelHandlerContext channelHandlerContext, + PlcAuthentication authentication, + boolean passive) { + this.handlerRegistrar = handlerRegistrar; + this.channelHandlerContext = channelHandlerContext; + this.authentication = authentication; + this.passive = passive; + } + @Override + public Channel getChannel() { + return channelHandlerContext.channel(); + } + + public PlcAuthentication getAuthentication() { + return authentication; + } + + @Override + public boolean isPassive() { + return passive; + } + + @Override + public void sendToWire(T1 msg) { + logger.trace("Sending to wire {}", msg); + channelHandlerContext.channel().writeAndFlush(msg); + } + + @Override + public void fireConnected() { + logger.trace("Firing Connected!"); + channelHandlerContext.pipeline().fireUserEventTriggered(new ConnectedEvent()); + } + + @Override + public void fireDisconnected() { + logger.trace("Firing Disconnected!"); + channelHandlerContext.pipeline().fireUserEventTriggered(new DisconnectedEvent()); + } + + @Override + public void fireDiscovered(Configuration c) { + logger.trace("Firing Discovered!"); + channelHandlerContext.pipeline().fireUserEventTriggered(new DiscoveredEvent(c)); + } + + @Override + public SendRequestContext sendRequest(T1 packet) { + return new DefaultSendRequestContext<>(handler -> { + logger.trace("Adding Response Handler ..."); + handlerRegistrar.accept(handler); + }, packet, this); + } + + @Override + public ExpectRequestContext expectRequest(Class clazz, Duration timeout) { + return new DefaultExpectRequestContext<>(handler -> { + logger.trace("Adding Request Handler ..."); + handlerRegistrar.accept(handler); + }, clazz, timeout, this); + } +} \ No newline at end of file diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java index 7dc9216d8b5..c71b203a3d8 100644 --- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/HandlerRegistration.java @@ -21,10 +21,8 @@ import io.vavr.control.Either; import java.time.Duration; -import java.time.Instant; import java.util.Deque; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -46,7 +44,6 @@ public class HandlerRegistration { private final BiConsumer errorConsumer; private final Duration timeout; - private final Instant timeoutAt; private volatile boolean cancelled = false; private volatile boolean handled = false; @@ -58,7 +55,6 @@ public HandlerRegistration(Deque, Predicate>> commands, this.onTimeoutConsumer = onTimeoutConsumer; this.errorConsumer = errorConsumer; this.timeout = timeout; - this.timeoutAt = Instant.now().plus(timeout); } public Deque, Predicate>> getCommands() { @@ -85,10 +81,6 @@ public Duration getTimeout() { return timeout; } - public Instant getTimeoutAt() { - return timeoutAt; - } - public void cancel() { this.cancelled = true; } @@ -109,4 +101,5 @@ public boolean hasHandled() { public String toString() { return "HandlerRegistration#" + id; } + } diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/netty/NettyHashTimerTimeoutManager.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/netty/NettyHashTimerTimeoutManager.java new file mode 100644 index 00000000000..823baedee76 --- /dev/null +++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/netty/NettyHashTimerTimeoutManager.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.plc4x.java.spi.netty; + +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import org.apache.plc4x.java.spi.TimedOperation; +import org.apache.plc4x.java.spi.TimeoutManager; + +public class NettyHashTimerTimeoutManager implements TimeoutManager { + + private final Timer timer; + + public NettyHashTimerTimeoutManager() { + this(100L); + } + + public NettyHashTimerTimeoutManager(long tick) { + HashedWheelTimer wheelTimer = new HashedWheelTimer(tick, TimeUnit.MILLISECONDS); + timer = wheelTimer; + wheelTimer.start(); + } + + @Override + public CompletionCallback register(TimedOperation operation) { + Timeout newTimeout = timer.newTimeout(timeout -> { + if (timeout.isCancelled()) { + return; + } + TimeoutException exception = new TimeoutException(); + operation.getOnTimeoutConsumer().accept(exception); + }, operation.getTimeout().toMillis(), TimeUnit.MILLISECONDS); + + return new TimeoutCompletionCallback(newTimeout); + } + + @Override + public void stop() { + Set timeouts = timer.stop(); + timeouts.forEach(Timeout::cancel); + } + + static class TimeoutCompletionCallback implements CompletionCallback { + + private final Timeout timeout; + private Consumer onTimeout; + + TimeoutCompletionCallback(Timeout timeout) { + this.timeout = timeout; + } + + @Override + public void complete() { + timeout.cancel(); + } + } +} diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java index 3b360c3b918..629d38f14d7 100644 --- a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/Plc4xNettyWrapperTest.java @@ -90,7 +90,7 @@ void conversationTimeoutTest() throws Exception { Thread.sleep(750); - verify(false, false, false); + verify(true, false, false); wrapper.decode(channelHandlerContext, new Date(), new ArrayList<>()); verify(true, false, false); diff --git a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContextTest.java b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContextTest.java index b3ae6bce6da..11b28df2fcf 100644 --- a/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContextTest.java +++ b/plc4j/spi/src/test/java/org/apache/plc4x/java/spi/internal/DefaultSendRequestContextTest.java @@ -43,7 +43,7 @@ class DefaultSendRequestContextTest { Consumer finisher; @Mock - Plc4xNettyWrapper.DefaultConversationContext context; + ConversationContext context; DefaultSendRequestContext SUT;