Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {

protected ByteBuffer buf;

protected byte[] heapArr;
protected byte @Nullable [] heapArr;

protected long baseOff;

Expand Down Expand Up @@ -188,7 +188,7 @@ public void setBuffer(ByteBuffer buf) {
this.buf = buf;

heapArr = buf.isDirect() ? null : buf.array();
baseOff = buf.isDirect() ? GridUnsafe.bufferAddress(buf) : BYTE_ARR_OFF;
baseOff = buf.isDirect() ? GridUnsafe.bufferAddress(buf) : BYTE_ARR_OFF + buf.arrayOffset();
Comment thread
ibessonov marked this conversation as resolved.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ public CommandClosure<WriteCommand> next() {
@Nullable CommandClosure<WriteCommand> done = (CommandClosure<WriteCommand>) iter.done();
ByteBuffer data = iter.getData();

WriteCommand command = done == null ? marshaller.unmarshall(data.array()) : done.command();
WriteCommand command = done == null ? marshaller.unmarshall(data) : done.command();

long commandIndex = iter.getIndex();
long commandTerm = iter.getTerm();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ public byte[] marshall(Object o) {

@SuppressWarnings("unchecked")
@Override
public <T> T unmarshall(byte[] bytes) {
stream.setBuffer(ByteBuffer.wrap(bytes).order(ORDER));
public <T> T unmarshall(ByteBuffer bytes) {
stream.setBuffer(bytes.duplicate().order(ORDER));

return stream.readMessage(messageReader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.raft.util;

import java.nio.ByteBuffer;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.jraft.util.Marshaller;

Expand All @@ -42,7 +43,7 @@ public byte[] marshall(Object o) {
}

@Override
public <T> T unmarshall(byte[] bytes) {
public <T> T unmarshall(ByteBuffer bytes) {
return marshaller.get().unmarshall(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,38 @@
*/
package org.apache.ignite.raft.jraft.entity.codec.v1;

import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;

/**
* Log entry codec implementation.
* Log entry codec implementation. Data format description:
* <ul>
* <li>Magic header, 1 byte. {@link #MAGIC}</li>
* <li>Log entry type, formally a var-long, effectively 1 byte. {@link EntryType}</li>
* <li>Log index, var-long, from 1 to 9 bytes.</li>
* <li>Log term, var-long, from 1 to 9 bytes.</li>
* <li>Checksum, 8 bytes, Big Endian.</li>
* <li>If type is not {@link EntryType#ENTRY_TYPE_DATA}:<ul>
* <li>Number of peers, val-long. Following it is the array of peers:</li>
* <li>Length of the peer name, 2 bytes, Big Endian.</li>
* <li>ASCII characters of the peer name, according to the read count.</li>
* <li>... same block repeats for "oldPeers", "learners" and "oldLearners".</li>
* </ul></li>
* <li>If type is not {@link EntryType#ENTRY_TYPE_DATA}:<ul>
* <li>The rest of the {@code byte[]} is the data payload.</li>
* </ul></li>
* </ul>
*/
public class LogEntryV1CodecFactory implements LogEntryCodecFactory {

//"Beeep boop beep beep boop beeeeeep" -BB8
public static final byte MAGIC = (byte) 0xB8;

// Size of the magic header.
public static final int PAYLOAD_OFFSET = 1;

private LogEntryV1CodecFactory() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
Expand Down Expand Up @@ -51,98 +52,127 @@ public LogEntry decode(final byte[] content) {
return log;
}

// Refactored to look closer to Ignites code style.
public void decode(final LogEntry log, final byte[] content) {
// 1-5 type
final int iType = Bits.getInt(content, 1);
log.setType(EnumOutter.EntryType.forNumber(iType));
// 5-13 index
// 13-21 term
final long index = Bits.getLong(content, 5);
final long term = Bits.getLong(content, 13);
var reader = new Reader(content);
Comment thread
ibessonov marked this conversation as resolved.
reader.pos = LogEntryV1CodecFactory.PAYLOAD_OFFSET;

int typeNumber = (int)reader.readLong();
EnumOutter.EntryType type = Objects.requireNonNull(EnumOutter.EntryType.forNumber(typeNumber));
log.setType(type);

long index = reader.readLong();
long term = reader.readLong();
log.setId(new LogId(index, term));
// 21-29 checksum
log.setChecksum(Bits.getLong(content, 21));
// 29-33 peer count
int peerCount = Bits.getInt(content, 29);
// peers
int pos = 33;
if (peerCount > 0) {
List<PeerId> peers = new ArrayList<>(peerCount);
while (peerCount-- > 0) {
final short len = Bits.getShort(content, pos);
final byte[] bs = new byte[len];
System.arraycopy(content, pos + 2, bs, 0, len);
// peer len (short in 2 bytes)
// peer str
pos += 2 + len;
final PeerId peer = new PeerId();
peer.parse(AsciiStringUtil.unsafeDecode(bs));
peers.add(peer);

long checksum = Bits.getLong(content, reader.pos);
log.setChecksum(checksum);

int pos = reader.pos + Long.BYTES;

// Peers and learners.
if (type != EnumOutter.EntryType.ENTRY_TYPE_DATA) {
reader.pos = pos;
int peerCount = (int)reader.readLong();
pos = reader.pos;
if (peerCount > 0) {
List<PeerId> peers = new ArrayList<>(peerCount);

pos = readNodesList(pos, content, peerCount, peers);

log.setPeers(peers);
}
log.setPeers(peers);
}
// old peers
int oldPeerCount = Bits.getInt(content, pos);
pos += 4;
if (oldPeerCount > 0) {
List<PeerId> oldPeers = new ArrayList<>(oldPeerCount);
while (oldPeerCount-- > 0) {
final short len = Bits.getShort(content, pos);
final byte[] bs = new byte[len];
System.arraycopy(content, pos + 2, bs, 0, len);
// peer len (short in 2 bytes)
// peer str
pos += 2 + len;
final PeerId peer = new PeerId();
peer.parse(AsciiStringUtil.unsafeDecode(bs));
oldPeers.add(peer);

reader.pos = pos;
int oldPeerCount = (int)reader.readLong();
pos = reader.pos;
if (oldPeerCount > 0) {
List<PeerId> oldPeers = new ArrayList<>(oldPeerCount);

pos = readNodesList(pos, content, oldPeerCount, oldPeers);

log.setOldPeers(oldPeers);
}
log.setOldPeers(oldPeers);
}
// learners
int learnersCount = Bits.getInt(content, pos);
pos += 4;
if (learnersCount > 0) {
List<PeerId> learners = new ArrayList<>(learnersCount);
while (learnersCount-- > 0) {
final short len = Bits.getShort(content, pos);
final byte[] bs = new byte[len];
System.arraycopy(content, pos + 2, bs, 0, len);
// peer len (short in 2 bytes)
// peer str
pos += 2 + len;
final PeerId peer = new PeerId();
peer.parse(AsciiStringUtil.unsafeDecode(bs));
learners.add(peer);

reader.pos = pos;
int learnersCount = (int)reader.readLong();
pos = reader.pos;
if (learnersCount > 0) {
List<PeerId> learners = new ArrayList<>(learnersCount);

pos = readNodesList(pos, content, learnersCount, learners);

log.setLearners(learners);
}

reader.pos = pos;
int oldLearnersCount = (int)reader.readLong();
pos = reader.pos;
if (oldLearnersCount > 0) {
List<PeerId> oldLearners = new ArrayList<>(oldLearnersCount);

pos = readNodesList(pos, content, oldLearnersCount, oldLearners);

log.setOldLearners(oldLearners);
}
log.setLearners(learners);
}
// old learners
int oldLearnersCount = Bits.getInt(content, pos);
pos += 4;
if (oldLearnersCount > 0) {
List<PeerId> oldLearners = new ArrayList<>(oldLearnersCount);
while (oldLearnersCount-- > 0) {
final short len = Bits.getShort(content, pos);
final byte[] bs = new byte[len];
System.arraycopy(content, pos + 2, bs, 0, len);
// peer len (short in 2 bytes)
// peer str
pos += 2 + len;
final PeerId peer = new PeerId();
peer.parse(AsciiStringUtil.unsafeDecode(bs));
oldLearners.add(peer);

// Data.
if (type != EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
if (content.length > pos) {
int len = content.length - pos;

ByteBuffer data = ByteBuffer.wrap(content, pos, len).slice();

log.setData(data);
}
log.setOldLearners(oldLearners);
}
}

private static int readNodesList(int pos, byte[] content, int count, List<PeerId> nodes) {
for (int i = 0; i < count; i++) {
short len = Bits.getShort(content, pos);
pos += 2;

PeerId peer = new PeerId();
peer.parse(AsciiStringUtil.unsafeDecode(content, pos, len));
nodes.add(peer);

pos += len;
}

return pos;
}

/*
* Allows reading varlen numbers and tracking position at the same time. Simply having a "readLong" method wasn't enough.
*/
private static class Reader {
private final byte[] content;
int pos;

private Reader(byte[] content) {
this.content = content;
}

// Based on DirectByteBufferStreamImplV1.
long readLong() {
long val = 0;
int shift = 0;

// data
if (content.length > pos) {
final int len = content.length - pos;
ByteBuffer data = ByteBuffer.allocate(len);
data.put(content, pos, len);
data.flip();
log.setData(data);
while (true) {
byte b = content[pos];

pos++;

val |= ((long) b & 0x7F) << shift;

if ((b & 0x80) == 0) {
return val;
} else {
shift += 7;
}
}
}
}
}
Loading