From 2b127b48920234ea21e640d701c08de0b9079590 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 14 Oct 2025 17:02:00 +0200 Subject: [PATCH] Allow passing a custom script and params if hash matches to scripted updates --- .../ElasticsearchChannelStatics.cs | 15 +- .../Indices/IndexChannelOptions.cs | 47 +++- .../Serialization/BulkRequestDataFactory.cs | 54 ++++- .../Elastic.Ingest.Transport.csproj | 2 +- .../CustomScriptHashIngestionTests.cs | 222 ++++++++++++++++++ .../ScriptedHashUpdateIngestionTests.cs | 2 +- .../TestDocument.cs | 6 + .../Elastic.Ingest.Elasticsearch.Tests.csproj | 2 +- 8 files changed, 334 insertions(+), 16 deletions(-) create mode 100644 tests/Elastic.Ingest.Elasticsearch.IntegrationTests/CustomScriptHashIngestionTests.cs diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelStatics.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelStatics.cs index adc5a56..1e4457e 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelStatics.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelStatics.cs @@ -18,12 +18,19 @@ internal static class ElasticsearchChannelStatics public static readonly byte[] DocUpdateHeaderEnd = " }"u8.ToArray(); public static readonly byte[] ScriptedHashUpsertStart = - "{ \"scripted_upsert\": true, \"upsert\": {}, \"script\": { \"source\": \"if (ctx.op != 'create') { if (ctx._source."u8.ToArray(); + "{ \"scripted_upsert\": true, \"upsert\": {}, \"script\": { \"source\": \"if (ctx._source."u8.ToArray(); - public static readonly byte[] ScriptedHashUpsertMiddle = - " == params.hash ) { ctx.op = 'noop' } } ctx._source = params.doc\", \"params\": { \"hash\": "u8.ToArray(); + public static readonly byte[] ScriptedHashUpsertAfterIfCheck = " == params.hash ) { "u8.ToArray(); - public static readonly byte[] ScriptedHashUpsertDocPreamble = + public static readonly byte[] ScriptedHashUpdateScript = "ctx.op = 'noop'"u8.ToArray(); + + public static readonly byte[] ScriptedHashParamComma = ", "u8.ToArray(); + public static readonly byte[] ScriptedHashKeySeparator = ": "u8.ToArray(); + + public static readonly byte[] ScriptedHashAfterIfCheckOp = + " } else { ctx._source = params.doc } \", \"params\": { \"hash\": "u8.ToArray(); + + public static readonly byte[] ScriptHashDocAsParameter = ", \"doc\":"u8.ToArray(); public static readonly byte[] ScriptedHashUpsertEnd = " } } }"u8.ToArray(); diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs index 0bfbb28..99da22f 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs @@ -2,6 +2,8 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information using System; +using System.Collections; +using System.Collections.Generic; using System.Security.Cryptography; using System.Text; using Elastic.Transport; @@ -11,10 +13,33 @@ namespace Elastic.Ingest.Elasticsearch.Indices; /// /// For scripted hash bulk upserts returns per operation the field and the hash to use for the scripted upsert /// -/// The field to check the previous hash against -/// The current hash of the document -public record HashedBulkUpdate(string Field, string Hash) +public record HashedBulkUpdate { + /// + /// For scripted hash bulk upserts returns per operation the field and the hash to use for the scripted upsert + /// + /// The field to check the previous hash against + /// The current hash of the document + /// The update script + /// + public HashedBulkUpdate(string field, string hash, string? updateScript, IDictionary? parameters) + : this(field, hash) + { + UpdateScript = updateScript; + Parameters = parameters; + } + + /// + /// For scripted hash bulk upserts returns per operation the field and the hash to use for the scripted upsert + /// + /// The field to check the previous hash against + /// The current hash of the document + public HashedBulkUpdate(string field, string hash) + { + Field = field; + Hash = hash; + } + /// /// A short SHA256 hash of the provided /// @@ -23,13 +48,25 @@ public record HashedBulkUpdate(string Field, string Hash) public static string CreateHash(params string[] components) { #if NET8_0_OR_GREATER - return Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(string.Join("", components))))[..8]; + return Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(string.Join("", components))))[..16].ToLowerInvariant(); #else var sha = SHA256.Create(); var hash = sha.ComputeHash(Encoding.UTF8.GetBytes(string.Join("", components))); - return BitConverter.ToString(hash).Replace("-", "")[..8]; + return BitConverter.ToString(hash).Replace("-", "")[..16].ToLowerInvariant(); #endif } + + /// The field to check the previous hash against + public string Field { get; init; } + + /// The current hash of the document + public string Hash { get; init; } + + /// Optional update script if hashes match defaults to '' + public string? UpdateScript { get; init; } + + /// Optional additional parameters for + public IDictionary? Parameters { get; init; } }; /// diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs index 31946fb..e2204a3 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs @@ -62,11 +62,36 @@ public static ReadOnlyMemory GetBytes(ArraySegment page, var field = Encoding.UTF8.GetBytes(hashUpdate.UpdateInformation.Field); bufferWriter.Write(field); writer.Reset(); - bufferWriter.Write(ScriptedHashUpsertMiddle); + bufferWriter.Write(ScriptedHashUpsertAfterIfCheck); writer.Reset(); + + bufferWriter.Write(ScriptedHashUpdateScript); + writer.Reset(); + if (hashUpdate.UpdateInformation.UpdateScript is not null) + { + bufferWriter.Write(Encoding.UTF8.GetBytes(hashUpdate.UpdateInformation.UpdateScript)); + writer.Reset(); + } + else + { + bufferWriter.Write(ScriptedHashAfterIfCheckOp); + writer.Reset(); + } var hash = hashUpdate.UpdateInformation.Hash; JsonSerializer.Serialize(writer, hash, options.SerializerOptions); - bufferWriter.Write(ScriptedHashUpsertDocPreamble); + + if (hashUpdate.UpdateInformation.Parameters is not null) + foreach (var (key, value) in hashUpdate.UpdateInformation.Parameters) + { + bufferWriter.Write(ScriptedHashParamComma); + writer.Reset(); + JsonSerializer.Serialize(writer, key, options.SerializerOptions); + bufferWriter.Write(ScriptedHashKeySeparator); + writer.Reset(); + JsonSerializer.Serialize(writer, value, options.SerializerOptions); + } + + bufferWriter.Write(ScriptHashDocAsParameter); writer.Reset(); } @@ -137,10 +162,31 @@ await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), await stream.WriteAsync(ScriptedHashUpsertStart, 0, ScriptedHashUpsertStart.Length, ctx).ConfigureAwait(false); var field = Encoding.UTF8.GetBytes(hashUpdate.UpdateInformation.Field); await stream.WriteAsync(field, 0, field.Length, ctx).ConfigureAwait(false); - await stream.WriteAsync(ScriptedHashUpsertMiddle, 0, ScriptedHashUpsertMiddle.Length, ctx).ConfigureAwait(false); + await stream.WriteAsync(ScriptedHashUpsertAfterIfCheck, 0, ScriptedHashUpsertAfterIfCheck.Length, ctx).ConfigureAwait(false); + + if (hashUpdate.UpdateInformation.UpdateScript is { } script && !string.IsNullOrWhiteSpace(script)) + { + var bytes = Encoding.UTF8.GetBytes(script); + await stream.WriteAsync(bytes, 0, bytes.Length, ctx).ConfigureAwait(false); + } + else + await stream.WriteAsync(ScriptedHashUpdateScript, 0, ScriptedHashUpdateScript.Length, ctx).ConfigureAwait(false); + + await stream.WriteAsync(ScriptedHashAfterIfCheckOp, 0, ScriptedHashAfterIfCheckOp.Length, ctx).ConfigureAwait(false); + var hash = hashUpdate.UpdateInformation.Hash; await JsonSerializer.SerializeAsync(stream, hash, options.SerializerOptions, ctx).ConfigureAwait(false); - await stream.WriteAsync(ScriptedHashUpsertDocPreamble, 0, ScriptedHashUpsertDocPreamble.Length, ctx).ConfigureAwait(false); + + if (hashUpdate.UpdateInformation.Parameters is { } parameters) + foreach (var kv in parameters) + { + await stream.WriteAsync(ScriptedHashParamComma, 0, ScriptedHashParamComma.Length, ctx ).ConfigureAwait(false); + await JsonSerializer.SerializeAsync(stream, kv.Key, options.SerializerOptions, ctx).ConfigureAwait(false); + await stream.WriteAsync(ScriptedHashKeySeparator, 0, ScriptedHashKeySeparator.Length, ctx ).ConfigureAwait(false); + await JsonSerializer.SerializeAsync(stream, kv.Value, options.SerializerOptions, ctx).ConfigureAwait(false); + } + + await stream.WriteAsync(ScriptHashDocAsParameter, 0, ScriptHashDocAsParameter.Length, ctx).ConfigureAwait(false); } if (options.EventWriter?.WriteToStreamAsync != null) diff --git a/src/Elastic.Ingest.Transport/Elastic.Ingest.Transport.csproj b/src/Elastic.Ingest.Transport/Elastic.Ingest.Transport.csproj index 7bd2884..bf8a93b 100644 --- a/src/Elastic.Ingest.Transport/Elastic.Ingest.Transport.csproj +++ b/src/Elastic.Ingest.Transport/Elastic.Ingest.Transport.csproj @@ -15,7 +15,7 @@ - + diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/CustomScriptHashIngestionTests.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/CustomScriptHashIngestionTests.cs new file mode 100644 index 0000000..5a08f43 --- /dev/null +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/CustomScriptHashIngestionTests.cs @@ -0,0 +1,222 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Channels; +using Elastic.Clients.Elasticsearch.IndexManagement; +using Elastic.Ingest.Elasticsearch.Catalog; +using Elastic.Ingest.Elasticsearch.Indices; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Ingest.Elasticsearch.IntegrationTests; + +public class CustomScriptHashIngestionTests + : IntegrationTestBase, IAsyncLifetime +{ + public CustomScriptHashIngestionTests(IngestionCluster cluster, ITestOutputHelper output) : base(cluster, output) + { + IndexBatchDate = DateTimeOffset.UtcNow; + IndexBatchDateUpdate = IndexBatchDate.AddHours(1); + IndexPrefix = "scripted-data-"; + Slim = new CountdownEvent(1); + Channel = CreateChannel(IndexPrefix, Slim); + IndexName = Channel.IndexName; + } + + public string IndexPrefix { get; } + public string IndexName { get; set; } + public CountdownEvent Slim { get; } + public CatalogIndexChannel Channel { get; set; } + public DateTimeOffset IndexBatchDate { get; } + public DateTimeOffset IndexBatchDateUpdate { get; } + + /// + public async Task InitializeAsync() + { + await Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default"); + IndexName = Channel.IndexName; + } + + /// + public Task DisposeAsync() + { + Channel.Dispose(); + return Task.CompletedTask; + } + + [Fact] + public async Task EnsureDocumentsEndUpInIndex() + { + var bootstrapped = await Channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default"); + bootstrapped.Should().BeTrue("Expected to be able to bootstrap index channel"); + + var searchIndex = Channel.Options.ActiveSearchAlias; + + // Verify the index does not exist yet + var index = await Client.Indices.GetAsync(new GetIndexRequest(IndexName)); + index.Indices.Should().BeNullOrEmpty(); + + await WriteInitialDocuments(searchIndex, IndexName, expectedVersion: 1); + + // Verify the index exists and has the correct settings + index = await Client.Indices.GetAsync(new GetIndexRequest(IndexName)); + index.Indices.Should().NotBeNullOrEmpty(); + index.Indices[IndexName].Settings?.Index?.Lifecycle?.Name?.Should().NotBeNull().And.Be("7-days-default"); + + // write the same 100 documents again over the same channel; + // since we are using a custom script that doesn't noop all documents should be updated + await WriteInitialDocuments(searchIndex, IndexName, expectedVersion: 2); + + var refreshResult = await Client.Indices.RefreshAsync(IndexName); + refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); + + // Now only update half the documents, assert the version is 3 for all the documents + await UpdateHalfOfDocumentsSkipFirst10(searchIndex, IndexName, expectedVersion: 3); + + + } + + private async Task WriteInitialDocuments(string searchIndex, string indexName, int expectedVersion) + { + // Write 100 documents + for (var i = 0; i < 100; i++) + Channel.TryWrite(new HashDocument { Title = "Hello World!", Id = $"hello-world-{i}", IndexBatchDate = IndexBatchDate, LastUpdated = IndexBatchDate }); + if (!Slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) + throw new Exception($"ecs document was not persisted within 10 seconds: {Channel}"); + + Slim.Reset(); + await Channel.RefreshAsync(); + await Channel.ApplyAliasesAsync(); + + var searchResult = await Client.SearchAsync(s => s + .Indices(searchIndex) + .Aggregations(a => a.Add("max", a1 => a1.Max(m => m.Field("_version")))) + ); + searchResult.Total.Should().Be(100); + var maxVersion = searchResult.Aggregations?.GetMax("max")?.Value; + maxVersion.Should().Be(expectedVersion); + + var storedDocument = searchResult.Documents.First(); + storedDocument.Id.Should().StartWith("hello-world"); + storedDocument.Title.Should().Be("Hello World!"); + + var hit = searchResult.Hits.First(); + hit.Index.Should().Be(indexName); + + } + + private async Task UpdateHalfOfDocumentsSkipFirst10(string searchIndex, string indexName, int expectedVersion) + { + for (var i = 10; i < 100; i++) + { + var title = "Hello World!"; + if (i % 2 == 0) + title += $"{i:N0}-{expectedVersion:N0}"; + Channel.TryWrite(new HashDocument + { + Title = title , Id = $"hello-world-{i}", IndexBatchDate = IndexBatchDateUpdate, LastUpdated = IndexBatchDateUpdate + }); + } + if (!Slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) + throw new Exception($"Updates did not go through within 10s: {Channel}"); + + Slim.Reset(); + await Channel.RefreshAsync(); + await Channel.ApplyAliasesAsync(); + var searchResult = await Client.SearchAsync(s => s + .Indices(searchIndex) + .Size(1) + .Aggregations(a => a + .Add("max", a1 => a1.Max(m => m.Field("_version"))) + .Add("terms-index", a1 => a1.Terms(m => m.Field("index_batch_date"))) + .Add("terms-updated", a1 => a1.Terms(m => m.Field("last_updated"))) + ) + ); + searchResult.Total.Should().Be(100); + var maxVersion = searchResult.Aggregations?.GetMax("max")?.Value; + maxVersion.Should().Be(expectedVersion); + + // since we skip 10 we only expect 90 to be part of this update + var ingestDates = searchResult.Aggregations?.GetLongTerms("terms-index")?.Buckets; + ingestDates.Should().NotBeNullOrEmpty().And.HaveCount(2); + ingestDates!.First(d => d.Key == IndexBatchDate.ToUnixTimeMilliseconds()).DocCount.Should().Be(10); + ingestDates!.First(d => d.Key == IndexBatchDateUpdate.ToUnixTimeMilliseconds()).DocCount.Should().Be(90); + + // since we update half of 90 we expect 45 documents to actually have the updated timestamp + var updateDates = searchResult.Aggregations?.GetLongTerms("terms-updated")?.Buckets; + updateDates.Should().NotBeNullOrEmpty().And.HaveCount(2); + updateDates!.First(d => d.Key == IndexBatchDate.ToUnixTimeMilliseconds()).DocCount.Should().Be(55); + updateDates!.First(d => d.Key == IndexBatchDateUpdate.ToUnixTimeMilliseconds()).DocCount.Should().Be(45); + + var hit = searchResult.Hits.First(); + hit.Index.Should().Be(indexName); + + } + + private CatalogIndexChannel CreateChannel(string indexPrefix, CountdownEvent slim, bool useScriptedHashBulkUpsert = true) + { + var options = new CatalogIndexChannelOptions(Client.Transport) + { + IndexFormat = indexPrefix + "{0:yyyy.MM.dd.HH-mm-ss-fffffff}", + ActiveSearchAlias = indexPrefix + "search", + BulkOperationIdLookup = c => c.Id, + ScriptedHashBulkUpsertLookup = !useScriptedHashBulkUpsert ? null : (c, channelHash) => + { + var hash = HashedBulkUpdate.CreateHash(channelHash, c.Id, c.Title ?? string.Empty); + c.Hash = hash; + return new HashedBulkUpdate("hash", hash, "ctx._source.index_batch_date = params.index_batch_date", + new Dictionary + { + { "index_batch_date", c.IndexBatchDate.ToString("o") } + + }); + }, + BufferOptions = new BufferOptions + { + WaitHandle = slim, OutboundBufferMaxSize = 100, + }, + + // language=json + GetMappingSettings = () => + """ + { + "analysis": { + "analyzer": { + "my_analyzer": { + "type": "custom", + "tokenizer": "standard", + "char_filter": [ "html_strip" ], + "filter": [ "lowercase", "asciifolding" ] + } + } + } + } + """, + // language=json + GetMapping = () => + """ + { + "properties": { + "hash": { "type": "keyword" }, + "title": { + "type": "text", + "search_analyzer": "my_analyzer", + "fields": { + "keyword": { "type": "keyword" } + } + } + } + } + """ + }; + var channel = new CatalogIndexChannel(options); + return channel; + } + +} diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/ScriptedHashUpdateIngestionTests.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/ScriptedHashUpdateIngestionTests.cs index 0400312..9e55bc5 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/ScriptedHashUpdateIngestionTests.cs +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/ScriptedHashUpdateIngestionTests.cs @@ -103,7 +103,7 @@ private async Task UpdateHalfOfDocuments(CatalogIndexChannel chann { var title = "Hello World!"; if (i % 2 == 0) - title += $"{i:N}-{expectedVersion:N}"; + title += $"{i:N0}-{expectedVersion:N0}"; channel.TryWrite(new HashDocument { Title = title , Id = $"hello-world-{i}" }); } if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) diff --git a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/TestDocument.cs b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/TestDocument.cs index ed2efca..2e52ad6 100644 --- a/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/TestDocument.cs +++ b/tests/Elastic.Ingest.Elasticsearch.IntegrationTests/TestDocument.cs @@ -40,4 +40,10 @@ public class HashDocument [JsonPropertyName("hash")] public string Hash { get; set; } = string.Empty; + [JsonPropertyName("index_batch_date")] + public DateTimeOffset IndexBatchDate { get; set; } + + [JsonPropertyName("last_updated")] + public DateTimeOffset LastUpdated { get; set; } + } diff --git a/tests/Elastic.Ingest.Elasticsearch.Tests/Elastic.Ingest.Elasticsearch.Tests.csproj b/tests/Elastic.Ingest.Elasticsearch.Tests/Elastic.Ingest.Elasticsearch.Tests.csproj index aff72f2..6e13e01 100644 --- a/tests/Elastic.Ingest.Elasticsearch.Tests/Elastic.Ingest.Elasticsearch.Tests.csproj +++ b/tests/Elastic.Ingest.Elasticsearch.Tests/Elastic.Ingest.Elasticsearch.Tests.csproj @@ -5,7 +5,7 @@ - + all