Skip to content
Permalink
Browse files
IGNITE-16946 .NET: Thin client: Add AtomicLong (#10030)
* Add `IIgniteClient.GetAtomicLong` and `IAtomicLongClient` APIs.
* Use partition-aware requests.
  • Loading branch information
ptupitsyn committed May 23, 2022
1 parent c55d689 commit 9fee83a0d1669843dc697e10d1615f1fdc79ac19
Showing 17 changed files with 786 additions and 13 deletions.
@@ -214,7 +214,10 @@ public enum ClientOperation {
ATOMIC_LONG_VALUE_GET_AND_SET(9005),

/** AtomicLong.compareAndSet. */
ATOMIC_LONG_VALUE_COMPARE_AND_SET(9006);
ATOMIC_LONG_VALUE_COMPARE_AND_SET(9006),

/** AtomicLong.compareAndSetAndGet. */
ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET(9007);

/** Code. */
private final int code;
@@ -391,6 +394,7 @@ public ClientNotificationType notificationType() {
return ClientOperationType.ATOMIC_LONG_VALUE_GET_AND_SET;

case ATOMIC_LONG_VALUE_COMPARE_AND_SET:
case ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET:
return ClientOperationType.ATOMIC_LONG_VALUE_COMPARE_AND_SET;

default:
@@ -357,7 +357,6 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
else
w.writeBoolean(false);
}

}, null);
}

@@ -82,6 +82,7 @@
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongExistsRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongRemoveRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueAddAndGetRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueCompareAndSetAndGetRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueCompareAndSetRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueGetAndSetRequest;
import org.apache.ignite.internal.processors.platform.client.datastructures.ClientAtomicLongValueGetRequest;
@@ -318,6 +319,9 @@ public class ClientMessageParser implements ClientListenerMessageParser {
/** AtomicLong.compareAndSet. */
private static final short OP_ATOMIC_LONG_VALUE_COMPARE_AND_SET = 9006;

/** AtomicLong.compareAndSetAndGet. */
private static final short OP_ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET = 9007;

/** Marshaller. */
private final GridBinaryMarshaller marsh;

@@ -566,6 +570,9 @@ public ClientListenerRequest decode(BinaryReaderExImpl reader) {

case OP_ATOMIC_LONG_VALUE_COMPARE_AND_SET:
return new ClientAtomicLongValueCompareAndSetRequest(reader);

case OP_ATOMIC_LONG_VALUE_COMPARE_AND_SET_AND_GET:
return new ClientAtomicLongValueCompareAndSetAndGetRequest(reader);
}

return new ClientRawRequest(reader.readLong(), ClientStatus.INVALID_OP_CODE,
@@ -17,11 +17,11 @@

package org.apache.ignite.internal.processors.platform.client.datastructures;

import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.configuration.AtomicConfiguration;
import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongImpl;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientRequest;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -54,11 +54,11 @@ public ClientAtomicLongRequest(BinaryRawReader reader) {
* @param ctx Context.
* @return Atomic long or null.
*/
protected IgniteAtomicLong atomicLong(ClientConnectionContext ctx) {
protected GridCacheAtomicLongImpl atomicLong(ClientConnectionContext ctx) {
AtomicConfiguration cfg = groupName == null ? null : new AtomicConfiguration().setGroupName(groupName);

try {
return ctx.kernalContext().dataStructures().atomicLong(name, cfg, 0, false);
return (GridCacheAtomicLongImpl)ctx.kernalContext().dataStructures().atomicLong(name, cfg, 0, false);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e.getMessage(), e);
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.platform.client.datastructures;

import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicLongImpl;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientLongResponse;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;

/**
* Atomic long compare and set and get request.
*/
public class ClientAtomicLongValueCompareAndSetAndGetRequest extends ClientAtomicLongRequest {
/** */
private final long expected;

/** */
private final long val;

/**
* Constructor.
*
* @param reader Reader.
*/
public ClientAtomicLongValueCompareAndSetAndGetRequest(BinaryRawReader reader) {
super(reader);

expected = reader.readLong();
val = reader.readLong();
}

/** {@inheritDoc} */
@Override public ClientResponse process(ClientConnectionContext ctx) {
GridCacheAtomicLongImpl atomicLong = atomicLong(ctx);

if (atomicLong == null)
return notFoundResponse();

return new ClientLongResponse(requestId(), atomicLong.compareAndSetAndGet(expected, val));
}
}
@@ -24,7 +24,7 @@
import org.apache.ignite.internal.processors.platform.client.ClientResponse;

/**
* Atomic long get and set request.
* Atomic long compare and set request.
*/
public class ClientAtomicLongValueCompareAndSetRequest extends ClientAtomicLongRequest {
/** */
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.platform;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.ignite.Ignite;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobAdapter;
import org.apache.ignite.compute.ComputeJobResult;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* Task to get internal caches.
*/
public class PlatformGetInternalCachesTask extends ComputeTaskAdapter<Object, byte[]> {
/** {@inheritDoc} */
@NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
@Nullable Object arg) {
return Collections.singletonMap(new InternalCachesJob(), F.first(subgrid));
}

/** {@inheritDoc} */
@Override public byte[] reduce(List<ComputeJobResult> results) {
return results.get(0).getData();
}

/**
* Job.
*/
@SuppressWarnings("rawtypes")
private static class InternalCachesJob extends ComputeJobAdapter {
/** */
@SuppressWarnings("unused")
@IgniteInstanceResource
private Ignite ignite;

/** {@inheritDoc} */
@Override public byte[] execute() {
IgniteEx ign = (IgniteEx)ignite;

BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(), null);

try (BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx, new BinaryHeapOutputStream(512), null, null)) {
Collection<IgniteInternalCache<?, ?>> caches = ign.cachesx();

writer.writeInt(caches.size());

for (IgniteInternalCache c : caches) {
PlatformConfigurationUtils.writeCacheConfiguration(writer, c.configuration());
}

return writer.out().arrayCopy();
}
}
}
}
@@ -27,6 +27,7 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Client;
using Apache.Ignite.Core.Client.Cache;
using Apache.Ignite.Core.Client.DataStructures;
using Apache.Ignite.Core.Common;
using NUnit.Framework;

@@ -462,6 +463,32 @@ public void DataStreamer_PrimitiveKeyType_RequestIsRoutedToPrimaryNode(int key,
Assert.AreEqual(gridIdx, GetClientRequestGridIndex("Start", RequestNamePrefixStreamer));
}

[Test]
[TestCase("default-grp-partitioned", null, CacheMode.Partitioned, 0)]
[TestCase("default-grp-replicated", null, CacheMode.Replicated, 2)]
[TestCase("custom-grp-partitioned", "testAtomicLong", CacheMode.Partitioned, 1)]
[TestCase("custom-grp-replicated", "testAtomicLong", CacheMode.Replicated, 0)]
public void AtomicLong_RequestIsRoutedToPrimaryNode(
string name, string groupName, CacheMode cacheMode, int gridIdx)
{
var cfg = new AtomicClientConfiguration
{
GroupName = groupName,
CacheMode = cacheMode
};

var atomicLong = Client.GetAtomicLong(name, cfg, 1, true);

// Warm up.
atomicLong.Read();
ClearLoggers();

// Test.
atomicLong.Read();

Assert.AreEqual(gridIdx, GetClientRequestGridIndex("ValueGet", "datastructures.ClientAtomicLong"));
}

protected override IgniteClientConfiguration GetClientConfiguration()
{
var cfg = base.GetClientConfiguration();

0 comments on commit 9fee83a

Please sign in to comment.