Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelStatics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@ internal static class ElasticsearchChannelStatics
public static readonly byte[] DocUpdateHeaderEnd = " }"u8.ToArray();

public static readonly byte[] ScriptedHashUpsertStart =
"{ \"scripted_upsert\": true, \"upsert\": {}, \"script\": { \"source\": \"if (ctx.op != 'create') { if (ctx._source."u8.ToArray();
"{ \"scripted_upsert\": true, \"upsert\": {}, \"script\": { \"source\": \"if (ctx._source."u8.ToArray();

public static readonly byte[] ScriptedHashUpsertMiddle =
" == params.hash ) { ctx.op = 'noop' } } ctx._source = params.doc\", \"params\": { \"hash\": "u8.ToArray();
public static readonly byte[] ScriptedHashUpsertAfterIfCheck = " == params.hash ) { "u8.ToArray();

public static readonly byte[] ScriptedHashUpsertDocPreamble =
public static readonly byte[] ScriptedHashUpdateScript = "ctx.op = 'noop'"u8.ToArray();

public static readonly byte[] ScriptedHashParamComma = ", "u8.ToArray();
public static readonly byte[] ScriptedHashKeySeparator = ": "u8.ToArray();

public static readonly byte[] ScriptedHashAfterIfCheckOp =
" } else { ctx._source = params.doc } \", \"params\": { \"hash\": "u8.ToArray();

public static readonly byte[] ScriptHashDocAsParameter =
", \"doc\":"u8.ToArray();

public static readonly byte[] ScriptedHashUpsertEnd = " } } }"u8.ToArray();
Expand Down
47 changes: 42 additions & 5 deletions src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
using System;
using System.Collections;
using System.Collections.Generic;
using System.Security.Cryptography;
using System.Text;
using Elastic.Transport;
Expand All @@ -11,10 +13,33 @@ namespace Elastic.Ingest.Elasticsearch.Indices;
/// <summary>
/// For scripted hash bulk upserts returns per operation the field and the hash to use for the scripted upsert
/// </summary>
/// <param name="Field">The field to check the previous hash against</param>
/// <param name="Hash">The current hash of the document</param>
public record HashedBulkUpdate(string Field, string Hash)
public record HashedBulkUpdate
{
/// <summary>
/// For scripted hash bulk upserts returns per operation the field and the hash to use for the scripted upsert
/// </summary>
/// <param name="field">The field to check the previous hash against</param>
/// <param name="hash">The current hash of the document</param>
/// <param name="updateScript">The update script</param>
/// <param name="parameters"></param>
public HashedBulkUpdate(string field, string hash, string? updateScript, IDictionary<string, string>? parameters)
: this(field, hash)
{
UpdateScript = updateScript;
Parameters = parameters;
}

/// <summary>
/// For scripted hash bulk upserts returns per operation the field and the hash to use for the scripted upsert
/// </summary>
/// <param name="field">The field to check the previous hash against</param>
/// <param name="hash">The current hash of the document</param>
public HashedBulkUpdate(string field, string hash)
{
Field = field;
Hash = hash;
}

/// <summary>
/// A short SHA256 hash of the provided <paramref name="components"/>
/// </summary>
Expand All @@ -23,13 +48,25 @@ public record HashedBulkUpdate(string Field, string Hash)
public static string CreateHash(params string[] components)
{
#if NET8_0_OR_GREATER
return Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(string.Join("", components))))[..8];
return Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(string.Join("", components))))[..16].ToLowerInvariant();
#else
var sha = SHA256.Create();
var hash = sha.ComputeHash(Encoding.UTF8.GetBytes(string.Join("", components)));
return BitConverter.ToString(hash).Replace("-", "")[..8];
return BitConverter.ToString(hash).Replace("-", "")[..16].ToLowerInvariant();
#endif
}

/// <summary>The field to check the previous hash against</summary>
public string Field { get; init; }

/// <summary>The current hash of the document</summary>
public string Hash { get; init; }

/// <summary>Optional update script if hashes match defaults to ''</summary>
public string? UpdateScript { get; init; }

/// <summary> Optional additional parameters for <see cref="UpdateScript"/> </summary>
public IDictionary<string, string>? Parameters { get; init; }
};

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,36 @@ public static ReadOnlyMemory<byte> GetBytes<TEvent>(ArraySegment<TEvent> page,
var field = Encoding.UTF8.GetBytes(hashUpdate.UpdateInformation.Field);
bufferWriter.Write(field);
writer.Reset();
bufferWriter.Write(ScriptedHashUpsertMiddle);
bufferWriter.Write(ScriptedHashUpsertAfterIfCheck);
writer.Reset();

bufferWriter.Write(ScriptedHashUpdateScript);
writer.Reset();
if (hashUpdate.UpdateInformation.UpdateScript is not null)
{
bufferWriter.Write(Encoding.UTF8.GetBytes(hashUpdate.UpdateInformation.UpdateScript));
writer.Reset();
}
else
{
bufferWriter.Write(ScriptedHashAfterIfCheckOp);
writer.Reset();
}
var hash = hashUpdate.UpdateInformation.Hash;
JsonSerializer.Serialize(writer, hash, options.SerializerOptions);
bufferWriter.Write(ScriptedHashUpsertDocPreamble);

if (hashUpdate.UpdateInformation.Parameters is not null)
foreach (var (key, value) in hashUpdate.UpdateInformation.Parameters)
{
bufferWriter.Write(ScriptedHashParamComma);
writer.Reset();
JsonSerializer.Serialize(writer, key, options.SerializerOptions);
bufferWriter.Write(ScriptedHashKeySeparator);
writer.Reset();
JsonSerializer.Serialize(writer, value, options.SerializerOptions);
}

bufferWriter.Write(ScriptHashDocAsParameter);
writer.Reset();
}

Expand Down Expand Up @@ -137,10 +162,31 @@ await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(),
await stream.WriteAsync(ScriptedHashUpsertStart, 0, ScriptedHashUpsertStart.Length, ctx).ConfigureAwait(false);
var field = Encoding.UTF8.GetBytes(hashUpdate.UpdateInformation.Field);
await stream.WriteAsync(field, 0, field.Length, ctx).ConfigureAwait(false);
await stream.WriteAsync(ScriptedHashUpsertMiddle, 0, ScriptedHashUpsertMiddle.Length, ctx).ConfigureAwait(false);
await stream.WriteAsync(ScriptedHashUpsertAfterIfCheck, 0, ScriptedHashUpsertAfterIfCheck.Length, ctx).ConfigureAwait(false);

if (hashUpdate.UpdateInformation.UpdateScript is { } script && !string.IsNullOrWhiteSpace(script))
{
var bytes = Encoding.UTF8.GetBytes(script);
await stream.WriteAsync(bytes, 0, bytes.Length, ctx).ConfigureAwait(false);
}
else
await stream.WriteAsync(ScriptedHashUpdateScript, 0, ScriptedHashUpdateScript.Length, ctx).ConfigureAwait(false);

await stream.WriteAsync(ScriptedHashAfterIfCheckOp, 0, ScriptedHashAfterIfCheckOp.Length, ctx).ConfigureAwait(false);

var hash = hashUpdate.UpdateInformation.Hash;
await JsonSerializer.SerializeAsync(stream, hash, options.SerializerOptions, ctx).ConfigureAwait(false);
await stream.WriteAsync(ScriptedHashUpsertDocPreamble, 0, ScriptedHashUpsertDocPreamble.Length, ctx).ConfigureAwait(false);

if (hashUpdate.UpdateInformation.Parameters is { } parameters)
foreach (var kv in parameters)
{
await stream.WriteAsync(ScriptedHashParamComma, 0, ScriptedHashParamComma.Length, ctx ).ConfigureAwait(false);
await JsonSerializer.SerializeAsync(stream, kv.Key, options.SerializerOptions, ctx).ConfigureAwait(false);
await stream.WriteAsync(ScriptedHashKeySeparator, 0, ScriptedHashKeySeparator.Length, ctx ).ConfigureAwait(false);
await JsonSerializer.SerializeAsync(stream, kv.Value, options.SerializerOptions, ctx).ConfigureAwait(false);
}

await stream.WriteAsync(ScriptHashDocAsParameter, 0, ScriptHashDocAsParameter.Length, ctx).ConfigureAwait(false);
}

if (options.EventWriter?.WriteToStreamAsync != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<ItemGroup>
<ProjectReference Include="..\Elastic.Channels\Elastic.Channels.csproj" />
<PackageReference Include="Elastic.Transport" Version="0.10.0" />
<PackageReference Include="Elastic.Transport" Version="0.10.1" />
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
</ItemGroup>

Expand Down
Loading