Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,8 @@ public void testDropThinConnectionsOnRemove() throws Exception {
"--typeName", "Type0",
"--out", typeFile.toString()));

// Executes command to check disconnect / reconnect.
GridTestUtils.assertThrows(log, () ->
cli.createCache(new ClientCacheConfiguration().setName("test")),
Exception.class, null);
// Executes command to reconnect.
cli.createCache(new ClientCacheConfiguration().setName("test"));

createType(cli.binary(), "Type0", "str");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public <T> T service(
) throws ClientException, ClientError {
ClientConnectionException failure = null;

for (int i = 0; i < channels.length; i++) {
for (int i = 0; i <= channels.length; i++) {
ClientChannel ch = null;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,14 +530,8 @@ private void writeCacheInfo(PayloadOutputChannel payloadCh) {
flags |= WITH_EXPIRY_POLICY_FLAG_MASK;
}

if (tx != null) {
if (tx.clientChannel() != payloadCh.clientChannel()) {
throw new ClientException("Transaction context has been lost due to connection errors. " +
"Cache operations are prohibited until current transaction closed.");
}

if (tx != null)
flags |= TRANSACTIONAL_FLAG_MASK;
}

out.writeByte(flags);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.ignite.internal.client.thin;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientTransaction;
import org.apache.ignite.client.ClientTransactions;
Expand All @@ -42,14 +39,8 @@ class TcpClientTransactions implements ClientTransactions {
/** Marshaller. */
private final ClientBinaryMarshaller marsh;

/** Tx counter (used to generate tx UID). */
private final AtomicLong txCnt = new AtomicLong();

/** Current thread transaction UID. */
private final ThreadLocal<Long> threadLocTxUid = new ThreadLocal<>();

/** Tx map (Tx UID to Tx). */
private final Map<Long, TcpClientTransaction> txMap = new ConcurrentHashMap<>();
private final ThreadLocal<TcpClientTransaction> threadLocTxUid = new ThreadLocal<>();

/** Tx config. */
private final ClientTransactionConfiguration txCfg;
Expand Down Expand Up @@ -108,9 +99,7 @@ private ClientTransaction txStart0(TransactionConcurrency concurrency, Transacti
res -> new TcpClientTransaction(res.in().readInt(), res.clientChannel())
);

threadLocTxUid.set(tx0.txUid);

txMap.put(tx0.txUid, tx0);
threadLocTxUid.set(tx0);

return tx0;
}
Expand All @@ -126,13 +115,11 @@ private ClientTransaction txStart0(TransactionConcurrency concurrency, Transacti
* Current thread transaction.
*/
TcpClientTransaction tx() {
Long txUid = threadLocTxUid.get();
TcpClientTransaction tx0 = threadLocTxUid.get();

if (txUid == null)
if (tx0 == null)
return null;

TcpClientTransaction tx0 = txMap.get(txUid);

// Also check isClosed() flag, since transaction can be closed by another thread.
return tx0 == null || tx0.isClosed() ? null : tx0;
}
Expand Down Expand Up @@ -183,9 +170,6 @@ private class ClientTransactionsWithLabel implements ClientTransactions {
*
*/
class TcpClientTransaction implements ClientTransaction {
/** Unique client-side transaction id. */
private final long txUid;

/** Server-side transaction id. */
private final int txId;

Expand All @@ -200,19 +184,18 @@ class TcpClientTransaction implements ClientTransaction {
* @param clientCh Client channel.
*/
private TcpClientTransaction(int id, ClientChannel clientCh) {
txUid = txCnt.incrementAndGet();
txId = id;
this.clientCh = clientCh;
}

/** {@inheritDoc} */
@Override public void commit() {
Long threadTxUid;
TcpClientTransaction threadTxUid;

if (closed || (threadTxUid = threadLocTxUid.get()) == null)
throw new ClientException("The transaction is already closed");

if (txUid != threadTxUid)
if (txId != threadTxUid.txId)
throw new ClientException("You can commit transaction only from the thread it was started");

endTx(true);
Expand Down Expand Up @@ -248,14 +231,9 @@ private void endTx(boolean committed) {
}, null);
}
finally {
txMap.remove(txUid);

closed = true;

Long threadTxUid = threadLocTxUid.get();

if (threadTxUid != null && txUid == threadTxUid)
threadLocTxUid.set(null);
threadLocTxUid.set(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte
/** Connection-related metadata key. */
public static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();

/** Previous session metadata key. */
public static final int CONN_CTX_META_KEY_PREV = GridNioSessionMetaKey.nextUniqueKey();

/** Next connection id. */
private static AtomicInteger nextConnId = new AtomicInteger(1);

Expand All @@ -90,6 +93,9 @@ public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte
/** Thin client configuration. */
private final ThinClientConfiguration thinCfg;

/** Previous session context. */
private volatile ClientListenerConnectionContext storedCtx;

/**
* Constructor.
*
Expand Down Expand Up @@ -145,17 +151,29 @@ public ClientListenerNioListener(GridKernalContext ctx, GridSpinBusyLock busyLoc
ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY);

if (connCtx == null) {
try {
onHandshake(ses, msg);
}
catch (Exception e) {
U.error(log, "Failed to handle handshake request " +
"(probably, connection has already been closed).", e);
}
synchronized (this) {
connCtx = ses.meta(CONN_CTX_META_KEY);

return;
if (connCtx == null) {
try {
ses.addMeta(CONN_CTX_META_KEY_PREV, storedCtx);

onHandshake(ses, msg);

storedCtx = null;
}
catch (Exception e) {
U.error(log, "Failed to handle handshake request " +
"(probably, connection has already been closed).", e);
}

return;
}
}
}

storedCtx = connCtx;

ClientListenerMessageParser parser = connCtx.parser();
ClientListenerRequestHandler handler = connCtx.handler();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
import org.apache.ignite.internal.processors.platform.client.tx.ClientTxContext;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.CONN_CTX_META_KEY_PREV;
import static org.apache.ignite.internal.processors.platform.client.ClientBitmaskFeature.USER_ATTRIBUTES;
import static org.apache.ignite.internal.processors.platform.client.ClientProtocolVersionFeature.AUTHORIZATION;
import static org.apache.ignite.internal.processors.platform.client.ClientProtocolVersionFeature.BITMAP_FEATURES;
Expand Down Expand Up @@ -219,6 +221,16 @@ public ClientProtocolContext currentProtocolContext() {
handler = new ClientRequestHandler(this, authCtx, currentProtocolContext);
parser = new ClientMessageParser(this, currentProtocolContext);

@Nullable Object connCtx0 = ses.meta(CONN_CTX_META_KEY_PREV);

assert connCtx0 == null || connCtx0 instanceof ClientConnectionContext;

if (connCtx0 != null) {
ClientConnectionContext connCtx = (ClientConnectionContext)connCtx0;

txIdSeq.set(connCtx.lastTxId() + 1);
}

this.ses = ses;
}

Expand Down Expand Up @@ -288,6 +300,7 @@ public ClientAffinityTopologyVersion checkAffinityTopologyVersion() {

/**
* Next transaction id for this connection.
* txId == 0 - reserved.
*/
public int nextTxId() {
int txId = txIdSeq.incrementAndGet();
Expand Down Expand Up @@ -380,4 +393,11 @@ public void decrementActiveTasksCount() {

assert cnt >= 0 : "Unexpected active tasks count: " + cnt;
}

/**
* @return Tx id.
*/
public int lastTxId() {
return txIdSeq.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -690,18 +690,9 @@ public void testTransactions() throws Exception {
ClientListenerProcessor.class, ClientProcessorMXBean.class);

try (ClientTransaction tx = client.transactions().txStart()) {
cache.put(1, "value6");

mxBean.dropAllConnections();

try {
cache.put(1, "value7");

fail();
}
catch (ClientException expected) {
// No-op.
}
cache.put(1, "value6");

// Start new transaction doesn't recover cache operations on failed channel.
try (ClientTransaction tx1 = client.transactions().txStart()) {
Expand All @@ -711,29 +702,17 @@ public void testTransactions() throws Exception {
// No-op.
}

try {
cache.get(1);

fail();
}
catch (ClientException expected) {
// No-op.
}
assertEquals("value6", cache.get(1));

// Close outdated transaction doesn't recover cache operations on failed channel.
tx0.close();

try {
cache.get(1);

fail();
}
catch (ClientException expected) {
// No-op.
}
assertEquals("value6", cache.get(1));
}

assertEquals("value5", cache.get(1));
assertEquals("value6", cache.get(1));

cache.put(1, "value5");

// Test concurrent transactions in different connections.
try (IgniteClient client1 = Ignition.startClient(getClientConfiguration())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testFailover() throws Exception {

Throwable[] suppressed = ex.getSuppressed();

assertEquals(suppressed.length, CLUSTER_SIZE - 1);
assertEquals(CLUSTER_SIZE, suppressed.length);

assertTrue(Stream.of(suppressed).allMatch(t -> t instanceof ClientConnectionException));
}
Expand Down Expand Up @@ -202,7 +202,6 @@ public void testQueryConsistencyOnFailover() throws Exception {
* Test that client works properly with servers txId intersection.
*/
@Test
@SuppressWarnings("ThrowableNotThrown")
public void testTxWithIdIntersection() throws Exception {
int CLUSTER_SIZE = 2;

Expand Down Expand Up @@ -230,6 +229,8 @@ public void testTxWithIdIntersection() throws Exception {
// Another thread puts to cache here.
barrier.await(1, TimeUnit.SECONDS);

cache.put(1, 0);

tx.commit();

barrier.await(1, TimeUnit.SECONDS);
Expand All @@ -247,11 +248,7 @@ public void testTxWithIdIntersection() throws Exception {
// same transaction id as we started in this thread.
barrier.await(1, TimeUnit.SECONDS);

GridTestUtils.assertThrows(null, () -> {
cache.put(0, 0);

return null;
}, ClientException.class, "Transaction context has been lost due to connection errors");
cache.put(0, 0);

tx.close();

Expand All @@ -260,7 +257,9 @@ public void testTxWithIdIntersection() throws Exception {
// Another thread commit transaction here.
barrier.await(1, TimeUnit.SECONDS);

assertFalse(cache.containsKey(0));
assertEquals((int)cache.get(0), 0);

assertEquals((int)cache.get(1), 0);
}
}

Expand All @@ -281,11 +280,16 @@ public void testReconnectionThrottling() throws Exception {
) {
ClientCache<Integer, Integer> cache = client.createCache("cache");

// Fill throttling periods array.
for (int i = 0; i < throttlingRetries; i++) {
// Attempts to reconnect within throttlingRetries should pass.
cache.put(0, 0);

dropAllThinClientConnections(Ignition.allGrids().get(0));
}

for (int i = 0; i < throttlingRetries; i++) {
// Attempts to reconnect within throttlingRetries should pass.
dropAllThinClientConnections(Ignition.allGrids().get(0));

GridTestUtils.assertThrowsWithCause(() -> cache.put(0, 0), ClientConnectionException.class);
}
Expand Down