Skip to content

Commit

Permalink
fix(plc4j): Changed the SPI to pass along PlcAuthentication informati…
Browse files Browse the repository at this point in the history
…on to the ProtocolLogic
  • Loading branch information
chrisdutz committed Aug 30, 2022
1 parent 16c7d37 commit 1045cf0
Show file tree
Hide file tree
Showing 15 changed files with 101 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,22 @@ public DriverContext getDriverContext() {

@Override
public void setContext(ConversationContext<C> context) {
delegate.setContext(new ConversationContextWrapper<>(context, wireType, adapter, frameHandler));
delegate.setContext(new ConversationContextWrapper<>(context, wireType, adapter, frameHandler, context.getAuthentication()));
}

@Override
public void onConnect(ConversationContext<C> context) {
delegate.onConnect(new ConversationContextWrapper<>(context, wireType, adapter, frameHandler));
delegate.onConnect(new ConversationContextWrapper<>(context, wireType, adapter, frameHandler, context.getAuthentication()));
}

@Override
public void onDisconnect(ConversationContext<C> context) {
delegate.onDisconnect(new ConversationContextWrapper<>(context, wireType, adapter, frameHandler));
delegate.onDisconnect(new ConversationContextWrapper<>(context, wireType, adapter, frameHandler, context.getAuthentication()));
}

@Override
protected void decode(ConversationContext<C> context, C msg) throws Exception {
delegate.decode(new ConversationContextWrapper<>(context, wireType, adapter, frameHandler), frameHandler.fromCAN(adapter.apply(msg)));
delegate.decode(new ConversationContextWrapper<>(context, wireType, adapter, frameHandler, context.getAuthentication()), frameHandler.fromCAN(adapter.apply(msg)));
}

@Override
Expand All @@ -102,7 +102,7 @@ public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptio

@Override
public void close(ConversationContext<C> context) {
delegate.close(new ConversationContextWrapper<>(context, wireType, adapter, frameHandler));
delegate.close(new ConversationContextWrapper<>(context, wireType, adapter, frameHandler, context.getAuthentication()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Duration;
import java.util.function.Function;

import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.configuration.Configuration;
import org.apache.plc4x.java.transport.can.CANTransport.FrameHandler;
Expand All @@ -34,11 +35,19 @@ public class ConversationContextWrapper<C, T> implements ConversationContext<T>
private final Function<C, FrameData> adapter;
private final FrameHandler<C, T> frameHandler;

public ConversationContextWrapper(ConversationContext<C> delegate, Class<C> wireType, Function<C, FrameData> adapter, FrameHandler<C, T> frameHandler) {
private final PlcAuthentication authentication;

public ConversationContextWrapper(ConversationContext<C> delegate, Class<C> wireType, Function<C, FrameData> adapter, FrameHandler<C, T> frameHandler, PlcAuthentication authentication) {
this.delegate = delegate;
this.wireType = wireType;
this.adapter = adapter;
this.frameHandler = frameHandler;
this.authentication = authentication;
}

@Override
public PlcAuthentication getAuthentication() {
return authentication;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.plc4x.java.opcua;

import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.opcua.field.OpcuaField;
import org.apache.plc4x.java.opcua.field.OpcuaPlcFieldHandler;
Expand Down Expand Up @@ -237,12 +236,8 @@ public PlcConnection getConnection(String connectionString) throws PlcConnection
awaitDisconnectComplete,
awaitDiscoverComplete,
getStackConfigurer(),
getOptimizer());
}

@Override
public PlcConnection getConnection(String url, PlcAuthentication authentication) throws PlcConnectionException {
throw new PlcConnectionException("Authentication not supported.");
getOptimizer(),
null);
}

/** Estimate the Length of a Packet */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public class SimulatedConnection extends AbstractPlcConnection implements PlcRea
private final Map<Integer, Consumer<PlcSubscriptionEvent>> consumerIdMap = new ConcurrentHashMap<>();

public SimulatedConnection(SimulatedDevice device) {
super(true, true, true, false, new SimulatedFieldHandler(), new IEC61131ValueHandler(), null);
super(true, true, true, false,
new SimulatedFieldHandler(), new IEC61131ValueHandler(), null, null);
this.device = device;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.plc4x.java.spi;

import io.netty.channel.Channel;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.spi.configuration.Configuration;

Expand All @@ -31,6 +32,8 @@

public interface ConversationContext<T> {

PlcAuthentication getAuthentication();

Channel getChannel();

boolean isPassive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.MessageToMessageCodec;
import io.vavr.control.Either;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.spi.configuration.Configuration;
import org.apache.plc4x.java.spi.events.*;
import org.apache.plc4x.java.spi.internal.DefaultExpectRequestContext;
Expand All @@ -48,17 +49,28 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
private static final Logger logger = LoggerFactory.getLogger(Plc4xNettyWrapper.class);

private final Plc4xProtocolBase<T> protocolBase;

private final PlcAuthentication authentication;

private final Queue<HandlerRegistration> registeredHandlers;
private final ChannelPipeline pipeline;
private final boolean passive;

public Plc4xNettyWrapper(ChannelPipeline pipeline, boolean passive, Plc4xProtocolBase<T> protocol, Class<T> clazz) {
public Plc4xNettyWrapper(ChannelPipeline pipeline, boolean passive, Plc4xProtocolBase<T> protocol,
PlcAuthentication authentication, Class<T> clazz) {
super(clazz, Object.class);
this.pipeline = pipeline;
this.passive = passive;
this.registeredHandlers = new ConcurrentLinkedQueue<>();
this.protocolBase = protocol;
this.authentication = authentication;
this.protocolBase.setContext(new ConversationContext<T>() {

@Override
public PlcAuthentication getAuthentication() {
return authentication;
}

@Override
public Channel getChannel() {
return pipeline.channel();
Expand Down Expand Up @@ -179,7 +191,7 @@ protected void decode(ChannelHandlerContext channelHandlerContext, T t, List<Obj
}
}
logger.trace("None of {} registered handlers could handle message {}, using default decode method", this.registeredHandlers.size(), t);
protocolBase.decode(new DefaultConversationContext<>(channelHandlerContext, passive), t);
protocolBase.decode(new DefaultConversationContext<>(channelHandlerContext, authentication, passive), t);
}

@Override
Expand All @@ -188,24 +200,29 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
// by sending a connection request to the plc.
logger.debug("User Event triggered {}", evt);
if (evt instanceof ConnectEvent) {
this.protocolBase.onConnect(new DefaultConversationContext<>(ctx, passive));
this.protocolBase.onConnect(new DefaultConversationContext<>(ctx, authentication, passive));
} else if (evt instanceof DisconnectEvent) {
this.protocolBase.onDisconnect(new DefaultConversationContext<>(ctx, passive));
this.protocolBase.onDisconnect(new DefaultConversationContext<>(ctx, authentication, passive));
} else if (evt instanceof DiscoverEvent) {
this.protocolBase.onDiscover(new DefaultConversationContext<>(ctx, passive));
this.protocolBase.onDiscover(new DefaultConversationContext<>(ctx, authentication, passive));
} else if (evt instanceof CloseConnectionEvent) {
this.protocolBase.close(new DefaultConversationContext<>(ctx, passive));
this.protocolBase.close(new DefaultConversationContext<>(ctx, authentication, passive));
} else {
super.userEventTriggered(ctx, evt);
}
}

public class DefaultConversationContext<T1> implements ConversationContext<T1> {
private final ChannelHandlerContext channelHandlerContext;

private final PlcAuthentication authentication;
private final boolean passive;

public DefaultConversationContext(ChannelHandlerContext channelHandlerContext, boolean passive) {
public DefaultConversationContext(ChannelHandlerContext channelHandlerContext,
PlcAuthentication authentication,
boolean passive) {
this.channelHandlerContext = channelHandlerContext;
this.authentication = authentication;
this.passive = passive;
}

Expand All @@ -214,6 +231,10 @@ public Channel getChannel() {
return channelHandlerContext.channel();
}

public PlcAuthentication getAuthentication() {
return authentication;
}

@Override
public boolean isPassive() {
return passive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.commons.lang3.NotImplementedException;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcUnsupportedOperationException;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.metadata.PlcConnectionMetadata;
Expand Down Expand Up @@ -50,6 +51,7 @@ public abstract class AbstractPlcConnection implements PlcConnection, PlcConnect
private PlcValueHandler valueHandler;
private Plc4xProtocolBase<?> protocol;
private BaseOptimizer optimizer;
private PlcAuthentication authentication;

/**
* @deprecated only for compatibility reasons.
Expand All @@ -58,15 +60,17 @@ public abstract class AbstractPlcConnection implements PlcConnection, PlcConnect
protected AbstractPlcConnection() {
}

protected AbstractPlcConnection(boolean canRead, boolean canWrite, boolean canSubscribe, boolean canBrowse, PlcFieldHandler fieldHandler, PlcValueHandler valueHandler,
BaseOptimizer optimizer) {
protected AbstractPlcConnection(boolean canRead, boolean canWrite, boolean canSubscribe, boolean canBrowse,
PlcFieldHandler fieldHandler, PlcValueHandler valueHandler,
BaseOptimizer optimizer, PlcAuthentication authentication) {
this.canRead = canRead;
this.canWrite = canWrite;
this.canSubscribe = canSubscribe;
this.canBrowse = canBrowse;
this.fieldHandler = fieldHandler;
this.valueHandler = valueHandler;
this.optimizer = optimizer;
this.authentication = authentication;
}

public void setProtocol(Plc4xProtocolBase<?> protocol) {
Expand Down Expand Up @@ -113,6 +117,10 @@ public PlcValueHandler getPlcValueHandler() {
return this.valueHandler;
}

protected PlcAuthentication getAuthentication() {
return authentication;
}

@Override
public PlcReadRequest.Builder readRequestBuilder() {
if (!canRead()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.MessageToMessageCodec;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.listener.EventListener;
import org.apache.plc4x.java.spi.Plc4xNettyWrapper;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class CustomProtocolStackConfigurer<BASE_PACKET_CLASS extends Message> im
private final Function<Configuration, ? extends ToIntFunction<ByteBuf>> packetSizeEstimator;
private final Function<Configuration, ? extends Consumer<ByteBuf>> corruptPacketRemover;
private final MessageToMessageCodec<ByteBuf, ByteBuf> encryptionHandler;

private final Object[] parserArgs;

public static <BPC extends Message> CustomProtocolStackBuilder<BPC> builder(Class<BPC> basePacketClass, Function<Configuration, ? extends MessageInput<BPC>> messageInput) {
Expand Down Expand Up @@ -87,8 +89,9 @@ private ChannelHandler getMessageCodec(Configuration configuration) {

/** Applies the given Stack to the Pipeline */
@Override
public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(
Configuration configuration, ChannelPipeline pipeline, boolean passive, List<EventListener> ignore) {
public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(Configuration configuration, ChannelPipeline pipeline,
PlcAuthentication authentication, boolean passive,
List<EventListener> ignore) {
if (this.encryptionHandler != null) {
pipeline.addLast(this.encryptionHandler);
}
Expand All @@ -98,7 +101,7 @@ public Plc4xProtocolBase<BASE_PACKET_CLASS> configurePipeline(
if (driverContext != null) {
protocol.setDriverContext(driverContext);
}
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(pipeline, passive, protocol, basePacketClass);
Plc4xNettyWrapper<BASE_PACKET_CLASS> context = new Plc4xNettyWrapper<>(pipeline, passive, protocol, authentication, basePacketClass);
pipeline.addLast(context);
return protocol;
}
Expand Down Expand Up @@ -173,7 +176,8 @@ public CustomProtocolStackBuilder<BASE_PACKET_CLASS> withEncryptionHandler(Messa
public CustomProtocolStackConfigurer<BASE_PACKET_CLASS> build() {
assert this.protocol != null;
return new CustomProtocolStackConfigurer<>(
basePacketClass, byteOrder, parserArgs, protocol, driverContext, messageInput, packetSizeEstimator, corruptPacketRemover, encryptionHandler);
basePacketClass, byteOrder, parserArgs, protocol, driverContext, messageInput, packetSizeEstimator,
corruptPacketRemover, encryptionHandler);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import org.apache.plc4x.java.api.EventPlcConnection;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcIoException;
import org.apache.plc4x.java.api.listener.ConnectionStateListener;
Expand All @@ -33,7 +34,6 @@
import org.apache.plc4x.java.spi.optimizer.BaseOptimizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.plc4x.java.api.value.PlcValueHandler;

import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -54,6 +54,7 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection implements

protected final Configuration configuration;
protected final ChannelFactory channelFactory;

protected final boolean awaitSessionSetupComplete;
protected final boolean awaitSessionDisconnectComplete;
protected final boolean awaitSessionDiscoverComplete;
Expand All @@ -68,8 +69,9 @@ public DefaultNettyPlcConnection(boolean canRead, boolean canWrite, boolean canS
PlcFieldHandler fieldHandler, PlcValueHandler valueHandler, Configuration configuration,
ChannelFactory channelFactory, boolean awaitSessionSetupComplete,
boolean awaitSessionDisconnectComplete, boolean awaitSessionDiscoverComplete,
ProtocolStackConfigurer stackConfigurer, BaseOptimizer optimizer) {
super(canRead, canWrite, canSubscribe, canBrowse, fieldHandler, valueHandler, optimizer);
ProtocolStackConfigurer stackConfigurer, BaseOptimizer optimizer,
PlcAuthentication authentication) {
super(canRead, canWrite, canSubscribe, canBrowse, fieldHandler, valueHandler, optimizer, authentication);
this.configuration = configuration;
this.channelFactory = channelFactory;
this.awaitSessionSetupComplete = awaitSessionSetupComplete;
Expand Down Expand Up @@ -220,7 +222,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
// Initialize via Transport Layer
channelFactory.initializePipeline(pipeline);
// Initialize Protocol Layer
setProtocol(stackConfigurer.configurePipeline(configuration, pipeline, channelFactory.isPassive()));
setProtocol(stackConfigurer.configurePipeline(configuration, pipeline, getAuthentication(),
channelFactory.isPassive()));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ protected void initializePipeline(ChannelFactory channelFactory) {

@Override
public PlcConnection getConnection(String connectionString) throws PlcConnectionException {
return getConnection(connectionString, null);
}

@Override
public PlcConnection getConnection(String connectionString, PlcAuthentication authentication) throws PlcConnectionException {
// Split up the connection string into its individual segments.
Matcher matcher = URI_PATTERN.matcher(connectionString);
if (!matcher.matches()) {
Expand All @@ -122,7 +127,7 @@ public PlcConnection getConnection(String connectionString) throws PlcConnection
throw new PlcConnectionException("Unsupported configuration");
}

// Try to find a transport in order to create a communication channel.
// Try to find a suitable transport-type for creating the communication channel.
Transport transport = null;
ServiceLoader<Transport> transportLoader = ServiceLoader.load(
Transport.class, Thread.currentThread().getContextClassLoader());
Expand Down Expand Up @@ -177,12 +182,8 @@ public PlcConnection getConnection(String connectionString) throws PlcConnection
awaitDisconnectComplete,
awaitDiscoverComplete,
getStackConfigurer(transport),
getOptimizer());
}

@Override
public PlcConnection getConnection(String url, PlcAuthentication authentication) throws PlcConnectionException {
throw new PlcConnectionException("Authentication not supported.");
getOptimizer(),
authentication);
}


Expand Down
Loading

0 comments on commit 1045cf0

Please sign in to comment.