diff --git a/src/Nest/Ingest/ProcessorFormatter.cs b/src/Nest/Ingest/ProcessorFormatter.cs index 11f03e37e71..3e1d27b25da 100644 --- a/src/Nest/Ingest/ProcessorFormatter.cs +++ b/src/Nest/Ingest/ProcessorFormatter.cs @@ -41,6 +41,7 @@ internal class ProcessorFormatter : IJsonFormatter { "drop", 29 }, { "circle", 30 }, { "enrich", 31 }, + { "csv", 32 }, }; public IProcessor Deserialize(ref JsonReader reader, IJsonFormatterResolver formatterResolver) @@ -157,6 +158,9 @@ public IProcessor Deserialize(ref JsonReader reader, IJsonFormatterResolver form case 31: processor = Deserialize(ref reader, formatterResolver); break; + case 32: + processor = Deserialize(ref reader, formatterResolver); + break; } } else @@ -185,6 +189,9 @@ public void Serialize(ref JsonWriter writer, IProcessor value, IJsonFormatterRes case "append": Serialize(ref writer, value, formatterResolver); break; + case "csv": + Serialize(ref writer, value, formatterResolver); + break; case "convert": Serialize(ref writer, value, formatterResolver); break; diff --git a/src/Nest/Ingest/Processors/CsvProcessor.cs b/src/Nest/Ingest/Processors/CsvProcessor.cs new file mode 100644 index 00000000000..11cb6034c0c --- /dev/null +++ b/src/Nest/Ingest/Processors/CsvProcessor.cs @@ -0,0 +1,114 @@ +using System; +using System.Linq.Expressions; +using System.Runtime.Serialization; +using Elasticsearch.Net; +using Elasticsearch.Net.Utf8Json; + + +namespace Nest +{ + /// + /// Extracts fields from CSV line out of a single text field within a document. + /// Any empty field in CSV will be skipped. + /// + /// Available in Elasticsearch 7.6.0+ + /// + [InterfaceDataContract] + public interface ICsvProcessor : IProcessor + { + /// + /// The field to extract data from + /// + [DataMember(Name ="field")] + Field Field { get; set; } + + /// + /// The array of fields to assign extracted values to. + /// + [DataMember(Name ="target_fields")] + Fields TargetFields { get; set; } + + /// + /// Separator used in CSV, has to be single character string. Defaults to , + /// + [DataMember(Name = "separator")] + string Separator { get; set; } + + /// + /// Quote used in CSV, has to be single character string. Defaults to " + /// + [DataMember(Name = "quote")] + string Quote { get; set; } + + /// + /// If true and does not exist or is null, + /// the processor quietly exits without modifying the document. Default is false + /// + [DataMember(Name = "ignore_missing")] + bool? IgnoreMissing { get; set; } + + /// + /// Trim whitespaces in unquoted fields. Default is false; + /// + [DataMember(Name = "trim")] + bool? Trim { get; set; } + } + + /// + public class CsvProcessor : ProcessorBase, ICsvProcessor + { + /// + public Field Field { get; set; } + /// + public Fields TargetFields { get; set; } + /// + public string Separator { get; set; } + /// + public string Quote { get; set; } + /// + public bool? IgnoreMissing { get; set; } + /// + public bool? Trim { get; set; } + /// + protected override string Name => "csv"; + } + + /// + public class CsvProcessorDescriptor : ProcessorDescriptorBase, ICsvProcessor>, ICsvProcessor + where T : class + { + protected override string Name => "csv"; + Field ICsvProcessor.Field { get; set; } + Fields ICsvProcessor.TargetFields { get; set; } + bool? ICsvProcessor.IgnoreMissing { get; set; } + string ICsvProcessor.Quote { get; set; } + string ICsvProcessor.Separator { get; set; } + bool? ICsvProcessor.Trim { get; set; } + + /// + public CsvProcessorDescriptor Field(Field field) => Assign(field, (a, v) => a.Field = v); + + /// + public CsvProcessorDescriptor Field(Expression> objectPath) => + Assign(objectPath, (a, v) => a.Field = v); + + /// + public CsvProcessorDescriptor TargetFields(Func, IPromise> targetFields) => + Assign(targetFields, (a, v) => a.TargetFields = v?.Invoke(new FieldsDescriptor())?.Value); + + /// + public CsvProcessorDescriptor TargetFields(Fields targetFields) => Assign(targetFields, (a, v) => a.TargetFields = v); + + /// + public CsvProcessorDescriptor IgnoreMissing(bool? ignoreMissing = true) => Assign(ignoreMissing, (a, v) => a.IgnoreMissing = v); + + /// + public CsvProcessorDescriptor Trim(bool? trim = true) => Assign(trim, (a, v) => a.Trim = v); + + /// + public CsvProcessorDescriptor Quote(string quote) => Assign(quote, (a, v) => a.Quote = v); + + /// + public CsvProcessorDescriptor Separator(string separator) => Assign(separator, (a, v) => a.Separator = v); + } +} diff --git a/src/Nest/Ingest/ProcessorsDescriptor.cs b/src/Nest/Ingest/ProcessorsDescriptor.cs index d9213e71aa9..a2ae0563337 100644 --- a/src/Nest/Ingest/ProcessorsDescriptor.cs +++ b/src/Nest/Ingest/ProcessorsDescriptor.cs @@ -22,6 +22,11 @@ public ProcessorsDescriptor Attachment(Func, public ProcessorsDescriptor Append(Func, IAppendProcessor> selector) where T : class => Assign(selector, (a, v) => a.AddIfNotNull(v?.Invoke(new AppendProcessorDescriptor()))); + /// + public ProcessorsDescriptor Csv(Func, ICsvProcessor> selector) where T : class => + Assign(selector, (a, v) => a.AddIfNotNull(v?.Invoke(new CsvProcessorDescriptor()))); + + /// public ProcessorsDescriptor Convert(Func, IConvertProcessor> selector) where T : class => Assign(selector, (a, v) => a.AddIfNotNull(v?.Invoke(new ConvertProcessorDescriptor()))); diff --git a/tests/Tests/Ingest/ProcessorAssertions.cs b/tests/Tests/Ingest/ProcessorAssertions.cs index ea815161b71..41313573bf6 100644 --- a/tests/Tests/Ingest/ProcessorAssertions.cs +++ b/tests/Tests/Ingest/ProcessorAssertions.cs @@ -70,6 +70,30 @@ public class Append : ProcessorAssertion public override string Key => "append"; } + [SkipVersion("<7.6.0", "Introduced in Elasticsearch 7.6.0+")] + public class Csv : ProcessorAssertion + { + public override Func>> Fluent => d => d + .Csv(c => c + .Field(p => p.Name) + .TargetFields(new[] { "targetField1", "targetField2" }) + ); + + public override IProcessor Initializer => new CsvProcessor + { + Field = "name", + TargetFields = new[] { "targetField1", "targetField2" }, + }; + + public override object Json => new + { + field = "name", + target_fields = new[] { "targetField1", "targetField2" }, + }; + + public override string Key => "csv"; + } + public class Convert : ProcessorAssertion { public override Func>> Fluent => d => d