diff --git a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/ADSPlcDriver.java b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/ADSPlcDriver.java index 70576535cc8..adaf58ec5c1 100644 --- a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/ADSPlcDriver.java +++ b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/ADSPlcDriver.java @@ -63,6 +63,11 @@ protected boolean canWrite() { return true; } + @Override + protected boolean canSubscribe() { + return true; + } + @Override protected Class getConfigurationType() { return AdsConfiguration.class; diff --git a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java index 6a84d05e569..0c1332c7f1d 100644 --- a/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java +++ b/plc4j/drivers/ads/src/main/java/org/apache/plc4x/java/ads/protocol/AdsProtocolLogic.java @@ -20,38 +20,43 @@ import org.apache.plc4x.java.ads.configuration.AdsConfiguration; import org.apache.plc4x.java.ads.field.*; +import org.apache.plc4x.java.ads.model.AdsSubscriptionHandle; import org.apache.plc4x.java.ads.readwrite.*; import org.apache.plc4x.java.ads.readwrite.io.DataItemIO; import org.apache.plc4x.java.api.exceptions.PlcException; import org.apache.plc4x.java.api.exceptions.PlcRuntimeException; -import org.apache.plc4x.java.api.messages.PlcReadRequest; -import org.apache.plc4x.java.api.messages.PlcReadResponse; -import org.apache.plc4x.java.api.messages.PlcWriteRequest; -import org.apache.plc4x.java.api.messages.PlcWriteResponse; +import org.apache.plc4x.java.api.messages.*; +import org.apache.plc4x.java.api.model.PlcConsumerRegistration; import org.apache.plc4x.java.api.model.PlcField; +import org.apache.plc4x.java.api.model.PlcSubscriptionHandle; import org.apache.plc4x.java.api.types.PlcResponseCode; +import org.apache.plc4x.java.api.types.PlcSubscriptionType; import org.apache.plc4x.java.api.value.PlcValue; import org.apache.plc4x.java.spi.ConversationContext; import org.apache.plc4x.java.spi.Plc4xProtocolBase; import org.apache.plc4x.java.spi.configuration.HasConfiguration; import org.apache.plc4x.java.spi.generation.*; -import org.apache.plc4x.java.spi.messages.DefaultPlcReadResponse; -import org.apache.plc4x.java.spi.messages.DefaultPlcWriteResponse; +import org.apache.plc4x.java.spi.messages.*; import org.apache.plc4x.java.spi.messages.utils.ResponseItem; +import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration; +import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionField; import org.apache.plc4x.java.spi.transaction.RequestTransactionManager; import org.apache.plc4x.java.spi.values.IEC61131ValueHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigInteger; import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; -public class AdsProtocolLogic extends Plc4xProtocolBase implements HasConfiguration { +public class AdsProtocolLogic extends Plc4xProtocolBase implements HasConfiguration, PlcSubscriber { private static final Logger LOGGER = LoggerFactory.getLogger(AdsProtocolLogic.class); @@ -62,6 +67,8 @@ public class AdsProtocolLogic extends Plc4xProtocolBase implements private final AtomicLong invokeIdGenerator = new AtomicLong(1); private RequestTransactionManager tm; + private Map> consumers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap symbolicFieldMapping; private final ConcurrentHashMap> pendingResolutionRequests; @@ -550,6 +557,265 @@ protected PlcWriteResponse convertToPlc4xWriteResponse(PlcWriteRequest writeRequ return new DefaultPlcWriteResponse(writeRequest, responseCodes); } + @Override + public CompletableFuture subscribe(PlcSubscriptionRequest subscriptionRequest) { + // Get all ADS addresses in their resolved state. + final CompletableFuture> directAdsFieldsFuture = + getDirectAddresses(subscriptionRequest.getFields() + .stream() + .map(field -> ((DefaultPlcSubscriptionField) field).getPlcField()) + .collect(Collectors.toList())); + + // If all addresses were already resolved we can send the request immediately. + if (directAdsFieldsFuture.isDone()) { + final List fields = directAdsFieldsFuture.getNow(null); + if (fields != null) { + return executeSubscribe(subscriptionRequest); + } else { + final CompletableFuture errorFuture = new CompletableFuture<>(); + errorFuture.completeExceptionally(new PlcException("Fields are null")); + return errorFuture; + } + } + // If there are still symbolic addresses that have to be resolved, send the + // request as soon as the resolution is done. + // In order to instantly be able to return a future, for the final result we have to + // create a new one which is then completed later on. Unfortunately as soon as the + // directAdsFieldsFuture is completed we still don't have the end result, but we can + // now actually send the delayed read request ... as soon as that future completes + // we can complete the initial one. + else { + CompletableFuture delayedSubscribe = new CompletableFuture<>(); + directAdsFieldsFuture.handle((directAdsFields, throwable) -> { + if (directAdsFields != null) { + final CompletableFuture delayedResponse = + executeSubscribe(subscriptionRequest); + delayedResponse.handle((plcSubscribeResponse, throwable1) -> { + if (plcSubscribeResponse != null) { + delayedSubscribe.complete(plcSubscribeResponse); + } else { + delayedSubscribe.completeExceptionally(throwable1); + } + return this; + }); + } else { + delayedSubscribe.completeExceptionally(throwable); + } + return this; + }); + return delayedSubscribe; + } + } + + private CompletableFuture executeSubscribe(PlcSubscriptionRequest subscribeRequest) { + CompletableFuture future = new CompletableFuture<>(); + + List adsData = subscribeRequest.getFields().stream() + .map(field -> (DefaultPlcSubscriptionField) field) + .map(field -> new AdsAddDeviceNotificationRequest( + symbolicFieldMapping.get(field.getPlcField()).getIndexGroup(), + symbolicFieldMapping.get(field.getPlcField()).getIndexOffset(), + (long) ((AdsField) field.getPlcField()).getAdsDataType().getNumBytes() * field.getNumberOfElements(), + field.getPlcSubscriptionType() == PlcSubscriptionType.CYCLIC ? 3 : 4, // if it's not cyclic, it's on change or event + 0 , // there is no api for that yet + field.getDuration().orElse(Duration.ZERO).toMillis())) + .collect(Collectors.toList()); + + List amsTCPPackets = adsData.stream().map( data -> + new AmsTCPPacket(new AmsPacket( + configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(), + configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(), + CommandId.ADS_ADD_DEVICE_NOTIFICATION, DEFAULT_COMMAND_STATE, 0, getInvokeId(), data))).collect(Collectors.toList()); + + Map> responses = new HashMap<>(); + + // Start the first request-transaction (it is ended in the response-handler). + RequestTransactionManager.RequestTransaction transaction = tm.startRequest(); + transaction.submit(subscribeRecursively( + subscribeRequest, + subscribeRequest.getFieldNames().iterator(), + responses, + future, + amsTCPPackets.iterator(), + transaction)); + return future; + } + + private Runnable subscribeRecursively(PlcSubscriptionRequest subscriptionRequest, Iterator fieldNames, + Map> responses, + CompletableFuture future, + Iterator amsTCPPackets, + RequestTransactionManager.RequestTransaction transaction) { + return () -> { + AmsTCPPacket packet = amsTCPPackets.next(); + boolean hasMorePackets = amsTCPPackets.hasNext(); + String fieldName = fieldNames.next(); + context.sendRequest(packet) + .expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest())) + .onTimeout(future::completeExceptionally) + .onError((p, e) -> future.completeExceptionally(e)) + .check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == packet.getUserdata().getInvokeId()) + .unwrap(response -> (AdsAddDeviceNotificationResponse) response.getUserdata().getData()) + .handle(responseAdsData -> { + if (responseAdsData.getResult() == ReturnCode.OK) { + // Collect notification handle from individual response. + responses.put(fieldName, new ResponseItem<>( + parsePlcResponseCode(responseAdsData.getResult()), + new AdsSubscriptionHandle(this, + fieldName, + ((AdsField) ((DefaultPlcSubscriptionField) subscriptionRequest.getField(fieldName)).getPlcField()).getAdsDataType(), + responseAdsData.getNotificationHandle()))); + + // After receiving the last ADD_DEVICE_NOTIFICATION response, complete the PLC4X response. + if (!hasMorePackets) { + final PlcSubscriptionResponse plcSubscriptionResponse = new DefaultPlcSubscriptionResponse(subscriptionRequest, responses); + future.complete(plcSubscriptionResponse); + } + } else { + if (responseAdsData.getResult() == ReturnCode.ADSERR_DEVICE_INVALIDSIZE) { + future.completeExceptionally( + new PlcException("The parameter size was not correct (Internal error)")); + } else { + future.completeExceptionally(new PlcException("Unexpected result " + responseAdsData.getResult())); + } + } + // Finish the request-transaction. + transaction.endRequest(); + + // Submit the next transaction. + if (hasMorePackets) { + RequestTransactionManager.RequestTransaction nextTransaction = tm.startRequest(); + nextTransaction.submit(subscribeRecursively( + subscriptionRequest, fieldNames, responses, future, amsTCPPackets, nextTransaction)); + } + }); + }; + } + + @Override + public CompletableFuture unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) { + CompletableFuture future = new CompletableFuture<>(); + + List notificationHandles = new ArrayList<>(); + unsubscriptionRequest.getSubscriptionHandles().stream() + .filter(handle -> handle instanceof AdsSubscriptionHandle) + .map(handle -> (AdsSubscriptionHandle) handle) + .forEach(adsSubscriptionHandle -> { + // Notification handle used for delete notification messages. + notificationHandles.add(adsSubscriptionHandle.getNotificationHandle()); + // Remove consumers + consumers.keySet().stream().filter(consumerRegistration -> + consumerRegistration.getSubscriptionHandles().contains(adsSubscriptionHandle)) + .forEach(DefaultPlcConsumerRegistration::unregister); + }); + + List adsData = notificationHandles.stream() + .map(AdsDeleteDeviceNotificationRequest::new) + .collect(Collectors.toList()); + + List amsTCPPackets = adsData.stream().map( data -> new AmsTCPPacket( + new AmsPacket(configuration.getTargetAmsNetId(), configuration.getTargetAmsPort(), + configuration.getSourceAmsNetId(), configuration.getSourceAmsPort(), + CommandId.ADS_DELETE_DEVICE_NOTIFICATION, DEFAULT_COMMAND_STATE, 0, getInvokeId(), data))).collect(Collectors.toList()); + + // Start the first request-transaction (it is ended in the response-handler) + RequestTransactionManager.RequestTransaction transaction = tm.startRequest(); + transaction.submit(unsubscribeRecursively(unsubscriptionRequest, future, amsTCPPackets.iterator(), transaction)); + return future; + } + + private Runnable unsubscribeRecursively(PlcUnsubscriptionRequest unsubscriptionRequest, + CompletableFuture future, + Iterator amsTCPPackets, + RequestTransactionManager.RequestTransaction transaction){ + return () -> { + AmsTCPPacket packet = amsTCPPackets.next(); + boolean hasMorePackets = amsTCPPackets.hasNext(); + context.sendRequest(packet) + .expectResponse(AmsTCPPacket.class, Duration.ofMillis(configuration.getTimeoutRequest())) + .onTimeout(future::completeExceptionally) + .onError((p, e) -> future.completeExceptionally(e)) + .check(responseAmsPacket -> responseAmsPacket.getUserdata().getInvokeId() == packet.getUserdata().getInvokeId()) + .unwrap(response -> (AdsDeleteDeviceNotificationResponse) response.getUserdata().getData()) + .handle(responseAdsData -> { + if (responseAdsData.getResult() == ReturnCode.OK) { + // After receiving the last DELETE_DEVICE_NOTIFICATION response, complete the PLC4X response. + if (!hasMorePackets) { + final PlcUnsubscriptionResponse plcUnsubscriptionResponse = new DefaultPlcUnsubscriptionResponse(unsubscriptionRequest); + future.complete(plcUnsubscriptionResponse); + } + } else { + // TODO: this is more guesswork than knowing it could actually occur + if (responseAdsData.getResult() == ReturnCode.ADSERR_DEVICE_NOTIFYHNDINVALID) { + future.completeExceptionally( + new PlcException("The notification handle is invalid (Internal error)")); + } else { + future.completeExceptionally(new PlcException("Unexpected result " + responseAdsData.getResult())); + } + } + // Finish the request-transaction. + transaction.endRequest(); + + // Submit the next transaction. + if (hasMorePackets) { + RequestTransactionManager.RequestTransaction nextTransaction = tm.startRequest(); + nextTransaction.submit(unsubscribeRecursively(unsubscriptionRequest, future, amsTCPPackets, nextTransaction)); + } + }); + }; + } + + @Override + protected void decode(ConversationContext context, AmsTCPPacket msg) throws Exception { + if (msg.getUserdata().getData() instanceof AdsDeviceNotificationRequest){ + AdsDeviceNotificationRequest notificationData = (AdsDeviceNotificationRequest) msg.getUserdata().getData(); + AdsStampHeader[] stamps = notificationData.getAdsStampHeaders(); + for (int stamp=0; stamp < notificationData.getStamps(); stamp++){ + // convert Windows FILETIME format to unix epoch + long unixEpochTimestamp = stamps[stamp].getTimestamp().divide(BigInteger.valueOf(10000L)).longValue() - 11644473600000L; + AdsNotificationSample[] samples = stamps[stamp].getAdsNotificationSamples(); + for (int smpl=0; smpl < stamps[stamp].getSamples(); smpl++){ + long handle = samples[smpl].getNotificationHandle(); + final AdsNotificationSample sample = samples[smpl]; + for (DefaultPlcConsumerRegistration registration : consumers.keySet()){ + for(PlcSubscriptionHandle subscriptionHandle : registration.getSubscriptionHandles()){ + if (subscriptionHandle instanceof AdsSubscriptionHandle) { + AdsSubscriptionHandle adsHandle = (AdsSubscriptionHandle) subscriptionHandle; + if (adsHandle.getNotificationHandle() == handle) + consumers.get(registration).accept( + new DefaultPlcSubscriptionEvent(Instant.ofEpochMilli(unixEpochTimestamp), + convertSampleToPlc4XResult(adsHandle, sample.getData()))); + } + } + } + } + } + } + } + + private Map> convertSampleToPlc4XResult(AdsSubscriptionHandle subscriptionHandle, byte[] data) throws + ParseException { + Map> values = new HashMap<>(); + ReadBufferByteBased readBuffer = new ReadBufferByteBased(data, true); + values.put(subscriptionHandle.getPlcFieldName(), new ResponseItem<>(PlcResponseCode.OK, + DataItemIO.staticParse(readBuffer, subscriptionHandle.getAdsDataType().getDataFormatName(), data.length))); + return values; + } + + @Override + public PlcConsumerRegistration register(Consumer consumer, Collection handles) { + final DefaultPlcConsumerRegistration consumerRegistration = + new DefaultPlcConsumerRegistration(this, consumer, handles.toArray(new PlcSubscriptionHandle[0])); + consumers.put(consumerRegistration, consumer); + return consumerRegistration; + } + + @Override + public void unregister(PlcConsumerRegistration registration) { + DefaultPlcConsumerRegistration consumerRegistration = (DefaultPlcConsumerRegistration) registration; + consumers.remove(consumerRegistration); + } + protected CompletableFuture> getDirectAddresses(List fields) { CompletableFuture> future = new CompletableFuture<>(); diff --git a/protocols/ads/src/main/resources/protocols/ads/ads.mspec b/protocols/ads/src/main/resources/protocols/ads/ads.mspec index 4541f7ab5fa..5870c071756 100644 --- a/protocols/ads/src/main/resources/protocols/ads/ads.mspec +++ b/protocols/ads/src/main/resources/protocols/ads/ads.mspec @@ -281,7 +281,8 @@ // 4 bytes The ADS server checks if the value changes in this time slice. The unit is 1ms [simple uint 32 cycleTime] // 16bytes Must be set to 0 - [reserved uint 128 '0x0000' ] + [reserved uint 64 '0x0000' ] + [reserved uint 64 '0x0000' ] ] ['ADS_ADD_DEVICE_NOTIFICATION', 'true' AdsAddDeviceNotificationResponse // 4 bytes ADS error number