Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.msgpack.core.MessagePack.Code;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -83,15 +84,28 @@ private static MessageSizeException overflowU32Size(int u32) {
* @param b Actual format.
* @return Exception to throw.
*/
private static MessagePackException unexpected(String expected, byte b) {
private MessagePackException unexpected(String expected, byte b) {
MessageFormat format = MessageFormat.valueOf(b);

if (format == MessageFormat.NEVER_USED) {
return new MessageNeverUsedFormatException(String.format("Expected %s, but encountered 0xC1 \"NEVER_USED\" byte", expected));
} else {
String name = format.getValueType().name();
String typeName = name.charAt(0) + name.substring(1).toLowerCase();
return new MessageTypeException(String.format("Expected %s, but got %s (%02x)", expected, typeName, b));

// Convert all bytes from the start of the buffer to the current position to a string for debugging
ByteBuf slice = buf.slice(0, buf.readerIndex());

int maxBufSliceLen = 256;
if (slice.readableBytes() > maxBufSliceLen) {
slice = slice.slice(slice.readableBytes() - maxBufSliceLen, maxBufSliceLen);
}

String bufContent = ByteBufUtil.hexDump(slice);
int problemPos = buf.readerIndex() - 1;

return new MessageTypeException(
String.format("Expected %s, but got %s (%02x) at pos %s: '%s'", expected, typeName, b, problemPos, bufContent));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,13 @@ public static void writeTxMeta(
out.packLong(tx.getTimeout());
} else if (tx.remote()) {
PendingTxPartitionEnlistment token = tx.enlistedPartition(null);
out.packString(token.primaryNodeConsistentId());
String consistentId = token.primaryNodeConsistentId();

if (consistentId == null) {
throw new IllegalStateException("Primary node consistent ID must not be null for remote transactions: " + tx);
}

out.packString(consistentId);
out.packLong(token.consistencyToken());
out.packBoolean(TxState.ABORTED == tx.state()); // No-op enlistment.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static CompletableFuture<ResponseWriter> process(

if (table != null) {
ZonePartitionId replicationGroupId = table.internalTable().targetReplicationGroupId(partId);
enlistedPartitions.computeIfAbsent(replicationGroupId, k -> new PendingTxPartitionEnlistment(null, 0))
enlistedPartitions.computeIfAbsent(replicationGroupId, k -> new PendingTxPartitionEnlistment("UNUSED", 0))
.addTableId(tableId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ private <T> CompletableFuture<T> send(
try {
ClientMessageUnpacker unpacker = fut.join();

return completedFuture(complete(payloadReader, notificationFut, unpacker));
return completedFuture(complete(payloadReader, notificationFut, unpacker, opCode));
} catch (Throwable t) {
expectedException = true;
throw sneakyThrow(ViewUtils.ensurePublicException(t));
Expand All @@ -443,7 +443,7 @@ private <T> CompletableFuture<T> send(
CompletableFuture<T> resFut = new CompletableFuture<>();

fut.handle((unpacker, err) -> {
completeAsync(payloadReader, notificationFut, unpacker, err, resFut);
completeAsync(payloadReader, notificationFut, unpacker, err, resFut, opCode);
return null;
});

Expand Down Expand Up @@ -476,7 +476,8 @@ private <T> void completeAsync(
@Nullable CompletableFuture<PayloadInputChannel> notificationFut,
ClientMessageUnpacker unpacker,
@Nullable Throwable err,
CompletableFuture<T> resFut
CompletableFuture<T> resFut,
int opCode
) {
if (err != null) {
assert unpacker == null : "unpacker must be null if err is not null";
Expand All @@ -497,7 +498,7 @@ private <T> void completeAsync(
// With handleAsync et al we can't close the unpacker in that case.
asyncContinuationExecutor.execute(() -> {
try {
resFut.complete(complete(payloadReader, notificationFut, unpacker));
resFut.complete(complete(payloadReader, notificationFut, unpacker, opCode));
} catch (Throwable t) {
resFut.completeExceptionally(ViewUtils.ensurePublicException(t));
}
Expand All @@ -516,11 +517,13 @@ private <T> void completeAsync(
* @param payloadReader Payload reader.
* @param notificationFut Notify future.
* @param unpacker Unpacked message.
* @param opCode Op code.
*/
private <T> @Nullable T complete(
@Nullable PayloadReader<T> payloadReader,
@Nullable CompletableFuture<PayloadInputChannel> notificationFut,
ClientMessageUnpacker unpacker
ClientMessageUnpacker unpacker,
int opCode
) {
try (unpacker) {
if (payloadReader != null) {
Expand All @@ -529,9 +532,10 @@ private <T> void completeAsync(

return null;
} catch (Throwable e) {
log.error("Failed to deserialize server response [remoteAddress=" + cfg.getAddress() + "]: " + e.getMessage(), e);
log.error("Failed to deserialize server response [remoteAddress=" + cfg.getAddress() + ", opCode=" + opCode + "]: "
+ e.getMessage(), e);

throw new IgniteException(PROTOCOL_ERR, "Failed to deserialize server response: " + e.getMessage(), e);
throw new IgniteException(PROTOCOL_ERR, "Failed to deserialize server response for op " + opCode + ": " + e.getMessage(), e);
}
}

Expand Down Expand Up @@ -736,7 +740,7 @@ private CompletableFuture<Object> handshakeAsync(ProtocolVersion ver) throws Ign
CompletableFuture<Object> resFut = new CompletableFuture<>();

fut.handle((unpacker, err) -> {
completeAsync(r -> handshakeRes(r.in()), null, unpacker, err, resFut);
completeAsync(r -> handshakeRes(r.in()), null, unpacker, err, resFut, -1);
return null;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ public static void readTx(
"Encountered no-op on first direct enlistment, server version upgrade is required"));
}
} else {
String consistentId = payloadChannel.in().unpackString();
long token = payloadChannel.in().unpackLong();
String consistentId = in.unpackString();
long token = in.unpackLong();

// Test if no-op enlistment.
if (payloadChannel.in().unpackBoolean()) {
if (in.unpackBoolean()) {
payloadChannel.clientChannel().inflights().removeInflight(tx.txId(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,31 +579,32 @@ public void testNonNullableColumnWithDefaultValueSetNullThrowsException() {

@Test
public void testGetNullValueThrows() {
testNullValueThrows(view -> view.get(null, DEFAULT_ID), "getNullable");
testNullValueThrows(view -> view.get(null, DEFAULT_ID), "getNullable", 12);
}

@Test
public void testGetAndPutNullValueThrows() {
testNullValueThrows(view -> view.getAndPut(null, DEFAULT_ID, DEFAULT_NAME), "getNullableAndPut");
testNullValueThrows(view -> view.getAndPut(null, DEFAULT_ID, DEFAULT_NAME), "getNullableAndPut", 16);
}

@Test
public void testGetAndRemoveNullValueThrows() {
testNullValueThrows(view -> view.getAndRemove(null, DEFAULT_ID), "getNullableAndRemove");
testNullValueThrows(view -> view.getAndRemove(null, DEFAULT_ID), "getNullableAndRemove", 32);
}

@Test
public void testGetAndReplaceNullValueThrows() {
testNullValueThrows(view -> view.getAndReplace(null, DEFAULT_ID, DEFAULT_NAME), "getNullableAndReplace");
testNullValueThrows(view -> view.getAndReplace(null, DEFAULT_ID, DEFAULT_NAME), "getNullableAndReplace", 26);
}

private void testNullValueThrows(Consumer<KeyValueView<Long, String>> run, String methodName) {
private void testNullValueThrows(Consumer<KeyValueView<Long, String>> run, String methodName, int op) {
KeyValueView<Long, String> primitiveView = defaultTable().keyValueView(Mapper.of(Long.class), Mapper.of(String.class));
primitiveView.put(null, DEFAULT_ID, null);

var ex = assertThrowsWithCause(() -> run.accept(primitiveView), UnexpectedNullValueException.class);
assertEquals(
format("Failed to deserialize server response: Got unexpected null value: use `{}` sibling method instead.", methodName),
format("Failed to deserialize server response for op {}: Got unexpected null value: use `{}` sibling method instead.",
op, methodName),
ex.getMessage());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ void testResultSetMappingColumnNameMismatch() {
IgniteException.class,
() -> client().sql().execute((Transaction) null, Mapper.of(Pojo.class), query));

assertEquals("Failed to deserialize server response: No mapped object field found for column 'FOO'", e.getMessage());
assertEquals("Failed to deserialize server response for op 50: No mapped object field found for column 'FOO'", e.getMessage());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,15 @@ public class PartitionEnlistment {
@IgniteToStringInclude
protected final Set<Integer> tableIds;

/**
* Constructs a {@code PartitionEnlistment} instance.
*
* @param primaryNodeConsistentId The consistent ID of the primary node.
* @param tableIds A set of table IDs for which the partition is enlisted.
*/
public PartitionEnlistment(String primaryNodeConsistentId, Set<Integer> tableIds) {
assert primaryNodeConsistentId != null : "Primary node consistent ID cannot be null";

this.primaryNodeConsistentId = primaryNodeConsistentId;
this.tableIds = tableIds;
}
Expand Down