diff --git a/docs/reference/ml/anomaly-detection/apis/find-file-structure.asciidoc b/docs/reference/ml/anomaly-detection/apis/find-file-structure.asciidoc index 4f85e39d60aa0..0a1b2aaefef80 100644 --- a/docs/reference/ml/anomaly-detection/apis/find-file-structure.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/find-file-structure.asciidoc @@ -145,8 +145,8 @@ to request analysis of 100000 lines to achieve some variety. value is `true`. Otherwise, the default value is `false`. `timeout`:: - (Optional, <>) Sets the maximum amount of time that the - structure analysis make take. If the analysis is still running when the + (Optional, <>) Sets the maximum amount of time that the + structure analysis make take. If the analysis is still running when the timeout expires then it will be aborted. The default value is 25 seconds. `timestamp_field`:: @@ -163,8 +163,8 @@ also specified. For structured file formats, if you specify this parameter, the field must exist within the file. -If this parameter is not specified, the structure finder makes a decision about -which field (if any) is the primary timestamp field. For structured file +If this parameter is not specified, the structure finder makes a decision about +which field (if any) is the primary timestamp field. For structured file formats, it is not compulsory to have a timestamp in the file. -- @@ -213,14 +213,14 @@ format from a built-in set. The following table provides the appropriate `timeformat` values for some example timestamps: |=== -| Timeformat | Presentation +| Timeformat | Presentation | yyyy-MM-dd HH:mm:ssZ | 2019-04-20 13:15:22+0000 -| EEE, d MMM yyyy HH:mm:ss Z | Sat, 20 Apr 2019 13:15:22 +0000 +| EEE, d MMM yyyy HH:mm:ss Z | Sat, 20 Apr 2019 13:15:22 +0000 | dd.MM.yy HH:mm:ss.SSS | 20.04.19 13:15:22.285 |=== -See +See https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html[the Java date/time format documentation] for more information about date and time format syntax. @@ -675,6 +675,30 @@ If the request does not encounter errors, you receive the following result: "ingest_pipeline" : { "description" : "Ingest pipeline created by file structure finder", "processors" : [ + { + "csv" : { + "field" : "message", + "target_fields" : [ + "VendorID", + "tpep_pickup_datetime", + "tpep_dropoff_datetime", + "passenger_count", + "trip_distance", + "RatecodeID", + "store_and_fwd_flag", + "PULocationID", + "DOLocationID", + "payment_type", + "fare_amount", + "extra", + "mta_tax", + "tip_amount", + "tolls_amount", + "improvement_surcharge", + "total_amount" + ] + } + }, { "date" : { "field" : "tpep_pickup_datetime", @@ -683,6 +707,95 @@ If the request does not encounter errors, you receive the following result: "yyyy-MM-dd HH:mm:ss" ] } + }, + { + "convert" : { + "field" : "DOLocationID", + "type" : "long" + } + }, + { + "convert" : { + "field" : "PULocationID", + "type" : "long" + } + }, + { + "convert" : { + "field" : "RatecodeID", + "type" : "long" + } + }, + { + "convert" : { + "field" : "VendorID", + "type" : "long" + } + }, + { + "convert" : { + "field" : "extra", + "type" : "double" + } + }, + { + "convert" : { + "field" : "fare_amount", + "type" : "double" + } + }, + { + "convert" : { + "field" : "improvement_surcharge", + "type" : "double" + } + }, + { + "convert" : { + "field" : "mta_tax", + "type" : "double" + } + }, + { + "convert" : { + "field" : "passenger_count", + "type" : "long" + } + }, + { + "convert" : { + "field" : "payment_type", + "type" : "long" + } + }, + { + "convert" : { + "field" : "tip_amount", + "type" : "double" + } + }, + { + "convert" : { + "field" : "tolls_amount", + "type" : "double" + } + }, + { + "convert" : { + "field" : "total_amount", + "type" : "double" + } + }, + { + "convert" : { + "field" : "trip_distance", + "type" : "double" + } + }, + { + "remove" : { + "field" : "message" + } } ] }, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinder.java index b947ed2d9cffe..befdf3d229f21 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinder.java @@ -93,7 +93,18 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List, SortedMap> mappingsAndFieldStats = + FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker); + + SortedMap mappings = mappingsAndFieldStats.v1(); + + List columnNamesList = Arrays.asList(columnNames); char delimiter = (char) csvPreference.getDelimiterChar(); + char quoteChar = csvPreference.getQuoteChar(); + + Map csvProcessorSettings = makeCsvProcessorSettings("message", columnNamesList, delimiter, quoteChar, + trimFields); + FileStructure.Builder structureBuilder = new FileStructure.Builder(FileStructure.Format.DELIMITED) .setCharset(charsetName) .setHasByteOrderMarker(hasByteOrderMarker) @@ -102,8 +113,19 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote) + .collect(Collectors.joining(delimiterMatcher))); + } if (trimFields) { structureBuilder.setShouldTrimFields(true); @@ -135,32 +157,20 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote) - .collect(Collectors.joining(delimiterMatcher))); - } - boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing(); structureBuilder.setTimestampField(timeField.v1()) .setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats()) .setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats()) .setNeedClientTimezone(needClientTimeZone) - .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timeField.v1(), - timeField.v2().getJavaTimestampFormats(), needClientTimeZone)) + .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings, + mappings, timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone)) .setMultilineStartPattern(timeLineRegex); + } else { + structureBuilder.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), + csvProcessorSettings, mappings, null, null, false)); } - Tuple, SortedMap> mappingsAndFieldStats = - FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker); - - SortedMap mappings = mappingsAndFieldStats.v1(); if (timeField != null) { mappings.put(FileStructureUtils.DEFAULT_TIMESTAMP_FIELD, FileStructureUtils.DATE_MAPPING_WITHOUT_FORMAT); } @@ -579,4 +589,24 @@ static boolean canCreateFromSample(List explanation, String sample, int private static boolean notUnexpectedEndOfFile(SuperCsvException e) { return e.getMessage().startsWith("unexpected end of file while reading quoted column") == false; } + + static Map makeCsvProcessorSettings(String field, List targetFields, char separator, char quote, boolean trim) { + + Map csvProcessorSettings = new LinkedHashMap<>(); + csvProcessorSettings.put("field", field); + csvProcessorSettings.put("target_fields", Collections.unmodifiableList(targetFields)); + if (separator != ',') { + // The value must be String, not Character, as XContent only works with String + csvProcessorSettings.put("separator", String.valueOf(separator)); + } + if (quote != '"') { + // The value must be String, not Character, as XContent only works with String + csvProcessorSettings.put("quote", String.valueOf(quote)); + } + csvProcessorSettings.put("ignore_missing", false); + if (trim) { + csvProcessorSettings.put("trim", true); + } + return Collections.unmodifiableMap(csvProcessorSettings); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtils.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtils.java index e4945d3709860..14df58a35ce8f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtils.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtils.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.filestructurefinder; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.grok.Grok; import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.xpack.core.ml.filestructurefinder.FieldStats; @@ -31,6 +32,8 @@ public final class FileStructureUtils { public static final String MAPPING_PROPERTIES_SETTING = "properties"; public static final Map DATE_MAPPING_WITHOUT_FORMAT = Collections.singletonMap(MAPPING_TYPE_SETTING, "date"); + public static final Set CONVERTIBLE_TYPES = + Collections.unmodifiableSet(Sets.newHashSet("integer", "long", "float", "double", "boolean")); private static final int NUM_TOP_HITS = 10; // NUMBER Grok pattern doesn't support scientific notation, so we extend it @@ -352,6 +355,9 @@ static boolean isMoreLikelyTextThanKeyword(String str) { * @param grokPattern The Grok pattern used for parsing semi-structured text formats. null for * fully structured formats. * @param customGrokPatternDefinitions The definitions for any custom patterns that {@code grokPattern} uses. + * @param csvProcessorSettings The CSV processor settings for delimited formats. null for + * non-delimited formats. + * @param mappingsForConversions Mappings (or partial mappings) that will be considered for field type conversions. * @param timestampField The input field containing the timestamp to be parsed into @timestamp. * null if there is no timestamp. * @param timestampFormats Timestamp formats to be used for parsing {@code timestampField}. @@ -360,10 +366,12 @@ static boolean isMoreLikelyTextThanKeyword(String str) { * @return The ingest pipeline definition, or null if none is required. */ public static Map makeIngestPipelineDefinition(String grokPattern, Map customGrokPatternDefinitions, + Map csvProcessorSettings, + Map mappingsForConversions, String timestampField, List timestampFormats, boolean needClientTimezone) { - if (grokPattern == null && timestampField == null) { + if (grokPattern == null && csvProcessorSettings == null && timestampField == null) { return null; } @@ -384,6 +392,10 @@ public static Map makeIngestPipelineDefinition(String grokPatter assert customGrokPatternDefinitions.isEmpty(); } + if (csvProcessorSettings != null) { + processors.add(Collections.singletonMap("csv", csvProcessorSettings)); + } + if (timestampField != null) { Map dateProcessorSettings = new LinkedHashMap<>(); dateProcessorSettings.put("field", timestampField); @@ -394,6 +406,32 @@ public static Map makeIngestPipelineDefinition(String grokPatter processors.add(Collections.singletonMap("date", dateProcessorSettings)); } + for (Map.Entry mapping : mappingsForConversions.entrySet()) { + String fieldName = mapping.getKey(); + Object values = mapping.getValue(); + if (values instanceof Map) { + Object type = ((Map) values).get(MAPPING_TYPE_SETTING); + if (CONVERTIBLE_TYPES.contains(type)) { + Map convertProcessorSettings = new LinkedHashMap<>(); + convertProcessorSettings.put("field", fieldName); + convertProcessorSettings.put("type", type); + convertProcessorSettings.put("ignore_missing", true); + processors.add(Collections.singletonMap("convert", convertProcessorSettings)); + } + } + } + + // This removes the unparsed message field for delimited formats (unless the same field name is used for one of the columns) + if (csvProcessorSettings != null) { + Object field = csvProcessorSettings.get("field"); + assert field != null; + Object targetFields = csvProcessorSettings.get("target_fields"); + assert targetFields instanceof List; + if (((List) targetFields).contains(field) == false) { + processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", field))); + } + } + // This removes the interim timestamp field used for semi-structured text formats if (grokPattern != null && timestampField != null) { processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", timestampField))); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/NdJsonFileStructureFinder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/NdJsonFileStructureFinder.java index 1b405eb685fa2..da103630be586 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/NdJsonFileStructureFinder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/NdJsonFileStructureFinder.java @@ -61,8 +61,10 @@ static NdJsonFileStructureFinder makeNdJsonFileStructureFinder(List expl .setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats()) .setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats()) .setNeedClientTimezone(needClientTimeZone) - .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timeField.v1(), - timeField.v2().getJavaTimestampFormats(), needClientTimeZone)); + .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null, + // Note: no convert processors are added based on mappings for NDJSON input + // because it's reasonable that _source matches the supplied JSON precisely + Collections.emptyMap(), timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone)); } Tuple, SortedMap> mappingsAndFieldStats = diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TextLogFileStructureFinder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TextLogFileStructureFinder.java index e47d045dd257e..e5e9576b316aa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TextLogFileStructureFinder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/TextLogFileStructureFinder.java @@ -150,9 +150,8 @@ static TextLogFileStructureFinder makeTextLogFileStructureFinder(List ex .setJavaTimestampFormats(timestampFormatFinder.getJavaTimestampFormats()) .setNeedClientTimezone(needClientTimeZone) .setGrokPattern(grokPattern) - .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern, - customGrokPatternDefinitions, interimTimestampField, - timestampFormatFinder.getJavaTimestampFormats(), needClientTimeZone)) + .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern, customGrokPatternDefinitions, null, mappings, + interimTimestampField, timestampFormatFinder.getJavaTimestampFormats(), needClientTimeZone)) .setMappings(mappings) .setFieldStats(fieldStats) .setExplanation(explanation) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/XmlFileStructureFinder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/XmlFileStructureFinder.java index 91fc61bcbd4b4..94e698d269c5c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/XmlFileStructureFinder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/filestructurefinder/XmlFileStructureFinder.java @@ -102,8 +102,9 @@ static XmlFileStructureFinder makeXmlFileStructureFinder(List explanatio .setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats()) .setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats()) .setNeedClientTimezone(needClientTimeZone) - .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), - topLevelTag + "." + timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone)); + .setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null, + Collections.emptyMap(), topLevelTag + "." + timeField.v1(), timeField.v2().getJavaTimestampFormats(), + needClientTimeZone)); } Tuple, SortedMap> mappingsAndFieldStats = diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinderTests.java index 993343084a848..fbe94c92e8e56 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/DelimitedFileStructureFinderTests.java @@ -15,11 +15,14 @@ import java.util.BitSet; import java.util.Collections; import java.util.List; +import java.util.Map; import static org.elasticsearch.xpack.ml.filestructurefinder.DelimitedFileStructureFinder.levenshteinFieldwiseCompareRows; import static org.elasticsearch.xpack.ml.filestructurefinder.DelimitedFileStructureFinder.levenshteinDistance; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; public class DelimitedFileStructureFinderTests extends FileStructureTestCase { @@ -583,4 +586,39 @@ public void testRowContainsDuplicateNonEmptyValues() { assertNull(DelimitedFileStructureFinder.findDuplicateNonEmptyValues(Arrays.asList("a", "", ""))); assertNull(DelimitedFileStructureFinder.findDuplicateNonEmptyValues(Arrays.asList("", "a", ""))); } + + public void testMakeCsvProcessorSettings() { + + String field = randomAlphaOfLength(10); + List targetFields = Arrays.asList(generateRandomStringArray(10, field.length() - 1, false , false)); + char separator = randomFrom(',', ';', '\t', '|'); + char quote = randomFrom('"', '\''); + boolean trim = randomBoolean(); + Map settings = DelimitedFileStructureFinder.makeCsvProcessorSettings(field, targetFields, separator, quote, trim); + assertThat(settings.get("field"), equalTo(field)); + assertThat(settings.get("target_fields"), equalTo(targetFields)); + assertThat(settings.get("ignore_missing"), equalTo(false)); + if (separator == ',') { + assertThat(settings, not(hasKey("separator"))); + } else { + assertThat(settings.get("separator"), equalTo(String.valueOf(separator))); + } + if (quote == '"') { + assertThat(settings, not(hasKey("quote"))); + } else { + assertThat(settings.get("quote"), equalTo(String.valueOf(quote))); + } + if (trim) { + assertThat(settings.get("trim"), equalTo(true)); + } else { + assertThat(settings, not(hasKey("trim"))); + } + } + + static Map randomCsvProcessorSettings() { + String field = randomAlphaOfLength(10); + return DelimitedFileStructureFinder.makeCsvProcessorSettings(field, + Arrays.asList(generateRandomStringArray(10, field.length() - 1, false , false)), randomFrom(',', ';', '\t', '|'), + randomFrom('"', '\''), randomBoolean()); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtilsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtilsTests.java index a0f54c6b6f24f..d52c3cf87ddb9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtilsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/filestructurefinder/FileStructureUtilsTests.java @@ -18,6 +18,8 @@ import static org.elasticsearch.xpack.ml.filestructurefinder.FileStructureOverrides.EMPTY_OVERRIDES; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class FileStructureUtilsTests extends FileStructureTestCase { @@ -346,21 +348,22 @@ public void testGuessMappingsAndCalculateFieldStats() { assertNull(fieldStats.get("nothing")); } - public void testMakeIngestPipelineDefinitionGivenStructuredWithoutTimestamp() { + public void testMakeIngestPipelineDefinitionGivenNdJsonWithoutTimestamp() { - assertNull(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null, null, false)); + assertNull(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null, Collections.emptyMap(), null, null, + false)); } @SuppressWarnings("unchecked") - public void testMakeIngestPipelineDefinitionGivenStructuredWithTimestamp() { + public void testMakeIngestPipelineDefinitionGivenNdJsonWithTimestamp() { String timestampField = randomAlphaOfLength(10); List timestampFormats = randomFrom(Collections.singletonList("ISO8601"), Arrays.asList("EEE MMM dd HH:mm:ss yyyy", "EEE MMM d HH:mm:ss yyyy")); boolean needClientTimezone = randomBoolean(); - Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timestampField, - timestampFormats, needClientTimezone); + Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null, + Collections.emptyMap(), timestampField, timestampFormats, needClientTimezone); assertNotNull(pipeline); assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description")); @@ -379,6 +382,144 @@ public void testMakeIngestPipelineDefinitionGivenStructuredWithTimestamp() { assertEquals(Collections.emptyMap(), pipeline); } + @SuppressWarnings("unchecked") + public void testMakeIngestPipelineDefinitionGivenDelimitedWithoutTimestamp() { + + Map csvProcessorSettings = DelimitedFileStructureFinderTests.randomCsvProcessorSettings(); + + Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings, + Collections.emptyMap(), null, null, false); + assertNotNull(pipeline); + + assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description")); + + List> processors = (List>) pipeline.remove("processors"); + assertNotNull(processors); + assertEquals(2, processors.size()); + + Map csvProcessor = (Map) processors.get(0).get("csv"); + assertNotNull(csvProcessor); + assertThat(csvProcessor.get("field"), instanceOf(String.class)); + assertThat(csvProcessor.get("target_fields"), instanceOf(List.class)); + + Map removeProcessor = (Map) processors.get(1).get("remove"); + assertNotNull(removeProcessor); + assertThat(csvProcessor.get("field"), equalTo(csvProcessorSettings.get("field"))); + + // After removing the two expected fields there should be nothing left in the pipeline + assertEquals(Collections.emptyMap(), pipeline); + } + + @SuppressWarnings("unchecked") + public void testMakeIngestPipelineDefinitionGivenDelimitedWithFieldInTargetFields() { + + Map csvProcessorSettings = new HashMap<>(DelimitedFileStructureFinderTests.randomCsvProcessorSettings()); + // Hack it so the field to be parsed is also one of the column names + String firstTargetField = ((List) csvProcessorSettings.get("target_fields")).get(0); + csvProcessorSettings.put("field", firstTargetField); + + Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings, + Collections.emptyMap(), null, null, false); + assertNotNull(pipeline); + + assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description")); + + List> processors = (List>) pipeline.remove("processors"); + assertNotNull(processors); + assertEquals(1, processors.size()); // 1 because there's no "remove" processor this time + + Map csvProcessor = (Map) processors.get(0).get("csv"); + assertNotNull(csvProcessor); + assertThat(csvProcessor.get("field"), equalTo(firstTargetField)); + assertThat(csvProcessor.get("target_fields"), instanceOf(List.class)); + assertThat(csvProcessor.get("ignore_missing"), equalTo(false)); + + // After removing the two expected fields there should be nothing left in the pipeline + assertEquals(Collections.emptyMap(), pipeline); + } + + @SuppressWarnings("unchecked") + public void testMakeIngestPipelineDefinitionGivenDelimitedWithConversion() { + + Map csvProcessorSettings = DelimitedFileStructureFinderTests.randomCsvProcessorSettings(); + boolean expectConversion = randomBoolean(); + String mappingType = expectConversion ? randomFrom("long", "double", "boolean") : randomFrom("keyword", "text", "date"); + String firstTargetField = ((List) csvProcessorSettings.get("target_fields")).get(0); + Map mappingsForConversions = + Collections.singletonMap(firstTargetField, Collections.singletonMap(FileStructureUtils.MAPPING_TYPE_SETTING, mappingType)); + + Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings, + mappingsForConversions, null, null, false); + assertNotNull(pipeline); + + assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description")); + + List> processors = (List>) pipeline.remove("processors"); + assertNotNull(processors); + assertEquals(expectConversion ? 3 : 2, processors.size()); + + Map csvProcessor = (Map) processors.get(0).get("csv"); + assertNotNull(csvProcessor); + assertThat(csvProcessor.get("field"), instanceOf(String.class)); + assertThat(csvProcessor.get("target_fields"), instanceOf(List.class)); + assertThat(csvProcessor.get("ignore_missing"), equalTo(false)); + + if (expectConversion) { + Map convertProcessor = (Map) processors.get(1).get("convert"); + assertNotNull(convertProcessor); + assertThat(convertProcessor.get("field"), equalTo(firstTargetField)); + assertThat(convertProcessor.get("type"), equalTo(mappingType)); + assertThat(convertProcessor.get("ignore_missing"), equalTo(true)); + } + + Map removeProcessor = (Map) processors.get(processors.size() - 1).get("remove"); + assertNotNull(removeProcessor); + assertThat(removeProcessor.get("field"), equalTo(csvProcessorSettings.get("field"))); + + // After removing the two expected fields there should be nothing left in the pipeline + assertEquals(Collections.emptyMap(), pipeline); + } + + @SuppressWarnings("unchecked") + public void testMakeIngestPipelineDefinitionGivenDelimitedWithTimestamp() { + + Map csvProcessorSettings = DelimitedFileStructureFinderTests.randomCsvProcessorSettings(); + + String timestampField = randomAlphaOfLength(10); + List timestampFormats = randomFrom(Collections.singletonList("ISO8601"), + Arrays.asList("EEE MMM dd HH:mm:ss yyyy", "EEE MMM d HH:mm:ss yyyy")); + boolean needClientTimezone = randomBoolean(); + + Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings, + Collections.emptyMap(), timestampField, timestampFormats, needClientTimezone); + assertNotNull(pipeline); + + assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description")); + + List> processors = (List>) pipeline.remove("processors"); + assertNotNull(processors); + assertEquals(3, processors.size()); + + Map csvProcessor = (Map) processors.get(0).get("csv"); + assertNotNull(csvProcessor); + assertThat(csvProcessor.get("field"), instanceOf(String.class)); + assertThat(csvProcessor.get("target_fields"), instanceOf(List.class)); + assertThat(csvProcessor.get("ignore_missing"), equalTo(false)); + + Map dateProcessor = (Map) processors.get(1).get("date"); + assertNotNull(dateProcessor); + assertEquals(timestampField, dateProcessor.get("field")); + assertEquals(needClientTimezone, dateProcessor.containsKey("timezone")); + assertEquals(timestampFormats, dateProcessor.get("formats")); + + Map removeProcessor = (Map) processors.get(2).get("remove"); + assertNotNull(removeProcessor); + assertThat(removeProcessor.get("field"), equalTo(csvProcessorSettings.get("field"))); + + // After removing the two expected fields there should be nothing left in the pipeline + assertEquals(Collections.emptyMap(), pipeline); + } + @SuppressWarnings("unchecked") public void testMakeIngestPipelineDefinitionGivenSemiStructured() { @@ -388,8 +529,8 @@ public void testMakeIngestPipelineDefinitionGivenSemiStructured() { Arrays.asList("EEE MMM dd HH:mm:ss yyyy", "EEE MMM d HH:mm:ss yyyy")); boolean needClientTimezone = randomBoolean(); - Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(grokPattern, Collections.emptyMap(), timestampField, - timestampFormats, needClientTimezone); + Map pipeline = FileStructureUtils.makeIngestPipelineDefinition(grokPattern, Collections.emptyMap(), null, + Collections.emptyMap(), timestampField, timestampFormats, needClientTimezone); assertNotNull(pipeline); assertEquals("Ingest pipeline created by file structure finder", pipeline.remove("description"));