Skip to content

Commit

Permalink
feat(plc4j) Better handling of timeouts in plc4j (#821).
Browse files Browse the repository at this point in the history
Signed-off-by: Łukasz Dywicki <luke@code-house.org>
  • Loading branch information
splatch committed Feb 23, 2023
1 parent 65346de commit fd174ff
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,26 @@
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.MessageToMessageCodec;
import io.vavr.control.Either;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.spi.TimeoutManager.CompletionCallback;
import org.apache.plc4x.java.spi.configuration.Configuration;
import org.apache.plc4x.java.spi.events.*;
import org.apache.plc4x.java.spi.internal.DefaultConversationContext;
import org.apache.plc4x.java.spi.internal.DefaultExpectRequestContext;
import org.apache.plc4x.java.spi.internal.DefaultSendRequestContext;
import org.apache.plc4x.java.spi.internal.HandlerRegistration;
import org.apache.plc4x.java.spi.netty.NettyHashTimerTimeoutManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -55,15 +58,23 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
private final Queue<HandlerRegistration> registeredHandlers;
private final ChannelPipeline pipeline;
private final boolean passive;
private final TimeoutManager timeoutManager;

public Plc4xNettyWrapper(ChannelPipeline pipeline, boolean passive, Plc4xProtocolBase<T> protocol,
PlcAuthentication authentication, Class<T> clazz) {
this(new NettyHashTimerTimeoutManager(), pipeline, passive, protocol, authentication, clazz);
}

public Plc4xNettyWrapper(TimeoutManager timeoutManager, ChannelPipeline pipeline, boolean passive, Plc4xProtocolBase<T> protocol,
PlcAuthentication authentication, Class<T> clazz) {
super(clazz, Object.class);
this.pipeline = pipeline;
this.passive = passive;
this.registeredHandlers = new ConcurrentLinkedQueue<>();
this.protocolBase = protocol;
this.authentication = authentication;
this.timeoutManager = timeoutManager;
//this.protocolBase.setContext(new DefaultConversationContext<>(this::registerHandler, pipeline.context(""), authentication, passive));
this.protocolBase.setContext(new ConversationContext<T>() {

@Override
Expand Down Expand Up @@ -102,19 +113,15 @@ public void fireDiscovered(Configuration c) {
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public SendRequestContext<T> sendRequest(T packet) {
return new DefaultSendRequestContext<>(handler -> {
logger.trace("Adding Response Handler ...");
registeredHandlers.add(handler);
}, packet, this);
return new DefaultSendRequestContext<>(Plc4xNettyWrapper.this::registerHandler, packet, this);
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public ExpectRequestContext<T> expectRequest(Class<T> clazz, Duration timeout) {
return new DefaultExpectRequestContext<>(handler -> {
logger.trace("Adding Request Handler ...");
registeredHandlers.add(handler);
}, clazz, timeout, this);
return new DefaultExpectRequestContext<>(Plc4xNettyWrapper.this::registerHandler, clazz, timeout, this);
}

});
Expand Down Expand Up @@ -151,15 +158,6 @@ protected void decode(ChannelHandlerContext channelHandlerContext, T t, List<Obj
continue;
}
// Timeout?
final Instant now = Instant.now();
if (registration.getTimeoutAt().isBefore(now)) {
logger.debug("Removing {} as its timed out (timeout of {} was set till {} and now is {})",
registration, registration.getTimeout(), registration.getTimeoutAt(), now);
// pass timeout back to caller so it can do ie. transaction compensation
registration.getOnTimeoutConsumer().accept(new TimeoutException());
iter.remove();
continue;
}
logger.trace("Checking handler {} for Object of type {}", registration, t.getClass().getSimpleName());
if (registration.getExpectClazz().isInstance(t)) {
logger.trace("Handler {} has right expected type {}, checking condition", registration, registration.getExpectClazz().getSimpleName());
Expand Down Expand Up @@ -191,7 +189,7 @@ protected void decode(ChannelHandlerContext channelHandlerContext, T t, List<Obj
}
}
logger.trace("None of {} registered handlers could handle message {}, using default decode method", this.registeredHandlers.size(), t);
protocolBase.decode(new DefaultConversationContext<>(channelHandlerContext, authentication, passive), t);
protocolBase.decode(new DefaultConversationContext<>(this::registerHandler, channelHandlerContext, authentication, passive), t);
}

@Override
Expand All @@ -200,85 +198,60 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
// by sending a connection request to the plc.
logger.debug("User Event triggered {}", evt);
if (evt instanceof ConnectEvent) {
this.protocolBase.onConnect(new DefaultConversationContext<>(ctx, authentication, passive));
this.protocolBase.onConnect(new DefaultConversationContext<>(this::registerHandler, ctx, authentication, passive));
} else if (evt instanceof DisconnectEvent) {
this.protocolBase.onDisconnect(new DefaultConversationContext<>(ctx, authentication, passive));
this.protocolBase.onDisconnect(new DefaultConversationContext<>(this::registerHandler, ctx, authentication, passive));
} else if (evt instanceof DiscoverEvent) {
this.protocolBase.onDiscover(new DefaultConversationContext<>(ctx, authentication, passive));
this.protocolBase.onDiscover(new DefaultConversationContext<>(this::registerHandler, ctx, authentication, passive));
} else if (evt instanceof CloseConnectionEvent) {
this.protocolBase.close(new DefaultConversationContext<>(ctx, authentication, passive));
this.protocolBase.close(new DefaultConversationContext<>(this::registerHandler, ctx, authentication, passive));
} else {
super.userEventTriggered(ctx, evt);
}
}

public class DefaultConversationContext<T1> implements ConversationContext<T1> {
private final ChannelHandlerContext channelHandlerContext;

private final PlcAuthentication authentication;
private final boolean passive;

public DefaultConversationContext(ChannelHandlerContext channelHandlerContext,
PlcAuthentication authentication,
boolean passive) {
this.channelHandlerContext = channelHandlerContext;
this.authentication = authentication;
this.passive = passive;
}

@Override
public Channel getChannel() {
return channelHandlerContext.channel();
}

public PlcAuthentication getAuthentication() {
return authentication;
}

@Override
public boolean isPassive() {
return passive;
}

@Override
public void sendToWire(T1 msg) {
logger.trace("Sending to wire {}", msg);
channelHandlerContext.channel().writeAndFlush(msg);
}

@Override
public void fireConnected() {
logger.trace("Firing Connected!");
channelHandlerContext.pipeline().fireUserEventTriggered(new ConnectedEvent());
}

@Override
public void fireDisconnected() {
logger.trace("Firing Disconnected!");
channelHandlerContext.pipeline().fireUserEventTriggered(new DisconnectedEvent());
}

@Override
public void fireDiscovered(Configuration c) {
logger.trace("Firing Discovered!");
channelHandlerContext.pipeline().fireUserEventTriggered(new DiscoveredEvent(c));
}
/**
* Performs registration of packet handler and makes sure that its timeout will be handled properly.
*
* Since timeouts are controlled by {@link TimeoutManager} there is a need to decorate handler
* operations so both sides know what's going on.
*
* @param handler Handler to be registered.
*/
private void registerHandler(HandlerRegistration handler) {
AtomicReference<HandlerRegistration> deferred = new AtomicReference<>();
CompletionCallback completionCallback = this.timeoutManager.register(new TimedOperation() {
@Override
public Consumer<TimeoutException> getOnTimeoutConsumer() {
return onTimeout(deferred, handler.getOnTimeoutConsumer());
}

@Override
public SendRequestContext<T1> sendRequest(T1 packet) {
return new DefaultSendRequestContext<>(handler -> {
logger.trace("Adding Response Handler ...");
registeredHandlers.add(handler);
}, packet, this);
}
@Override
public Duration getTimeout() {
return handler.getTimeout();
}
});
// wrap handler, so we can catch packet consumer call and inform completion callback.
HandlerRegistration registration = new HandlerRegistration(
handler.getCommands(),
handler.getExpectClazz(),
completionCallback.andThen(handler.getPacketConsumer()),
handler.getOnTimeoutConsumer(),
handler.getErrorConsumer(),
handler.getTimeout()
);
deferred.set(registration);
registeredHandlers.add(registration);
}

@Override
public ExpectRequestContext<T1> expectRequest(Class<T1> clazz, Duration timeout) {
return new DefaultExpectRequestContext<>(handler -> {
logger.trace("Adding Request Handler ...");
registeredHandlers.add(handler);
}, clazz, timeout, this);
}
private Consumer<TimeoutException> onTimeout(AtomicReference<HandlerRegistration> reference, Consumer<TimeoutException> onTimeoutConsumer) {
return new Consumer<TimeoutException>() {
@Override
public void accept(TimeoutException e) {
registeredHandlers.remove(reference.get());
onTimeoutConsumer.accept(e);
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
*/
package org.apache.plc4x.java.spi.configuration;

import org.apache.plc4x.java.spi.netty.NettyHashTimerTimeoutManager;
import org.apache.plc4x.java.spi.TimeoutManager;

public interface Configuration {

default TimeoutManager getTimeoutManager() {
return new NettyHashTimerTimeoutManager();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration conf
if (driverContext != null) {
protocol.setDriverContext(driverContext);
}
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(pipeline, passive, protocol, authentication, basePacketClass);
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(configuration.getTimeoutManager(), pipeline, passive, protocol, authentication, basePacketClass);
pipeline.addLast(context);
return protocol;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration conf
if (driverContextClass != null) {
protocol.setDriverContext(configure(configuration, createInstance(driverContextClass)));
}
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(pipeline, passive, protocol,
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(configuration.getTimeoutManager(), pipeline, passive, protocol,
authentication, basePacketClass);
pipeline.addLast(context);
return protocol;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.plc4x.java.spi.internal;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.time.Duration;
import java.util.function.Consumer;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.configuration.Configuration;
import org.apache.plc4x.java.spi.events.ConnectedEvent;
import org.apache.plc4x.java.spi.events.DisconnectedEvent;
import org.apache.plc4x.java.spi.events.DiscoveredEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultConversationContext<T1> implements ConversationContext<T1> {
private final Logger logger = LoggerFactory.getLogger(DefaultConversationContext.class);

private final Consumer<HandlerRegistration> handlerRegistrar;

private final ChannelHandlerContext channelHandlerContext;
private final PlcAuthentication authentication;
private final boolean passive;

public DefaultConversationContext(Consumer<HandlerRegistration> handlerRegistrar,
ChannelHandlerContext channelHandlerContext,
PlcAuthentication authentication,
boolean passive) {
this.handlerRegistrar = handlerRegistrar;
this.channelHandlerContext = channelHandlerContext;
this.authentication = authentication;
this.passive = passive;
}
@Override
public Channel getChannel() {
return channelHandlerContext.channel();
}

public PlcAuthentication getAuthentication() {
return authentication;
}

@Override
public boolean isPassive() {
return passive;
}

@Override
public void sendToWire(T1 msg) {
logger.trace("Sending to wire {}", msg);
channelHandlerContext.channel().writeAndFlush(msg);
}

@Override
public void fireConnected() {
logger.trace("Firing Connected!");
channelHandlerContext.pipeline().fireUserEventTriggered(new ConnectedEvent());
}

@Override
public void fireDisconnected() {
logger.trace("Firing Disconnected!");
channelHandlerContext.pipeline().fireUserEventTriggered(new DisconnectedEvent());
}

@Override
public void fireDiscovered(Configuration c) {
logger.trace("Firing Discovered!");
channelHandlerContext.pipeline().fireUserEventTriggered(new DiscoveredEvent(c));
}

@Override
public SendRequestContext<T1> sendRequest(T1 packet) {
return new DefaultSendRequestContext<>(handler -> {
logger.trace("Adding Response Handler ...");
handlerRegistrar.accept(handler);
}, packet, this);
}

@Override
public ExpectRequestContext<T1> expectRequest(Class<T1> clazz, Duration timeout) {
return new DefaultExpectRequestContext<>(handler -> {
logger.trace("Adding Request Handler ...");
handlerRegistrar.accept(handler);
}, clazz, timeout, this);
}
}

0 comments on commit fd174ff

Please sign in to comment.