From 5e66377e6761ed8f3cf951b83f18d2a21952ccac Mon Sep 17 00:00:00 2001 From: David Roberts Date: Mon, 27 Jan 2020 17:06:30 +0000 Subject: [PATCH] [ML] Use CSV ingest processor in find_file_structure ingest pipeline Changes the find_file_structure response to include a CSV ingest processor in the ingest pipeline it suggests. Previously the Kibana file upload functionality parsed CSV in the browser, but by parsing CSV in the ingest pipeline it makes the Kibana file upload functionality more easily interchangable with Filebeat such that the configurations it creates can more easily be used to import data with the same structure repeatedly in production. --- .../apis/find-file-structure.asciidoc | 127 +++++++++++++- .../DelimitedFileStructureFinder.java | 68 +++++--- .../FileStructureUtils.java | 40 ++++- .../NdJsonFileStructureFinder.java | 6 +- .../TextLogFileStructureFinder.java | 5 +- .../XmlFileStructureFinder.java | 5 +- .../DelimitedFileStructureFinderTests.java | 38 +++++ .../FileStructureUtilsTests.java | 155 +++++++++++++++++- 8 files changed, 403 insertions(+), 41 deletions(-) diff --git a/docs/reference/ml/anomaly-detection/apis/find-file-structure.asciidoc b/docs/reference/ml/anomaly-detection/apis/find-file-structure.asciidoc index 4f85e39d60aa0..0a1b2aaefef80 100644 --- a/docs/reference/ml/anomaly-detection/apis/find-file-structure.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/find-file-structure.asciidoc @@ -145,8 +145,8 @@ to request analysis of 100000 lines to achieve some variety. value is `true`. Otherwise, the default value is `false`. `timeout`:: - (Optional, <>) Sets the maximum amount of time that the - structure analysis make take. If the analysis is still running when the + (Optional, <>) Sets the maximum amount of time that the + structure analysis make take. If the analysis is still running when the timeout expires then it will be aborted. The default value is 25 seconds. `timestamp_field`:: @@ -163,8 +163,8 @@ also specified. For structured file formats, if you specify this parameter, the field must exist within the file. -If this parameter is not specified, the structure finder makes a decision about -which field (if any) is the primary timestamp field. For structured file +If this parameter is not specified, the structure finder makes a decision about +which field (if any) is the primary timestamp field. For structured file formats, it is not compulsory to have a timestamp in the file. -- @@ -213,14 +213,14 @@ format from a built-in set. The following table provides the appropriate `timeformat` values for some example timestamps: |=== -| Timeformat | Presentation +| Timeformat | Presentation | yyyy-MM-dd HH:mm:ssZ | 2019-04-20 13:15:22+0000 -| EEE, d MMM yyyy HH:mm:ss Z | Sat, 20 Apr 2019 13:15:22 +0000 +| EEE, d MMM yyyy HH:mm:ss Z | Sat, 20 Apr 2019 13:15:22 +0000 | dd.MM.yy HH:mm:ss.SSS | 20.04.19 13:15:22.285 |=== -See +See https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html[the Java date/time format documentation] for more information about date and time format syntax. @@ -675,6 +675,30 @@ If the request does not encounter errors, you receive the following result: "ingest_pipeline" : { "description" : "Ingest pipeline created by file structure finder", "processors" : [ + { + "csv" : { + "field" : "message", + "target_fields" : [ + "VendorID", + "tpep_pickup_datetime", + "tpep_dropoff_datetime", + "passenger_count", + "trip_distance", + "RatecodeID", + "store_and_fwd_flag", + "PULocationID", + "DOLocationID", + "payment_type", + "fare_amount", + "extra", + "mta_tax", + "tip_amount", + "tolls_amount", + "improvement_surcharge", + "total_amount" + ] + } + }, { "date" : { "field" : "tpep_pickup_datetime", @@ -683,6 +707,95 @@ If the request does not encounter errors, you receive the following result: "yyyy-MM-dd HH:mm:ss" ] } + }, + { + "convert" : { + "field" : "DOLocationID", + "type" : "long" + } + }, + { + "convert" : { + "field" : "PULocationID", + "type" : "long" + } + }, + { + "convert" : { + "field" : "RatecodeID", + "type" : "long" + } + }, + { + "convert" : { + "field" : "VendorID", + "type" : "long" + } + }, + { + "convert" : { + "field" : "extra", + "type" : "double" + } + }, + { + "convert" : { + "field" : "fare_amount", + "type" : "double" + } + }, + { + "convert" : { + "field" : "improvement_surcharge", + "type" : "double" + } + }, + { + "convert" : { + "field" : "mta_tax", + "type" : "double" + } + }, + { + "convert" : { + "field" : "passenger_count", + "type" : "long" + } + }, + { + "convert" : { + "field" : "payment_type", + "type" : "long" + } + }, + { + "convert" : { + "field" : "tip_amount", + "type" : "double" + } + }, + { + "convert" : { + "field" : "tolls_amount", + "type" : "double" + } + }, + { + "convert" : { + "field" : "total_amount", + "type" : "double" + } + }, + { + "convert" : { + "field" : "trip_distance", + "type" : "double" + } + }, + { + "remove" : { + "field" : "message" + } } ] }, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinder.java index b947ed2d9cffe..befdf3d229f21 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinder.java @@ -93,7 +93,18 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List, SortedMap> mappingsAndFieldStats = + FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker); + + SortedMap mappings = mappingsAndFieldStats.v1(); + + List columnNamesList = Arrays.asList(columnNames); char delimiter = (char) csvPreference.getDelimiterChar(); + char quoteChar = csvPreference.getQuoteChar(); + + Map csvProcessorSettings = makeCsvProcessorSettings("message", columnNamesList, delimiter, quoteChar, + trimFields); + FileStructure.Builder structureBuilder = new FileStructure.Builder(FileStructure.Format.DELIMITED) .setCharset(charsetName) .setHasByteOrderMarker(hasByteOrderMarker) @@ -102,8 +113,19 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote) + .collect(Collectors.joining(delimiterMatcher))); + } if (trimFields) { structureBuilder.setShouldTrimFields(true); @@ -135,32 +157,20 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote) - .collect(Collectors.joining(delimiterMatcher))); - } - boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing(); structureBuilder.setTimestampField(timeField.v1()) .setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats()) .setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats()) .setNeedClientTimezone(needClientTimeZone) - .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timeField.v1(), - timeField.v2().getJavaTimestampFormats(), needClientTimeZone)) + .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings, + mappings, timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone)) .setMultilineStartPattern(timeLineRegex); + } else { + structureBuilder.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), + csvProcessorSettings, mappings, null, null, false)); } - Tuple, SortedMap> mappingsAndFieldStats = - FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker); - - SortedMap mappings = mappingsAndFieldStats.v1(); if (timeField != null) { mappings.put(FileStructureUtils.DEFAULT_TIMESTAMP_FIELD, FileStructureUtils.DATE_MAPPING_WITHOUT_FORMAT); } @@ -579,4 +589,24 @@ static boolean canCreateFromSample(List explanation, String sample, int private static boolean notUnexpectedEndOfFile(SuperCsvException e) { return e.getMessage().startsWith("unexpected end of file while reading quoted column") == false; } + + static Map makeCsvProcessorSettings(String field, List targetFields, char separator, char quote, boolean trim) { + + Map csvProcessorSettings = new LinkedHashMap<>(); + csvProcessorSettings.put("field", field); + csvProcessorSettings.put("target_fields", Collections.unmodifiableList(targetFields)); + if (separator != ',') { + // The value must be String, not Character, as XContent only works with String + csvProcessorSettings.put("separator", String.valueOf(separator)); + } + if (quote != '"') { + // The value must be String, not Character, as XContent only works with String + csvProcessorSettings.put("quote", String.valueOf(quote)); + } + csvProcessorSettings.put("ignore_missing", false); + if (trim) { + csvProcessorSettings.put("trim", true); + } + return Collections.unmodifiableMap(csvProcessorSettings); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtils.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtils.java index e4945d3709860..14df58a35ce8f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtils.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtils.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.filestructurefinder; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.grok.Grok; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.xpack.core.ml.filestructurefinder.FieldStats; @@ -31,6 +32,8 @@ public final class FileStructureUtils { public static final String MAPPING_PROPERTIES_SETTING = "properties"; public static final Map DATE_MAPPING_WITHOUT_FORMAT = Collections.singletonMap(MAPPING_TYPE_SETTING, "date"); + public static final Set CONVERTIBLE_TYPES = + Collections.unmodifiableSet(Sets.newHashSet("integer", "long", "float", "double", "boolean")); private static final int NUM_TOP_HITS = 10; // NUMBER Grok pattern doesn't support scientific notation, so we extend it @@ -352,6 +355,9 @@ static boolean isMoreLikelyTextThanKeyword(String str) { * @param grokPattern The Grok pattern used for parsing semi-structured text formats. null for * fully structured formats. * @param customGrokPatternDefinitions The definitions for any custom patterns that {@code grokPattern} uses. + * @param csvProcessorSettings The CSV processor settings for delimited formats. null for + * non-delimited formats. + * @param mappingsForConversions Mappings (or partial mappings) that will be considered for field type conversions. * @param timestampField The input field containing the timestamp to be parsed into @timestamp. * null if there is no timestamp. * @param timestampFormats Timestamp formats to be used for parsing {@code timestampField}. @@ -360,10 +366,12 @@ static boolean isMoreLikelyTextThanKeyword(String str) { * @return The ingest pipeline definition, or null if none is required. */ public static Map makeIngestPipelineDefinition(String grokPattern, Map customGrokPatternDefinitions, + Map csvProcessorSettings, + Map mappingsForConversions, String timestampField, List timestampFormats, boolean needClientTimezone) { - if (grokPattern == null && timestampField == null) { + if (grokPattern == null && csvProcessorSettings == null && timestampField == null) { return null; } @@ -384,6 +392,10 @@ public static Map makeIngestPipelineDefinition(String grokPatter assert customGrokPatternDefinitions.isEmpty(); } + if (csvProcessorSettings != null) { + processors.add(Collections.singletonMap("csv", csvProcessorSettings)); + } + if (timestampField != null) { Map dateProcessorSettings = new LinkedHashMap<>(); dateProcessorSettings.put("field", timestampField); @@ -394,6 +406,32 @@ public static Map makeIngestPipelineDefinition(String grokPatter processors.add(Collections.singletonMap("date", dateProcessorSettings)); } + for (Map.Entry mapping : mappingsForConversions.entrySet()) { + String fieldName = mapping.getKey(); + Object values = mapping.getValue(); + if (values instanceof Map) { + Object type = ((Map) values).get(MAPPING_TYPE_SETTING); + if (CONVERTIBLE_TYPES.contains(type)) { + Map convertProcessorSettings = new LinkedHashMap<>(); + convertProcessorSettings.put("field", fieldName); + convertProcessorSettings.put("type", type); + convertProcessorSettings.put("ignore_missing", true); + processors.add(Collections.singletonMap("convert", convertProcessorSettings)); + } + } + } + + // This removes the unparsed message field for delimited formats (unless the same field name is used for one of the columns) + if (csvProcessorSettings != null) { + Object field = csvProcessorSettings.get("field"); + assert field != null; + Object targetFields = csvProcessorSettings.get("target_fields"); + assert targetFields instanceof List; + if (((List) targetFields).contains(field) == false) { + processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", field))); + } + } + // This removes the interim timestamp field used for semi-structured text formats if (grokPattern != null && timestampField != null) { processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", timestampField))); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/NdJsonFileStructureFinder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/NdJsonFileStructureFinder.java index 1b405eb685fa2..da103630be586 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/NdJsonFileStructureFinder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/NdJsonFileStructureFinder.java @@ -61,8 +61,10 @@ static NdJsonFileStructureFinder makeNdJsonFileStructureFinder(List expl .setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats()) .setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats()) .setNeedClientTimezone(needClientTimeZone) - .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timeField.v1(), - timeField.v2().getJavaTimestampFormats(), needClientTimeZone)); + .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null, + // Note: no convert processors are added based on mappings for NDJSON input + // because it's reasonable that _source matches the supplied JSON precisely + Collections.emptyMap(), timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone)); } Tuple, SortedMap> mappingsAndFieldStats = diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TextLogFileStructureFinder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TextLogFileStructureFinder.java index e47d045dd257e..e5e9576b316aa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TextLogFileStructureFinder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TextLogFileStructureFinder.java @@ -150,9 +150,8 @@ static TextLogFileStructureFinder makeTextLogFileStructureFinder(List ex .setJavaTimestampFormats(timestampFormatFinder.getJavaTimestampFormats()) .setNeedClientTimezone(needClientTimeZone) .setGrokPattern(grokPattern) - .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern, - customGrokPatternDefinitions, interimTimestampField, - timestampFormatFinder.getJavaTimestampFormats(), needClientTimeZone)) + .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern, customGrokPatternDefinitions, null, mappings, + interimTimestampField, timestampFormatFinder.getJavaTimestampFormats(), needClientTimeZone)) .setMappings(mappings) .setFieldStats(fieldStats) .setExplanation(explanation) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/XmlFileStructureFinder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/XmlFileStructureFinder.java index 91fc61bcbd4b4..94e698d269c5c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/XmlFileStructureFinder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/XmlFileStructureFinder.java @@ -102,8 +102,9 @@ static XmlFileStructureFinder makeXmlFileStructureFinder(List explanatio .setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats()) .setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats()) .setNeedClientTimezone(needClientTimeZone) - .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), - topLevelTag + "." + timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone)); + .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null, + Collections.emptyMap(), topLevelTag + "." + timeField.v1(), timeField.v2().getJavaTimestampFormats(), + needClientTimeZone)); } Tuple, SortedMap> mappingsAndFieldStats = diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinderTests.java index 993343084a848..fbe94c92e8e56 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinderTests.java @@ -15,11 +15,14 @@ import java.util.BitSet; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.elasticsearch.xpack.ml.filestructurefinder.DelimitedFileStructureFinder.levenshteinFieldwiseCompareRows; import static org.elasticsearch.xpack.ml.filestructurefinder.DelimitedFileStructureFinder.levenshteinDistance; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; public class DelimitedFileStructureFinderTests extends FileStructureTestCase { @@ -583,4 +586,39 @@ public void testRowContainsDuplicateNonEmptyValues() { assertNull(DelimitedFileStructureFinder.findDuplicateNonEmptyValues(Arrays.asList("a", "", ""))); assertNull(DelimitedFileStructureFinder.findDuplicateNonEmptyValues(Arrays.asList("", "a", ""))); } + + public void testMakeCsvProcessorSettings() { + + String field = randomAlphaOfLength(10); + List targetFields = Arrays.asList(generateRandomStringArray(10, field.length() - 1, false , false)); + char separator = randomFrom(',', ';', '\t', '|'); + char quote = randomFrom('"', '\''); + boolean trim = randomBoolean(); + Map settings = DelimitedFileStructureFinder.makeCsvProcessorSettings(field, targetFields, separator, quote, trim); + assertThat(settings.get("field"), equalTo(field)); + assertThat(settings.get("target_fields"), equalTo(targetFields)); + assertThat(settings.get("ignore_missing"), equalTo(false)); + if (separator == ',') { + assertThat(settings, not(hasKey("separator"))); + } else { + assertThat(settings.get("separator"), equalTo(String.valueOf(separator))); + } + if (quote == '"') { + assertThat(settings, not(hasKey("quote"))); + } else { + assertThat(settings.get("quote"), equalTo(String.valueOf(quote))); + } + if (trim) { + assertThat(settings.get("trim"), equalTo(true)); + } else { + assertThat(settings, not(hasKey("trim"))); + } + } + + static Map randomCsvProcessorSettings() { + String field = randomAlphaOfLength(10); + return DelimitedFileStructureFinder.makeCsvProcessorSettings(field, + Arrays.asList(generateRandomStringArray(10, field.length() - 1, false , false)), randomFrom(',', ';', '\t', '|'), + randomFrom('"', '\''), randomBoolean()); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtilsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtilsTests.java index a0f54c6b6f24f..d52c3cf87ddb9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtilsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtilsTests.java @@ -18,6 +18,8 @@ import static org.elasticsearch.xpack.ml.filestructurefinder.FileStructureOverrides.EMPTY_OVERRIDES; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class FileStructureUtilsTests extends FileStructureTestCase { @@ -346,21 +348,22 @@ public void testGuessMappingsAndCalculateFieldStats() { assertNull(fieldStats.get("nothing")); } - public void testMakeIngestPipelineDefinitionGivenStructuredWithoutTimestamp() { + public void testMakeIngestPipelineDefinitionGivenNdJsonWithoutTimestamp() { - assertNull(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null, null, false)); + assertNull(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null, Collections.emptyMap(), null, null, + false)); } @SuppressWarnings("unchecked") - public void testMakeIngestPipelineDefinitionGivenStructuredWithTimestamp() { + public void testMakeIngestPipelineDefinitionGivenNdJsonWithTimestamp() { String timestampField = randomAlphaOfLength(10); List timestampFormats = randomFrom(Collections.singletonList("ISO8601"), Arrays.asList("EEE MMM dd HH:mm:ss yyyy", "EEE MMM d HH:mm:ss yyyy")); boolean needClientTimezone = randomBoolean(); - Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timestampField, - timestampFormats, needClientTimezone); + Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null, + Collections.emptyMap(), timestampField, timestampFormats, needClientTimezone); assertNotNull(pipeline); assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description")); @@ -379,6 +382,144 @@ public void testMakeIngestPipelineDefinitionGivenStructuredWithTimestamp() { assertEquals(Collections.emptyMap(), pipeline); } + @SuppressWarnings("unchecked") + public void testMakeIngestPipelineDefinitionGivenDelimitedWithoutTimestamp() { + + Map csvProcessorSettings = DelimitedFileStructureFinderTests.randomCsvProcessorSettings(); + + Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings, + Collections.emptyMap(), null, null, false); + assertNotNull(pipeline); + + assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description")); + + List> processors = (List>) pipeline.remove("processors"); + assertNotNull(processors); + assertEquals(2, processors.size()); + + Map csvProcessor = (Map) processors.get(0).get("csv"); + assertNotNull(csvProcessor); + assertThat(csvProcessor.get("field"), instanceOf(String.class)); + assertThat(csvProcessor.get("target_fields"), instanceOf(List.class)); + + Map removeProcessor = (Map) processors.get(1).get("remove"); + assertNotNull(removeProcessor); + assertThat(csvProcessor.get("field"), equalTo(csvProcessorSettings.get("field"))); + + // After removing the two expected fields there should be nothing left in the pipeline + assertEquals(Collections.emptyMap(), pipeline); + } + + @SuppressWarnings("unchecked") + public void testMakeIngestPipelineDefinitionGivenDelimitedWithFieldInTargetFields() { + + Map csvProcessorSettings = new HashMap<>(DelimitedFileStructureFinderTests.randomCsvProcessorSettings()); + // Hack it so the field to be parsed is also one of the column names + String firstTargetField = ((List) csvProcessorSettings.get("target_fields")).get(0); + csvProcessorSettings.put("field", firstTargetField); + + Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings, + Collections.emptyMap(), null, null, false); + assertNotNull(pipeline); + + assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description")); + + List> processors = (List>) pipeline.remove("processors"); + assertNotNull(processors); + assertEquals(1, processors.size()); // 1 because there's no "remove" processor this time + + Map csvProcessor = (Map) processors.get(0).get("csv"); + assertNotNull(csvProcessor); + assertThat(csvProcessor.get("field"), equalTo(firstTargetField)); + assertThat(csvProcessor.get("target_fields"), instanceOf(List.class)); + assertThat(csvProcessor.get("ignore_missing"), equalTo(false)); + + // After removing the two expected fields there should be nothing left in the pipeline + assertEquals(Collections.emptyMap(), pipeline); + } + + @SuppressWarnings("unchecked") + public void testMakeIngestPipelineDefinitionGivenDelimitedWithConversion() { + + Map csvProcessorSettings = DelimitedFileStructureFinderTests.randomCsvProcessorSettings(); + boolean expectConversion = randomBoolean(); + String mappingType = expectConversion ? randomFrom("long", "double", "boolean") : randomFrom("keyword", "text", "date"); + String firstTargetField = ((List) csvProcessorSettings.get("target_fields")).get(0); + Map mappingsForConversions = + Collections.singletonMap(firstTargetField, Collections.singletonMap(FileStructureUtils.MAPPING_TYPE_SETTING, mappingType)); + + Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings, + mappingsForConversions, null, null, false); + assertNotNull(pipeline); + + assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description")); + + List> processors = (List>) pipeline.remove("processors"); + assertNotNull(processors); + assertEquals(expectConversion ? 3 : 2, processors.size()); + + Map csvProcessor = (Map) processors.get(0).get("csv"); + assertNotNull(csvProcessor); + assertThat(csvProcessor.get("field"), instanceOf(String.class)); + assertThat(csvProcessor.get("target_fields"), instanceOf(List.class)); + assertThat(csvProcessor.get("ignore_missing"), equalTo(false)); + + if (expectConversion) { + Map convertProcessor = (Map) processors.get(1).get("convert"); + assertNotNull(convertProcessor); + assertThat(convertProcessor.get("field"), equalTo(firstTargetField)); + assertThat(convertProcessor.get("type"), equalTo(mappingType)); + assertThat(convertProcessor.get("ignore_missing"), equalTo(true)); + } + + Map removeProcessor = (Map) processors.get(processors.size() - 1).get("remove"); + assertNotNull(removeProcessor); + assertThat(removeProcessor.get("field"), equalTo(csvProcessorSettings.get("field"))); + + // After removing the two expected fields there should be nothing left in the pipeline + assertEquals(Collections.emptyMap(), pipeline); + } + + @SuppressWarnings("unchecked") + public void testMakeIngestPipelineDefinitionGivenDelimitedWithTimestamp() { + + Map csvProcessorSettings = DelimitedFileStructureFinderTests.randomCsvProcessorSettings(); + + String timestampField = randomAlphaOfLength(10); + List timestampFormats = randomFrom(Collections.singletonList("ISO8601"), + Arrays.asList("EEE MMM dd HH:mm:ss yyyy", "EEE MMM d HH:mm:ss yyyy")); + boolean needClientTimezone = randomBoolean(); + + Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings, + Collections.emptyMap(), timestampField, timestampFormats, needClientTimezone); + assertNotNull(pipeline); + + assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description")); + + List> processors = (List>) pipeline.remove("processors"); + assertNotNull(processors); + assertEquals(3, processors.size()); + + Map csvProcessor = (Map) processors.get(0).get("csv"); + assertNotNull(csvProcessor); + assertThat(csvProcessor.get("field"), instanceOf(String.class)); + assertThat(csvProcessor.get("target_fields"), instanceOf(List.class)); + assertThat(csvProcessor.get("ignore_missing"), equalTo(false)); + + Map dateProcessor = (Map) processors.get(1).get("date"); + assertNotNull(dateProcessor); + assertEquals(timestampField, dateProcessor.get("field")); + assertEquals(needClientTimezone, dateProcessor.containsKey("timezone")); + assertEquals(timestampFormats, dateProcessor.get("formats")); + + Map removeProcessor = (Map) processors.get(2).get("remove"); + assertNotNull(removeProcessor); + assertThat(removeProcessor.get("field"), equalTo(csvProcessorSettings.get("field"))); + + // After removing the two expected fields there should be nothing left in the pipeline + assertEquals(Collections.emptyMap(), pipeline); + } + @SuppressWarnings("unchecked") public void testMakeIngestPipelineDefinitionGivenSemiStructured() { @@ -388,8 +529,8 @@ public void testMakeIngestPipelineDefinitionGivenSemiStructured() { Arrays.asList("EEE MMM dd HH:mm:ss yyyy", "EEE MMM d HH:mm:ss yyyy")); boolean needClientTimezone = randomBoolean(); - Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(grokPattern, Collections.emptyMap(), timestampField, - timestampFormats, needClientTimezone); + Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(grokPattern, Collections.emptyMap(), null, + Collections.emptyMap(), timestampField, timestampFormats, needClientTimezone); assertNotNull(pipeline); assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description"));