Skip to content

Commit

Permalink
[ML] Use CSV ingest processor in find_file_structure ingest pipeline (#…
Browse files Browse the repository at this point in the history
…51492)

Changes the find_file_structure response to include a CSV
ingest processor in the ingest pipeline it suggests.

Previously the Kibana file upload functionality parsed CSV
in the browser, but by parsing CSV in the ingest pipeline
it makes the Kibana file upload functionality more easily
interchangable with Filebeat such that the configurations
it creates can more easily be used to import data with the
same structure repeatedly in production.
  • Loading branch information
droberts195 committed Jan 28, 2020
1 parent a8bd4d0 commit 550254e
Show file tree
Hide file tree
Showing 8 changed files with 403 additions and 41 deletions.
127 changes: 120 additions & 7 deletions docs/reference/ml/anomaly-detection/apis/find-file-structure.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ to request analysis of 100000 lines to achieve some variety.
value is `true`. Otherwise, the default value is `false`.

`timeout`::
(Optional, <<time-units,time units>>) Sets the maximum amount of time that the
structure analysis make take. If the analysis is still running when the
(Optional, <<time-units,time units>>) Sets the maximum amount of time that the
structure analysis make take. If the analysis is still running when the
timeout expires then it will be aborted. The default value is 25 seconds.

`timestamp_field`::
Expand All @@ -163,8 +163,8 @@ also specified.
For structured file formats, if you specify this parameter, the field must exist
within the file.

If this parameter is not specified, the structure finder makes a decision about
which field (if any) is the primary timestamp field. For structured file
If this parameter is not specified, the structure finder makes a decision about
which field (if any) is the primary timestamp field. For structured file
formats, it is not compulsory to have a timestamp in the file.
--

Expand Down Expand Up @@ -213,14 +213,14 @@ format from a built-in set.
The following table provides the appropriate `timeformat` values for some example timestamps:

|===
| Timeformat | Presentation
| Timeformat | Presentation

| yyyy-MM-dd HH:mm:ssZ | 2019-04-20 13:15:22+0000
| EEE, d MMM yyyy HH:mm:ss Z | Sat, 20 Apr 2019 13:15:22 +0000
| EEE, d MMM yyyy HH:mm:ss Z | Sat, 20 Apr 2019 13:15:22 +0000
| dd.MM.yy HH:mm:ss.SSS | 20.04.19 13:15:22.285
|===

See
See
https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html[the Java date/time format documentation]
for more information about date and time format syntax.

Expand Down Expand Up @@ -675,6 +675,30 @@ If the request does not encounter errors, you receive the following result:
"ingest_pipeline" : {
"description" : "Ingest pipeline created by file structure finder",
"processors" : [
{
"csv" : {
"field" : "message",
"target_fields" : [
"VendorID",
"tpep_pickup_datetime",
"tpep_dropoff_datetime",
"passenger_count",
"trip_distance",
"RatecodeID",
"store_and_fwd_flag",
"PULocationID",
"DOLocationID",
"payment_type",
"fare_amount",
"extra",
"mta_tax",
"tip_amount",
"tolls_amount",
"improvement_surcharge",
"total_amount"
]
}
},
{
"date" : {
"field" : "tpep_pickup_datetime",
Expand All @@ -683,6 +707,95 @@ If the request does not encounter errors, you receive the following result:
"yyyy-MM-dd HH:mm:ss"
]
}
},
{
"convert" : {
"field" : "DOLocationID",
"type" : "long"
}
},
{
"convert" : {
"field" : "PULocationID",
"type" : "long"
}
},
{
"convert" : {
"field" : "RatecodeID",
"type" : "long"
}
},
{
"convert" : {
"field" : "VendorID",
"type" : "long"
}
},
{
"convert" : {
"field" : "extra",
"type" : "double"
}
},
{
"convert" : {
"field" : "fare_amount",
"type" : "double"
}
},
{
"convert" : {
"field" : "improvement_surcharge",
"type" : "double"
}
},
{
"convert" : {
"field" : "mta_tax",
"type" : "double"
}
},
{
"convert" : {
"field" : "passenger_count",
"type" : "long"
}
},
{
"convert" : {
"field" : "payment_type",
"type" : "long"
}
},
{
"convert" : {
"field" : "tip_amount",
"type" : "double"
}
},
{
"convert" : {
"field" : "tolls_amount",
"type" : "double"
}
},
{
"convert" : {
"field" : "total_amount",
"type" : "double"
}
},
{
"convert" : {
"field" : "trip_distance",
"type" : "double"
}
},
{
"remove" : {
"field" : "message"
}
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,18 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
// null to allow GC before timestamp search
sampleLines = null;

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker);

SortedMap<String, Object> mappings = mappingsAndFieldStats.v1();

List<String> columnNamesList = Arrays.asList(columnNames);
char delimiter = (char) csvPreference.getDelimiterChar();
char quoteChar = csvPreference.getQuoteChar();

Map<String, Object> csvProcessorSettings = makeCsvProcessorSettings("message", columnNamesList, delimiter, quoteChar,
trimFields);

FileStructure.Builder structureBuilder = new FileStructure.Builder(FileStructure.Format.DELIMITED)
.setCharset(charsetName)
.setHasByteOrderMarker(hasByteOrderMarker)
Expand All @@ -102,8 +113,19 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
.setNumMessagesAnalyzed(sampleRecords.size())
.setHasHeaderRow(isHeaderInFile)
.setDelimiter(delimiter)
.setQuote(csvPreference.getQuoteChar())
.setColumnNames(Arrays.stream(columnNames).collect(Collectors.toList()));
.setQuote(quoteChar)
.setColumnNames(columnNamesList);

if (isHeaderInFile) {
String quote = String.valueOf(quoteChar);
String twoQuotes = quote + quote;
String optQuote = quote.replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + "?";
String delimiterMatcher =
(delimiter == '\t') ? "\\t" : String.valueOf(delimiter).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1");
structureBuilder.setExcludeLinesPattern("^" + Arrays.stream(header)
.map(column -> optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote)
.collect(Collectors.joining(delimiterMatcher)));
}

if (trimFields) {
structureBuilder.setShouldTrimFields(true);
Expand Down Expand Up @@ -135,32 +157,20 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
}
}

if (isHeaderInFile) {
String quote = String.valueOf(csvPreference.getQuoteChar());
String twoQuotes = quote + quote;
String optQuote = quote.replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + "?";
String delimiterMatcher =
(delimiter == '\t') ? "\\t" : String.valueOf(delimiter).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1");
structureBuilder.setExcludeLinesPattern("^" + Arrays.stream(header)
.map(column -> optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote)
.collect(Collectors.joining(delimiterMatcher)));
}

boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();

structureBuilder.setTimestampField(timeField.v1())
.setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats())
.setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats())
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timeField.v1(),
timeField.v2().getJavaTimestampFormats(), needClientTimeZone))
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings,
mappings, timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone))
.setMultilineStartPattern(timeLineRegex);
} else {
structureBuilder.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(),
csvProcessorSettings, mappings, null, null, false));
}

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker);

SortedMap<String, Object> mappings = mappingsAndFieldStats.v1();
if (timeField != null) {
mappings.put(FileStructureUtils.DEFAULT_TIMESTAMP_FIELD, FileStructureUtils.DATE_MAPPING_WITHOUT_FORMAT);
}
Expand Down Expand Up @@ -579,4 +589,24 @@ static boolean canCreateFromSample(List<String> explanation, String sample, int
private static boolean notUnexpectedEndOfFile(SuperCsvException e) {
return e.getMessage().startsWith("unexpected end of file while reading quoted column") == false;
}

static Map<String, Object> makeCsvProcessorSettings(String field, List<String> targetFields, char separator, char quote, boolean trim) {

Map<String, Object> csvProcessorSettings = new LinkedHashMap<>();
csvProcessorSettings.put("field", field);
csvProcessorSettings.put("target_fields", Collections.unmodifiableList(targetFields));
if (separator != ',') {
// The value must be String, not Character, as XContent only works with String
csvProcessorSettings.put("separator", String.valueOf(separator));
}
if (quote != '"') {
// The value must be String, not Character, as XContent only works with String
csvProcessorSettings.put("quote", String.valueOf(quote));
}
csvProcessorSettings.put("ignore_missing", false);
if (trim) {
csvProcessorSettings.put("trim", true);
}
return Collections.unmodifiableMap(csvProcessorSettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.filestructurefinder;

import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.grok.Grok;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FieldStats;
Expand All @@ -31,6 +32,8 @@ public final class FileStructureUtils {
public static final String MAPPING_PROPERTIES_SETTING = "properties";
public static final Map<String, String> DATE_MAPPING_WITHOUT_FORMAT =
Collections.singletonMap(MAPPING_TYPE_SETTING, "date");
public static final Set<String> CONVERTIBLE_TYPES =
Collections.unmodifiableSet(Sets.newHashSet("integer", "long", "float", "double", "boolean"));

private static final int NUM_TOP_HITS = 10;
// NUMBER Grok pattern doesn't support scientific notation, so we extend it
Expand Down Expand Up @@ -352,6 +355,9 @@ static boolean isMoreLikelyTextThanKeyword(String str) {
* @param grokPattern The Grok pattern used for parsing semi-structured text formats. <code>null</code> for
* fully structured formats.
* @param customGrokPatternDefinitions The definitions for any custom patterns that {@code grokPattern} uses.
* @param csvProcessorSettings The CSV processor settings for delimited formats. <code>null</code> for
* non-delimited formats.
* @param mappingsForConversions Mappings (or partial mappings) that will be considered for field type conversions.
* @param timestampField The input field containing the timestamp to be parsed into <code>@timestamp</code>.
* <code>null</code> if there is no timestamp.
* @param timestampFormats Timestamp formats to be used for parsing {@code timestampField}.
Expand All @@ -360,10 +366,12 @@ static boolean isMoreLikelyTextThanKeyword(String str) {
* @return The ingest pipeline definition, or <code>null</code> if none is required.
*/
public static Map<String, Object> makeIngestPipelineDefinition(String grokPattern, Map<String, String> customGrokPatternDefinitions,
Map<String, Object> csvProcessorSettings,
Map<String, Object> mappingsForConversions,
String timestampField, List<String> timestampFormats,
boolean needClientTimezone) {

if (grokPattern == null && timestampField == null) {
if (grokPattern == null && csvProcessorSettings == null && timestampField == null) {
return null;
}

Expand All @@ -384,6 +392,10 @@ public static Map<String, Object> makeIngestPipelineDefinition(String grokPatter
assert customGrokPatternDefinitions.isEmpty();
}

if (csvProcessorSettings != null) {
processors.add(Collections.singletonMap("csv", csvProcessorSettings));
}

if (timestampField != null) {
Map<String, Object> dateProcessorSettings = new LinkedHashMap<>();
dateProcessorSettings.put("field", timestampField);
Expand All @@ -394,6 +406,32 @@ public static Map<String, Object> makeIngestPipelineDefinition(String grokPatter
processors.add(Collections.singletonMap("date", dateProcessorSettings));
}

for (Map.Entry<String, Object> mapping : mappingsForConversions.entrySet()) {
String fieldName = mapping.getKey();
Object values = mapping.getValue();
if (values instanceof Map) {
Object type = ((Map<?, ?>) values).get(MAPPING_TYPE_SETTING);
if (CONVERTIBLE_TYPES.contains(type)) {
Map<String, Object> convertProcessorSettings = new LinkedHashMap<>();
convertProcessorSettings.put("field", fieldName);
convertProcessorSettings.put("type", type);
convertProcessorSettings.put("ignore_missing", true);
processors.add(Collections.singletonMap("convert", convertProcessorSettings));
}
}
}

// This removes the unparsed message field for delimited formats (unless the same field name is used for one of the columns)
if (csvProcessorSettings != null) {
Object field = csvProcessorSettings.get("field");
assert field != null;
Object targetFields = csvProcessorSettings.get("target_fields");
assert targetFields instanceof List;
if (((List<?>) targetFields).contains(field) == false) {
processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", field)));
}
}

// This removes the interim timestamp field used for semi-structured text formats
if (grokPattern != null && timestampField != null) {
processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", timestampField)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ static NdJsonFileStructureFinder makeNdJsonFileStructureFinder(List<String> expl
.setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats())
.setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats())
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timeField.v1(),
timeField.v2().getJavaTimestampFormats(), needClientTimeZone));
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null,
// Note: no convert processors are added based on mappings for NDJSON input
// because it's reasonable that _source matches the supplied JSON precisely
Collections.emptyMap(), timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone));
}

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,8 @@ static TextLogFileStructureFinder makeTextLogFileStructureFinder(List<String> ex
.setJavaTimestampFormats(timestampFormatFinder.getJavaTimestampFormats())
.setNeedClientTimezone(needClientTimeZone)
.setGrokPattern(grokPattern)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern,
customGrokPatternDefinitions, interimTimestampField,
timestampFormatFinder.getJavaTimestampFormats(), needClientTimeZone))
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern, customGrokPatternDefinitions, null, mappings,
interimTimestampField, timestampFormatFinder.getJavaTimestampFormats(), needClientTimeZone))
.setMappings(mappings)
.setFieldStats(fieldStats)
.setExplanation(explanation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ static XmlFileStructureFinder makeXmlFileStructureFinder(List<String> explanatio
.setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats())
.setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats())
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(),
topLevelTag + "." + timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone));
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null,
Collections.emptyMap(), topLevelTag + "." + timeField.v1(), timeField.v2().getJavaTimestampFormats(),
needClientTimeZone));
}

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
Expand Down
Loading

0 comments on commit 550254e

Please sign in to comment.