Skip to content

Commit

Permalink
IGNITE-61 - Client fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Valentin Kulichenko committed Feb 6, 2015
1 parent c12c674 commit b8e3588
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 59 deletions.
Expand Up @@ -60,7 +60,7 @@ public class CommunicationMessageCodeGenerator {
};

/** */
private static final String SRC_DIR = U.getGridGainHome() + "/modules/core/src/main/java";
private static final String SRC_DIR = U.getIgniteHome() + "/modules/core/src/main/java";

/** */
private static final Class<?> BASE_CLS = GridTcpCommunicationMessageAdapter.class;
Expand Down
Expand Up @@ -19,6 +19,7 @@

import org.apache.ignite.internal.util.direct.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.gridgain.grid.util.direct.*;

import java.nio.*;

Expand All @@ -32,6 +33,9 @@ public class GridClientHandshakeRequestWrapper extends GridTcpCommunicationMessa
/** Signal char. */
public static final byte HANDSHAKE_HEADER = (byte)0x91;

/** Stream. */
private final GridTcpCommunicationByteBufferStream stream = new GridTcpCommunicationByteBufferStream(null);

/** Handshake bytes. */
private byte[] bytes;

Expand Down Expand Up @@ -59,43 +63,29 @@ public byte[] bytes() {

/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf) {
commState.setBuffer(buf);
stream.setBuffer(buf);

if (!commState.typeWritten) {
if (!commState.putByte(null, directType()))
if (!buf.hasRemaining())
return false;

stream.writeByte(directType());

commState.typeWritten = true;
}

switch (commState.idx) {
case 0:
if (!commState.putByteArray("bytes", bytes))
return false;

commState.idx++;

}
stream.writeByteArray(bytes, 0, bytes.length);

return true;
return stream.lastFinished();
}

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf) {
commState.setBuffer(buf);
stream.setBuffer(buf);

switch (commState.idx) {
case 0:
bytes = commState.getByteArray("bytes");

if (!commState.lastRead())
return false;

commState.idx++;

}
bytes = stream.readByteArray(GridClientHandshakeRequest.PACKET_SIZE);

return true;
return stream.lastFinished();
}

/** {@inheritDoc} */
Expand Down
Expand Up @@ -62,8 +62,6 @@ public GridClientHandshakeResponseWrapper(byte code) {

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf) {
commState.setBuffer(buf);

return true;
}

Expand Down
Expand Up @@ -19,6 +19,7 @@

import org.apache.ignite.internal.util.direct.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.gridgain.grid.util.direct.*;

import java.nio.*;
import java.util.*;
Expand All @@ -33,6 +34,9 @@ public class GridClientMessageWrapper extends GridTcpCommunicationMessageAdapter
/** Client request header. */
public static final byte REQ_HEADER = (byte)0x90;

/** Stream. */
private final GridTcpCommunicationByteBufferStream stream = new GridTcpCommunicationByteBufferStream(null);

/** */
private int msgSize;

Expand Down Expand Up @@ -130,43 +134,52 @@ public void message(ByteBuffer msg) {

/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf) {
commState.setBuffer(buf);
stream.setBuffer(buf);

if (!commState.typeWritten) {
if (!commState.putByte(null, directType()))
if (stream.remaining() < 1)
return false;

stream.writeByte(directType());

commState.typeWritten = true;
}

switch (commState.idx) {
case 0:
if (!commState.putInt("msgSize", msgSize))
if (stream.remaining() < 4)
return false;

stream.writeInt(msgSize);

commState.idx++;

case 1:
if (!commState.putLong("reqId", reqId))
if (stream.remaining() < 8)
return false;

stream.writeLong(reqId);

commState.idx++;

case 2:
if (!commState.putUuid("clientId", clientId))
if (stream.remaining() < 16)
return false;

stream.writeByteArray(U.uuidToBytes(clientId), 0, 16);

commState.idx++;

case 3:
if (!commState.putUuid("destId", destId))
if (stream.remaining() < 16)
return false;

stream.writeByteArray(U.uuidToBytes(destId), 0, 16);

commState.idx++;

case 4:
if (!commState.putByteBuffer("msg", msg))
return false;
stream.writeByteArray(msg.array(), msg.position(), msg.remaining());

commState.idx++;

Expand All @@ -177,47 +190,52 @@ public void message(ByteBuffer msg) {

/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf) {
commState.setBuffer(buf);
stream.setBuffer(buf);

switch (commState.idx) {
case 0:
msgSize = commState.getInt("msgSize");

if (!commState.lastRead())
if (stream.remaining() < 4)
return false;

msgSize = stream.readInt();

if (msgSize == 0) // Ping message.
return true;

commState.idx++;

case 1:
reqId = commState.getLong("reqId");

if (!commState.lastRead())
if (stream.remaining() < 8)
return false;

reqId = stream.readLong();

commState.idx++;

case 2:
clientId = commState.getUuid("clientId");

if (!commState.lastRead())
if (stream.remaining() < 16)
return false;

clientId = U.bytesToUuid(stream.readByteArray(16), 0);

commState.idx++;

case 3:
destId = commState.getUuid("destId");

if (!commState.lastRead())
if (stream.remaining() < 16)
return false;

destId = U.bytesToUuid(stream.readByteArray(16), 0);

commState.idx++;

case 4:
msg = commState.getByteBuffer("msg");
byte[] msg0 = stream.readByteArray(msgSize);

if (!commState.lastRead())
if (!stream.lastFinished())
return false;

msg = ByteBuffer.wrap(msg0);

commState.idx++;
}

Expand Down
Expand Up @@ -78,7 +78,7 @@ public GridMemcachedMessageWrapper(GridMemcachedMessage msg, IgniteMarshaller jd
commState.typeWritten = true;
}

stream.writeByteArrayNoLength(bytes);
stream.writeByteArray(bytes, 0, bytes.length);

return stream.lastFinished();
}
Expand Down
Expand Up @@ -106,7 +106,7 @@ public static int longToBytes(long l, byte[] bytes, int off) {
* @return Encoded into byte array {@link UUID}.
*/
public static byte[] uuidToBytes(UUID uuid) {
byte[] bytes = new byte[(Long.SIZE >> 3)* 2];
byte[] bytes = new byte[(Long.SIZE >> 3) * 2];

uuidToBytes(uuid, bytes, 0);

Expand All @@ -122,7 +122,6 @@ public static byte[] uuidToBytes(UUID uuid) {
* @return Number of bytes overwritten in {@code bytes} array.
*/
public static int uuidToBytes(UUID uuid, byte[] bytes, int off) {

ByteBuffer buf = ByteBuffer.wrap(bytes, off, 16);

buf.order(ByteOrder.BIG_ENDIAN);
Expand Down
Expand Up @@ -276,10 +276,10 @@ public void writeByteArray(byte[] val) {
/**
* @param val Array.
*/
public void writeByteArrayNoLength(byte[] val) {
public void writeByteArray(byte[] val, int off, int len) {
assert val != null;

lastFinished = writeArray(val, BYTE_ARR_OFF, val.length, val.length, true);
lastFinished = writeArray(val, BYTE_ARR_OFF + off, len, len, true);
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -419,6 +419,14 @@ public byte[] readByteArray() {
return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF);
}

/**
* @param len Length.
* @return Array.
*/
public byte[] readByteArray(int len) {
return readArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF, len);
}

/** {@inheritDoc} */
public boolean readBoolean() {
assert buf.hasRemaining();
Expand Down Expand Up @@ -639,16 +647,29 @@ private boolean writeArray(Object arr, long off, int len, int bytes, boolean ski
* @return Array or special value if it was not fully read.
*/
private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) {
return readArray(creator, lenShift, off, -1);
}

/**
* @param creator Array creator.
* @param lenShift Array length shift size.
* @param off Base offset.
* @param len Length.
* @return Array or special value if it was not fully read.
*/
private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off, int len) {
assert creator != null;

if (tmpArr == null) {
if (remaining() < 4) {
lastFinished = false;
if (len == -1) {
if (remaining() < 4) {
lastFinished = false;

return null;
}
return null;
}

int len = readInt();
len = readInt();
}

switch (len) {
case -1:
Expand Down

0 comments on commit b8e3588

Please sign in to comment.