From 6ce02526ef56769fd03c5100e3529c883f567831 Mon Sep 17 00:00:00 2001 From: Sergio Jr Date: Wed, 29 May 2024 08:30:03 -0300 Subject: [PATCH] Add support for custom event writer (#388) * Pass json serializer options through converters * Update file generator * Add unit test * Add integration tests * Fix tests * Fix test code style * Add type-specialized property converter write methods * Use cached DateTimeOffset JsonConverter when default JsonSerializerOptions * Use cached DateTimeOffset JsonConverter when default JsonSerializerOptions * Consolidate JsonConverter getter --- .../EcsDocumentJsonConverter.Generated.cs | 136 +++++++-------- .../Serialization/EcsDocumentJsonConverter.cs | 15 +- .../Serialization/EcsJsonConverterBase.cs | 47 +++-- ...ertiesReaderJsonConverterBase.Generated.cs | 26 +-- .../PropertiesReaderJsonConverterBase.cs | 10 +- .../CustomEventWriter.cs | 20 +++ .../DataStreamIngestionTests.cs | 153 +++++++++++------ .../IndexIngestionTests.cs | 162 ++++++++++++------ .../TestDocument.cs | 2 + .../Elastic.CommonSchema.Tests/Serializes.cs | 20 +++ .../EcsDocumentJsonConverter.Generated.cshtml | 32 ++-- ...esReaderJsonConverterBase.Generated.cshtml | 12 +- 12 files changed, 396 insertions(+), 239 deletions(-) create mode 100644 tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/CustomEventWriter.cs diff --git a/src/Elastic.CommonSchema/Serialization/EcsDocumentJsonConverter.Generated.cs b/src/Elastic.CommonSchema/Serialization/EcsDocumentJsonConverter.Generated.cs index 4b3d9362..c612a4e7 100644 --- a/src/Elastic.CommonSchema/Serialization/EcsDocumentJsonConverter.Generated.cs +++ b/src/Elastic.CommonSchema/Serialization/EcsDocumentJsonConverter.Generated.cs @@ -24,7 +24,8 @@ public partial class EcsDocumentJsonConverter : EcsJsonConverterBase : EcsJsonConverterBase ReadString(ref reader, ref loglevel), "ecs.version" => ReadString(ref reader, ref ecsVersion), - "metadata" => ReadProp(ref reader, "metadata", ecsEvent, (b, v) => b.Metadata = v), - "@timestamp" => ReadDateTime(ref reader, ref @timestamp), - "message" => ReadProp(ref reader, "message", ecsEvent, (b, v) => b.Message = v), - "tags" => ReadProp(ref reader, "tags", ecsEvent, (b, v) => b.Tags = v), - "span.id" => ReadProp(ref reader, "span.id", ecsEvent, (b, v) => b.SpanId = v), - "trace.id" => ReadProp(ref reader, "trace.id", ecsEvent, (b, v) => b.TraceId = v), - "transaction.id" => ReadProp(ref reader, "transaction.id", ecsEvent, (b, v) => b.TransactionId = v), - "labels" => ReadProp(ref reader, "labels", ecsEvent, (b, v) => b.Labels = v), + "metadata" => ReadProp(ref reader, "metadata", ecsEvent, (b, v) => b.Metadata = v, options), + "@timestamp" => ReadDateTime(ref reader, ref @timestamp, options), + "message" => ReadProp(ref reader, "message", ecsEvent, (b, v) => b.Message = v, options), + "tags" => ReadProp(ref reader, "tags", ecsEvent, (b, v) => b.Tags = v, options), + "span.id" => ReadProp(ref reader, "span.id", ecsEvent, (b, v) => b.SpanId = v, options), + "trace.id" => ReadProp(ref reader, "trace.id", ecsEvent, (b, v) => b.TraceId = v, options), + "transaction.id" => ReadProp(ref reader, "transaction.id", ecsEvent, (b, v) => b.TransactionId = v, options), + "labels" => ReadProp(ref reader, "labels", ecsEvent, (b, v) => b.Labels = v, options), "agent" => ReadProp(ref reader, "agent", EcsJsonContext.Default.Agent, ecsEvent, (b, v) => b.Agent = v), "as" => ReadProp(ref reader, "as", EcsJsonContext.Default.As, ecsEvent, (b, v) => b.As = v), "client" => ReadProp(ref reader, "client", EcsJsonContext.Default.Client, ecsEvent, (b, v) => b.Client = v), @@ -94,7 +95,7 @@ public partial class EcsDocumentJsonConverter : EcsJsonConverterBase WriteProp(writer, k, v)); + value.WriteAdditionalProperties((k, v) => WriteProp(writer, k, v, options)); writer.WriteEndObject(); } } diff --git a/src/Elastic.CommonSchema/Serialization/EcsDocumentJsonConverter.cs b/src/Elastic.CommonSchema/Serialization/EcsDocumentJsonConverter.cs index fffeaea9..d7f63df2 100644 --- a/src/Elastic.CommonSchema/Serialization/EcsDocumentJsonConverter.cs +++ b/src/Elastic.CommonSchema/Serialization/EcsDocumentJsonConverter.cs @@ -42,7 +42,7 @@ public partial class EcsDocumentJsonConverter where TBase : EcsDocument, if (reader.TokenType != JsonTokenType.PropertyName) throw new JsonException(); - var _ = ReadProperties(ref reader, ecsEvent, ref timestamp, ref loglevel, ref ecsVersion); + var _ = ReadProperties(ref reader, ecsEvent, ref timestamp, ref loglevel, ref ecsVersion, options); } if (!string.IsNullOrEmpty(loglevel)) { @@ -65,11 +65,11 @@ private static void WriteMessage(Utf8JsonWriter writer, EcsDocument value) writer.WriteString("message", value.Message); } - private static void WriteLogEntity(Utf8JsonWriter writer, Log? value) { + private static void WriteLogEntity(Utf8JsonWriter writer, Log? value, JsonSerializerOptions options) { if (value == null) return; if (!value.ShouldSerialize) return; - WriteProp(writer, "log", value, EcsJsonContext.Default.Log); + WriteProp(writer, "log", value, EcsJsonContext.Default.Log, options); } private static void WriteLogLevel(Utf8JsonWriter writer, EcsDocument value) @@ -78,23 +78,24 @@ private static void WriteLogLevel(Utf8JsonWriter writer, EcsDocument value) writer.WriteString("log.level", value.Log?.Level); } - private static void WriteEcsEntity(Utf8JsonWriter writer, Ecs? value) + private static void WriteEcsEntity(Utf8JsonWriter writer, Ecs? value, JsonSerializerOptions options) { if (value == null) return; if (!value.ShouldSerialize) return; - WriteProp(writer, "ecs", value, EcsJsonContext.Default.Ecs); + WriteProp(writer, "ecs", value, EcsJsonContext.Default.Ecs, options); } private static void WriteEcsVersion(Utf8JsonWriter writer, EcsDocument value) => writer.WriteString("ecs.version", value.Ecs?.Version ?? EcsDocument.Version); - private static void WriteTimestamp(Utf8JsonWriter writer, BaseFieldSet value) + private static void WriteTimestamp(Utf8JsonWriter writer, BaseFieldSet value, JsonSerializerOptions options) { if (!value.Timestamp.HasValue) return; writer.WritePropertyName("@timestamp"); - EcsJsonConfiguration.DateTimeOffsetConverter.Write(writer, value.Timestamp.Value, EcsJsonConfiguration.SerializerOptions); + var converter = GetDateTimeOffsetConverter(options); + converter.Write(writer, value.Timestamp.Value, options); } } diff --git a/src/Elastic.CommonSchema/Serialization/EcsJsonConverterBase.cs b/src/Elastic.CommonSchema/Serialization/EcsJsonConverterBase.cs index 11cfc8d6..68070d19 100644 --- a/src/Elastic.CommonSchema/Serialization/EcsJsonConverterBase.cs +++ b/src/Elastic.CommonSchema/Serialization/EcsJsonConverterBase.cs @@ -13,7 +13,13 @@ namespace Elastic.CommonSchema.Serialization public abstract class EcsJsonConverterBase : JsonConverter { /// - protected static bool ReadDateTime(ref Utf8JsonReader reader, ref DateTimeOffset? dateTime) + protected static JsonConverter GetDateTimeOffsetConverter(JsonSerializerOptions options) => + options == EcsJsonConfiguration.SerializerOptions + ? EcsJsonConfiguration.DateTimeOffsetConverter + : (JsonConverter)options.GetConverter(typeof(DateTimeOffset)); + + /// + protected static bool ReadDateTime(ref Utf8JsonReader reader, ref DateTimeOffset? dateTime, JsonSerializerOptions options) { if (reader.TokenType == JsonTokenType.Null) { @@ -21,7 +27,8 @@ protected static bool ReadDateTime(ref Utf8JsonReader reader, ref DateTimeOffset return true; } - dateTime = EcsJsonConfiguration.DateTimeOffsetConverter.Read(ref reader, typeof(DateTimeOffset), EcsJsonConfiguration.SerializerOptions); + var converter = GetDateTimeOffsetConverter(options); + dateTime = converter.Read(ref reader, typeof(DateTimeOffset), options); return true; } @@ -34,11 +41,27 @@ protected static bool ReadString(ref Utf8JsonReader reader, ref string? stringPr } /// - protected static void WriteProp(Utf8JsonWriter writer, string key, TValue value) + protected static void WritePropLong(Utf8JsonWriter writer, string key, long? value) + { + if (value == null) return; + + writer.WritePropertyName(key); + writer.WriteNumberValue(value.Value); + } + + /// + protected static void WritePropString(Utf8JsonWriter writer, string key, string? value) { if (value == null) return; - var options = EcsJsonConfiguration.SerializerOptions; + writer.WritePropertyName(key); + writer.WriteStringValue(value); + } + + /// + protected static void WriteProp(Utf8JsonWriter writer, string key, TValue value, JsonSerializerOptions options) + { + if (value == null) return; writer.WritePropertyName(key); // Attempt to use existing converter first before re-entering through JsonSerializer.Serialize(). @@ -47,7 +70,8 @@ protected static void WriteProp(Utf8JsonWriter writer, string key, TValu } /// - protected static void WriteProp(Utf8JsonWriter writer, string key, TValue value, JsonTypeInfo typeInfo) + protected static void WriteProp(Utf8JsonWriter writer, string key, TValue value, JsonTypeInfo typeInfo, + JsonSerializerOptions options) { if (value == null) return; @@ -56,16 +80,14 @@ protected static void WriteProp(Utf8JsonWriter writer, string key, TValu //To support user supplied subtypes if (type != typeof(TValue)) - JsonSerializer.Serialize(writer, value, type, EcsJsonConfiguration.SerializerOptions); + JsonSerializer.Serialize(writer, value, type, options); else JsonSerializer.Serialize(writer, value, typeInfo); } - internal static object? ReadPropDeserialize(ref Utf8JsonReader reader, Type type) + internal static object? ReadPropDeserialize(ref Utf8JsonReader reader, Type type, JsonSerializerOptions options) { if (reader.TokenType == JsonTokenType.Null) return null; - var options = EcsJsonConfiguration.SerializerOptions; - return JsonSerializer.Deserialize(ref reader, type, options); } @@ -100,19 +122,18 @@ protected static bool ReadPropString(ref Utf8JsonReader reader, string key, T b, /// // ReSharper disable once UnusedParameter.Local (key is used for readability) - private static TValue? ReadProp(ref Utf8JsonReader reader, string key) where TValue : class + private static TValue? ReadProp(ref Utf8JsonReader reader, string key, JsonSerializerOptions options) where TValue : class { if (reader.TokenType == JsonTokenType.Null) return null; - var options = EcsJsonConfiguration.SerializerOptions; return JsonSerializer.Deserialize(ref reader, options); } /// - protected static bool ReadProp(ref Utf8JsonReader reader, string key, T b, Action set) + protected static bool ReadProp(ref Utf8JsonReader reader, string key, T b, Action set, JsonSerializerOptions options) where TValue : class { - set(b, ReadProp(ref reader, key)); + set(b, ReadProp(ref reader, key, options)); return true; } diff --git a/src/Elastic.CommonSchema/Serialization/PropertiesReaderJsonConverterBase.Generated.cs b/src/Elastic.CommonSchema/Serialization/PropertiesReaderJsonConverterBase.Generated.cs index da243d12..7e92eb2e 100644 --- a/src/Elastic.CommonSchema/Serialization/PropertiesReaderJsonConverterBase.Generated.cs +++ b/src/Elastic.CommonSchema/Serialization/PropertiesReaderJsonConverterBase.Generated.cs @@ -21,7 +21,7 @@ namespace Elastic.CommonSchema.Serialization; internal partial class LogEntityJsonConverter : PropertiesReaderJsonConverterBase { /// - protected override bool ReadProperties(ref Utf8JsonReader reader, Log ecsEvent) + protected override bool ReadProperties(ref Utf8JsonReader reader, Log ecsEvent, JsonSerializerOptions options) { var propertyName = reader.GetString(); reader.Read(); @@ -33,12 +33,12 @@ protected override bool ReadProperties(ref Utf8JsonReader reader, Log ecsEvent) "origin.file.line" => ReadPropLong(ref reader, "origin.file.line", ecsEvent, (b, v) => b.OriginFileLine = v), "origin.file.name" => ReadPropString(ref reader, "origin.file.name", ecsEvent, (b, v) => b.OriginFileName = v), "origin.function" => ReadPropString(ref reader, "origin.function", ecsEvent, (b, v) => b.OriginFunction = v), - "syslog" => ReadProp(ref reader, "syslog", ecsEvent, (b, v) => b.Syslog = v), - _ => ReadProperty(ref reader, propertyName, ecsEvent) + "syslog" => ReadProp(ref reader, "syslog", ecsEvent, (b, v) => b.Syslog = v, options), + _ => ReadProperty(ref reader, propertyName, ecsEvent, options) }; } - private partial bool ReadProperty(ref Utf8JsonReader reader, string propertyName, Log ecsEvent); + private partial bool ReadProperty(ref Utf8JsonReader reader, string propertyName, Log ecsEvent, JsonSerializerOptions options); /// public override void Write(Utf8JsonWriter writer, Log value, JsonSerializerOptions options) @@ -50,12 +50,12 @@ public override void Write(Utf8JsonWriter writer, Log value, JsonSerializerOptio } writer.WriteStartObject(); - WriteProp(writer, "file.path", value.FilePath); - WriteProp(writer, "logger", value.Logger); - WriteProp(writer, "origin.file.line", value.OriginFileLine); - WriteProp(writer, "origin.file.name", value.OriginFileName); - WriteProp(writer, "origin.function", value.OriginFunction); - WriteProp(writer, "syslog", value.Syslog); + WritePropString(writer, "file.path", value.FilePath); + WritePropString(writer, "logger", value.Logger); + WritePropLong(writer, "origin.file.line", value.OriginFileLine); + WritePropString(writer, "origin.file.name", value.OriginFileName); + WritePropString(writer, "origin.function", value.OriginFunction); + WriteProp(writer, "syslog", value.Syslog, options); writer.WriteEndObject(); } @@ -65,18 +65,18 @@ public override void Write(Utf8JsonWriter writer, Log value, JsonSerializerOptio internal partial class EcsEntityJsonConverter : PropertiesReaderJsonConverterBase { /// - protected override bool ReadProperties(ref Utf8JsonReader reader, Ecs ecsEvent) + protected override bool ReadProperties(ref Utf8JsonReader reader, Ecs ecsEvent, JsonSerializerOptions options) { var propertyName = reader.GetString(); reader.Read(); return propertyName switch { "version" => ReadPropString(ref reader, "version", ecsEvent, (b, v) => b.Version = v), - _ => ReadProperty(ref reader, propertyName, ecsEvent) + _ => ReadProperty(ref reader, propertyName, ecsEvent, options) }; } - private partial bool ReadProperty(ref Utf8JsonReader reader, string propertyName, Ecs ecsEvent); + private partial bool ReadProperty(ref Utf8JsonReader reader, string propertyName, Ecs ecsEvent, JsonSerializerOptions options); /// public override void Write(Utf8JsonWriter writer, Ecs value, JsonSerializerOptions options) diff --git a/src/Elastic.CommonSchema/Serialization/PropertiesReaderJsonConverterBase.cs b/src/Elastic.CommonSchema/Serialization/PropertiesReaderJsonConverterBase.cs index 1a900c32..883de095 100644 --- a/src/Elastic.CommonSchema/Serialization/PropertiesReaderJsonConverterBase.cs +++ b/src/Elastic.CommonSchema/Serialization/PropertiesReaderJsonConverterBase.cs @@ -42,19 +42,19 @@ public abstract class PropertiesReaderJsonConverterBase : EcsJsonConverterBas if (reader.TokenType != JsonTokenType.PropertyName) throw new JsonException(); - var _ = ReadProperties(ref reader, ecsEvent); + var _ = ReadProperties(ref reader, ecsEvent, options); } return ecsEvent; } /// Handle reading the current property - protected abstract bool ReadProperties(ref Utf8JsonReader reader, T ecsEvent); + protected abstract bool ReadProperties(ref Utf8JsonReader reader, T ecsEvent, JsonSerializerOptions options); } internal partial class EcsEntityJsonConverter { - private partial bool ReadProperty(ref Utf8JsonReader reader, string propertyName, Ecs ecsEvent) => false; + private partial bool ReadProperty(ref Utf8JsonReader reader, string propertyName, Ecs ecsEvent, JsonSerializerOptions options) => false; } internal partial class LogEntityJsonConverter @@ -76,7 +76,7 @@ private class LogOriginInvalid public LogFileOriginInvalid? File { get; set; } } - private partial bool ReadProperty(ref Utf8JsonReader reader, string propertyName, Log ecsEvent) => + private partial bool ReadProperty(ref Utf8JsonReader reader, string propertyName, Log ecsEvent, JsonSerializerOptions options) => propertyName switch { "origin" => ReadProp(ref reader, "origin", ecsEvent, (b, v) => @@ -86,7 +86,7 @@ propertyName switch if (v.File == null) return; b.OriginFileLine = v.File.Line; b.OriginFileName = v.File.Name; - }), + }, options), _ => false }; } diff --git a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/CustomEventWriter.cs b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/CustomEventWriter.cs new file mode 100644 index 00000000..e10fce86 --- /dev/null +++ b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/CustomEventWriter.cs @@ -0,0 +1,20 @@ +using System.Buffers; +using System.Text.Json; +using System.Text.Json.Serialization; + +namespace Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests; + +public class CustomEventWriter : IElasticsearchEventWriter +{ + // ReSharper disable once StaticMemberInGenericType + private static readonly JsonSerializerOptions s_serializerOptions = new() { Converters = { new JsonStringEnumConverter() } }; + + public Action, T>? WriteToArrayBuffer + { + get => throw new NotImplementedException(); + set => throw new NotImplementedException(); + } + + public Func? WriteToStreamAsync { get; set; } = + (stream, doc, ctx) => JsonSerializer.SerializeAsync(stream, doc, s_serializerOptions, ctx); +} diff --git a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs index 2383cf57..6185ed64 100644 --- a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs +++ b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs @@ -2,75 +2,116 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System.Text.Json; using Elastic.Channels; -using Elastic.Channels.Diagnostics; using Elastic.Clients.Elasticsearch.IndexManagement; +using Elastic.CommonSchema; using Elastic.Ingest.Elasticsearch.DataStreams; -using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Transport; using FluentAssertions; using Xunit; using Xunit.Abstractions; using HttpMethod = Elastic.Transport.HttpMethod; -namespace Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests +namespace Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests; + +public class DataStreamIngestionTests : IntegrationTestBase { - public class DataStreamIngestionTests : IntegrationTestBase + public DataStreamIngestionTests(IngestionCluster cluster, ITestOutputHelper output) : base(cluster, output) + { + } + + [Fact] + public async Task EnsureDocumentsEndUpInDataStream() + { + var targetDataStream = new DataStreamName("timeseriesdocs", "dotnet"); + var slim = new CountdownEvent(1); + var options = new DataStreamChannelOptions(Client.Transport) + { + DataStream = targetDataStream, + BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 }, + }; + var channel = new EcsDataStreamChannel(options); + + var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default"); + bootstrapped.Should().BeTrue("Expected to be able to bootstrap data stream channel"); + + var dataStream = + await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString())); + dataStream.DataStreams.Should().BeNullOrEmpty(); + + channel.TryWrite(new TimeSeriesDocument { Timestamp = DateTimeOffset.Now, Message = "hello-world" }); + if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) + throw new Exception($"No flush occurred in 10 seconds: {channel}", channel.DiagnosticsListener?.ObservedException); + + var refreshResult = await Client.Indices.RefreshAsync(targetDataStream.ToString()); + refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); + var searchResult = await Client.SearchAsync(s => s.Indices(targetDataStream.ToString())); + searchResult.Total.Should().Be(1); + + var storedDocument = searchResult.Documents.First(); + storedDocument.Message.Should().Be("hello-world"); + + var hit = searchResult.Hits.First(); + hit.Index.Should().StartWith($".ds-{targetDataStream}-"); + + // the following throws in the 8.0.4 version of the client + // The JSON value could not be converted to Elastic.Clients.Elasticsearch.HealthStatus. Path: $.data_stre... + // await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString()) + var getDataStream = + await Client.Transport.RequestAsync(HttpMethod.GET, $"/_data_stream/{targetDataStream}"); + + getDataStream.ApiCallDetails.HttpStatusCode.Should() + .Be(200, "{0}", getDataStream.ApiCallDetails.DebugInformation); + + //this ensures the data stream was setup using the expected bootstrapped template + getDataStream.ApiCallDetails.DebugInformation.Should() + .Contain(@$"""template"" : ""{targetDataStream.GetTemplateName()}"""); + + //this ensures the data stream is managed by the expected ilm_policy + getDataStream.ApiCallDetails.DebugInformation.Should() + .Contain(@"""ilm_policy"" : ""7-days-default"""); + } + + [Fact] + public async Task UseCustomEventWriter() { - public DataStreamIngestionTests(IngestionCluster cluster, ITestOutputHelper output) : base(cluster, output) + var targetDataStream = new DataStreamName("customtimeseriesdocs", "dotnet"); + var slim = new CountdownEvent(1); + var options = new DataStreamChannelOptions(Client.Transport) { - } + DataStream = targetDataStream, + BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 }, + EventWriter = new CustomEventWriter() + }; + var channel = new EcsDataStreamChannel(options); - [Fact] - public async Task EnsureDocumentsEndUpInDataStream() + var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default"); + bootstrapped.Should().BeTrue("Expected to be able to bootstrap data stream channel"); + + var dataStream = + await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString())); + dataStream.DataStreams.Should().BeNullOrEmpty(); + + channel.TryWrite(new TimeSeriesDocument { - var targetDataStream = new DataStreamName("timeseriesdocs", "dotnet"); - var slim = new CountdownEvent(1); - var options = new DataStreamChannelOptions(Client.Transport) - { - DataStream = targetDataStream, - BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 }, - }; - var channel = new EcsDataStreamChannel(options); - - var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default"); - bootstrapped.Should().BeTrue("Expected to be able to bootstrap data stream channel"); - - var dataStream = - await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString())); - dataStream.DataStreams.Should().BeNullOrEmpty(); - - channel.TryWrite(new TimeSeriesDocument { Timestamp = DateTimeOffset.Now, Message = "hello-world" }); - if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) - throw new Exception($"No flush occurred in 10 seconds: {channel}", channel.DiagnosticsListener?.ObservedException); - - var refreshResult = await Client.Indices.RefreshAsync(targetDataStream.ToString()); - refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); - var searchResult = await Client.SearchAsync(s => s.Indices(targetDataStream.ToString())); - searchResult.Total.Should().Be(1); - - var storedDocument = searchResult.Documents.First(); - storedDocument.Message.Should().Be("hello-world"); - - var hit = searchResult.Hits.First(); - hit.Index.Should().StartWith($".ds-{targetDataStream}-"); - - // the following throws in the 8.0.4 version of the client - // The JSON value could not be converted to Elastic.Clients.Elasticsearch.HealthStatus. Path: $.data_stre... - // await Client.Indices.GetDataStreamAsync(new GetDataStreamRequest(targetDataStream.ToString()) - var getDataStream = - await Client.Transport.RequestAsync(HttpMethod.GET, $"/_data_stream/{targetDataStream}"); - - getDataStream.ApiCallDetails.HttpStatusCode.Should() - .Be(200, "{0}", getDataStream.ApiCallDetails.DebugInformation); - - //this ensures the data stream was setup using the expected bootstrapped template - getDataStream.ApiCallDetails.DebugInformation.Should() - .Contain(@$"""template"" : ""{targetDataStream.GetTemplateName()}"""); - - //this ensures the data stream is managed by the expected ilm_policy - getDataStream.ApiCallDetails.DebugInformation.Should() - .Contain(@"""ilm_policy"" : ""7-days-default"""); - } + Timestamp = DateTimeOffset.Parse("2024-05-27T23:56:15.785Z"), + Message = "Hello World!", + Metadata = new MetadataDictionary { { "MyEnum", MyEnum.Two } } + }); + + if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) + throw new Exception($"No flush occurred in 10 seconds: {channel}", channel.DiagnosticsListener?.ObservedException); + + var refreshResult = await Client.Indices.RefreshAsync(targetDataStream.ToString()); + refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); + + var searchResult = await Client.SearchAsync(s => s.Indices(targetDataStream.ToString())); + searchResult.Total.Should().Be(1); + + var root = searchResult.Documents.First().RootElement; + root.GetProperty("@timestamp").GetString().Should().Be("2024-05-27T23:56:15.785+00:00"); + root.GetProperty("message").GetString().Should().Be("Hello World!"); + root.GetProperty("metadata").GetProperty("MyEnum").GetString().Should().Be("Two"); } } diff --git a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/IndexIngestionTests.cs b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/IndexIngestionTests.cs index 6e7af5d8..a736a1b8 100644 --- a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/IndexIngestionTests.cs +++ b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/IndexIngestionTests.cs @@ -2,77 +2,125 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information +using System.Text.Json; using Elastic.Channels; -using Elastic.Channels.Diagnostics; using Elastic.Clients.Elasticsearch.IndexManagement; +using Elastic.CommonSchema; using Elastic.Ingest.Elasticsearch.Indices; -using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Transport; -using Elasticsearch.IntegrationDefaults; using FluentAssertions; using Xunit; using Xunit.Abstractions; using HttpMethod = Elastic.Transport.HttpMethod; -namespace Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests +namespace Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests; + +public class IndexIngestionTests : IntegrationTestBase { - public class IndexIngestionTests : IntegrationTestBase + public IndexIngestionTests(IngestionCluster cluster, ITestOutputHelper output) : base(cluster, output) + { + } + + [Fact] + public async Task EnsureDocumentsEndUpInIndex() { - public IndexIngestionTests(IngestionCluster cluster, ITestOutputHelper output) : base(cluster, output) + var indexPrefix = "catalog-data-"; + var slim = new CountdownEvent(1); + var options = new IndexChannelOptions(Client.Transport) { - } + IndexFormat = indexPrefix + "{0:yyyy.MM.dd}", + BulkOperationIdLookup = c => c.Id!, + TimestampLookup = c => c.Created, + BufferOptions = new BufferOptions + { + WaitHandle = slim, ExportMaxConcurrency = 1, + }, + }; + var channel = new EcsIndexChannel(options); + var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default"); + bootstrapped.Should().BeTrue("Expected to be able to bootstrap index channel"); + + var date = DateTimeOffset.Now; + var indexName = string.Format(options.IndexFormat, date); + + var index = await Client.Indices.GetAsync(new GetIndexRequest(indexName)); + index.Indices.Should().BeNullOrEmpty(); + + channel.TryWrite(new CatalogDocument { Created = date, Title = "Hello World!", Id = "hello-world" }); + if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) + throw new Exception($"No flush occurred in 10 seconds: {channel.DiagnosticsListener}", channel.DiagnosticsListener?.ObservedException); + + var refreshResult = await Client.Indices.RefreshAsync(indexName); + refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); + var searchResult = await Client.SearchAsync(s => s.Indices(indexName)); + searchResult.Total.Should().Be(1); + + var storedDocument = searchResult.Documents.First(); + storedDocument.Id.Should().Be("hello-world"); + storedDocument.Title.Should().Be("Hello World!"); + + var hit = searchResult.Hits.First(); + hit.Index.Should().Be(indexName); + + // bug in client + // https://github.com/elastic/elasticsearch-net/issues/7221 + var indexResponse = + await Client.Transport.RequestAsync(HttpMethod.GET, $"/{indexName}"); + + indexResponse.ApiCallDetails.HasSuccessfulStatusCode.Should().BeTrue("{0}", indexResponse.ApiCallDetails.DebugInformation); - [Fact] - public async Task EnsureDocumentsEndUpInIndex() + // Bug in client, for now assume template was applied because the ILM policy is set on the index. + indexResponse.ApiCallDetails.DebugInformation.Should().Contain(@"""7-days-default"""); + } + + [Fact] + public async Task UseCustomEventWriter() + { + var indexPrefix = "custom-catalog-data-"; + var slim = new CountdownEvent(1); + var options = new IndexChannelOptions(Client.Transport) { - var indexPrefix = "catalog-data-"; - var slim = new CountdownEvent(1); - var options = new IndexChannelOptions(Client.Transport) + IndexFormat = indexPrefix + "{0:yyyy.MM.dd}", + BulkOperationIdLookup = c => c.Id!, + TimestampLookup = c => c.Created, + BufferOptions = new BufferOptions { - IndexFormat = indexPrefix + "{0:yyyy.MM.dd}", - BulkOperationIdLookup = c => c.Id!, - TimestampLookup = c => c.Created, - BufferOptions = new BufferOptions - { - WaitHandle = slim, ExportMaxConcurrency = 1, - }, - }; - var channel = new EcsIndexChannel(options); - var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default"); - bootstrapped.Should().BeTrue("Expected to be able to bootstrap index channel"); - - var date = DateTimeOffset.Now; - var indexName = string.Format(options.IndexFormat, date); - - var index = await Client.Indices.GetAsync(new GetIndexRequest(indexName)); - index.Indices.Should().BeNullOrEmpty(); - - channel.TryWrite(new CatalogDocument { Created = date, Title = "Hello World!", Id = "hello-world" }); - if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) - throw new Exception($"No flush occurred in 10 seconds: {channel.DiagnosticsListener}", channel.DiagnosticsListener?.ObservedException); - - var refreshResult = await Client.Indices.RefreshAsync(indexName); - refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); - var searchResult = await Client.SearchAsync(s => s.Indices(indexName)); - searchResult.Total.Should().Be(1); - - var storedDocument = searchResult.Documents.First(); - storedDocument.Id.Should().Be("hello-world"); - storedDocument.Title.Should().Be("Hello World!"); - - var hit = searchResult.Hits.First(); - hit.Index.Should().Be(indexName); - - // bug in client - // https://github.com/elastic/elasticsearch-net/issues/7221 - var indexResponse = - await Client.Transport.RequestAsync(HttpMethod.GET, $"/{indexName}"); - - indexResponse.ApiCallDetails.HasSuccessfulStatusCode.Should().BeTrue("{0}", indexResponse.ApiCallDetails.DebugInformation); - - // Bug in client, for now assume template was applied because the ILM policy is set on the index. - indexResponse.ApiCallDetails.DebugInformation.Should().Contain(@"""7-days-default"""); - - } + WaitHandle = slim, ExportMaxConcurrency = 1, + }, + EventWriter = new CustomEventWriter() + }; + var channel = new EcsIndexChannel(options); + + var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default"); + bootstrapped.Should().BeTrue("Expected to be able to bootstrap index channel"); + + var date = DateTimeOffset.Parse("2024-05-27T23:56:15.785Z"); + var indexName = string.Format(options.IndexFormat, date); + + var index = await Client.Indices.GetAsync(new GetIndexRequest(indexName)); + index.Indices.Should().BeNullOrEmpty(); + + channel.TryWrite(new CatalogDocument + { + Created = date, + Title = "Hello World!", + Id = "hello-world", + Metadata = new MetadataDictionary { { "MyEnum", MyEnum.Two } } + }); + + if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) + throw new Exception($"No flush occurred in 10 seconds: {channel.DiagnosticsListener}", channel.DiagnosticsListener?.ObservedException); + + var refreshResult = await Client.Indices.RefreshAsync(indexName); + refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); + + var searchResult = await Client.SearchAsync(s => s.Indices(indexName)); + searchResult.Total.Should().Be(1); + + var root = searchResult.Documents.First().RootElement; + root.GetProperty("created").GetString().Should().Be("2024-05-27T23:56:15.785+00:00"); + root.GetProperty("title").GetString().Should().Be("Hello World!"); + root.GetProperty("id").GetString().Should().Be("hello-world"); + root.GetProperty("metadata").GetProperty("MyEnum").GetString().Should().Be("Two"); } } diff --git a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/TestDocument.cs b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/TestDocument.cs index 715f3673..5ec7327a 100644 --- a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/TestDocument.cs +++ b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/TestDocument.cs @@ -7,6 +7,8 @@ namespace Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests; +public enum MyEnum { One, Two, Three } + public class TimeSeriesDocument : EcsDocument { } diff --git a/tests/Elastic.CommonSchema.Tests/Serializes.cs b/tests/Elastic.CommonSchema.Tests/Serializes.cs index 3398c35e..7fc4c10a 100644 --- a/tests/Elastic.CommonSchema.Tests/Serializes.cs +++ b/tests/Elastic.CommonSchema.Tests/Serializes.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information using System; +using System.Text.Json; using System.Text.Json.Serialization; using Elastic.CommonSchema.Serialization; using FluentAssertions; @@ -232,5 +233,24 @@ public void SerializesDocumentInTheReadMe() deserialized.Dns.Answers[1].Data.Should().NotBeNull().And.EndWith(".117"); deserialized.Metadata.Should().NotBeNull().And.HaveCount(1); } + + private enum MyEnum { One, Two, Three } + + [Fact] + public void UseCustomSerializer() + { + var ecsDocument = new EcsDocument + { + Timestamp = DateTimeOffset.Parse("2024-05-27T23:56:15.785Z"), + Message = "Hello World!", + Metadata = new MetadataDictionary { { "MyEnum", MyEnum.Two } }, + Ecs = new Ecs { Version = "8.11.0" } + }; + + var serializerOptions = new JsonSerializerOptions { Converters = { new JsonStringEnumConverter() } }; + var json = JsonSerializer.Serialize(ecsDocument, serializerOptions); + + json.Should().Be("{\"@timestamp\":\"2024-05-27T23:56:15.785+00:00\",\"message\":\"Hello World!\",\"ecs.version\":\"8.11.0\",\"metadata\":{\"MyEnum\":\"Two\"}}"); + } } } diff --git a/tools/Elastic.CommonSchema.Generator/Views/EcsDocumentJsonConverter.Generated.cshtml b/tools/Elastic.CommonSchema.Generator/Views/EcsDocumentJsonConverter.Generated.cshtml index cbfa91fe..9e23fa31 100644 --- a/tools/Elastic.CommonSchema.Generator/Views/EcsDocumentJsonConverter.Generated.cshtml +++ b/tools/Elastic.CommonSchema.Generator/Views/EcsDocumentJsonConverter.Generated.cshtml @@ -27,7 +27,8 @@ namespace Elastic.CommonSchema.Serialization TBase ecsEvent, ref DateTimeOffset? timestamp, ref string loglevel, - ref string ecsVersion + ref string ecsVersion, + JsonSerializerOptions options ) { var propertyName = reader.GetString(); @@ -36,21 +37,21 @@ namespace Elastic.CommonSchema.Serialization { "log.level" => ReadString(ref reader, ref loglevel), "ecs.version" => ReadString(ref reader, ref ecsVersion), - "metadata" => ReadProp@(Raw(""))(ref reader, "metadata", ecsEvent, (b, v) => b.Metadata = v), + "metadata" => ReadProp@(Raw(""))(ref reader, "metadata", ecsEvent, (b, v) => b.Metadata = v, options), @foreach (var property in Model.Base.BaseFieldSet.ValueProperties) { var name = property.JsonProperty; if (name == "@timestamp") -{ "@(name)" => ReadDateTime(ref reader, ref @(name)), +{ "@(name)" => ReadDateTime(ref reader, ref @(name), options), } else -{ "@(name)" => ReadProp<@(Raw(property.ClrType))>(ref reader, "@name", ecsEvent, (b, v) => b.@(property.Name) = v), +{ "@(name)" => ReadProp<@(Raw(property.ClrType))>(ref reader, "@name", ecsEvent, (b, v) => b.@(property.Name) = v, options), } } @foreach (var property in Model.Base.BaseFieldSet.InlineObjectProperties) { var name = property.JsonProperty; - "@(name)" => ReadProp<@(Raw(property.InlineObject.Name))>(ref reader, "@name", ecsEvent, (b, v) => b.@(property.Name) = v), + "@(name)" => ReadProp<@(Raw(property.InlineObject.Name))>(ref reader, "@name", ecsEvent, (b, v) => b.@(property.Name) = v, options), } @foreach (var entity in Model.EntityClasses) @@ -62,7 +63,7 @@ namespace Elastic.CommonSchema.Serialization typeof(@(Model.Base.Name)) == ecsEvent.GetType() ? false : ecsEvent.TryRead(propertyName, out var t) - ? ecsEvent.ReceiveProperty(propertyName, ReadPropDeserialize(ref reader, t)) + ? ecsEvent.ReceiveProperty(propertyName, ReadPropDeserialize(ref reader, t, options)) : false }; } @@ -77,12 +78,12 @@ namespace Elastic.CommonSchema.Serialization } writer.WriteStartObject(); - WriteTimestamp(writer, value); + WriteTimestamp(writer, value, options); WriteLogLevel(writer, value); WriteMessage(writer, value); WriteEcsVersion(writer, value); - WriteLogEntity(writer, value.Log); - WriteEcsEntity(writer, value.Ecs); + WriteLogEntity(writer, value.Log, options); + WriteEcsEntity(writer, value.Ecs, options); // Base fields @{ @@ -94,15 +95,16 @@ namespace Elastic.CommonSchema.Serialization { continue; } - WriteProp(writer, "@field.JsonProperty", value.@field.Name); - + WriteProp(writer, "@field.JsonProperty", value.@field.Name, options); + } @foreach (var property in Model.Base.BaseFieldSet.InlineObjectProperties) { var name = property.JsonProperty; - WriteProp(writer, "@(name)", value.@(property.InlineObject.Name)); + WriteProp(writer, "@(name)", value.@(property.InlineObject.Name), options); } + // Complex types @foreach (var entity in Model.EntityClasses) { @@ -111,13 +113,13 @@ namespace Elastic.CommonSchema.Serialization { continue; } - WriteProp(writer, "@(entityName)", value.@(entity.Name), EcsJsonContext.Default.@(entity.Name)); + WriteProp(writer, "@(entityName)", value.@(entity.Name), EcsJsonContext.Default.@(entity.Name), options); } - WriteProp(writer, "metadata", value.Metadata); + WriteProp(writer, "metadata", value.Metadata, options); if (typeof(@Model.Base.Name) != value.GetType()) - value.WriteAdditionalProperties((k, v) => WriteProp(writer, k, v)); + value.WriteAdditionalProperties((k, v) => WriteProp(writer, k, v, options)); writer.WriteEndObject(); } } diff --git a/tools/Elastic.CommonSchema.Generator/Views/PropertiesReaderJsonConverterBase.Generated.cshtml b/tools/Elastic.CommonSchema.Generator/Views/PropertiesReaderJsonConverterBase.Generated.cshtml index 211a4497..2fcccec1 100644 --- a/tools/Elastic.CommonSchema.Generator/Views/PropertiesReaderJsonConverterBase.Generated.cshtml +++ b/tools/Elastic.CommonSchema.Generator/Views/PropertiesReaderJsonConverterBase.Generated.cshtml @@ -27,7 +27,7 @@ namespace Elastic.CommonSchema.Serialization; internal partial class @(entity.Name)EntityJsonConverter : PropertiesReaderJsonConverterBase@(Raw("<"))@(entity.Name)@Raw(">") { /// - protected override bool ReadProperties(ref Utf8JsonReader reader, @entity.Name ecsEvent) + protected override bool ReadProperties(ref Utf8JsonReader reader, @entity.Name ecsEvent, JsonSerializerOptions options) { var propertyName = reader.GetString(); reader.Read(); @@ -42,14 +42,14 @@ internal partial class @(entity.Name)EntityJsonConverter : PropertiesReaderJsonC @foreach (var property in entity.BaseFieldSet.InlineObjectProperties) { var name = property.JsonProperty; - "@(name)" => ReadProp<@(Raw(property.InlineObject.Name))>(ref reader, "@name", ecsEvent, (b, v) => b.@(property.Name) = v), + "@(name)" => ReadProp<@(Raw(property.InlineObject.Name))>(ref reader, "@name", ecsEvent, (b, v) => b.@(property.Name) = v, options), } - _ => ReadProperty(ref reader, propertyName, ecsEvent) + _ => ReadProperty(ref reader, propertyName, ecsEvent, options) }; } - private partial bool ReadProperty(ref Utf8JsonReader reader, string propertyName, @entity.Name ecsEvent); + private partial bool ReadProperty(ref Utf8JsonReader reader, string propertyName, @entity.Name ecsEvent, JsonSerializerOptions options); /// public override void Write(Utf8JsonWriter writer, @entity.Name value, JsonSerializerOptions options) @@ -67,13 +67,13 @@ internal partial class @(entity.Name)EntityJsonConverter : PropertiesReaderJsonC { continue; } - WriteProp(writer, "@field.JsonProperty", value.@field.Name); + WriteProp@(Raw(field.ReadJsonType))(writer, "@field.JsonProperty", value.@field.Name); } @foreach (var property in entity.BaseFieldSet.InlineObjectProperties) { var name = property.JsonProperty; - WriteProp(writer, "@(name)", value.@(property.Name)); + WriteProp(writer, "@(name)", value.@(property.Name), options); }