Skip to content

Commit

Permalink
fixed missing sandbox ams code
Browse files Browse the repository at this point in the history
  • Loading branch information
sruehl committed Dec 11, 2019
1 parent a7ca2bb commit 408054c
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ Licensed to the Apache Software Foundation (ASF) under one
import io.netty.channel.ChannelFuture;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.SystemConfiguration;
import org.apache.plc4x.java.amsads.model.AdsPlcFieldHandler;
import org.apache.plc4x.java.amsads.model.DirectAdsField;
import org.apache.plc4x.java.amsads.model.SymbolicAdsField;
import org.apache.plc4x.java.amsads.readwrite.*;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
Expand All @@ -31,6 +34,8 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.*;

public abstract class AdsAbstractPlcConnection extends NettyPlcConnection implements PlcReader, PlcWriter, PlcProprietarySender {
Expand Down Expand Up @@ -63,22 +68,6 @@ protected AdsAbstractPlcConnection(ChannelFactory channelFactory, AmsNetId targe
this.fieldMapping = new ConcurrentHashMap<>();
}

public AmsNetId getTargetAmsNetId() {
return targetAmsNetId;
}

public int getTargetAmsPort() {
return targetAmsPort;
}

public AmsNetId getSourceAmsNetId() {
return sourceAmsNetId;
}

public int getSourceAmsPort() {
return sourceAmsPort;
}

@Override
public boolean canRead() {
return true;
Expand Down Expand Up @@ -153,16 +142,12 @@ protected void mapFields(SymbolicAdsField symbolicAdsField) {
// resolve it and add it to the map.
fieldMapping.computeIfAbsent(symbolicAdsField, symbolicAdsFieldInternal -> {
LOGGER.debug("Resolving {}", symbolicAdsFieldInternal);
AdsReadWriteRequest adsReadWriteRequest = AdsReadWriteRequest.of(
targetAmsNetId,
targetAmsPort,
sourceAmsNetId,
sourceAmsPort,
Invoke.NONE,
IndexGroup.ReservedGroups.ADSIGRP_SYM_HNDBYNAME,
IndexOffset.NONE,
ReadLength.of(IndexOffset.NUM_BYTES),
Data.of(symbolicAdsFieldInternal.getSymbolicField())
AdsReadWriteRequest adsReadWriteRequest = new AdsReadWriteRequest(
0xF003L,
0L,
4L,
symbolicAdsFieldInternal.getSymbolicField().getBytes().length,
symbolicAdsFieldInternal.getSymbolicField().getBytes()
);

// TODO: This is blocking, should be changed to be async.
Expand All @@ -171,12 +156,15 @@ protected void mapFields(SymbolicAdsField symbolicAdsField) {
InternalPlcProprietaryResponse<AdsReadWriteResponse> getHandleResponse = getFromFuture(getHandelFuture, SYMBOL_RESOLVE_TIMEOUT);
AdsReadWriteResponse response = getHandleResponse.getResponse();

if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
if (response.getResult() != 0L) {
throw new PlcRuntimeException("Non error code received " + response.getResult());
}

IndexOffset symbolHandle = IndexOffset.of(response.getData().getBytes());
return DirectAdsField.of(IndexGroup.ReservedGroups.ADSIGRP_SYM_VALBYHND.getAsLong(), symbolHandle.getAsLong(), symbolicAdsFieldInternal.getAdsDataType(), symbolicAdsFieldInternal.getNumberOfElements());
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.order(ByteOrder.LITTLE_ENDIAN);
buffer.put(response.getData());
Long symbolHandle = buffer.getLong();
return DirectAdsField.of(0xF005, symbolHandle, symbolicAdsFieldInternal.getAdsDataType(), symbolicAdsFieldInternal.getNumberOfElements());
});
}

Expand All @@ -192,16 +180,18 @@ protected static int generateAMSPort() {
public void close() throws PlcConnectionException {
fieldMapping.values().stream()
.parallel()
.map(adsField -> AdsWriteRequest.of(
targetAmsNetId,
targetAmsPort,
sourceAmsNetId,
sourceAmsPort,
Invoke.NONE,
IndexGroup.ReservedGroups.ADSIGRP_SYM_RELEASEHND,
IndexOffset.NONE,
Data.of(IndexGroup.of(adsField.getIndexGroup()).getBytes())
))
.map(adsField -> {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.order(ByteOrder.LITTLE_ENDIAN);
buffer.putLong(adsField.getIndexGroup());
byte[] bytes = buffer.array();
return new AdsWriteRequest(
0xF006L,
0L,
bytes.length,
bytes
);
})
.map(adsWriteRequest -> new PlcRequestContainer<>(new DefaultPlcProprietaryRequest<>(adsWriteRequest), new CompletableFuture<>()))
// We don't need a response so we just supply a throw away future.
.forEach(channel::write);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ protected void initChannel(Channel channel) {
// Build the protocol stack for communicating with the ads protocol.
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new Payload2SerialProtocol());
pipeline.addLast(new SingleMessageRateLimiter());
pipeline.addLast(new Ads2PayloadProtocol());
pipeline.addLast(new Plc4x2AdsProtocol(targetAmsNetId, targetAmsPort, sourceAmsNetId, sourceAmsPort, fieldMapping));
pipeline.addLast(new SingleItemToSingleRequestProtocol(AdsSerialPlcConnection.this, AdsSerialPlcConnection.this, null, timer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ Licensed to the Apache Software Foundation (ASF) under one
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.plc4x.java.amsads.model.*;
import org.apache.plc4x.java.amsads.protocol.Ads2PayloadProtocol;
import org.apache.plc4x.java.amsads.protocol.Payload2TcpProtocol;
import org.apache.plc4x.java.amsads.protocol.Plc4x2AdsProtocol;
import org.apache.plc4x.java.amsads.readwrite.AmsNetId;
import org.apache.plc4x.java.amsads.readwrite.AdsDeviceNotificationRequest;
import org.apache.plc4x.java.amsads.protocol.util.LittleEndianDecoder;
import org.apache.plc4x.java.amsads.readwrite.*;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
import org.apache.plc4x.java.api.messages.*;
Expand All @@ -46,6 +47,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigInteger;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -75,11 +77,11 @@ public class AdsTcpPlcConnection extends AdsAbstractPlcConnection implements Plc
private Map<InternalPlcConsumerRegistration, Consumer<AdsDeviceNotificationRequest>> consumerRegistrations = new HashMap<>();

private AdsTcpPlcConnection(InetAddress address, AmsNetId targetAmsNetId, int targetint) {
this(address, targetAmsNetId, targetint, generateAmsNetId(), generateint());
this(address, targetAmsNetId, targetint, generateAmsNetId(), generateAmsPort());
}

private AdsTcpPlcConnection(InetAddress address, Integer port, AmsNetId targetAmsNetId, int targetint) {
this(address, port, targetAmsNetId, targetint, generateAmsNetId(), generateint());
this(address, port, targetAmsNetId, targetint, generateAmsNetId(), generateAmsPort());
}

private AdsTcpPlcConnection(InetAddress address, AmsNetId targetAmsNetId, int targetint, AmsNetId sourceAmsNetId, int sourceint) {
Expand Down Expand Up @@ -127,14 +129,23 @@ public InetAddress getRemoteAddress() {

protected static AmsNetId generateAmsNetId() {
try {
return AmsNetId.of(Inet4Address.getLocalHost().getHostAddress() + ".1.1");
String hostAddress = Inet4Address.getLocalHost().getHostAddress();
String[] octets = hostAddress.split("\\.");
return new AmsNetId(
Short.parseShort(octets[3]),
Short.parseShort(octets[2]),
Short.parseShort(octets[1]),
Short.parseShort(octets[0]),
(short) 1,
(short) 2
);
} catch (UnknownHostException e) {
throw new PlcRuntimeException(e);
}
}

protected static int generateint() {
return int.of(localPorts.getAndIncrement());
protected static int generateAmsPort() {
return localPorts.getAndIncrement();
}

@Override
Expand All @@ -148,8 +159,8 @@ public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionReque
final SubscriptionPlcField subscriptionPlcField = subscriptionPlcFieldEntry.getValue();
final PlcField field = Objects.requireNonNull(subscriptionPlcField.getPlcField());

final IndexGroup indexGroup;
final IndexOffset indexOffset;
final long indexGroup;
final long indexOffset;
final AdsDataType adsDataType;
final int numberOfElements;
// If this is a symbolic field, it has to be resolved first.
Expand All @@ -160,51 +171,45 @@ public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionReque
if (directAdsField == null) {
throw new PlcRuntimeException("Unresolvable field " + field);
}
indexGroup = IndexGroup.of(directAdsField.getIndexGroup());
indexOffset = IndexOffset.of(directAdsField.getIndexOffset());
indexGroup = directAdsField.getIndexGroup();
indexOffset = directAdsField.getIndexOffset();
adsDataType = directAdsField.getAdsDataType();
numberOfElements = directAdsField.getNumberOfElements();
}
// If it's no symbolic field, we can continue immediately
// without having to do any resolving.
else if (field instanceof DirectAdsField) {
DirectAdsField directAdsField = (DirectAdsField) field;
indexGroup = IndexGroup.of(directAdsField.getIndexGroup());
indexOffset = IndexOffset.of(directAdsField.getIndexOffset());
indexGroup = directAdsField.getIndexGroup();
indexOffset = directAdsField.getIndexOffset();
adsDataType = directAdsField.getAdsDataType();
numberOfElements = directAdsField.getNumberOfElements();
} else {
throw new IllegalArgumentException("Unsupported field type " + field.getClass());
}

final TransmissionMode transmissionMode;
final long transmissionMode;
long cycleTime = 4000000;
switch (subscriptionPlcField.getPlcSubscriptionType()) {
case CYCLIC:
transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERCYCLE;
transmissionMode = 3L;
cycleTime = subscriptionPlcField.getDuration().orElse(Duration.ofSeconds(1)).toMillis();
break;
case CHANGE_OF_STATE:
transmissionMode = TransmissionMode.DefinedValues.ADSTRANS_SERVERONCHA;
transmissionMode = 4L;
break;
default:
throw new PlcRuntimeException("Unmapped type " + subscriptionPlcField.getPlcSubscriptionType());
}

// Prepare the subscription request itself.
AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = AdsAddDeviceNotificationRequest.of(
targetAmsNetId,
targetint,
sourceAmsNetId,
sourceint,
Invoke.NONE,
AdsAddDeviceNotificationRequest adsAddDeviceNotificationRequest = new AdsAddDeviceNotificationRequest(
indexGroup,
indexOffset,
Length.of(adsDataType.getTargetByteSize() * (long) numberOfElements),
adsDataType.getTargetByteSize() * (long) numberOfElements,
transmissionMode,
// We set max delay to cycle time as we don't have a second parameter for this in the plc4j-api
MaxDelay.of(cycleTime + 1),
CycleTime.of(cycleTime)
cycleTime + 1,
cycleTime
);

// Send the request to the plc and wait for a response
Expand All @@ -215,7 +220,7 @@ else if (field instanceof DirectAdsField) {
AdsAddDeviceNotificationResponse response = addDeviceResponse.getResponse();

// Abort if we got anything but a successful response.
if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
if (response.getResult() != 0L) {
throw new PlcRuntimeException("Error code received " + response.getResult());
}
PlcSubscriptionHandle adsSubscriptionHandle = new AdsSubscriptionHandle(this, plcFieldName, adsDataType, response.getNotificationHandle());
Expand All @@ -233,23 +238,17 @@ public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptio
for (InternalPlcSubscriptionHandle internalPlcSubscriptionHandle : internalPlcUnsubscriptionRequest.getInternalPlcSubscriptionHandles()) {
if (internalPlcSubscriptionHandle instanceof AdsSubscriptionHandle) {
AdsSubscriptionHandle adsSubscriptionHandle = (AdsSubscriptionHandle) internalPlcSubscriptionHandle;
AdsDeleteDeviceNotificationRequest adsDeleteDeviceNotificationRequest =
AdsDeleteDeviceNotificationRequest.of(
targetAmsNetId,
targetint,
sourceAmsNetId,
sourceint,
Invoke.NONE,
adsSubscriptionHandle.getNotificationHandle()
);
AdsDeleteDeviceNotificationRequest adsDeleteDeviceNotificationRequest = new AdsDeleteDeviceNotificationRequest(
adsSubscriptionHandle.getNotificationHandle()
);
CompletableFuture<InternalPlcProprietaryResponse<AdsDeleteDeviceNotificationResponse>> deleteDeviceFuture =
new CompletableFuture<>();
channel.writeAndFlush(new PlcRequestContainer<>(new DefaultPlcProprietaryRequest<>(adsDeleteDeviceNotificationRequest), deleteDeviceFuture));

InternalPlcProprietaryResponse<AdsDeleteDeviceNotificationResponse> deleteDeviceResponse =
getFromFuture(deleteDeviceFuture, DEL_DEVICE_TIMEOUT);
AdsDeleteDeviceNotificationResponse response = deleteDeviceResponse.getResponse();
if (response.getResult().toAdsReturnCode() != AdsReturnCode.ADS_CODE_0) {
if (response.getResult() != 0L) {
throw new PlcRuntimeException("Non error code received " + response.getResult());
}
}
Expand All @@ -273,19 +272,23 @@ public InternalPlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> c
}

InternalPlcConsumerRegistration internalPlcConsumerRegistration = new DefaultPlcConsumerRegistration(this, consumer, internalPlcSubscriptionHandles);
Map<NotificationHandle, AdsSubscriptionHandle> notificationHandleAdsSubscriptionHandleMap = Arrays.stream(internalPlcSubscriptionHandles)
Map<Long, AdsSubscriptionHandle> notificationHandleAdsSubscriptionHandleMap = Arrays.stream(internalPlcSubscriptionHandles)
.map(subscriptionHandle -> checkInternal(subscriptionHandle, AdsSubscriptionHandle.class))
.collect(Collectors.toConcurrentMap(AdsSubscriptionHandle::getNotificationHandle, Function.identity()));

Consumer<AdsDeviceNotificationRequest> adsDeviceNotificationRequestConsumer =
adsDeviceNotificationRequest -> adsDeviceNotificationRequest.getAdsStampHeaders().forEach(adsStampHeader -> {
Instant timeStamp = adsStampHeader.getTimeStamp().getAsDate().toInstant();
adsDeviceNotificationRequest -> Arrays.asList(adsDeviceNotificationRequest.getAdsStampHeaders()).forEach(adsStampHeader -> {
BigInteger winTime = adsStampHeader.getTimestamp();
BigInteger timeMillisSince16010101 = winTime.divide(BigInteger.valueOf(10_000));
BigInteger EPOCH_DIFF_IN_MILLIS = BigInteger.valueOf((369L * 365L + 89L) * 86400L * 1000L);
BigInteger subtract = timeMillisSince16010101.subtract(EPOCH_DIFF_IN_MILLIS);
Instant timeStamp = new Date(subtract.longValue()).toInstant();

Map<String, Pair<PlcResponseCode, BaseDefaultFieldItem>> fields = new HashMap<>();
adsStampHeader.getAdsNotificationSamples()
Arrays.asList(adsStampHeader.getAdsNotificationSamples())
.forEach(adsNotificationSample -> {
NotificationHandle notificationHandle = adsNotificationSample.getNotificationHandle();
Data data = adsNotificationSample.getData();
Long notificationHandle = adsNotificationSample.getNotificationHandle();
byte[] data = adsNotificationSample.getData();
AdsSubscriptionHandle adsSubscriptionHandle = notificationHandleAdsSubscriptionHandleMap.get(notificationHandle);
if (adsSubscriptionHandle == null) {
// TODO: we might want to refactor this so that we don't subscribe to everything in the first place.
Expand All @@ -296,7 +299,7 @@ public InternalPlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> c
String plcFieldName = adsSubscriptionHandle.getPlcFieldName();
AdsDataType adsDataType = adsSubscriptionHandle.getAdsDataType();
try {
BaseDefaultFieldItem baseDefaultFieldItem = LittleEndianDecoder.decodeData(adsDataType, data.getBytes());
BaseDefaultFieldItem baseDefaultFieldItem = LittleEndianDecoder.decodeData(adsDataType, data);
fields.put(plcFieldName, Pair.of(PlcResponseCode.OK, baseDefaultFieldItem));
} catch (RuntimeException e) {
LOGGER.error("Can't decode {}", data, e);
Expand Down
Loading

0 comments on commit 408054c

Please sign in to comment.