Skip to content
Permalink
Browse files
IGNITE-16434 @InternalId integration for table id, optimized direct p…
…roxy usage for tables (#611)
  • Loading branch information
ibessonov committed Feb 3, 2022
1 parent 58b7b08 commit c44f97bb10c1bc28a0fb0b350bcde843f9dce264
Showing 48 changed files with 176 additions and 370 deletions.
@@ -34,7 +34,6 @@
import java.time.LocalTime;
import java.util.BitSet;
import java.util.UUID;
import org.apache.ignite.lang.IgniteUuid;

/**
* ByteBuf-based MsgPack implementation. Replaces {@link org.msgpack.core.MessagePacker} to avoid
@@ -499,32 +498,6 @@ public void packUuid(UUID val) {
buf.writeLong(val.getLeastSignificantBits());
}

/**
* Writes an {@link IgniteUuid}.
*
* @param val {@link IgniteUuid} value.
*/
public void packIgniteUuid(IgniteUuid val) {
assert !closed : "Packer is closed";

buf.writeByte(Code.EXT8);

// Reserve space for varint payload length.
int payloadLenPos = buf.writerIndex();
buf.writeByte(0);

buf.writeByte(ClientMsgPackType.IGNITE_UUID);

UUID globalId = val.globalId();
buf.writeLong(globalId.getMostSignificantBits());
buf.writeLong(globalId.getLeastSignificantBits());

packLong(val.localId());

int payloadLen = buf.writerIndex() - payloadLenPos - 2;
buf.setByte(payloadLenPos, payloadLen);
}

/**
* Writes a decimal.
*
@@ -48,7 +48,6 @@
import java.util.UUID;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteUuid;
import org.msgpack.core.ExtensionTypeHeader;
import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessageFormatException;
@@ -742,26 +741,6 @@ public UUID unpackUuid() {
return new UUID(buf.readLong(), buf.readLong());
}

/**
* Reads an {@link IgniteUuid}.
*
* @return {@link IgniteUuid} value.
* @throws MessageTypeException when type is not {@link IgniteUuid}.
* @throws MessageSizeException when size is not correct.
*/
public IgniteUuid unpackIgniteUuid() {
assert refCnt > 0 : "Unpacker is closed";

var hdr = unpackExtensionTypeHeader();
var type = hdr.getType();

if (type != ClientMsgPackType.IGNITE_UUID) {
throw new MessageTypeException("Expected Ignite UUID extension (1), but got " + type);
}

return new IgniteUuid(new UUID(buf.readLong(), buf.readLong()), unpackLong());
}

/**
* Reads a decimal.
*
@@ -45,9 +45,6 @@ public class ClientMsgPackType {
/** Bit mask. */
public static final byte BITMASK = 8;

/** Ignite UUID. */
public static final byte IGNITE_UUID = 9;

/** Absent value for a column. */
public static final byte NO_VALUE = 10;
}
@@ -26,7 +26,6 @@
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.lang.IgniteUuid;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -181,14 +180,6 @@ public void testReadPayload() {
testUnpacker(p -> p.writePayload(b, 1, 1), p -> p.readPayload(1), new byte[]{5});
}

@ParameterizedTest
@ValueSource(longs = {0, 1, 255, 256, 65535, 65536, Integer.MAX_VALUE, Long.MAX_VALUE, Long.MIN_VALUE, Integer.MIN_VALUE})
public void testUnpackIgniteUuid(long l) {
IgniteUuid id = new IgniteUuid(UUID.randomUUID(), l);

testUnpacker(p -> p.packIgniteUuid(id), ClientMessageUnpacker::unpackIgniteUuid, id);
}

@Test
public void testSkipValues() {
testUnpacker(p -> {
@@ -199,9 +190,9 @@ public void testSkipValues() {
p.packString("x");
p.packNil();
p.packUuid(UUID.randomUUID());
p.packIgniteUuid(new IgniteUuid(UUID.randomUUID(), 123));
p.packLong(123);
p.packUuid(UUID.randomUUID());
p.packIgniteUuid(new IgniteUuid(UUID.randomUUID(), UUID.randomUUID().getLeastSignificantBits()));
p.packLong(UUID.randomUUID().getLeastSignificantBits());

p.packDouble(1.1);
p.packDouble(2.2);
@@ -42,7 +42,6 @@
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.manager.IgniteTables;
@@ -374,7 +373,7 @@ public static SchemaDescriptor readSchema(ClientMessageUnpacker unpacker, TableI
* </ul>
*/
public static TableImpl readTable(ClientMessageUnpacker unpacker, IgniteTables tables) {
IgniteUuid tableId = unpacker.unpackIgniteUuid();
UUID tableId = unpacker.unpackUuid();

try {
return ((IgniteTablesInternal) tables).table(tableId);
@@ -46,7 +46,7 @@ public static CompletableFuture<Void> process(
if (table == null) {
out.packNil();
} else {
out.packIgniteUuid(((TableImpl) table).tableId());
out.packUuid(((TableImpl) table).tableId());
}
});
}
@@ -43,7 +43,7 @@ public static CompletableFuture<Void> process(
for (var table : tables) {
var tableImpl = (TableImpl) table;

out.packIgniteUuid(tableImpl.tableId());
out.packUuid(tableImpl.tableId());
out.packString(table.name());
}
});
@@ -166,7 +166,7 @@ public void putAll(@Nullable Transaction tx, @NotNull Map<K, V> pairs) {
return tbl.doSchemaOutOpAsync(
ClientOp.TUPLE_UPSERT_ALL,
(s, w) -> {
w.out().packIgniteUuid(tbl.tableId());
w.out().packUuid(tbl.tableId());
writeTx(tx, w);
w.out().packInt(s.version());
w.out().packInt(pairs.size());
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import org.apache.ignite.client.IgniteClientException;
import org.apache.ignite.internal.client.PayloadOutputChannel;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
@@ -32,7 +33,6 @@
import org.apache.ignite.internal.marshaller.Marshaller;
import org.apache.ignite.internal.marshaller.MarshallerException;
import org.apache.ignite.internal.marshaller.MarshallerUtil;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.NotNull;
@@ -43,7 +43,7 @@
*/
class ClientRecordSerializer<R> {
/** Table ID. */
private final IgniteUuid tableId;
private final UUID tableId;

/** Mapper. */
private final Mapper<R> mapper;
@@ -57,7 +57,7 @@
* @param tableId Table ID.
* @param mapper Mapper.
*/
public ClientRecordSerializer(IgniteUuid tableId, Mapper<R> mapper) {
public ClientRecordSerializer(UUID tableId, Mapper<R> mapper) {
assert tableId != null;
assert mapper != null;

@@ -72,7 +72,7 @@ public Mapper<R> mapper() {
}

public void writeRec(@Nullable Transaction tx, @Nullable R rec, ClientSchema schema, PayloadOutputChannel out, TuplePart part) {
out.out().packIgniteUuid(tableId);
out.out().packUuid(tableId);
writeTx(tx, out);
out.out().packInt(schema.version());

@@ -98,7 +98,7 @@ public void writeRecs(
PayloadOutputChannel out,
TuplePart part
) {
out.out().packIgniteUuid(tableId);
out.out().packUuid(tableId);
writeTx(tx, out);
out.out().packInt(schema.version());

@@ -120,7 +120,7 @@ public void writeRecs(
PayloadOutputChannel out,
TuplePart part
) {
out.out().packIgniteUuid(tableId);
out.out().packUuid(tableId);
writeTx(tx, out);
out.out().packInt(schema.version());
out.out().packInt(recs.size());
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.client.table;

import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
@@ -32,7 +33,6 @@
import org.apache.ignite.internal.client.tx.ClientTransaction;
import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
@@ -46,7 +46,7 @@
* Client table API implementation.
*/
public class ClientTable implements Table {
private final IgniteUuid id;
private final UUID id;

private final String name;

@@ -65,7 +65,7 @@ public class ClientTable implements Table {
* @param id Table id.
* @param name Table name.
*/
public ClientTable(ReliableChannel ch, IgniteUuid id, String name) {
public ClientTable(ReliableChannel ch, UUID id, String name) {
assert ch != null;
assert id != null;
assert name != null && !name.isEmpty();
@@ -80,7 +80,7 @@ public ClientTable(ReliableChannel ch, IgniteUuid id, String name) {
*
* @return Table id.
*/
public IgniteUuid tableId() {
public UUID tableId() {
return id;
}

@@ -138,7 +138,7 @@ private CompletableFuture<ClientSchema> getSchema(int ver) {

private CompletableFuture<ClientSchema> loadSchema(Integer ver) {
return ch.serviceAsync(ClientOp.SCHEMAS_GET, w -> {
w.out().packIgniteUuid(id);
w.out().packUuid(id);

if (ver == null) {
w.out().packNil();
@@ -104,7 +104,7 @@ public CompletableFuture<List<Table>> tablesAsync() {
var res = new ArrayList<Table>(cnt);

for (int i = 0; i < cnt; i++) {
res.add(new ClientTable(ch, in.unpackIgniteUuid(), in.unpackString()));
res.add(new ClientTable(ch, in.unpackUuid(), in.unpackString()));
}

return res;
@@ -123,6 +123,6 @@ public CompletableFuture<Table> tableAsync(String name) {
Objects.requireNonNull(name);

return ch.serviceAsync(ClientOp.TABLE_GET, w -> w.out().packString(name),
r -> r.in().tryUnpackNil() ? null : new ClientTable(ch, r.in().unpackIgniteUuid(), name));
r -> r.in().tryUnpackNil() ? null : new ClientTable(ch, r.in().unpackUuid(), name));
}
}
@@ -24,10 +24,10 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.client.PayloadOutputChannel;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.NotNull;
@@ -38,14 +38,14 @@
*/
class ClientTupleSerializer {
/** Table ID. */
private final IgniteUuid tableId;
private final UUID tableId;

/**
* Constructor.
*
* @param tableId Table id.
*/
ClientTupleSerializer(IgniteUuid tableId) {
ClientTupleSerializer(UUID tableId) {
this.tableId = tableId;
}

@@ -101,7 +101,7 @@ public void writeTuple(
boolean skipHeader
) {
if (!skipHeader) {
out.out().packIgniteUuid(tableId);
out.out().packUuid(tableId);
writeTx(tx, out);
out.out().packInt(schema.version());
}
@@ -136,7 +136,7 @@ public void writeKvTuple(
boolean skipHeader
) {
if (!skipHeader) {
out.out().packIgniteUuid(tableId);
out.out().packUuid(tableId);
writeTx(tx, out);
out.out().packInt(schema.version());
}
@@ -164,7 +164,7 @@ public void writeKvTuple(
* @param out Out.
*/
public void writeKvTuples(@Nullable Transaction tx, Map<Tuple, Tuple> pairs, ClientSchema schema, PayloadOutputChannel out) {
out.out().packIgniteUuid(tableId);
out.out().packUuid(tableId);
writeTx(tx, out);
out.out().packInt(schema.version());
out.out().packInt(pairs.size());
@@ -189,7 +189,7 @@ public void writeTuples(
PayloadOutputChannel out,
boolean keyOnly
) {
out.out().packIgniteUuid(tableId);
out.out().packUuid(tableId);
writeTx(tx, out);
out.out().packInt(schema.version());
out.out().packInt(tuples.size());

0 comments on commit c44f97b

Please sign in to comment.