Skip to content

Commit

Permalink
fix(plc4j): Fix bug with select returning prematurely when device not…
Browse files Browse the repository at this point in the history
… sending any data (#386)

* Add support for custom handlers used in testing

* Add support for custom handlers used in testing

* Add test to demonstrate problem with current select() implementation

* Change select() and wakeup() logic to match API expectations

* Don't use Java 11 features as PR check build fails

* Fix missing import

* Add missing headers

* Declare dependency used by the new test
  • Loading branch information
vmpn committed Jun 27, 2022
1 parent 4f6369f commit d8fecd3
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 57 deletions.
6 changes: 6 additions & 0 deletions plc4j/transports/serial/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@
<version>0.10.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;

public class SerialChannel extends AbstractNioByteChannel implements DuplexChannel {
Expand Down Expand Up @@ -158,9 +159,10 @@ protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddr
logger.debug("Connecting to Socket Address '{}'", ((SerialSocketAddress) remoteAddress).getIdentifier());

try {
// A bit hacky but to make a Test Connection start the String with TEST
if (((SerialSocketAddress) remoteAddress).getIdentifier().startsWith("TEST")) {
comPort = SerialChannelHandler.DummyHandler.INSTANCE;
// A bit hacky but to support testing check for custom handler
final Optional<SerialChannelHandler> customHandler = ((SerialSocketAddress) remoteAddress).getHandler();
if (customHandler.isPresent()) {
comPort = customHandler.get();
} else {
comPort = new SerialChannelHandler.SerialPortHandler(remoteAddress, config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,54 +64,6 @@ public SerialChannelHandler(SocketAddress address) {
*/
public abstract int write(ByteBuf buf);

public static class DummyHandler extends SerialChannelHandler {

public static final DummyHandler INSTANCE = new DummyHandler(null);

private SerialSelectionKey selectionKey;

public DummyHandler(SocketAddress address) {
super(address);
}

@Override
public boolean open() {
return true;
}

@Override
public String getIdentifier() {
return null;
}

@Override
public void registerSelectionKey(SerialSelectionKey selectionKey) {
this.selectionKey = selectionKey;
}

@Override
public void close() {
// NOOP
}

@Override
public int read(ByteBuf buf) {
buf.writeByte(1);
return 1;
}

@Override
public int write(ByteBuf buf) {
System.out.println("Haha i wrote something");
return 1;
}

public void fireEvent(int readyOp) {
((SerialPollingSelector) this.selectionKey.selector())
.addEvent(new SerialPollingSelector.SelectorEvent(this.selectionKey, readyOp));
}
}


public static class SerialPortHandler extends SerialChannelHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,32 @@ public int select(long timeout) {
}
this.selectPromise = new DefaultPromise<>(executor);
try {
selectPromise.await(timeout);
if(timeout == 0) {
selectPromise.await();
} else {
selectPromise.await(timeout);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Was interrupted", e);
}
return events.size();
final int eventCount = events.size();
logger.debug("returning from select with {} events", eventCount);
return eventCount;
}

@Override
public int select() {
return select(10);
return select(0);
}

@Override
public Selector wakeup() {
logger.debug("being asked to wake up from select");
// throw new NotImplementedException("Not implemented for this selector, should not be needed.");
// NOOP
if (!selectPromise.isDone()) {
selectPromise.setSuccess(null);
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,31 @@

import java.net.SocketAddress;
import java.util.Objects;
import java.util.Optional;

public class SerialSocketAddress extends SocketAddress {

private final String identifier;

private final Optional<SerialChannelHandler> handler;

public SerialSocketAddress(String identifier) {
this.identifier = identifier;
this.handler = Optional.empty();
}

/**
* @param identifier of the port
* @param handler for custom behavior. E.g. testing
*/
public SerialSocketAddress(final String identifier, final SerialChannelHandler handler)
{
this.identifier = identifier;
this.handler = Optional.of(handler);
}

public Optional<SerialChannelHandler> getHandler() {
return handler;
}

public String getIdentifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.ByteToMessageCodec;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.transport.serial.DummyHandler;
import org.apache.plc4x.java.transport.serial.SerialChannel;
import org.apache.plc4x.java.transport.serial.SerialChannelFactory;
import org.apache.plc4x.java.transport.serial.SerialChannelHandler;
Expand Down Expand Up @@ -53,7 +54,7 @@ public void showAllPorts() {

@Test
public void createChannel() throws PlcConnectionException, InterruptedException, UnknownHostException {
SerialChannelFactory asdf = new SerialChannelFactory(new SerialSocketAddress("TEST-port1"));
SerialChannelFactory asdf = new SerialChannelFactory(new SerialSocketAddress("TEST-port1", DummyHandler.INSTANCE));
// final TcpSocketChannelFactory factory = new TcpSocketChannelFactory(InetAddress.getLocalHost(), 5432);
final Channel channel = asdf.createChannel(new ChannelInitializer<SerialChannel>() {
@Override
Expand All @@ -64,7 +65,7 @@ protected void initChannel(SerialChannel ch) throws Exception {
Thread.sleep(100);
for (int i = 1; i <= 10; i++) {
Thread.sleep(10);
SerialChannelHandler.DummyHandler.INSTANCE.fireEvent(1);
DummyHandler.INSTANCE.fireEvent(1);
}
Thread.sleep(100);
channel.close().sync();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.transport.serial;

import io.netty.buffer.ByteBuf;

import java.net.SocketAddress;

public class DummyHandler extends SerialChannelHandler {

public static final DummyHandler INSTANCE = new DummyHandler(null);

private SerialSelectionKey selectionKey;

public DummyHandler(SocketAddress address) {
super(address);
}

@Override
public boolean open() {
return true;
}

@Override
public String getIdentifier() {
return null;
}

@Override
public void registerSelectionKey(SerialSelectionKey selectionKey) {
this.selectionKey = selectionKey;
}

@Override
public void close() {
// NOOP
}

@Override
public int read(ByteBuf buf) {
buf.writeByte(1);
return 1;
}

@Override
public int write(ByteBuf buf) {
System.out.println("Haha i wrote something");
return 1;
}

public void fireEvent(int readyOp) {
((SerialPollingSelector) this.selectionKey.selector())
.addEvent(new SerialPollingSelector.SelectorEvent(this.selectionKey, readyOp));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.plc4x.java.transport.serial;

import io.netty.buffer.ByteBuf;

class IdleSerialChannelHandler extends SerialChannelHandler {

public IdleSerialChannelHandler() {
super(null);
}

@Override
public boolean open() {
return true;
}

@Override
String getIdentifier() {
return "Emulated Com Port";
}

@Override
void registerSelectionKey(SerialSelectionKey selectionKey) {
/*
* We can ignore registration as we will neven trigger events
*/
}

@Override
public void close() {
}

@Override
public int read(ByteBuf buf) {
return 0;
}

@Override
public int write(ByteBuf buf) {
throw new UnsupportedOperationException();
}
}
Loading

0 comments on commit d8fecd3

Please sign in to comment.