diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java index decac328ba263..b861806a6f367 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/CommunicationMessageCodeGenerator.java @@ -60,7 +60,7 @@ public class CommunicationMessageCodeGenerator { }; /** */ - private static final String SRC_DIR = U.getGridGainHome() + "/modules/core/src/main/java"; + private static final String SRC_DIR = U.getIgniteHome() + "/modules/core/src/main/java"; /** */ private static final Class BASE_CLS = GridTcpCommunicationMessageAdapter.class; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java index 1c0366a99cd8b..b7b986fe9833c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java @@ -19,6 +19,7 @@ import org.apache.ignite.internal.util.direct.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.grid.util.direct.*; import java.nio.*; @@ -32,6 +33,9 @@ public class GridClientHandshakeRequestWrapper extends GridTcpCommunicationMessa /** Signal char. */ public static final byte HANDSHAKE_HEADER = (byte)0x91; + /** Stream. */ + private final GridTcpCommunicationByteBufferStream stream = new GridTcpCommunicationByteBufferStream(null); + /** Handshake bytes. */ private byte[] bytes; @@ -59,43 +63,29 @@ public byte[] bytes() { /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); + stream.setBuffer(buf); if (!commState.typeWritten) { - if (!commState.putByte(null, directType())) + if (!buf.hasRemaining()) return false; + stream.writeByte(directType()); + commState.typeWritten = true; } - switch (commState.idx) { - case 0: - if (!commState.putByteArray("bytes", bytes)) - return false; - - commState.idx++; - - } + stream.writeByteArray(bytes, 0, bytes.length); - return true; + return stream.lastFinished(); } /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); + stream.setBuffer(buf); - switch (commState.idx) { - case 0: - bytes = commState.getByteArray("bytes"); - - if (!commState.lastRead()) - return false; - - commState.idx++; - - } + bytes = stream.readByteArray(GridClientHandshakeRequest.PACKET_SIZE); - return true; + return stream.lastFinished(); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java index 8bb10a33040e4..7ce67b4784901 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java @@ -62,8 +62,6 @@ public GridClientHandshakeResponseWrapper(byte code) { /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); - return true; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java index 4ba6a4fc7059a..fe3246833657d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java @@ -19,6 +19,7 @@ import org.apache.ignite.internal.util.direct.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.grid.util.direct.*; import java.nio.*; import java.util.*; @@ -33,6 +34,9 @@ public class GridClientMessageWrapper extends GridTcpCommunicationMessageAdapter /** Client request header. */ public static final byte REQ_HEADER = (byte)0x90; + /** Stream. */ + private final GridTcpCommunicationByteBufferStream stream = new GridTcpCommunicationByteBufferStream(null); + /** */ private int msgSize; @@ -130,43 +134,52 @@ public void message(ByteBuffer msg) { /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf) { - commState.setBuffer(buf); + stream.setBuffer(buf); if (!commState.typeWritten) { - if (!commState.putByte(null, directType())) + if (stream.remaining() < 1) return false; + stream.writeByte(directType()); + commState.typeWritten = true; } switch (commState.idx) { case 0: - if (!commState.putInt("msgSize", msgSize)) + if (stream.remaining() < 4) return false; + stream.writeInt(msgSize); + commState.idx++; case 1: - if (!commState.putLong("reqId", reqId)) + if (stream.remaining() < 8) return false; + stream.writeLong(reqId); + commState.idx++; case 2: - if (!commState.putUuid("clientId", clientId)) + if (stream.remaining() < 16) return false; + stream.writeByteArray(U.uuidToBytes(clientId), 0, 16); + commState.idx++; case 3: - if (!commState.putUuid("destId", destId)) + if (stream.remaining() < 16) return false; + stream.writeByteArray(U.uuidToBytes(destId), 0, 16); + commState.idx++; case 4: - if (!commState.putByteBuffer("msg", msg)) - return false; + stream.writeByteArray(msg.array(), msg.position(), msg.remaining()); commState.idx++; @@ -177,47 +190,52 @@ public void message(ByteBuffer msg) { /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf) { - commState.setBuffer(buf); + stream.setBuffer(buf); switch (commState.idx) { case 0: - msgSize = commState.getInt("msgSize"); - - if (!commState.lastRead()) + if (stream.remaining() < 4) return false; + msgSize = stream.readInt(); + + if (msgSize == 0) // Ping message. + return true; + commState.idx++; case 1: - reqId = commState.getLong("reqId"); - - if (!commState.lastRead()) + if (stream.remaining() < 8) return false; + reqId = stream.readLong(); + commState.idx++; case 2: - clientId = commState.getUuid("clientId"); - - if (!commState.lastRead()) + if (stream.remaining() < 16) return false; + clientId = U.bytesToUuid(stream.readByteArray(16), 0); + commState.idx++; case 3: - destId = commState.getUuid("destId"); - - if (!commState.lastRead()) + if (stream.remaining() < 16) return false; + destId = U.bytesToUuid(stream.readByteArray(16), 0); + commState.idx++; case 4: - msg = commState.getByteBuffer("msg"); + byte[] msg0 = stream.readByteArray(msgSize); - if (!commState.lastRead()) + if (!stream.lastFinished()) return false; + msg = ByteBuffer.wrap(msg0); + commState.idx++; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java index 906efa2c8a093..7d93503cd77f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java @@ -78,7 +78,7 @@ public GridMemcachedMessageWrapper(GridMemcachedMessage msg, IgniteMarshaller jd commState.typeWritten = true; } - stream.writeByteArrayNoLength(bytes); + stream.writeByteArray(bytes, 0, bytes.length); return stream.lastFinished(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridClientByteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridClientByteUtils.java index f15e13bf10afd..1c7cbd92e25ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridClientByteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridClientByteUtils.java @@ -106,7 +106,7 @@ public static int longToBytes(long l, byte[] bytes, int off) { * @return Encoded into byte array {@link UUID}. */ public static byte[] uuidToBytes(UUID uuid) { - byte[] bytes = new byte[(Long.SIZE >> 3)* 2]; + byte[] bytes = new byte[(Long.SIZE >> 3) * 2]; uuidToBytes(uuid, bytes, 0); @@ -122,7 +122,6 @@ public static byte[] uuidToBytes(UUID uuid) { * @return Number of bytes overwritten in {@code bytes} array. */ public static int uuidToBytes(UUID uuid, byte[] bytes, int off) { - ByteBuffer buf = ByteBuffer.wrap(bytes, off, 16); buf.order(ByteOrder.BIG_ENDIAN); diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationByteBufferStream.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationByteBufferStream.java index ea4a36f611f59..76b6eb78e1d0d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationByteBufferStream.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationByteBufferStream.java @@ -276,10 +276,10 @@ public void writeByteArray(byte[] val) { /** * @param val Array. */ - public void writeByteArrayNoLength(byte[] val) { + public void writeByteArray(byte[] val, int off, int len) { assert val != null; - lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length, true); + lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len, true); } /** {@inheritDoc} */ @@ -419,6 +419,14 @@ public byte[] readByteArray() { return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF); } + /** + * @param len Length. + * @return Array. + */ + public byte[] readByteArray(int len) { + return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF, len); + } + /** {@inheritDoc} */ public boolean readBoolean() { assert buf.hasRemaining(); @@ -639,16 +647,29 @@ private boolean writeArray(Object arr, long off, int len, int bytes, boolean ski * @return Array or special value if it was not fully read. */ private T readArray(ArrayCreator creator, int lenShift, long off) { + return readArray(creator, lenShift, off, -1); + } + + /** + * @param creator Array creator. + * @param lenShift Array length shift size. + * @param off Base offset. + * @param len Length. + * @return Array or special value if it was not fully read. + */ + private T readArray(ArrayCreator creator, int lenShift, long off, int len) { assert creator != null; if (tmpArr == null) { - if (remaining() < 4) { - lastFinished = false; + if (len == -1) { + if (remaining() < 4) { + lastFinished = false; - return null; - } + return null; + } - int len = readInt(); + len = readInt(); + } switch (len) { case -1: