Skip to content
Permalink
Browse files
IGNITE-16983 Java thin: Add AtomicLong partition awareness (#10024)
`AtomicLong` is just a cache entry underneath, where `GridCacheInternalKeyImpl` is the key. Use partition awareness to send requests to the primary node and improve performance.
  • Loading branch information
ptupitsyn committed May 16, 2022
1 parent 6d021ee commit b2c6f413b3215d1000ffa19e665ad70e02cd8f8b
Showing 4 changed files with 81 additions and 11 deletions.
@@ -21,6 +21,7 @@
import org.apache.ignite.client.ClientAtomicLong;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.jetbrains.annotations.Nullable;

/**
@@ -36,6 +37,9 @@ public class ClientAtomicLongImpl implements ClientAtomicLong {
/** */
private final ReliableChannel ch;

/** Cache id. */
private final int cacheId;

/**
* Constructor.
*
@@ -48,6 +52,9 @@ public ClientAtomicLongImpl(String name, @Nullable String groupName, ReliableCha
this.name = name;
this.groupName = groupName;
this.ch = ch;

String groupNameInternal = groupName == null ? DataStructuresProcessor.DEFAULT_DS_GROUP_NAME : groupName;
cacheId = ClientUtils.cacheId(DataStructuresProcessor.ATOMICS_CACHE_NAME + "@" + groupNameInternal);
}

/** {@inheritDoc} */
@@ -57,7 +64,7 @@ public ClientAtomicLongImpl(String name, @Nullable String groupName, ReliableCha

/** {@inheritDoc} */
@Override public long get() throws IgniteException {
return ch.service(ClientOperation.ATOMIC_LONG_VALUE_GET, this::writeName, in -> in.in().readLong());
return ch.affinityService(cacheId, affinityKey(), ClientOperation.ATOMIC_LONG_VALUE_GET, this::writeName, in -> in.in().readLong());
}

/** {@inheritDoc} */
@@ -72,7 +79,7 @@ public ClientAtomicLongImpl(String name, @Nullable String groupName, ReliableCha

/** {@inheritDoc} */
@Override public long addAndGet(long l) throws IgniteException {
return ch.service(ClientOperation.ATOMIC_LONG_VALUE_ADD_AND_GET, out -> {
return ch.affinityService(cacheId, affinityKey(), ClientOperation.ATOMIC_LONG_VALUE_ADD_AND_GET, out -> {
writeName(out);
out.out().writeLong(l);
}, in -> in.in().readLong());
@@ -95,15 +102,15 @@ public ClientAtomicLongImpl(String name, @Nullable String groupName, ReliableCha

/** {@inheritDoc} */
@Override public long getAndSet(long l) throws IgniteException {
return ch.service(ClientOperation.ATOMIC_LONG_VALUE_GET_AND_SET, out -> {
return ch.affinityService(cacheId, affinityKey(), ClientOperation.ATOMIC_LONG_VALUE_GET_AND_SET, out -> {
writeName(out);
out.out().writeLong(l);
}, in -> in.in().readLong());
}

/** {@inheritDoc} */
@Override public boolean compareAndSet(long expVal, long newVal) throws IgniteException {
return ch.service(ClientOperation.ATOMIC_LONG_VALUE_COMPARE_AND_SET, out -> {
return ch.affinityService(cacheId, affinityKey(), ClientOperation.ATOMIC_LONG_VALUE_COMPARE_AND_SET, out -> {
writeName(out);
out.out().writeLong(expVal);
out.out().writeLong(newVal);
@@ -112,12 +119,13 @@ public ClientAtomicLongImpl(String name, @Nullable String groupName, ReliableCha

/** {@inheritDoc} */
@Override public boolean removed() {
return ch.service(ClientOperation.ATOMIC_LONG_EXISTS, this::writeName, in -> !in.in().readBoolean());
return ch.affinityService(cacheId, affinityKey(), ClientOperation.ATOMIC_LONG_EXISTS, this::writeName,
in -> !in.in().readBoolean());
}

/** {@inheritDoc} */
@Override public void close() {
ch.service(ClientOperation.ATOMIC_LONG_REMOVE, this::writeName, null);
ch.affinityService(cacheId, affinityKey(), ClientOperation.ATOMIC_LONG_REMOVE, this::writeName, null);
}

/**
@@ -131,4 +139,14 @@ private void writeName(PayloadOutputChannel out) {
w.writeString(groupName);
}
}

/**
* Gets the affinity key for this data structure.
*
* @return Affinity key.
*/
private String affinityKey() {
// GridCacheInternalKeyImpl uses name as AffinityKeyMapped.
return name;
}
}
@@ -25,6 +25,7 @@
import org.apache.ignite.client.ClientAtomicLong;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.junit.Test;
import static org.apache.ignite.testframework.GridTestUtils.assertContains;
@@ -41,6 +42,11 @@ public class AtomicLongTest extends AbstractThinClientTest {
startGrids(1);
}

/** {@inheritDoc} */
@Override protected ClientConfiguration getClientConfiguration() {
return super.getClientConfiguration().setPartitionAwarenessEnabled(true);
}

/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
stopAllGrids();
@@ -209,19 +215,23 @@ public void testCustomConfigurationPropagatesToServer() {
try (IgniteClient client = startClient(0)) {
client.atomicLong(name, cfg1, 1, true);
client.atomicLong(name, cfg2, 2, true);
client.atomicLong(name, 3, true);
}

List<IgniteInternalCache<?, ?>> caches = new ArrayList<>(grid(0).cachesx());
assertEquals(3, caches.size());
assertEquals(4, caches.size());

IgniteInternalCache<?, ?> partitionedCache = caches.get(1);
IgniteInternalCache<?, ?> replicatedCache = caches.get(2);
IgniteInternalCache<?, ?> defaultCache = caches.get(3);

assertEquals("ignite-sys-atomic-cache@atomic-long-group-partitioned", partitionedCache.name());
assertEquals("ignite-sys-atomic-cache@atomic-long-group-replicated", replicatedCache.name());
assertEquals("ignite-sys-atomic-cache@default-ds-group", defaultCache.name());

assertEquals(2, partitionedCache.configuration().getBackups());
assertEquals(Integer.MAX_VALUE, replicatedCache.configuration().getBackups());
assertEquals(1, defaultCache.configuration().getBackups());
}

/**
@@ -148,11 +148,11 @@ protected void assertOpOnChannel(TestTcpClientChannel expCh, ClientOperation exp

assertNotNull("Unexpected (null) next operation [expCh=" + expCh + ", expOp=" + expOp + ']', nextChOp);

assertEquals("Unexpected channel for opertation [expCh=" + expCh + ", expOp=" + expOp +
", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1());

assertEquals("Unexpected operation on channel [expCh=" + expCh + ", expOp=" + expOp +
", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2());
", nextOpCh=" + nextChOp + ']', expOp, nextChOp.get2());

assertEquals("Unexpected channel for operation [expCh=" + expCh + ", expOp=" + expOp +
", nextOpCh=" + nextChOp + ']', expCh, nextChOp.get1());
}

/**
@@ -20,9 +20,14 @@
import java.util.function.Function;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.client.ClientAtomicConfiguration;
import org.apache.ignite.client.ClientAtomicLong;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongEx;
import org.junit.Test;

/**
@@ -172,6 +177,43 @@ public void testScanQuery() throws IgniteCheckedException {
}
}

/**
* Test atomic long.
*/
@Test
public void testAtomicLong() {
testAtomicLong("default-grp-partitioned", null, CacheMode.PARTITIONED);
testAtomicLong("default-grp-replicated", null, CacheMode.REPLICATED);
testAtomicLong("custom-grp-partitioned", "testAtomicLong", CacheMode.PARTITIONED);
testAtomicLong("custom-grp-replicated", "testAtomicLong", CacheMode.REPLICATED);
}

/**
* Test atomic long.
*/
private void testAtomicLong(String name, String grpName, CacheMode cacheMode) {
ClientAtomicConfiguration cfg = new ClientAtomicConfiguration()
.setGroupName(grpName)
.setCacheMode(cacheMode);

ClientAtomicLong clientAtomicLong = client.atomicLong(name, cfg, 1, true);
GridCacheAtomicLongEx serverAtomicLong = (GridCacheAtomicLongEx)grid(0).atomicLong(
name, new AtomicConfiguration().setGroupName(grpName), 0, false);

String cacheName = "ignite-sys-atomic-cache@" + (grpName == null ? "default-ds-group" : grpName);
IgniteInternalCache<Object, Object> cache = grid(0).context().cache().cache(cacheName);

// Warm up.
clientAtomicLong.get();
opsQueue.clear();

// Test.
clientAtomicLong.get();
TestTcpClientChannel opCh = affinityChannel(serverAtomicLong.key(), cache);

assertOpOnChannel(opCh, ClientOperation.ATOMIC_LONG_VALUE_GET);
}

/**
* @param cacheName Cache name.
*/

0 comments on commit b2c6f41

Please sign in to comment.