Skip to content

Commit

Permalink
feat(plc4j/ads): Subscriptions for ADS in PLC4J (#265)
Browse files Browse the repository at this point in the history
* protocols: ADS - split reserved field in two fields

The AddDeviceNotification message has an 128 bit reserved field and
sending the message throws a ParseException: "Unsigned Big Integer
can only contain max 64 bits"
So the reserved field is split into two 64 bit fields.

* plc4j: driver-ads returns true on canSubscribe()

* plc4j: add subscriptions to driver-ads

Co-authored-by: Sebastian Rühl <sruehl@apache.org>
  • Loading branch information
rmeister and sruehl committed Dec 4, 2021
1 parent becdd35 commit 5d4eb0a
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ protected boolean canWrite() {
return true;
}

@Override
protected boolean canSubscribe() {
return true;
}

@Override
protected Class<? extends Configuration> getConfigurationType() {
return AdsConfiguration.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,43 @@

import org.apache.plc4x.java.ads.configuration.AdsConfiguration;
import org.apache.plc4x.java.ads.field.*;
import org.apache.plc4x.java.ads.model.AdsSubscriptionHandle;
import org.apache.plc4x.java.ads.readwrite.*;
import org.apache.plc4x.java.ads.readwrite.io.DataItemIO;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.messages.*;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcField;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.api.types.PlcResponseCode;
import org.apache.plc4x.java.api.types.PlcSubscriptionType;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.spi.ConversationContext;
import org.apache.plc4x.java.spi.Plc4xProtocolBase;
import org.apache.plc4x.java.spi.configuration.HasConfiguration;
import org.apache.plc4x.java.spi.generation.*;
import org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse;
import org.apache.plc4x.java.spi.messages.DefaultPlcWriteResponse;
import org.apache.plc4x.java.spi.messages.*;
import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionField;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
import org.apache.plc4x.java.spi.values.IEC61131ValueHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigInteger;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements HasConfiguration<AdsConfiguration> {
public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements HasConfiguration<AdsConfiguration>, PlcSubscriber {

private static final Logger LOGGER = LoggerFactory.getLogger(AdsProtocolLogic.class);

Expand All @@ -62,6 +67,8 @@ public class AdsProtocolLogic extends Plc4xProtocolBase<AmsTCPPacket> implements
private final AtomicLong invokeIdGenerator = new AtomicLong(1);
private RequestTransactionManager tm;

private Map<DefaultPlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> consumers = new ConcurrentHashMap<>();

private final ConcurrentHashMap<SymbolicAdsField, DirectAdsField> symbolicFieldMapping;
private final ConcurrentHashMap<SymbolicAdsField, CompletableFuture<Void>> pendingResolutionRequests;

Expand Down Expand Up @@ -550,6 +557,265 @@ protected PlcWriteResponse convertToPlc4xWriteResponse(PlcWriteRequest writeRequ
return new DefaultPlcWriteResponse(writeRequest, responseCodes);
}

@Override
public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
// Get all ADS addresses in their resolved state.
final CompletableFuture<List<DirectAdsField>> directAdsFieldsFuture =
getDirectAddresses(subscriptionRequest.getFields()
.stream()
.map(field -> ((DefaultPlcSubscriptionField) field).getPlcField())
.collect(Collectors.toList()));

// If all addresses were already resolved we can send the request immediately.
if (directAdsFieldsFuture.isDone()) {
final List<DirectAdsField> fields = directAdsFieldsFuture.getNow(null);
if (fields != null) {
return executeSubscribe(subscriptionRequest);
} else {
final CompletableFuture<PlcSubscriptionResponse> errorFuture = new CompletableFuture<>();
errorFuture.completeExceptionally(new PlcException("Fields are null"));
return errorFuture;
}
}
// If there are still symbolic addresses that have to be resolved, send the
// request as soon as the resolution is done.
// In order to instantly be able to return a future, for the final result we have to
// create a new one which is then completed later on. Unfortunately as soon as the
// directAdsFieldsFuture is completed we still don't have the end result, but we can
// now actually send the delayed read request ... as soon as that future completes
// we can complete the initial one.
else {
CompletableFuture<PlcSubscriptionResponse> delayedSubscribe = new CompletableFuture<>();
directAdsFieldsFuture.handle((directAdsFields, throwable) -> {
if (directAdsFields != null) {
final CompletableFuture<PlcSubscriptionResponse> delayedResponse =
executeSubscribe(subscriptionRequest);
delayedResponse.handle((plcSubscribeResponse, throwable1) -> {
if (plcSubscribeResponse != null) {
delayedSubscribe.complete(plcSubscribeResponse);
} else {
delayedSubscribe.completeExceptionally(throwable1);
}
return this;
});
} else {
delayedSubscribe.completeExceptionally(throwable);
}
return this;
});
return delayedSubscribe;
}
}

private CompletableFuture<PlcSubscriptionResponse> executeSubscribe(PlcSubscriptionRequest subscribeRequest) {
CompletableFuture<PlcSubscriptionResponse> future = new CompletableFuture<>();

List<AdsData> adsData = subscribeRequest.getFields().stream()
.map(field -> (DefaultPlcSubscriptionField) field)
.map(field -> new AdsAddDeviceNotificationRequest(
symbolicFieldMapping.get(field.getPlcField()).getIndexGroup(),
symbolicFieldMapping.get(field.getPlcField()).getIndexOffset(),
(long) ((AdsField) field.getPlcField()).getAdsDataType().getNumBytes() * field.getNumberOfElements(),
field.getPlcSubscriptionType() == PlcSubscriptionType.CYCLIC ? 3 : 4, // if it's not cyclic, it's on change or event
0 , // there is no api for that yet
field.getDuration().orElse(Duration.ZERO).toMillis()))
.collect(Collectors.toList());

List<AmsTCPPacket> amsTCPPackets = adsData.stream().map( data ->
new AmsTCPPacket(new AmsPacket(
configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
CommandId.ADS_ADD_DEVICE_NOTIFICATION, DEFAULT_COMMAND_STATE, 0, getInvokeId(), data))).collect(Collectors.toList());

Map<String, ResponseItem<PlcSubscriptionHandle>> responses = new HashMap<>();

// Start the first request-transaction (it is ended in the response-handler).
RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
transaction.submit(subscribeRecursively(
subscribeRequest,
subscribeRequest.getFieldNames().iterator(),
responses,
future,
amsTCPPackets.iterator(),
transaction));
return future;
}

private Runnable subscribeRecursively(PlcSubscriptionRequest subscriptionRequest, Iterator<String> fieldNames,
Map<String, ResponseItem<PlcSubscriptionHandle>> responses,
CompletableFuture<PlcSubscriptionResponse> future,
Iterator<AmsTCPPacket> amsTCPPackets,
RequestTransactionManager.RequestTransaction transaction) {
return () -> {
AmsTCPPacket packet = amsTCPPackets.next();
boolean hasMorePackets = amsTCPPackets.hasNext();
String fieldName = fieldNames.next();
context.sendRequest(packet)
.expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest()))
.onTimeout(future::completeExceptionally)
.onError((p, e) -> future.completeExceptionally(e))
.check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == packet.getUserdata().getInvokeId())
.unwrap(response -> (AdsAddDeviceNotificationResponse) response.getUserdata().getData())
.handle(responseAdsData -> {
if (responseAdsData.getResult() == ReturnCode.OK) {
// Collect notification handle from individual response.
responses.put(fieldName, new ResponseItem<>(
parsePlcResponseCode(responseAdsData.getResult()),
new AdsSubscriptionHandle(this,
fieldName,
((AdsField) ((DefaultPlcSubscriptionField) subscriptionRequest.getField(fieldName)).getPlcField()).getAdsDataType(),
responseAdsData.getNotificationHandle())));

// After receiving the last ADD_DEVICE_NOTIFICATION response, complete the PLC4X response.
if (!hasMorePackets) {
final PlcSubscriptionResponse plcSubscriptionResponse = new DefaultPlcSubscriptionResponse(subscriptionRequest, responses);
future.complete(plcSubscriptionResponse);
}
} else {
if (responseAdsData.getResult() == ReturnCode.ADSERR_DEVICE_INVALIDSIZE) {
future.completeExceptionally(
new PlcException("The parameter size was not correct (Internal error)"));
} else {
future.completeExceptionally(new PlcException("Unexpected result " + responseAdsData.getResult()));
}
}
// Finish the request-transaction.
transaction.endRequest();

// Submit the next transaction.
if (hasMorePackets) {
RequestTransactionManager.RequestTransaction nextTransaction = tm.startRequest();
nextTransaction.submit(subscribeRecursively(
subscriptionRequest, fieldNames, responses, future, amsTCPPackets, nextTransaction));
}
});
};
}

@Override
public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
CompletableFuture<PlcUnsubscriptionResponse> future = new CompletableFuture<>();

List<Long> notificationHandles = new ArrayList<>();
unsubscriptionRequest.getSubscriptionHandles().stream()
.filter(handle -> handle instanceof AdsSubscriptionHandle)
.map(handle -> (AdsSubscriptionHandle) handle)
.forEach(adsSubscriptionHandle -> {
// Notification handle used for delete notification messages.
notificationHandles.add(adsSubscriptionHandle.getNotificationHandle());
// Remove consumers
consumers.keySet().stream().filter(consumerRegistration ->
consumerRegistration.getSubscriptionHandles().contains(adsSubscriptionHandle))
.forEach(DefaultPlcConsumerRegistration::unregister);
});

List<AdsData> adsData = notificationHandles.stream()
.map(AdsDeleteDeviceNotificationRequest::new)
.collect(Collectors.toList());

List<AmsTCPPacket> amsTCPPackets = adsData.stream().map( data -> new AmsTCPPacket(
new AmsPacket(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(),
configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(),
CommandId.ADS_DELETE_DEVICE_NOTIFICATION, DEFAULT_COMMAND_STATE, 0, getInvokeId(), data))).collect(Collectors.toList());

// Start the first request-transaction (it is ended in the response-handler)
RequestTransactionManager.RequestTransaction transaction = tm.startRequest();
transaction.submit(unsubscribeRecursively(unsubscriptionRequest, future, amsTCPPackets.iterator(), transaction));
return future;
}

private Runnable unsubscribeRecursively(PlcUnsubscriptionRequest unsubscriptionRequest,
CompletableFuture<PlcUnsubscriptionResponse> future,
Iterator<AmsTCPPacket> amsTCPPackets,
RequestTransactionManager.RequestTransaction transaction){
return () -> {
AmsTCPPacket packet = amsTCPPackets.next();
boolean hasMorePackets = amsTCPPackets.hasNext();
context.sendRequest(packet)
.expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest()))
.onTimeout(future::completeExceptionally)
.onError((p, e) -> future.completeExceptionally(e))
.check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == packet.getUserdata().getInvokeId())
.unwrap(response -> (AdsDeleteDeviceNotificationResponse) response.getUserdata().getData())
.handle(responseAdsData -> {
if (responseAdsData.getResult() == ReturnCode.OK) {
// After receiving the last DELETE_DEVICE_NOTIFICATION response, complete the PLC4X response.
if (!hasMorePackets) {
final PlcUnsubscriptionResponse plcUnsubscriptionResponse = new DefaultPlcUnsubscriptionResponse(unsubscriptionRequest);
future.complete(plcUnsubscriptionResponse);
}
} else {
// TODO: this is more guesswork than knowing it could actually occur
if (responseAdsData.getResult() == ReturnCode.ADSERR_DEVICE_NOTIFYHNDINVALID) {
future.completeExceptionally(
new PlcException("The notification handle is invalid (Internal error)"));
} else {
future.completeExceptionally(new PlcException("Unexpected result " + responseAdsData.getResult()));
}
}
// Finish the request-transaction.
transaction.endRequest();

// Submit the next transaction.
if (hasMorePackets) {
RequestTransactionManager.RequestTransaction nextTransaction = tm.startRequest();
nextTransaction.submit(unsubscribeRecursively(unsubscriptionRequest, future, amsTCPPackets, nextTransaction));
}
});
};
}

@Override
protected void decode(ConversationContext<AmsTCPPacket> context, AmsTCPPacket msg) throws Exception {
if (msg.getUserdata().getData() instanceof AdsDeviceNotificationRequest){
AdsDeviceNotificationRequest notificationData = (AdsDeviceNotificationRequest) msg.getUserdata().getData();
AdsStampHeader[] stamps = notificationData.getAdsStampHeaders();
for (int stamp=0; stamp < notificationData.getStamps(); stamp++){
// convert Windows FILETIME format to unix epoch
long unixEpochTimestamp = stamps[stamp].getTimestamp().divide(BigInteger.valueOf(10000L)).longValue() - 11644473600000L;
AdsNotificationSample[] samples = stamps[stamp].getAdsNotificationSamples();
for (int smpl=0; smpl < stamps[stamp].getSamples(); smpl++){
long handle = samples[smpl].getNotificationHandle();
final AdsNotificationSample sample = samples[smpl];
for (DefaultPlcConsumerRegistration registration : consumers.keySet()){
for(PlcSubscriptionHandle subscriptionHandle : registration.getSubscriptionHandles()){
if (subscriptionHandle instanceof AdsSubscriptionHandle) {
AdsSubscriptionHandle adsHandle = (AdsSubscriptionHandle) subscriptionHandle;
if (adsHandle.getNotificationHandle() == handle)
consumers.get(registration).accept(
new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(unixEpochTimestamp),
convertSampleToPlc4XResult(adsHandle, sample.getData())));
}
}
}
}
}
}
}

private Map<String, ResponseItem<PlcValue>> convertSampleToPlc4XResult(AdsSubscriptionHandle subscriptionHandle, byte[] data) throws
ParseException {
Map<String, ResponseItem<PlcValue>> values = new HashMap<>();
ReadBufferByteBased readBuffer = new ReadBufferByteBased(data, true);
values.put(subscriptionHandle.getPlcFieldName(), new ResponseItem<>(PlcResponseCode.OK,
DataItemIO.staticParse(readBuffer, subscriptionHandle.getAdsDataType().getDataFormatName(), data.length)));
return values;
}

@Override
public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
final DefaultPlcConsumerRegistration consumerRegistration =
new DefaultPlcConsumerRegistration(this, consumer, handles.toArray(new PlcSubscriptionHandle[0]));
consumers.put(consumerRegistration, consumer);
return consumerRegistration;
}

@Override
public void unregister(PlcConsumerRegistration registration) {
DefaultPlcConsumerRegistration consumerRegistration = (DefaultPlcConsumerRegistration) registration;
consumers.remove(consumerRegistration);
}

protected CompletableFuture<List<DirectAdsField>> getDirectAddresses(List<PlcField> fields) {
CompletableFuture<List<DirectAdsField>> future = new CompletableFuture<>();

Expand Down
3 changes: 2 additions & 1 deletion protocols/ads/src/main/resources/protocols/ads/ads.mspec
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@
// 4 bytes The ADS server checks if the value changes in this time slice. The unit is 1ms
[simple uint 32 cycleTime]
// 16bytes Must be set to 0
[reserved uint 128 '0x0000' ]
[reserved uint 64 '0x0000' ]
[reserved uint 64 '0x0000' ]
]
['ADS_ADD_DEVICE_NOTIFICATION', 'true' AdsAddDeviceNotificationResponse
// 4 bytes ADS error number
Expand Down

0 comments on commit 5d4eb0a

Please sign in to comment.