diff --git a/.travis.yml b/.travis.yml index d57017d0a..75ef9247d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,16 @@ matrix: jdk: openjdk8 - language: java - jdk: oraclejdk8 + jdk: openjdk11 + + - language: java + jdk: openjdk12 + + - language: java + jdk: oraclejdk11 + + - language: java + jdk: oraclejdk12 - language: android jdk: openjdk8 diff --git a/src/main/java/org/zeromq/ZMQ.java b/src/main/java/org/zeromq/ZMQ.java index 0fa759245..2761632d5 100644 --- a/src/main/java/org/zeromq/ZMQ.java +++ b/src/main/java/org/zeromq/ZMQ.java @@ -6,6 +6,7 @@ import java.nio.channels.Selector; import java.nio.charset.Charset; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; @@ -3757,7 +3758,12 @@ public void unregister(SelectableChannel channel) */ private void unregisterInternal(Object socket) { - items.removeIf(item -> item.socket == socket || item.getRawSocket() == socket); + for (Iterator it = items.iterator(); it.hasNext(); ) { + PollItem item = it.next(); + if (item.socket == socket || item.getRawSocket() == socket) { + it.remove(); + } + } } /** diff --git a/src/main/java/zmq/Msg.java b/src/main/java/zmq/Msg.java index 6738639ba..633490607 100644 --- a/src/main/java/zmq/Msg.java +++ b/src/main/java/zmq/Msg.java @@ -4,6 +4,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.SocketChannel; +import java.util.Arrays; import zmq.io.Metadata; import zmq.util.Utils; @@ -104,7 +105,6 @@ enum Type private SocketChannel fileDesc; private final int size; - private byte[] data; private final ByteBuffer buf; // keep track of relative write position private int writeIndex = 0; @@ -122,7 +122,6 @@ public Msg(int capacity) this.flags = 0; this.size = capacity; this.buf = ByteBuffer.wrap(new byte[capacity]).order(ByteOrder.BIG_ENDIAN); - this.data = buf.array(); } public Msg(byte[] src) @@ -133,7 +132,6 @@ public Msg(byte[] src) this.type = Type.DATA; this.flags = 0; this.size = src.length; - this.data = src; this.buf = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN); } @@ -145,12 +143,6 @@ public Msg(final ByteBuffer src) this.type = Type.DATA; this.flags = 0; this.buf = src.duplicate(); - if (buf.hasArray() && buf.position() == 0 && buf.limit() == buf.capacity()) { - this.data = buf.array(); - } - else { - this.data = null; - } this.size = buf.remaining(); } @@ -163,10 +155,6 @@ public Msg(final Msg m) this.flags = m.flags; this.size = m.size; this.buf = m.buf != null ? m.buf.duplicate() : null; - if (m.data != null) { - this.data = new byte[this.size]; - System.arraycopy(m.data, 0, this.data, 0, m.size); - } } private Msg(Msg src, ByteArrayOutputStream out) @@ -223,13 +211,36 @@ public void initDelimiter() flags = 0; } + /** + * Returns the message data. + * + * If possible, a reference to the data is returned, without copy. + * Otherwise a new byte array will be allocated and the data will be copied. + * + * @return the message data. + */ public byte[] data() { - if (data == null) { - data = new byte[buf.remaining()]; - buf.duplicate().get(data); + if (buf.hasArray()) { + byte[] array = buf.array(); + int offset = buf.arrayOffset(); + + if (buf.arrayOffset() == 0 && array.length == size) { + // If the backing array is exactly what we need, return it without copy. + return array; + } + else { + // Else use it to make an efficient copy. + return Arrays.copyOfRange(array, offset, offset + size); + } } - return data; + + // No backing array -> use ByteBuffer#get(). + byte[] array = new byte[size]; + ByteBuffer dup = buf.duplicate(); + dup.position(0); + dup.get(array); + return array; } public ByteBuffer buf() @@ -342,14 +353,15 @@ public Msg put(ByteBuffer src) public int getBytes(int index, byte[] dst, int off, int len) { int count = Math.min(len, size - index); - if (data == null) { + + if (buf.hasArray()) { + System.arraycopy(buf.array(), buf.arrayOffset() + index, dst, off, count); + } + else { ByteBuffer dup = buf.duplicate(); dup.position(index); dup.get(dst, off, count); } - else { - System.arraycopy(data, index, dst, off, count); - } return count; } diff --git a/src/test/java/guide/mtrelay.java b/src/test/java/guide/mtrelay.java index cdeefe1c8..e8ffb1654 100644 --- a/src/test/java/guide/mtrelay.java +++ b/src/test/java/guide/mtrelay.java @@ -1,9 +1,8 @@ package guide; import org.zeromq.SocketType; -import org.zeromq.ZMQ; -import org.zeromq.ZMQ.Socket; import org.zeromq.ZContext; +import org.zeromq.ZMQ.Socket; /** * Multithreaded relay @@ -48,8 +47,6 @@ public void run() // Bind to inproc: endpoint, then start upstream thread Socket receiver = context.createSocket(SocketType.PAIR); receiver.bind("inproc://step2"); - Thread step1 = new Step1(context); - step1.start(); // Wait for signal receiver.recv(0); @@ -58,28 +55,57 @@ public void run() // Connect to step3 and tell it we're ready Socket xmitter = context.createSocket(SocketType.PAIR); xmitter.connect("inproc://step3"); + System.out.println("Step 2 ready, signaling step 3"); xmitter.send("READY", 0); - xmitter.close(); } } - public static void main(String[] args) + private static class Step3 extends Thread { - try (ZContext context = new ZContext()) { + private ZContext context; + + private Step3(ZContext context) + { + this.context = context; + } + + @Override + public void run() + { // Bind to inproc: endpoint, then start upstream thread Socket receiver = context.createSocket(SocketType.PAIR); receiver.bind("inproc://step3"); - // Step 2 relays the signal to step 3 - Thread step2 = new Step2(context); - step2.start(); - // Wait for signal receiver.recv(0); receiver.close(); + System.out.println("Step 3 ready"); + } + + } + + public static void main(String[] args) throws InterruptedException + { + try (ZContext context = new ZContext()) { + // Step 1 signals to step 2 + Thread step1 = new Step1(context); + step1.start(); + + // Step 2 relays the signal from step 1 to step 3 + Thread step2 = new Step2(context); + step2.start(); + + // Step 3 waits for signal from step 2 + Thread step3 = new Step3(context); + step3.start(); + + step1.join(); + step2.join(); + step3.join(); + System.out.println("Test successful!"); } } diff --git a/src/test/java/zmq/TestMsg.java b/src/test/java/zmq/TestMsg.java index b50994c3a..011e4655c 100644 --- a/src/test/java/zmq/TestMsg.java +++ b/src/test/java/zmq/TestMsg.java @@ -1,6 +1,7 @@ package zmq; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.sameInstance; import static org.junit.Assert.assertThat; import java.nio.ByteBuffer; @@ -143,4 +144,89 @@ Msg initDirectMsg(Function allocator) final Msg msg = new Msg(buffer); return msg; } + + // Check that data returned by Msg#getBytes(int, byte[], int, int) and Msg#get(int) are + // consistent. + @Test + public void testGetBytesSameAsGet() + { + Msg msg1 = new Msg(new byte[] {42}); + Msg msg2 = new Msg(msg1); + + msg2.put(5); + + byte firstByte = msg2.get(0); + + byte[] data = new byte[1]; + msg2.getBytes(0, data, 0, 1); + + assertThat(data[0], is(firstByte)); + } + + // Check that Msg#data() is correct when the backing array has an offset. + @Test + public void testDataNonZeroOffset() + { + byte[] data = new byte[]{10, 11, 12}; + + ByteBuffer buffer = ByteBuffer.wrap(data, 1, 2).slice(); + Msg msg = new Msg(buffer); + + assertThat(msg.data(), is(new byte[]{11, 12})); + } + + // Check that Msg#data() is correct when the end of the backing array is not used by the buffer. + @Test + public void testDataArrayExtendsFurther() + { + byte[] data = new byte[]{10, 11, 12}; + + ByteBuffer buffer = ByteBuffer.wrap(data, 0, 2).slice(); + Msg msg = new Msg(buffer); + + assertThat(msg.data(), is(new byte[]{10, 11})); + } + + // Check that data returned by Msg#getBytes(int, byte[], int, int) is correct when the backing + // array has an offset. + @Test + public void testGetBytesNonZeroOffset() + { + byte[] data = new byte[]{10, 11, 12}; + + ByteBuffer buffer = ByteBuffer.wrap(data, 1, 2).slice(); + Msg msg = new Msg(buffer); + + byte[] gotData = new byte[2]; + msg.getBytes(0, gotData, 0, 2); + + assertThat(msg.data(), is(new byte[]{11, 12})); + } + + // Check that data returned by Msg#getBytes(int, byte[], int, int) is correct when the end of + // the backing array is not used by the buffer. + @Test + public void testGetBytesArrayExtendsFurther() + { + byte[] data = new byte[]{10, 11, 12}; + + ByteBuffer buffer = ByteBuffer.wrap(data, 0, 2).slice(); + Msg msg = new Msg(buffer); + + byte[] gotData = new byte[2]; + msg.getBytes(0, gotData, 0, 2); + + assertThat(msg.data(), is(new byte[]{10, 11})); + } + + // Check that Msg#data() doesn't make unnecessary copies. + @Test + public void testDataNoCopy() + { + byte[] data = new byte[]{10, 11, 12}; + + Msg msg = new Msg(data); + + assertThat(msg.data(), sameInstance(data)); + } }