Skip to content

Commit

Permalink
IGNITE-19710 .NET: Add Data Streamer schema synchronization (#2548)
Browse files Browse the repository at this point in the history
Add support for schema changes in `DataStreamer`:
* Retry outdated schema error
* Retry unmapped columns error
* Keep original rows (tuples or pocos) along with the serialized data, re-serialize on retry using new schema
* Perf: `DataStreamerBenchmark` results not affected (pooled arrays don't increase memory usage, and the overhead to fill them is not noticeable)
  • Loading branch information
ptupitsyn committed Sep 5, 2023
1 parent 91dbe86 commit 5ac19cf
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 74 deletions.
4 changes: 2 additions & 2 deletions modules/platforms/dotnet/Apache.Ignite.Benchmarks/Program.cs
Expand Up @@ -18,9 +18,9 @@
namespace Apache.Ignite.Benchmarks;

using BenchmarkDotNet.Running;
using Table.Serialization;
using Table;

internal static class Program
{
private static void Main() => BenchmarkRunner.Run<SerializerHandlerReadBenchmarks>();
private static void Main() => BenchmarkRunner.Run<DataStreamerBenchmark>();
}
Expand Up @@ -82,7 +82,7 @@ public async Task GlobalSetup()

_client = await IgniteClient.StartAsync(cfg);
_table = (await _client.Tables.GetTableAsync(FakeServer.ExistingTableName))!;
_data = Enumerable.Range(1, 100_000).Select(x => new IgniteTuple { ["id"] = x, ["name"] = "name " + x }).ToList();
_data = Enumerable.Range(1, 100_000).Select(x => new IgniteTuple { ["id"] = x }).ToList();
}

[GlobalCleanup]
Expand Down
Expand Up @@ -18,6 +18,7 @@
namespace Apache.Ignite.Tests.Table;

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
Expand Down Expand Up @@ -335,6 +336,73 @@ public async Task TestClientUsesLatestSchemaOnReadKv()
Assert.AreEqual("name1", res.Value);
}

[Test]
public async Task TestSchemaUpdateWhileStreaming([Values(true, false)] bool insertNewColumn)
{
await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (KEY bigint PRIMARY KEY)");

var table = await Client.Tables.GetTableAsync(TestTableName);
var view = table!.RecordBinaryView;

var options = DataStreamerOptions.Default with { BatchSize = 2 };
await view.StreamDataAsync(GetData(), options);

// Inserted with old schema.
var res1 = await view.GetAsync(null, GetTuple(1));
Assert.AreEqual("FOO", res1.Value["VAL"]);

// Inserted with new schema.
var res2 = await view.GetAsync(null, GetTuple(19));
Assert.AreEqual(insertNewColumn ? "BAR_19" : "FOO", res2.Value["VAL"]);

async IAsyncEnumerable<IIgniteTuple> GetData()
{
// First set of batches uses old schema.
for (int i = 0; i < 10; i++)
{
yield return GetTuple(i);
}

// Update schema.
// New schema has a new column with a default value, so it is not required to provide it in the streamed data.
await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD COLUMN VAL varchar DEFAULT 'FOO'");
await WaitForNewSchemaOnAllNodes(TestTableName, 2);

for (int i = 10; i < 20; i++)
{
yield return insertNewColumn ? GetTuple(i, "BAR_" + i) : GetTuple(i);
}
}
}

[Test]
public async Task TestSchemaUpdateBeforeStreaming()
{
await Client.Sql.ExecuteAsync(null, $"CREATE TABLE {TestTableName} (KEY bigint PRIMARY KEY)");

var table = await Client.Tables.GetTableAsync(TestTableName);
var view = table!.RecordBinaryView;

// Insert some data - old schema gets cached.
await view.InsertAsync(null, GetTuple(-1));

// Update schema.
await Client.Sql.ExecuteAsync(null, $"ALTER TABLE {TestTableName} ADD COLUMN VAL varchar DEFAULT 'FOO'");
await WaitForNewSchemaOnAllNodes(TestTableName, 2);

// Stream data with new schema. Client does not yet know about the new schema,
// but unmapped column exception will trigger schema reload.
await view.StreamDataAsync(new[] { GetTuple(1, "BAR") }.ToAsyncEnumerable());

// Inserted with old schema.
var res1 = await view.GetAsync(null, GetTuple(-1));
Assert.AreEqual("FOO", res1.Value["VAL"]);

// Inserted with new schema.
var res2 = await view.GetAsync(null, GetTuple(1));
Assert.AreEqual("BAR", res2.Value["VAL"]);
}

private async Task WaitForNewSchemaOnAllNodes(string tableName, int schemaVer, int timeoutMs = 5000)
{
// TODO IGNITE-18733, IGNITE-18449: remove this workaround when issues are fixed.
Expand Down
Expand Up @@ -87,15 +87,16 @@ public Memory<byte> GetWrittenMemory()
public void Advance(int count)
{
Debug.Assert(count >= 0, "count >= 0");

if (_index > _buffer.Length - count)
{
throw new InvalidOperationException("Can't advance past buffer limit.");
}
Debug.Assert(_index + count < _buffer.Length, "_index + count < _buffer.Length");

_index += count;
}

/// <summary>
/// Resets the buffer to the initial state.
/// </summary>
public void Reset() => _index = _prefixSize;

/// <summary>
/// Gets a span for writing.
/// </summary>
Expand All @@ -122,24 +123,6 @@ public Span<byte> GetSpanAndAdvance(int size)
return span;
}

/// <summary>
/// Gets a span for writing at the specified position.
/// </summary>
/// <param name="position">Position.</param>
/// <param name="size">Size.</param>
/// <returns>Span for writing.</returns>
public Span<byte> GetSpan(int position, int size)
{
var overflow = _prefixSize + position + size - _index;

if (overflow > 0)
{
CheckAndResizeBuffer(overflow);
}

return _buffer.AsSpan(_prefixSize + position, size);
}

/// <inheritdoc />
public void Dispose()
{
Expand Down

0 comments on commit 5ac19cf

Please sign in to comment.