Skip to content
Permalink
Browse files
IGNITE-16930 .NET: Thin 3.0: Add Compute.ExecuteColocated (#809)
* Implement `ExecuteColocated` in .NET client. Send requests to default node, partition awareness will be added later (IGNITE-16930).
* To avoid extra table request on every `ExecuteColocated` call (we need table id and schemas), cache tables by name. If a table gets dropped and created again with the same name and a different id, retry the operation.
  • Loading branch information
ptupitsyn committed May 17, 2022
1 parent 8c075cd commit 32afeaaf02d8eb4d0d41f8cefb49263f52976ede
Showing 19 changed files with 395 additions and 76 deletions.
@@ -23,25 +23,33 @@ namespace Apache.Ignite.Tests.Compute
using System.Net;
using System.Threading.Tasks;
using Ignite.Compute;
using Ignite.Table;
using Internal.Network;
using Network;
using NUnit.Framework;
using Table;

/// <summary>
/// Tests <see cref="ICompute"/>.
/// </summary>
public class ComputeTests : IgniteTestsBase
{
private const string ConcatJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$ConcatJob";
private const string ItThinClientComputeTest = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest";

private const string NodeNameJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$NodeNameJob";
private const string ConcatJob = ItThinClientComputeTest + "$ConcatJob";

private const string ErrorJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$ErrorJob";
private const string NodeNameJob = ItThinClientComputeTest + "$NodeNameJob";

private const string EchoJob = "org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$EchoJob";
private const string ErrorJob = ItThinClientComputeTest + "$ErrorJob";

private const string EchoJob = ItThinClientComputeTest + "$EchoJob";

private const string PlatformTestNodeRunner = "org.apache.ignite.internal.runner.app.PlatformTestNodeRunner";

private const string CreateTableJob = PlatformTestNodeRunner + "$CreateTableJob";

private const string DropTableJob = PlatformTestNodeRunner + "$DropTableJob";

[Test]
public async Task TestGetClusterNodes()
{
@@ -177,6 +185,69 @@ async Task Test(object val, object? expected = null)
}
}

[Test]
[TestCase(1, "")]
[TestCase(2, "_2")]
[TestCase(3, "")]
[TestCase(5, "_2")]
public async Task TestExecuteColocated(int key, string nodeName)
{
var keyTuple = new IgniteTuple { [KeyCol] = key };
var resNodeName = await Client.Compute.ExecuteColocatedAsync<string>(TableName, keyTuple, NodeNameJob);

var keyPoco = new Poco { Key = key };
var resNodeName2 = await Client.Compute.ExecuteColocatedAsync<string, Poco>(TableName, keyPoco, NodeNameJob);

var expectedNodeName = PlatformTestNodeRunner + nodeName;
Assert.AreEqual(expectedNodeName, resNodeName);
Assert.AreEqual(expectedNodeName, resNodeName2);
}

[Test]
public void TestExecuteColocatedThrowsWhenTableDoesNotExist()
{
var ex = Assert.ThrowsAsync<IgniteClientException>(async () =>
await Client.Compute.ExecuteColocatedAsync<string>("unknownTable", new IgniteTuple(), EchoJob));

Assert.AreEqual("Table 'unknownTable' does not exist.", ex!.Message);
}

[Test]
public void TestExecuteColocatedThrowsWhenKeyColumnIsMissing()
{
var ex = Assert.ThrowsAsync<IgniteClientException>(async () =>
await Client.Compute.ExecuteColocatedAsync<string>(TableName, new IgniteTuple(), EchoJob));

StringAssert.Contains("Missed key column: KEY", ex!.Message);
}

[Test]
public async Task TestExecuteColocatedUpdatesTableCacheOnTableDrop()
{
// Create table and use it in ExecuteColocated.
var nodes = await GetNodeAsync(0);
var tableName = await Client.Compute.ExecuteAsync<string>(nodes, CreateTableJob, "PUB.drop-me");

try
{
var keyTuple = new IgniteTuple { [KeyCol] = 1 };
var resNodeName = await Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, NodeNameJob);

// Drop table and create a new one with a different ID, then execute a computation again.
// This should update the cached table and complete the computation successfully.
await Client.Compute.ExecuteAsync<string>(nodes, DropTableJob, tableName);
await Client.Compute.ExecuteAsync<string>(nodes, CreateTableJob, tableName);

var resNodeName2 = await Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, NodeNameJob);

Assert.AreEqual(resNodeName, resNodeName2);
}
finally
{
await Client.Compute.ExecuteAsync<string>(nodes, DropTableJob, tableName);
}
}

private async Task<List<IClusterNode>> GetNodeAsync(int index) =>
(await Client.GetClusterNodesAsync()).OrderBy(n => n.Name).Skip(index).Take(1).ToList();
}
@@ -18,7 +18,6 @@
namespace Apache.Ignite.Tests
{
using System;
using System.Linq;
using System.Threading.Tasks;
using Ignite.Table;
using Log;
@@ -30,7 +29,7 @@ namespace Apache.Ignite.Tests
/// </summary>
public class IgniteTestsBase
{
protected const string TableName = "PUB.tbl1";
protected const string TableName = "PUB.TBL1";

protected const string KeyCol = "key";

@@ -74,7 +73,7 @@ public async Task OneTimeSetUp()
[OneTimeTearDown]
public void OneTimeTearDown()
{
// ReSharper disable once ConstantConditionalAccessQualifier
// ReSharper disable once ConstantConditionalAccessQualifier, ConditionalAccessQualifierIsNonNullableAccordingToAPIContract
Client?.Dispose();

Assert.Greater(_eventListener.BuffersRented, 0);
@@ -83,10 +82,8 @@ public void OneTimeTearDown()
}

[TearDown]
public async Task TearDown()
public void TearDown()
{
await TupleView.DeleteAllAsync(null, Enumerable.Range(-5, 20).Select(x => GetTuple(x)));

Assert.AreEqual(_eventListener.BuffersReturned, _eventListener.BuffersRented);
}

@@ -15,6 +15,7 @@
* limitations under the License.
*/

// ReSharper disable MustUseReturnValue
namespace Apache.Ignite.Tests
{
using System;
@@ -30,6 +30,12 @@ namespace Apache.Ignite.Tests.Table
/// </summary>
public class RecordViewBinaryTests : IgniteTestsBase
{
[TearDown]
public async Task CleanTable()
{
await TupleView.DeleteAllAsync(null, Enumerable.Range(-1, 12).Select(x => GetTuple(x)));
}

[Test]
public async Task TestUpsertGet()
{
@@ -29,6 +29,12 @@ namespace Apache.Ignite.Tests.Table
/// </summary>
public class RecordViewPocoTests : IgniteTestsBase
{
[TearDown]
public async Task CleanTable()
{
await TupleView.DeleteAllAsync(null, Enumerable.Range(-1, 12).Select(x => GetTuple(x)));
}

[Test]
public async Task TestUpsertGet()
{
@@ -33,7 +33,7 @@ public async Task TestGetTables()
var tables = await Client.Tables.GetTablesAsync();

Assert.AreEqual(1, tables.Count);
Assert.AreEqual("PUB.TBL1", tables[0].Name);
Assert.AreEqual(TableName, tables[0].Name);
}

[Test]
@@ -42,7 +42,7 @@ public async Task TestGetExistingTable()
var table = await Client.Tables.GetTableAsync(TableName);

Assert.IsNotNull(table);
Assert.AreEqual("PUB.tbl1", table!.Name);
Assert.AreEqual(TableName, table!.Name);
}

[Test]
@@ -28,6 +28,12 @@ namespace Apache.Ignite.Tests.Transactions
/// </summary>
public class TransactionsTests : IgniteTestsBase
{
[TearDown]
public async Task CleanTable()
{
await TupleView.DeleteAllAsync(null, Enumerable.Range(1, 2).Select(x => GetTuple(x)));
}

[Test]
public async Task TestRecordViewBinaryOperations()
{
@@ -35,6 +35,11 @@ public enum ClientErrorCode
/// <summary>
/// Authentication or authorization failure.
/// </summary>
AuthFailed = 2
AuthFailed = 2,

/// <summary>
/// Table id does not exist.
/// </summary>
TableIdDoesNotExist = 3
}
}
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Compute
using System.Collections.Generic;
using System.Threading.Tasks;
using Network;
using Table;

/// <summary>
/// Ignite Compute API provides distributed job execution functionality.
@@ -36,6 +37,30 @@ public interface ICompute
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task<T> ExecuteAsync<T>(IEnumerable<IClusterNode> nodes, string jobClassName, params object[] args);

/// <summary>
/// Executes a job represented by the given class on one node where the given key is located.
/// </summary>
/// <param name="tableName">Name of the table to be used with <paramref name="key"/> to determine target node.</param>
/// <param name="key">Table key to be used to determine the target node for job execution.</param>
/// <param name="jobClassName">Java class name of the job to execute.</param>
/// <param name="args">Job arguments.</param>
/// <typeparam name="T">Job result type.</typeparam>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task<T> ExecuteColocatedAsync<T>(string tableName, IIgniteTuple key, string jobClassName, params object[] args);

/// <summary>
/// Executes a job represented by the given class on one node where the given key is located.
/// </summary>
/// <param name="tableName">Name of the table to be used with <paramref name="key"/> to determine target node.</param>
/// <param name="key">Table key to be used to determine the target node for job execution.</param>
/// <param name="jobClassName">Java class name of the job to execute.</param>
/// <param name="args">Job arguments.</param>
/// <typeparam name="T">Job result type.</typeparam>
/// <typeparam name="TKey">Key type.</typeparam>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
Task<T> ExecuteColocatedAsync<T, TKey>(string tableName, TKey key, string jobClassName, params object[] args)
where TKey : class; // TODO: Remove class constraint (IGNITE-16355)

/// <summary>
/// Executes a compute job represented by the given class on all of the specified nodes.
/// </summary>
@@ -29,9 +29,6 @@ public class IgniteClientException : Exception
/** Error code field. */
private const string ErrorCodeField = "StatusCode";

/** Error code. */
private readonly ClientErrorCode _errorCode;

/// <summary>
/// Initializes a new instance of the <see cref="IgniteClientException"/> class.
/// </summary>
@@ -70,7 +67,7 @@ public IgniteClientException(string message, Exception? cause)
public IgniteClientException(string message, Exception? cause, ClientErrorCode statusCode)
: base(message, cause)
{
_errorCode = statusCode;
ErrorCode = statusCode;
}

/// <summary>
@@ -81,9 +78,14 @@ public IgniteClientException(string message, Exception? cause, ClientErrorCode s
protected IgniteClientException(SerializationInfo info, StreamingContext ctx)
: base(info, ctx)
{
_errorCode = (ClientErrorCode) info.GetInt32(ErrorCodeField);
ErrorCode = (ClientErrorCode)info.GetInt32(ErrorCodeField);
}

/// <summary>
/// Gets the error code.
/// </summary>
public ClientErrorCode ErrorCode { get; }

/// <summary>
/// When overridden in a derived class, sets the <see cref="SerializationInfo" />
/// with information about the exception.
@@ -96,7 +98,7 @@ public override void GetObjectData(SerializationInfo info, StreamingContext cont
{
base.GetObjectData(info, context);

info.AddValue(ErrorCodeField, (int) _errorCode);
info.AddValue(ErrorCodeField, (int) ErrorCode);
}
}
}

0 comments on commit 32afeaa

Please sign in to comment.