Skip to content

Commit

Permalink
refactor(s7): Renamed some things and moved some classes to more refl…
Browse files Browse the repository at this point in the history
…ect the structure of other drivers
  • Loading branch information
chrisdutz committed May 11, 2023
1 parent e82f6e5 commit 5a890b7
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 47 deletions.
Expand Up @@ -20,6 +20,7 @@

import io.netty.buffer.ByteBuf;
import org.apache.plc4x.java.s7.readwrite.configuration.S7Configuration;
import org.apache.plc4x.java.s7.readwrite.connection.S7HGeneratedDriverBase;
import org.apache.plc4x.java.s7.readwrite.context.S7DriverContext;
import org.apache.plc4x.java.s7.readwrite.tag.S7Tag;
import org.apache.plc4x.java.s7.readwrite.optimizer.S7Optimizer;
Expand All @@ -33,7 +34,6 @@

import java.util.function.Consumer;
import java.util.function.ToIntFunction;
import org.apache.plc4x.java.s7.readwrite.protocol.S7HGeneratedDriverBase;

public class S7Driver extends S7HGeneratedDriverBase {

Expand Down
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.s7.readwrite.protocol;
package org.apache.plc4x.java.s7.readwrite.connection;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -115,9 +115,9 @@ public void connect() throws PlcConnectionException {

// Inject the configuration
ConfigurationFactory.configure(configuration, channelFactory);

if (secondaryChannelFactory != null)
if (secondaryChannelFactory != null) {
ConfigurationFactory.configure(configuration, secondaryChannelFactory);
}

channel = new EmbeddedChannel(getChannelHandler(sessionSetupCompleteFuture, sessionDisconnectCompleteFuture, sessionDiscoveredCompleteFuture));
channel.pipeline().addFirst(s7hmux);
Expand All @@ -133,14 +133,15 @@ public void connect() throws PlcConnectionException {
*/
doPrimaryTcpConnections();

if (secondaryChannelFactory != null)
if (secondaryChannelFactory != null) {
doSecondaryTcpConnections();
}

//If it is not possible to generate a TCP connection.
//Safety shutdown all executors in the channels.
if (primary_channel == null)
if (secondary_channel == null) {
sendChannelDisconectEvent();
sendChannelDisconnectEvent();
throw new PlcConnectionException("Connection is not possible.");
}

Expand Down Expand Up @@ -241,7 +242,7 @@ public void doSecondaryTcpConnections() {
* In this way, a controlled shutdown of the execution services is achieved.
* The user application must take the measures to make the connection again.
*/
protected void sendChannelDisconectEvent() {
protected void sendChannelDisconnectEvent() {
logger.trace("Channels was not created, firing DisconnectEvent Event");
// Send an event to the pipeline telling the Protocol filters what's going on.
channel.pipeline().fireUserEventTriggered(new DisconnectEvent());
Expand Down
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.s7.readwrite.protocol;
package org.apache.plc4x.java.s7.readwrite.connection;

import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.authentication.PlcAuthentication;
Expand All @@ -39,6 +39,9 @@

import static org.apache.plc4x.java.spi.configuration.ConfigurationFactory.configure;

/**
* Customized version of a GeneratedDriverBase that supports opening two connections to a remote host.
*/
public class S7HGeneratedDriverBase extends GeneratedDriverBase<TPKTPacket> {

private static final Logger logger = LoggerFactory.getLogger(S7HGeneratedDriverBase.class);
Expand All @@ -51,7 +54,7 @@ public class S7HGeneratedDriverBase extends GeneratedDriverBase<TPKTPacket> {

@Override
public PlcConnection getConnection(String connectionString) throws PlcConnectionException {
// Split up the connection string into it's individual segments.
// Split up the connection string into its individual segments.
Matcher smatcher = URI_PATTERN.matcher(connectionString);
Matcher hmatcher = URI_H_PATTERN.matcher(connectionString);
if (!smatcher.matches() && !hmatcher.matches()) {
Expand Down Expand Up @@ -106,25 +109,21 @@ public PlcConnection getConnection(String connectionString) throws PlcConnection
}
configure(configuration, channelFactory);

// Create an instance of the communication channel which the driver should use.
// Give drivers the option to customize the channel.
initializePipeline(channelFactory);

// Create an instance of the secondary communication channel the driver should use (When using a HA connection).
ChannelFactory secondaryChannelFactory = null;
if (hmatcher.matches()) {
secondaryChannelFactory = transport.createChannelFactory(transportConfig2);
if (secondaryChannelFactory == null) {
logger.info("Unable to get channel factory from url " + transportConfig2);
} else {
configure(configuration, secondaryChannelFactory);
initializePipeline(secondaryChannelFactory);
}
}

if (hmatcher.matches())
configure(configuration, secondaryChannelFactory);

// Give drivers the option to customize the channel.
initializePipeline(channelFactory);

// Give drivers the option to customize the channel.
if (hmatcher.matches())
initializePipeline(secondaryChannelFactory);

// Make the "await setup complete" overridable via system property.
boolean awaitSetupComplete = awaitSetupComplete();
if (System.getProperty(PROPERTY_PLC4X_FORCE_AWAIT_SETUP_COMPLETE) != null) {
Expand Down
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.s7.readwrite.protocol;
package org.apache.plc4x.java.s7.readwrite.connection;

import io.netty.channel.Channel;

Expand Down
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.s7.readwrite.protocol;
package org.apache.plc4x.java.s7.readwrite.connection;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class S7HMuxImpl extends MessageToMessageCodec<ByteBuf, ByteBuf> implemen
* both TCP channels are disconnected.
* Default value: false
*/
final static AttributeKey<Boolean> IS_CONNECTED = AttributeKey.valueOf("IS_CONNECTED");
public final static AttributeKey<Boolean> IS_CONNECTED = AttributeKey.valueOf("IS_CONNECTED");

/*
* This attribute indicates to the other handlers which channel is being used,
Expand All @@ -65,7 +65,7 @@ public class S7HMuxImpl extends MessageToMessageCodec<ByteBuf, ByteBuf> implemen
* the correct values will be defined in the connection URL.
* Default value: true
*/
final static AttributeKey<Boolean> IS_PRIMARY = AttributeKey.valueOf("IS_PRIMARY");
public final static AttributeKey<Boolean> IS_PRIMARY = AttributeKey.valueOf("IS_PRIMARY");

/*
* This is the maximum waiting time for reading on the TCP channel.
Expand All @@ -76,31 +76,31 @@ public class S7HMuxImpl extends MessageToMessageCodec<ByteBuf, ByteBuf> implemen
* will be restored automatically, which is done every 4 seconds.
* Default value: 8 sec.
*/
final static AttributeKey<Integer> READ_TIME_OUT = AttributeKey.valueOf("READ_TIME_OUT");
public final static AttributeKey<Integer> READ_TIME_OUT = AttributeKey.valueOf("READ_TIME_OUT");

/*
* If your application requires sampling times greater than the
* set "watchdog" time, it is important that the PING option is activated,
* this will prevent the TCP channel from being closed unnecessarily.
* Default value: false
*/
final static AttributeKey<Boolean> IS_PING_ACTIVE = AttributeKey.valueOf("IS_PIN_ACTIVE");
public final static AttributeKey<Boolean> IS_PING_ACTIVE = AttributeKey.valueOf("IS_PIN_ACTIVE");

/*
* Time value in seconds at which the execution of the PING will be scheduled.
* Generally set by developer experience, but generally should be the same
* as READ_TIME_OUT / 2.
* Default value: -1
*/
final static AttributeKey<Integer> PING_TIME = AttributeKey.valueOf("PING_TIME");
public final static AttributeKey<Integer> PING_TIME = AttributeKey.valueOf("PING_TIME");

/*
* Time for supervision of TCP channels. If the channel is not active,
* a safe stop of the EventLoop must be performed, to ensure that
* no additional tasks are created.
* Default value: 4
*/
final static AttributeKey<Integer> RETRY_TIME = AttributeKey.valueOf("RETRY_TIME");
public final static AttributeKey<Integer> RETRY_TIME = AttributeKey.valueOf("RETRY_TIME");

ChannelHandlerContext embed_ctx = null;
protected Channel embeded_channel = null;
Expand Down
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.s7.readwrite.protocol;
package org.apache.plc4x.java.s7.readwrite.exceptions;

public class S7HAbortRequestException {

Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.s7.events.S7CyclicEvent;
import org.apache.plc4x.java.s7.readwrite.*;
import org.apache.plc4x.java.s7.readwrite.connection.S7HMuxImpl;
import org.apache.plc4x.java.s7.readwrite.context.S7DriverContext;
import org.apache.plc4x.java.s7.readwrite.tag.*;
import org.apache.plc4x.java.s7.readwrite.types.S7ControllerType;
Expand Down Expand Up @@ -297,14 +298,14 @@ public CompletableFuture<PlcReadResponse> read(PlcReadRequest readRequest) {
* Maps the S7ReadResponse of a PlcReadRequest to a PlcReadResponse
*/
private CompletableFuture<PlcReadResponse> toPlcReadResponse(PlcReadRequest readRequest, CompletableFuture<S7Message> response) {
CompletableFuture<PlcReadResponse> client_future = new CompletableFuture<>();
activeRequests.get(response).setRight(client_future);
CompletableFuture<PlcReadResponse> clientFuture = new CompletableFuture<>();
activeRequests.get(response).setRight(clientFuture);

try {
clientExecutorService.execute(() -> {
try {
PlcReadResponse plcItems = (PlcReadResponse) decodeReadResponse(response.get(), readRequest);
client_future.complete(plcItems);
clientFuture.complete(plcItems);
} catch (Exception e) {
logger.info("uh", e);
}
Expand All @@ -313,7 +314,7 @@ private CompletableFuture<PlcReadResponse> toPlcReadResponse(PlcReadRequest read
logger.info("uh", e);
}

return client_future;
return clientFuture;

// TODO: whoever out commented this describe why it is out commented and describe what the above does different
// return response
Expand Down Expand Up @@ -441,7 +442,7 @@ public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionReque
future.completeExceptionally(new PlcRuntimeException("Disconnected"));
return future;
}
if (!isFeatureSupported()) {
if (!isSubscriptionSupported()) {
CompletableFuture<PlcSubscriptionResponse> future = new CompletableFuture<>();
future.completeExceptionally(new PlcRuntimeException("Not Supported"));
return future;
Expand Down Expand Up @@ -568,7 +569,7 @@ public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptio
future.completeExceptionally(new PlcRuntimeException("Disconnected"));
return future;
}
if (!isFeatureSupported()) {
if (!isSubscriptionSupported()) {
CompletableFuture<PlcUnsubscriptionResponse> future = new CompletableFuture<>();
future.completeExceptionally(new PlcRuntimeException("Not Supported"));
return future;
Expand Down Expand Up @@ -1401,8 +1402,6 @@ private PlcResponse decodeReadResponse(S7Message responseMessage, PlcReadRequest
for (String tagName : plcReadRequest.getTagNames()) {

if (plcReadRequest.getTag(tagName) instanceof S7SzlTag) {

S7SzlTag tag = (S7SzlTag) plcReadRequest.getTag(tagName);
S7PayloadUserDataItemCpuFunctionReadSzlResponse payloadItem = (S7PayloadUserDataItemCpuFunctionReadSzlResponse) payloadItems.get(index);
responseCode = decodeResponseCode(payloadItem.getReturnCode());

Expand All @@ -1414,18 +1413,15 @@ private PlcResponse decodeReadResponse(S7Message responseMessage, PlcReadRequest
for (byte b : data) plcvalues.add(new PlcSINT(b));

if (parameteritem.getLastDataUnit() == 1) {
CompletableFuture<S7MessageUserData> next_future = null;
S7ParameterUserData next_parameter = null;
S7PayloadUserData next_payload = null;
S7PayloadUserDataItemCpuFunctionReadSzlResponse next_payloadItem = null;
CompletableFuture<S7MessageUserData> nextFuture;
S7ParameterUserData next_parameter;
S7PayloadUserData next_payload;
S7PayloadUserDataItemCpuFunctionReadSzlResponse next_payloadItem;

while (parameteritem.getLastDataUnit() == 1) {
//TODO: Just wait for one answer!. Pending for other packages for rearm.
next_future = reassembledMessage(parameteritem.getSequenceNumber());

S7MessageUserData msg = null;

msg = next_future.get();
nextFuture = reassembledMessage(parameteritem.getSequenceNumber());
S7MessageUserData msg = nextFuture.get();
if (msg != null) {
next_parameter = (S7ParameterUserData) msg.getParameter();
parameteritem = (S7ParameterUserDataItemCPUFunctions) next_parameter.getItems().get(0);
Expand Down Expand Up @@ -1742,7 +1738,7 @@ private void setChannelFeatures() {
}


private boolean isFeatureSupported() {
private boolean isSubscriptionSupported() {
return (s7DriverContext.getControllerType() == S7ControllerType.S7_300) ||
(s7DriverContext.getControllerType() == S7ControllerType.S7_400);
}
Expand Down

0 comments on commit 5a890b7

Please sign in to comment.