Skip to content
Merged
11 changes: 10 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@ matrix:
jdk: openjdk8

- language: java
jdk: oraclejdk8
jdk: openjdk11

- language: java
jdk: openjdk12

- language: java
jdk: oraclejdk11

- language: java
jdk: oraclejdk12

- language: android
jdk: openjdk8
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/org/zeromq/ZMQ.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -3757,7 +3758,12 @@ public void unregister(SelectableChannel channel)
*/
private void unregisterInternal(Object socket)
{
items.removeIf(item -> item.socket == socket || item.getRawSocket() == socket);
for (Iterator<PollItem> it = items.iterator(); it.hasNext(); ) {
PollItem item = it.next();
if (item.socket == socket || item.getRawSocket() == socket) {
it.remove();
}
}
}

/**
Expand Down
54 changes: 33 additions & 21 deletions src/main/java/zmq/Msg.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import java.util.Arrays;

import zmq.io.Metadata;
import zmq.util.Utils;
Expand Down Expand Up @@ -104,7 +105,6 @@ enum Type
private SocketChannel fileDesc;

private final int size;
private byte[] data;
private final ByteBuffer buf;
// keep track of relative write position
private int writeIndex = 0;
Expand All @@ -122,7 +122,6 @@ public Msg(int capacity)
this.flags = 0;
this.size = capacity;
this.buf = ByteBuffer.wrap(new byte[capacity]).order(ByteOrder.BIG_ENDIAN);
this.data = buf.array();
}

public Msg(byte[] src)
Expand All @@ -133,7 +132,6 @@ public Msg(byte[] src)
this.type = Type.DATA;
this.flags = 0;
this.size = src.length;
this.data = src;
this.buf = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);
}

Expand All @@ -145,12 +143,6 @@ public Msg(final ByteBuffer src)
this.type = Type.DATA;
this.flags = 0;
this.buf = src.duplicate();
if (buf.hasArray() && buf.position() == 0 && buf.limit() == buf.capacity()) {
this.data = buf.array();
}
else {
this.data = null;
}
this.size = buf.remaining();
}

Expand All @@ -163,10 +155,6 @@ public Msg(final Msg m)
this.flags = m.flags;
this.size = m.size;
this.buf = m.buf != null ? m.buf.duplicate() : null;
if (m.data != null) {
this.data = new byte[this.size];
System.arraycopy(m.data, 0, this.data, 0, m.size);
}
}

private Msg(Msg src, ByteArrayOutputStream out)
Expand Down Expand Up @@ -223,13 +211,36 @@ public void initDelimiter()
flags = 0;
}

/**
* Returns the message data.
*
* If possible, a reference to the data is returned, without copy.
* Otherwise a new byte array will be allocated and the data will be copied.
*
* @return the message data.
*/
public byte[] data()
{
if (data == null) {
data = new byte[buf.remaining()];
buf.duplicate().get(data);
if (buf.hasArray()) {
byte[] array = buf.array();
int offset = buf.arrayOffset();

if (buf.arrayOffset() == 0 && array.length == size) {
// If the backing array is exactly what we need, return it without copy.
return array;
}
else {
// Else use it to make an efficient copy.
return Arrays.copyOfRange(array, offset, offset + size);
}
}
return data;

// No backing array -> use ByteBuffer#get().
byte[] array = new byte[size];
ByteBuffer dup = buf.duplicate();
dup.position(0);
dup.get(array);
return array;
}

public ByteBuffer buf()
Expand Down Expand Up @@ -342,14 +353,15 @@ public Msg put(ByteBuffer src)
public int getBytes(int index, byte[] dst, int off, int len)
{
int count = Math.min(len, size - index);
if (data == null) {

if (buf.hasArray()) {
System.arraycopy(buf.array(), buf.arrayOffset() + index, dst, off, count);
}
else {
ByteBuffer dup = buf.duplicate();
dup.position(index);
dup.get(dst, off, count);
}
else {
System.arraycopy(data, index, dst, off, count);
}

return count;
}
Expand Down
48 changes: 37 additions & 11 deletions src/test/java/guide/mtrelay.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package guide;

import org.zeromq.SocketType;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZContext;
import org.zeromq.ZMQ.Socket;

/**
* Multithreaded relay
Expand Down Expand Up @@ -48,8 +47,6 @@ public void run()
// Bind to inproc: endpoint, then start upstream thread
Socket receiver = context.createSocket(SocketType.PAIR);
receiver.bind("inproc://step2");
Thread step1 = new Step1(context);
step1.start();

// Wait for signal
receiver.recv(0);
Expand All @@ -58,28 +55,57 @@ public void run()
// Connect to step3 and tell it we're ready
Socket xmitter = context.createSocket(SocketType.PAIR);
xmitter.connect("inproc://step3");
System.out.println("Step 2 ready, signaling step 3");
xmitter.send("READY", 0);

xmitter.close();
}

}

public static void main(String[] args)
private static class Step3 extends Thread
{
try (ZContext context = new ZContext()) {
private ZContext context;

private Step3(ZContext context)
{
this.context = context;
}

@Override
public void run()
{
// Bind to inproc: endpoint, then start upstream thread
Socket receiver = context.createSocket(SocketType.PAIR);
receiver.bind("inproc://step3");

// Step 2 relays the signal to step 3
Thread step2 = new Step2(context);
step2.start();

// Wait for signal
receiver.recv(0);
receiver.close();

System.out.println("Step 3 ready");
}

}

public static void main(String[] args) throws InterruptedException
{
try (ZContext context = new ZContext()) {
// Step 1 signals to step 2
Thread step1 = new Step1(context);
step1.start();

// Step 2 relays the signal from step 1 to step 3
Thread step2 = new Step2(context);
step2.start();

// Step 3 waits for signal from step 2
Thread step3 = new Step3(context);
step3.start();

step1.join();
step2.join();
step3.join();

System.out.println("Test successful!");
}
}
Expand Down
86 changes: 86 additions & 0 deletions src/test/java/zmq/TestMsg.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package zmq;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.junit.Assert.assertThat;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -143,4 +144,89 @@ Msg initDirectMsg(Function<Integer, ByteBuffer> allocator)
final Msg msg = new Msg(buffer);
return msg;
}

// Check that data returned by Msg#getBytes(int, byte[], int, int) and Msg#get(int) are
// consistent.
@Test
public void testGetBytesSameAsGet()
{
Msg msg1 = new Msg(new byte[] {42});
Msg msg2 = new Msg(msg1);

msg2.put(5);

byte firstByte = msg2.get(0);

byte[] data = new byte[1];
msg2.getBytes(0, data, 0, 1);

assertThat(data[0], is(firstByte));
}

// Check that Msg#data() is correct when the backing array has an offset.
@Test
public void testDataNonZeroOffset()
{
byte[] data = new byte[]{10, 11, 12};

ByteBuffer buffer = ByteBuffer.wrap(data, 1, 2).slice();
Msg msg = new Msg(buffer);

assertThat(msg.data(), is(new byte[]{11, 12}));
}

// Check that Msg#data() is correct when the end of the backing array is not used by the buffer.
@Test
public void testDataArrayExtendsFurther()
{
byte[] data = new byte[]{10, 11, 12};

ByteBuffer buffer = ByteBuffer.wrap(data, 0, 2).slice();
Msg msg = new Msg(buffer);

assertThat(msg.data(), is(new byte[]{10, 11}));
}

// Check that data returned by Msg#getBytes(int, byte[], int, int) is correct when the backing
// array has an offset.
@Test
public void testGetBytesNonZeroOffset()
{
byte[] data = new byte[]{10, 11, 12};

ByteBuffer buffer = ByteBuffer.wrap(data, 1, 2).slice();
Msg msg = new Msg(buffer);

byte[] gotData = new byte[2];
msg.getBytes(0, gotData, 0, 2);

assertThat(msg.data(), is(new byte[]{11, 12}));
}

// Check that data returned by Msg#getBytes(int, byte[], int, int) is correct when the end of
// the backing array is not used by the buffer.
@Test
public void testGetBytesArrayExtendsFurther()
{
byte[] data = new byte[]{10, 11, 12};

ByteBuffer buffer = ByteBuffer.wrap(data, 0, 2).slice();
Msg msg = new Msg(buffer);

byte[] gotData = new byte[2];
msg.getBytes(0, gotData, 0, 2);

assertThat(msg.data(), is(new byte[]{10, 11}));
}

// Check that Msg#data() doesn't make unnecessary copies.
@Test
public void testDataNoCopy()
{
byte[] data = new byte[]{10, 11, 12};

Msg msg = new Msg(data);

assertThat(msg.data(), sameInstance(data));
}
}