Skip to content

Commit

Permalink
feat(plc4j) Better handling of timeouts in plc4j (#821).
Browse files Browse the repository at this point in the history
Signed-off-by: Łukasz Dywicki <luke@code-house.org>
  • Loading branch information
splatch committed Feb 23, 2023
1 parent 65346de commit 15114aa
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,26 @@
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.MessageToMessageCodec;
import io.vavr.control.Either;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.spi.TimeoutManager.CompletionCallback;
import org.apache.plc4x.java.spi.configuration.Configuration;
import org.apache.plc4x.java.spi.events.*;
import org.apache.plc4x.java.spi.internal.DefaultConversationContext;
import org.apache.plc4x.java.spi.internal.DefaultExpectRequestContext;
import org.apache.plc4x.java.spi.internal.DefaultSendRequestContext;
import org.apache.plc4x.java.spi.internal.HandlerRegistration;
import org.apache.plc4x.java.spi.netty.NettyHashTimerTimeoutManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -55,15 +58,23 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
private final Queue<HandlerRegistration> registeredHandlers;
private final ChannelPipeline pipeline;
private final boolean passive;
private final TimeoutManager timeoutManager;

public Plc4xNettyWrapper(ChannelPipeline pipeline, boolean passive, Plc4xProtocolBase<T> protocol,
PlcAuthentication authentication, Class<T> clazz) {
this(new NettyHashTimerTimeoutManager(), pipeline, passive, protocol, authentication, clazz);
}

public Plc4xNettyWrapper(TimeoutManager timeoutManager, ChannelPipeline pipeline, boolean passive, Plc4xProtocolBase<T> protocol,
PlcAuthentication authentication, Class<T> clazz) {
super(clazz, Object.class);
this.pipeline = pipeline;
this.passive = passive;
this.registeredHandlers = new ConcurrentLinkedQueue<>();
this.protocolBase = protocol;
this.authentication = authentication;
this.timeoutManager = timeoutManager;
//this.protocolBase.setContext(new DefaultConversationContext<>(this::registerHandler, pipeline.context(""), authentication, passive));
this.protocolBase.setContext(new ConversationContext<T>() {

@Override
Expand Down Expand Up @@ -102,19 +113,15 @@ public void fireDiscovered(Configuration c) {
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public SendRequestContext<T> sendRequest(T packet) {
return new DefaultSendRequestContext<>(handler -> {
logger.trace("Adding Response Handler ...");
registeredHandlers.add(handler);
}, packet, this);
return new DefaultSendRequestContext<>(Plc4xNettyWrapper.this::registerHandler, packet, this);
}

@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public ExpectRequestContext<T> expectRequest(Class<T> clazz, Duration timeout) {
return new DefaultExpectRequestContext<>(handler -> {
logger.trace("Adding Request Handler ...");
registeredHandlers.add(handler);
}, clazz, timeout, this);
return new DefaultExpectRequestContext<>(Plc4xNettyWrapper.this::registerHandler, clazz, timeout, this);
}

});
Expand Down Expand Up @@ -151,15 +158,6 @@ protected void decode(ChannelHandlerContext channelHandlerContext, T t, List<Obj
continue;
}
// Timeout?
final Instant now = Instant.now();
if (registration.getTimeoutAt().isBefore(now)) {
logger.debug("Removing {} as its timed out (timeout of {} was set till {} and now is {})",
registration, registration.getTimeout(), registration.getTimeoutAt(), now);
// pass timeout back to caller so it can do ie. transaction compensation
registration.getOnTimeoutConsumer().accept(new TimeoutException());
iter.remove();
continue;
}
logger.trace("Checking handler {} for Object of type {}", registration, t.getClass().getSimpleName());
if (registration.getExpectClazz().isInstance(t)) {
logger.trace("Handler {} has right expected type {}, checking condition", registration, registration.getExpectClazz().getSimpleName());
Expand Down Expand Up @@ -191,7 +189,7 @@ protected void decode(ChannelHandlerContext channelHandlerContext, T t, List<Obj
}
}
logger.trace("None of {} registered handlers could handle message {}, using default decode method", this.registeredHandlers.size(), t);
protocolBase.decode(new DefaultConversationContext<>(channelHandlerContext, authentication, passive), t);
protocolBase.decode(new DefaultConversationContext<>(this::registerHandler, channelHandlerContext, authentication, passive), t);
}

@Override
Expand All @@ -200,85 +198,60 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
// by sending a connection request to the plc.
logger.debug("User Event triggered {}", evt);
if (evt instanceof ConnectEvent) {
this.protocolBase.onConnect(new DefaultConversationContext<>(ctx, authentication, passive));
this.protocolBase.onConnect(new DefaultConversationContext<>(this::registerHandler, ctx, authentication, passive));
} else if (evt instanceof DisconnectEvent) {
this.protocolBase.onDisconnect(new DefaultConversationContext<>(ctx, authentication, passive));
this.protocolBase.onDisconnect(new DefaultConversationContext<>(this::registerHandler, ctx, authentication, passive));
} else if (evt instanceof DiscoverEvent) {
this.protocolBase.onDiscover(new DefaultConversationContext<>(ctx, authentication, passive));
this.protocolBase.onDiscover(new DefaultConversationContext<>(this::registerHandler, ctx, authentication, passive));
} else if (evt instanceof CloseConnectionEvent) {
this.protocolBase.close(new DefaultConversationContext<>(ctx, authentication, passive));
this.protocolBase.close(new DefaultConversationContext<>(this::registerHandler, ctx, authentication, passive));
} else {
super.userEventTriggered(ctx, evt);
}
}

public class DefaultConversationContext<T1> implements ConversationContext<T1> {
private final ChannelHandlerContext channelHandlerContext;

private final PlcAuthentication authentication;
private final boolean passive;

public DefaultConversationContext(ChannelHandlerContext channelHandlerContext,
PlcAuthentication authentication,
boolean passive) {
this.channelHandlerContext = channelHandlerContext;
this.authentication = authentication;
this.passive = passive;
}

@Override
public Channel getChannel() {
return channelHandlerContext.channel();
}

public PlcAuthentication getAuthentication() {
return authentication;
}

@Override
public boolean isPassive() {
return passive;
}

@Override
public void sendToWire(T1 msg) {
logger.trace("Sending to wire {}", msg);
channelHandlerContext.channel().writeAndFlush(msg);
}

@Override
public void fireConnected() {
logger.trace("Firing Connected!");
channelHandlerContext.pipeline().fireUserEventTriggered(new ConnectedEvent());
}

@Override
public void fireDisconnected() {
logger.trace("Firing Disconnected!");
channelHandlerContext.pipeline().fireUserEventTriggered(new DisconnectedEvent());
}

@Override
public void fireDiscovered(Configuration c) {
logger.trace("Firing Discovered!");
channelHandlerContext.pipeline().fireUserEventTriggered(new DiscoveredEvent(c));
}
/**
* Performs registration of packet handler and makes sure that its timeout will be handled properly.
*
* Since timeouts are controlled by {@link TimeoutManager} there is a need to decorate handler
* operations so both sides know what's going on.
*
* @param handler Handler to be registered.
*/
private void registerHandler(HandlerRegistration handler) {
AtomicReference<HandlerRegistration> deferred = new AtomicReference<>();
CompletionCallback completionCallback = this.timeoutManager.register(new TimedOperation() {
@Override
public Consumer<TimeoutException> getOnTimeoutConsumer() {
return onTimeout(deferred, handler.getOnTimeoutConsumer());
}

@Override
public SendRequestContext<T1> sendRequest(T1 packet) {
return new DefaultSendRequestContext<>(handler -> {
logger.trace("Adding Response Handler ...");
registeredHandlers.add(handler);
}, packet, this);
}
@Override
public Duration getTimeout() {
return handler.getTimeout();
}
});
// wrap handler, so we can catch packet consumer call and inform completion callback.
HandlerRegistration registration = new HandlerRegistration(
handler.getCommands(),
handler.getExpectClazz(),
completionCallback.andThen(handler.getPacketConsumer()),
handler.getOnTimeoutConsumer(),
handler.getErrorConsumer(),
handler.getTimeout()
);
deferred.set(registration);
registeredHandlers.add(registration);
}

@Override
public ExpectRequestContext<T1> expectRequest(Class<T1> clazz, Duration timeout) {
return new DefaultExpectRequestContext<>(handler -> {
logger.trace("Adding Request Handler ...");
registeredHandlers.add(handler);
}, clazz, timeout, this);
}
private Consumer<TimeoutException> onTimeout(AtomicReference<HandlerRegistration> reference, Consumer<TimeoutException> onTimeoutConsumer) {
return new Consumer<TimeoutException>() {
@Override
public void accept(TimeoutException e) {
registeredHandlers.remove(reference.get());
onTimeoutConsumer.accept(e);
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.spi;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

public interface TimedOperation {

Consumer<TimeoutException> getOnTimeoutConsumer();

Duration getTimeout();


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.spi;

import java.util.function.Consumer;

public interface TimeoutManager {

CompletionCallback register(TimedOperation operation);

void stop();

interface CompletionCallback<T> extends Consumer<T> {

void complete();

@Override
default void accept(T data) {
complete();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
*/
package org.apache.plc4x.java.spi.configuration;

import org.apache.plc4x.java.spi.netty.NettyHashTimerTimeoutManager;
import org.apache.plc4x.java.spi.TimeoutManager;

public interface Configuration {

default TimeoutManager getTimeoutManager() {
return new NettyHashTimerTimeoutManager();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration conf
if (driverContext != null) {
protocol.setDriverContext(driverContext);
}
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(pipeline, passive, protocol, authentication, basePacketClass);
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(configuration.getTimeoutManager(), pipeline, passive, protocol, authentication, basePacketClass);
pipeline.addLast(context);
return protocol;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration conf
if (driverContextClass != null) {
protocol.setDriverContext(configure(configuration, createInstance(driverContextClass)));
}
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(pipeline, passive, protocol,
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(configuration.getTimeoutManager(), pipeline, passive, protocol,
authentication, basePacketClass);
pipeline.addLast(context);
return protocol;
Expand Down
Loading

0 comments on commit 15114aa

Please sign in to comment.