Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Use CSV ingest processor in find_file_structure ingest pipeline #51492

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 120 additions & 7 deletions docs/reference/ml/anomaly-detection/apis/find-file-structure.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ to request analysis of 100000 lines to achieve some variety.
value is `true`. Otherwise, the default value is `false`.

`timeout`::
(Optional, <<time-units,time units>>) Sets the maximum amount of time that the
structure analysis make take. If the analysis is still running when the
(Optional, <<time-units,time units>>) Sets the maximum amount of time that the
structure analysis make take. If the analysis is still running when the
timeout expires then it will be aborted. The default value is 25 seconds.

`timestamp_field`::
Expand All @@ -163,8 +163,8 @@ also specified.
For structured file formats, if you specify this parameter, the field must exist
within the file.

If this parameter is not specified, the structure finder makes a decision about
which field (if any) is the primary timestamp field. For structured file
If this parameter is not specified, the structure finder makes a decision about
which field (if any) is the primary timestamp field. For structured file
formats, it is not compulsory to have a timestamp in the file.
--

Expand Down Expand Up @@ -213,14 +213,14 @@ format from a built-in set.
The following table provides the appropriate `timeformat` values for some example timestamps:

|===
| Timeformat | Presentation
| Timeformat | Presentation

| yyyy-MM-dd HH:mm:ssZ | 2019-04-20 13:15:22+0000
| EEE, d MMM yyyy HH:mm:ss Z | Sat, 20 Apr 2019 13:15:22 +0000
| EEE, d MMM yyyy HH:mm:ss Z | Sat, 20 Apr 2019 13:15:22 +0000
| dd.MM.yy HH:mm:ss.SSS | 20.04.19 13:15:22.285
|===

See
See
https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html[the Java date/time format documentation]
for more information about date and time format syntax.

Expand Down Expand Up @@ -675,6 +675,30 @@ If the request does not encounter errors, you receive the following result:
"ingest_pipeline" : {
"description" : "Ingest pipeline created by file structure finder",
"processors" : [
{
"csv" : {
"field" : "message",
"target_fields" : [
"VendorID",
"tpep_pickup_datetime",
"tpep_dropoff_datetime",
"passenger_count",
"trip_distance",
"RatecodeID",
"store_and_fwd_flag",
"PULocationID",
"DOLocationID",
"payment_type",
"fare_amount",
"extra",
"mta_tax",
"tip_amount",
"tolls_amount",
"improvement_surcharge",
"total_amount"
]
}
},
{
"date" : {
"field" : "tpep_pickup_datetime",
Expand All @@ -683,6 +707,95 @@ If the request does not encounter errors, you receive the following result:
"yyyy-MM-dd HH:mm:ss"
]
}
},
{
"convert" : {
"field" : "DOLocationID",
"type" : "long"
}
},
{
"convert" : {
"field" : "PULocationID",
"type" : "long"
}
},
{
"convert" : {
"field" : "RatecodeID",
"type" : "long"
}
},
{
"convert" : {
"field" : "VendorID",
"type" : "long"
}
},
{
"convert" : {
"field" : "extra",
"type" : "double"
}
},
{
"convert" : {
"field" : "fare_amount",
"type" : "double"
}
},
{
"convert" : {
"field" : "improvement_surcharge",
"type" : "double"
}
},
{
"convert" : {
"field" : "mta_tax",
"type" : "double"
}
},
{
"convert" : {
"field" : "passenger_count",
"type" : "long"
}
},
{
"convert" : {
"field" : "payment_type",
"type" : "long"
}
},
{
"convert" : {
"field" : "tip_amount",
"type" : "double"
}
},
{
"convert" : {
"field" : "tolls_amount",
"type" : "double"
}
},
{
"convert" : {
"field" : "total_amount",
"type" : "double"
}
},
{
"convert" : {
"field" : "trip_distance",
"type" : "double"
}
},
{
"remove" : {
"field" : "message"
}
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,18 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
// null to allow GC before timestamp search
sampleLines = null;

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker);

SortedMap<String, Object> mappings = mappingsAndFieldStats.v1();

List<String> columnNamesList = Arrays.asList(columnNames);
char delimiter = (char) csvPreference.getDelimiterChar();
char quoteChar = csvPreference.getQuoteChar();

Map<String, Object> csvProcessorSettings = makeCsvProcessorSettings("message", columnNamesList, delimiter, quoteChar,
trimFields);

FileStructure.Builder structureBuilder = new FileStructure.Builder(FileStructure.Format.DELIMITED)
.setCharset(charsetName)
.setHasByteOrderMarker(hasByteOrderMarker)
Expand All @@ -102,8 +113,19 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
.setNumMessagesAnalyzed(sampleRecords.size())
.setHasHeaderRow(isHeaderInFile)
.setDelimiter(delimiter)
.setQuote(csvPreference.getQuoteChar())
.setColumnNames(Arrays.stream(columnNames).collect(Collectors.toList()));
.setQuote(quoteChar)
.setColumnNames(columnNamesList);

if (isHeaderInFile) {
String quote = String.valueOf(quoteChar);
String twoQuotes = quote + quote;
String optQuote = quote.replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + "?";
String delimiterMatcher =
(delimiter == '\t') ? "\\t" : String.valueOf(delimiter).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1");
structureBuilder.setExcludeLinesPattern("^" + Arrays.stream(header)
.map(column -> optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote)
.collect(Collectors.joining(delimiterMatcher)));
}

if (trimFields) {
structureBuilder.setShouldTrimFields(true);
Expand Down Expand Up @@ -135,32 +157,20 @@ static DelimitedFileStructureFinder makeDelimitedFileStructureFinder(List<String
}
}

if (isHeaderInFile) {
String quote = String.valueOf(csvPreference.getQuoteChar());
String twoQuotes = quote + quote;
String optQuote = quote.replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + "?";
String delimiterMatcher =
(delimiter == '\t') ? "\\t" : String.valueOf(delimiter).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1");
structureBuilder.setExcludeLinesPattern("^" + Arrays.stream(header)
.map(column -> optQuote + column.replace(quote, twoQuotes).replaceAll(REGEX_NEEDS_ESCAPE_PATTERN, "\\\\$1") + optQuote)
.collect(Collectors.joining(delimiterMatcher)));
}

boolean needClientTimeZone = timeField.v2().hasTimezoneDependentParsing();

structureBuilder.setTimestampField(timeField.v1())
.setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats())
.setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats())
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timeField.v1(),
timeField.v2().getJavaTimestampFormats(), needClientTimeZone))
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), csvProcessorSettings,
mappings, timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone))
.setMultilineStartPattern(timeLineRegex);
} else {
structureBuilder.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(),
csvProcessorSettings, mappings, null, null, false));
}

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
FileStructureUtils.guessMappingsAndCalculateFieldStats(explanation, sampleRecords, timeoutChecker);

SortedMap<String, Object> mappings = mappingsAndFieldStats.v1();
if (timeField != null) {
mappings.put(FileStructureUtils.DEFAULT_TIMESTAMP_FIELD, FileStructureUtils.DATE_MAPPING_WITHOUT_FORMAT);
}
Expand Down Expand Up @@ -579,4 +589,24 @@ static boolean canCreateFromSample(List<String> explanation, String sample, int
private static boolean notUnexpectedEndOfFile(SuperCsvException e) {
return e.getMessage().startsWith("unexpected end of file while reading quoted column") == false;
}

static Map<String, Object> makeCsvProcessorSettings(String field, List<String> targetFields, char separator, char quote, boolean trim) {

Map<String, Object> csvProcessorSettings = new LinkedHashMap<>();
csvProcessorSettings.put("field", field);
csvProcessorSettings.put("target_fields", Collections.unmodifiableList(targetFields));
if (separator != ',') {
// The value must be String, not Character, as XContent only works with String
csvProcessorSettings.put("separator", String.valueOf(separator));
}
if (quote != '"') {
// The value must be String, not Character, as XContent only works with String
csvProcessorSettings.put("quote", String.valueOf(quote));
}
csvProcessorSettings.put("ignore_missing", false);
if (trim) {
csvProcessorSettings.put("trim", true);
}
return Collections.unmodifiableMap(csvProcessorSettings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.filestructurefinder;

import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.grok.Grok;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.xpack.core.ml.filestructurefinder.FieldStats;
Expand All @@ -31,6 +32,8 @@ public final class FileStructureUtils {
public static final String MAPPING_PROPERTIES_SETTING = "properties";
public static final Map<String, String> DATE_MAPPING_WITHOUT_FORMAT =
Collections.singletonMap(MAPPING_TYPE_SETTING, "date");
public static final Set<String> CONVERTIBLE_TYPES =
Collections.unmodifiableSet(Sets.newHashSet("integer", "long", "float", "double", "boolean"));

private static final int NUM_TOP_HITS = 10;
// NUMBER Grok pattern doesn't support scientific notation, so we extend it
Expand Down Expand Up @@ -352,6 +355,9 @@ static boolean isMoreLikelyTextThanKeyword(String str) {
* @param grokPattern The Grok pattern used for parsing semi-structured text formats. <code>null</code> for
* fully structured formats.
* @param customGrokPatternDefinitions The definitions for any custom patterns that {@code grokPattern} uses.
* @param csvProcessorSettings The CSV processor settings for delimited formats. <code>null</code> for
* non-delimited formats.
* @param mappingsForConversions Mappings (or partial mappings) that will be considered for field type conversions.
* @param timestampField The input field containing the timestamp to be parsed into <code>@timestamp</code>.
* <code>null</code> if there is no timestamp.
* @param timestampFormats Timestamp formats to be used for parsing {@code timestampField}.
Expand All @@ -360,10 +366,12 @@ static boolean isMoreLikelyTextThanKeyword(String str) {
* @return The ingest pipeline definition, or <code>null</code> if none is required.
*/
public static Map<String, Object> makeIngestPipelineDefinition(String grokPattern, Map<String, String> customGrokPatternDefinitions,
Map<String, Object> csvProcessorSettings,
Map<String, Object> mappingsForConversions,
String timestampField, List<String> timestampFormats,
boolean needClientTimezone) {

if (grokPattern == null && timestampField == null) {
if (grokPattern == null && csvProcessorSettings == null && timestampField == null) {
return null;
}

Expand All @@ -384,6 +392,10 @@ public static Map<String, Object> makeIngestPipelineDefinition(String grokPatter
assert customGrokPatternDefinitions.isEmpty();
}

if (csvProcessorSettings != null) {
processors.add(Collections.singletonMap("csv", csvProcessorSettings));
}

if (timestampField != null) {
Map<String, Object> dateProcessorSettings = new LinkedHashMap<>();
dateProcessorSettings.put("field", timestampField);
Expand All @@ -394,6 +406,32 @@ public static Map<String, Object> makeIngestPipelineDefinition(String grokPatter
processors.add(Collections.singletonMap("date", dateProcessorSettings));
}

for (Map.Entry<String, Object> mapping : mappingsForConversions.entrySet()) {
String fieldName = mapping.getKey();
Object values = mapping.getValue();
if (values instanceof Map) {
Object type = ((Map<?, ?>) values).get(MAPPING_TYPE_SETTING);
if (CONVERTIBLE_TYPES.contains(type)) {
Map<String, Object> convertProcessorSettings = new LinkedHashMap<>();
convertProcessorSettings.put("field", fieldName);
convertProcessorSettings.put("type", type);
convertProcessorSettings.put("ignore_missing", true);
processors.add(Collections.singletonMap("convert", convertProcessorSettings));
}
}
}

// This removes the unparsed message field for delimited formats (unless the same field name is used for one of the columns)
if (csvProcessorSettings != null) {
Object field = csvProcessorSettings.get("field");
assert field != null;
Object targetFields = csvProcessorSettings.get("target_fields");
assert targetFields instanceof List;
if (((List<?>) targetFields).contains(field) == false) {
processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", field)));
}
}

// This removes the interim timestamp field used for semi-structured text formats
if (grokPattern != null && timestampField != null) {
processors.add(Collections.singletonMap("remove", Collections.singletonMap("field", timestampField)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ static NdJsonFileStructureFinder makeNdJsonFileStructureFinder(List<String> expl
.setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats())
.setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats())
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), timeField.v1(),
timeField.v2().getJavaTimestampFormats(), needClientTimeZone));
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null,
// Note: no convert processors are added based on mappings for NDJSON input
// because it's reasonable that _source matches the supplied JSON precisely
Collections.emptyMap(), timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone));
}

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,8 @@ static TextLogFileStructureFinder makeTextLogFileStructureFinder(List<String> ex
.setJavaTimestampFormats(timestampFormatFinder.getJavaTimestampFormats())
.setNeedClientTimezone(needClientTimeZone)
.setGrokPattern(grokPattern)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern,
customGrokPatternDefinitions, interimTimestampField,
timestampFormatFinder.getJavaTimestampFormats(), needClientTimeZone))
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(grokPattern, customGrokPatternDefinitions, null, mappings,
interimTimestampField, timestampFormatFinder.getJavaTimestampFormats(), needClientTimeZone))
.setMappings(mappings)
.setFieldStats(fieldStats)
.setExplanation(explanation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ static XmlFileStructureFinder makeXmlFileStructureFinder(List<String> explanatio
.setJodaTimestampFormats(timeField.v2().getJodaTimestampFormats())
.setJavaTimestampFormats(timeField.v2().getJavaTimestampFormats())
.setNeedClientTimezone(needClientTimeZone)
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(),
topLevelTag + "." + timeField.v1(), timeField.v2().getJavaTimestampFormats(), needClientTimeZone));
.setIngestPipeline(FileStructureUtils.makeIngestPipelineDefinition(null, Collections.emptyMap(), null,
Collections.emptyMap(), topLevelTag + "." + timeField.v1(), timeField.v2().getJavaTimestampFormats(),
needClientTimeZone));
}

Tuple<SortedMap<String, Object>, SortedMap<String, FieldStats>> mappingsAndFieldStats =
Expand Down