diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java index 7ef9f00785..5b67aa5254 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java @@ -31,6 +31,7 @@ public class IndexingConfigurations extends Configurations { public static final String ENABLED_CONF = "enabled"; public static final String INDEX_CONF = "index"; public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction"; + public static final String FIELD_NAME_CONVERTER_CONF = "fieldNameConverter"; public Map getSensorIndexingConfig(String sensorType, boolean emptyMapOnNonExistent) { Map ret = (Map) getConfigurations().get(getKey(sensorType)); @@ -61,7 +62,8 @@ public void delete(String sensorType) { } public Map getSensorIndexingConfig(String sensorType, String writerName) { - Map ret = (Map) getConfigurations().get(getKey(sensorType)); + String key = getKey(sensorType); + Map ret = (Map) getConfigurations().get(key); if(ret == null) { return new HashMap(); } @@ -147,6 +149,10 @@ public String getOutputPathFunction(String sensorName, String writerName) { return getOutputPathFunction(getSensorIndexingConfig(sensorName, writerName), sensorName); } + public String getFieldNameConverter(String sensorName, String writerName) { + return getFieldNameConverter(getSensorIndexingConfig(sensorName, writerName), sensorName); + } + public static boolean isEnabled(Map conf) { return getAs( ENABLED_CONF ,conf @@ -187,6 +193,10 @@ public static String getOutputPathFunction(Map conf, String sens ); } + public static String getFieldNameConverter(Map conf, String sensorName) { + return getAs(FIELD_NAME_CONVERTER_CONF, conf, "", String.class); + } + public static Map setEnabled(Map conf, boolean enabled) { Map ret = conf == null?new HashMap<>():conf; ret.put(ENABLED_CONF, enabled); @@ -210,6 +220,12 @@ public static Map setIndex(Map conf, String inde return ret; } + public static Map setFieldNameConverter(Map conf, String index) { + Map ret = conf == null ? new HashMap<>(): conf; + ret.put(FIELD_NAME_CONVERTER_CONF, index); + return ret; + } + public static T getAs(String key, Map map, T defaultValue, Class clazz) { return map == null?defaultValue: ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz); } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java index beb9373543..ab25a80753 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java @@ -72,4 +72,9 @@ public Map getGlobalConfig() { public boolean isDefault(String sensorName) { return config.orElse(new IndexingConfigurations()).isDefault(sensorName, writerName); } + + @Override + public String getFieldNameConverter(String sensorName) { + return config.orElse(new IndexingConfigurations()).getFieldNameConverter(sensorName, writerName); + } } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java index ae74c65872..4603b32d69 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java @@ -101,4 +101,10 @@ public Map getGlobalConfig() { public boolean isDefault(String sensorName) { return false; } + + @Override + public String getFieldNameConverter(String sensorName) { + // not applicable + return null; + } } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java index e50bd2bb33..2d5a62c549 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java @@ -68,4 +68,10 @@ public Map getGlobalConfig() { public boolean isDefault(String sensorName) { return false; } + + @Override + public String getFieldNameConverter(String sensorName) { + // not applicable + return null; + } } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java index 2354f95602..4abb582a4c 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java @@ -18,17 +18,86 @@ package org.apache.metron.common.configuration.writer; +import org.apache.metron.common.field.FieldNameConverter; + import java.io.Serializable; import java.util.List; import java.util.Map; +/** + * Configures a writer to write messages to an endpoint. + * + *

Each destination will have its own {@link WriterConfiguration}; for example HDFS, Elasticsearch, and Solr. + *

A writer can be configured independently for each source type. + */ public interface WriterConfiguration extends Serializable { + + /** + * Defines the maximum batch size for a given sensor. + * + * @param sensorName The name of the sensor. + * @return The batch size for the sensor. + */ int getBatchSize(String sensorName); + + /** + * Defines the batch timeout for a given sensor. Even if the maximum + * batch size has not been reached, the messages will be written when + * the timeout is reached. + * + * @param sensorName The name of the sensor. + * @return The batch timeout for the sensor. + */ int getBatchTimeout(String sensorName); + + /** + * Returns the batch timeouts for all of the currently configured sensors. + * @return All of the batch timeouts. + */ List getAllConfiguredTimeouts(); + + /** + * The name of the index to write to for a given sensor. + * + * @param sensorName The name of the sensor. + * @return The name of the index to write to + */ String getIndex(String sensorName); + + /** + * Returns true, if this writer is enabled for the given sensor. + * + * @param sensorName The name of the sensor. + * @return True, if this writer is enabled. Otherwise, false. + */ boolean isEnabled(String sensorName); + + /** + * @param sensorName The name of a sensor. + * @return + */ Map getSensorConfig(String sensorName); + + /** + * Returns the global configuration. + * @return The global configuration. + */ Map getGlobalConfig(); + + /** + * Returns true, if the current writer configuration is set to all default values. + * + * @param sensorName The name of the sensor. + * @return True, if the writer is using all default values. Otherwise, false. + */ boolean isDefault(String sensorName); + + /** + * Return the {@link FieldNameConverter} to use + * when writing messages. + * + * @param sensorName The name of the sensor; + * @return + */ + String getFieldNameConverter(String sensorName); } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/DeDotFieldNameConverter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/DeDotFieldNameConverter.java new file mode 100644 index 0000000000..a571ce5d53 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/DeDotFieldNameConverter.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.field; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.lang.invoke.MethodHandles; + +/** + * A {@link FieldNameConverter} that replaces all field names containing dots + * with colons. + */ +public class DeDotFieldNameConverter implements FieldNameConverter, Serializable { + + private static final long serialVersionUID = -3126840090749760299L; + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Override + public String convert(String originalField) { + + String newField = originalField.replace(".",":"); + + if(LOG.isDebugEnabled() && originalField.contains(".")) { + LOG.debug("Renamed dotted field; original={}, new={}", originalField, newField); + } + + return newField; + } +} diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/FieldNameConverter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverter.java similarity index 73% rename from metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/FieldNameConverter.java rename to metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverter.java index 92e7ec64ad..0c15ec44f9 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/FieldNameConverter.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverter.java @@ -15,10 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.metron.common.interfaces; +package org.apache.metron.common.field; +/** + * Allows field names to be transformed before a message is written to an endpoint. + */ public interface FieldNameConverter { - String convert(String originalField); - + /** + * Convert the field name. + * + * @param originalField The original field name. + * @return The new field name. + */ + String convert(String originalField); } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java new file mode 100644 index 0000000000..d5858ed272 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.field; + +import org.apache.commons.lang.ClassUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; + +/** + * Enumerates a set of {@link FieldNameConverter} implementations. + * + *

Provides shared instances of each {@link FieldNameConverter}. + * + *

Allows the field name converter to be specified using a short-hand + * name, rather than the entire fully-qualified class name. + */ +public enum FieldNameConverters implements FieldNameConverter { + + /** + * A {@link FieldNameConverter} that does not rename any fields. All field + * names remain unchanged. + */ + NOOP(new NoopFieldNameConverter()), + + /** + * A {@link FieldNameConverter} that replaces all field names containing dots + * with colons. + */ + DEDOT(new DeDotFieldNameConverter()); + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private FieldNameConverter converter; + + FieldNameConverters(FieldNameConverter converter) { + this.converter = converter; + } + + /** + * Returns a shared instance of the {@link FieldNameConverter}/ + * + * @return A shared {@link FieldNameConverter} instance. + */ + public FieldNameConverter get() { + return converter; + } + + /** + * Allows the {@link FieldNameConverters} enums to be used directly as a {@link FieldNameConverter}. + * + * {@code + * FieldNameConverter converter = FieldNameConverters.DEDOT; + * } + * + * @param originalField The original field name. + * @return + */ + @Override + public String convert(String originalField) { + return converter.convert(originalField); + } + + /** + * Create a new {@link FieldNameConverter}. + * + * @param sensorType The type of sensor. + * @param config The writer configuration. + * @return + */ + public static FieldNameConverter create(String sensorType, WriterConfiguration config) { + FieldNameConverter result = null; + + // which field name converter has been configured? + String converterName = config.getFieldNameConverter(sensorType); + if(StringUtils.isNotBlank(converterName)) { + try { + result = FieldNameConverters.valueOf(converterName); + + } catch (IllegalArgumentException e) { + LOG.error("Invalid field name converter, using default; configured={}, knownValues={}, error={}", + converterName, FieldNameConverters.values(), ExceptionUtils.getRootCauseMessage(e)); + } + } + + if(result == null) { + // if no converter defined or an invalid converter is defined, default to 'DEDOT' + result = FieldNameConverters.DEDOT; + } + + LOG.debug("Created field name converter; sensorType={}, configured={}, class={}", + sensorType, converterName, ClassUtils.getShortClassName(result, "null")); + + return result; + } +} diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverterTest.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/NoopFieldNameConverter.java similarity index 70% rename from metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverterTest.java rename to metron-platform/metron-common/src/main/java/org/apache/metron/common/field/NoopFieldNameConverter.java index 3e52581fb8..a7f3f7c5a0 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverterTest.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/NoopFieldNameConverter.java @@ -15,18 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.metron.common.field; -package org.apache.metron.elasticsearch.writer; - -import org.junit.Test; - -import static org.junit.Assert.*; - -public class ElasticsearchFieldNameConverterTest { +/** + * A {@link FieldNameConverter} that does not rename any fields. All field + * names remain unchanged. + */ +public class NoopFieldNameConverter implements FieldNameConverter { - @Test - public void convert() throws Exception { - assertEquals("testfield:with:colons",new ElasticsearchFieldNameConverter().convert("testfield.with.colons")); - } + @Override + public String convert(String originalField) { -} \ No newline at end of file + // no change to the field name + return originalField; + } +} diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverter.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/DeDotFieldNameConverterTest.java similarity index 59% rename from metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverter.java rename to metron-platform/metron-common/src/test/java/org/apache/metron/common/field/DeDotFieldNameConverterTest.java index 57e07ea863..cc1102f256 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverter.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/DeDotFieldNameConverterTest.java @@ -15,18 +15,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.metron.elasticsearch.writer; -import org.apache.metron.common.interfaces.FieldNameConverter; -import java.io.Serializable; +package org.apache.metron.common.field; -public class ElasticsearchFieldNameConverter implements FieldNameConverter, Serializable { +import org.junit.Test; - private static final long serialVersionUID = -3126840090749760299L; +import static org.junit.Assert.assertEquals; - @Override - public String convert(String originalField) { - return originalField.replace(".",":"); - } +public class DeDotFieldNameConverterTest { -} + @Test + public void testWithColons() throws Exception { + String actual = new DeDotFieldNameConverter().convert("testfield.with.colons"); + assertEquals("testfield:with:colons", actual); + } + + @Test + public void testNoColons() throws Exception { + String actual = new DeDotFieldNameConverter().convert("test-field-no-colons"); + assertEquals("test-field-no-colons", actual); + } +} \ No newline at end of file diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java new file mode 100644 index 0000000000..2c263f2f92 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.field; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Test the {@link FieldNameConverters} class. + */ +public class FieldNameConvertersTest { + + private WriterConfiguration createConfig(String writer, String sensor, String json) throws Exception { + + IndexingConfigurations indexingConfig = new IndexingConfigurations(); + indexingConfig.updateSensorIndexingConfig(sensor, json.getBytes()); + return new IndexingWriterConfiguration(writer, indexingConfig); + } + + /** + * { + * "elasticsearch": { + * + * "index": "theIndex", + * "batchSize": 100, + * "batchTimeout": 1000, + * "enabled": true, + * "fieldNameConverter": "DEDOT" + * } + * } + */ + @Multiline + private static String jsonWithDedot; + + /** + * The factory should be able to create a {@link DeDotFieldNameConverter}. + */ + @Test + public void testCreateDedot() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + WriterConfiguration config = createConfig(writer, sensor, jsonWithDedot); + + // validate the converter created for 'bro' + FieldNameConverter converter = FieldNameConverters.create(sensor, config); + assertEquals(FieldNameConverters.DEDOT, converter); + } + + /** + * { + * "elasticsearch": { + * + * "index": "theIndex", + * "batchSize": 100, + * "batchTimeout": 1000, + * "enabled": true, + * "fieldNameConverter": "NOOP" + * } + * } + */ + @Multiline + private static String jsonWithNoop; + + /** + * The factory should be able to create a {@link NoopFieldNameConverter}. + */ + @Test + public void testCreateNoop() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + WriterConfiguration config = createConfig(writer, sensor, jsonWithNoop); + + // validate the converter created for 'bro' + FieldNameConverter converter = FieldNameConverters.create(sensor, config); + assertEquals(FieldNameConverters.NOOP, converter); + } + + /** + * { + * "elasticsearch": { + * + * "index": "theIndex", + * "batchSize": 100, + * "batchTimeout": 1000, + * "enabled": true + * } + * } + */ + @Multiline + private static String jsonWithNoConverter; + + /** + * The factory should create a default {@link FieldNameConverter} if none has been defined + * by the user in the writer configuration. + */ + @Test + public void testCreateDefault() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + WriterConfiguration config = createConfig(writer, sensor, jsonWithNoConverter); + + // if none defined, should default to 'DEDOT' + FieldNameConverter converter = FieldNameConverters.create(sensor, config); + assertEquals(FieldNameConverters.DEDOT, converter); + } + + /** + * If the user changes the {@link FieldNameConverter} in the writer configuration, the new + * {@link FieldNameConverter} should be used after the old one expires. + */ + @Test + public void testConfigChange() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + + // no converter defined in config, should use 'DEDOT' converter + WriterConfiguration config = createConfig(writer, sensor, jsonWithNoConverter); + assertEquals(FieldNameConverters.DEDOT, FieldNameConverters.create(sensor, config)); + + // an 'updated' config uses the 'NOOP' converter + WriterConfiguration newConfig = createConfig(writer, sensor, jsonWithNoop); + assertEquals(FieldNameConverters.NOOP, FieldNameConverters.create(sensor, newConfig)); + } + + /** + * { + * "elasticsearch": { + * + * "index": "theIndex", + * "batchSize": 100, + * "batchTimeout": 1000, + * "enabled": true, + * "fieldNameConverter": "INVALID" + * } + * } + */ + @Multiline + private static String jsonWithInvalidConverter; + + /** + * If an invalid field name converter is specified, it should fall-back to using the + * default, noop converter. + */ + @Test + public void testCreateInvalid() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + WriterConfiguration config = createConfig(writer, sensor, jsonWithInvalidConverter); + + // if invalid value defined, it should fall-back to using default 'DEDOT' + FieldNameConverter converter = FieldNameConverters.create(sensor, config); + assertEquals(FieldNameConverters.DEDOT, converter); + } + + /** + * { + * "elasticsearch": { + * + * "index": "theIndex", + * "batchSize": 100, + * "batchTimeout": 1000, + * "enabled": true, + * "fieldNameConverter": "" + * } + * } + */ + @Multiline + private static String jsonWithBlankConverter; + + /** + * If the field name converter field is blank, it should fall-back to using the + * default converter. + */ + @Test + public void testCreateBlank() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + WriterConfiguration config = createConfig(writer, sensor, jsonWithInvalidConverter); + + // if invalid value defined, it should fall-back to using default 'DEDOT' + FieldNameConverter converter = FieldNameConverters.create(sensor, config); + assertEquals(FieldNameConverters.DEDOT, converter); + } +} diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml index 141d8aaf6b..22d37cac88 100644 --- a/metron-platform/metron-elasticsearch/pom.xml +++ b/metron-platform/metron-elasticsearch/pom.xml @@ -216,6 +216,12 @@ log4j-core ${global_log4j_core_version} + + com.google.guava + guava-testlib + ${global_guava_version} + test + diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index 5959623785..4b8dd083e9 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -17,15 +17,10 @@ */ package org.apache.metron.elasticsearch.writer; -import java.io.Serializable; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.Map; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.interfaces.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverters; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; @@ -40,31 +35,53 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A {@link BulkMessageWriter} that writes messages to Elasticsearch. + */ public class ElasticsearchWriter implements BulkMessageWriter, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * The Elasticsearch client. + */ private transient TransportClient client; + + /** + * A simple data formatter used to build the appropriate Elasticsearch index name. + */ private SimpleDateFormat dateFormat; - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class); - private FieldNameConverter fieldNameConverter = new ElasticsearchFieldNameConverter(); + @Override public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) { + Map globalConfiguration = configurations.getGlobalConfig(); client = ElasticsearchUtils.getClient(globalConfiguration); dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration); } - @Override public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable tuples, List messages) throws Exception { + + // fetch the field name converter for this sensor type + FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations); + final String indexPostfix = dateFormat.format(new Date()); BulkRequestBuilder bulkRequest = client.prepareBulk(); - for(JSONObject message: messages) { JSONObject esDoc = new JSONObject(); for(Object k : message.keySet()){ - deDot(k.toString(), message, esDoc); + copyField(k.toString(), message, esDoc, fieldNameConverter); } String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations); @@ -125,19 +142,30 @@ public void close() throws Exception { client.close(); } + /** + * Copies the value of a field from the source message to the destination message. + * + *

A field name may also be transformed in the destination message by the {@link FieldNameConverter}. + * + * @param sourceFieldName The name of the field to copy from the source message + * @param source The source message. + * @param destination The destination message. + * @param fieldNameConverter The field name converter that transforms the field name + * between the source and destination. + */ //JSONObject doesn't expose map generics @SuppressWarnings("unchecked") - private void deDot(String field, JSONObject origMessage, JSONObject message){ - - if(field.contains(".")){ + private void copyField( + String sourceFieldName, + JSONObject source, + JSONObject destination, + FieldNameConverter fieldNameConverter) { - LOG.debug("Dotted field: {}", field); - - } - String newkey = fieldNameConverter.convert(field); - message.put(newkey,origMessage.get(field)); + // allow the field name to be transformed + String destinationFieldName = fieldNameConverter.convert(sourceFieldName); + // copy the field + destination.put(destinationFieldName, source.get(sourceFieldName)); } - } diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java index e3047b6631..df5e96a8f8 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java @@ -18,9 +18,10 @@ package org.apache.metron.elasticsearch.integration; import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.interfaces.FieldNameConverter; +import org.apache.metron.common.field.DeDotFieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverters; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; -import org.apache.metron.elasticsearch.writer.ElasticsearchFieldNameConverter; import org.apache.metron.indexing.integration.IndexingIntegrationTest; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.InMemoryComponent; @@ -43,7 +44,7 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes private String indexDir = "target/elasticsearch"; private String dateFormat = "yyyy.MM.dd.HH"; private String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date()); - private FieldNameConverter fieldNameConverter = new ElasticsearchFieldNameConverter(); + private FieldNameConverter fieldNameConverter = FieldNameConverters.DEDOT; /** * { * "yaf_doc": { diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md index f4a45010e6..a9a8ed9192 100644 --- a/metron-platform/metron-indexing/README.md +++ b/metron-platform/metron-indexing/README.md @@ -50,6 +50,8 @@ and sent to By default, errors during indexing are sent back into the `indexing` kafka queue so that they can be indexed and archived. ## Sensor Indexing Configuration + + The sensor specific configuration is intended to configure the indexing used for a given sensor type (e.g. `snort`). @@ -58,16 +60,16 @@ Just like the global config, the format is a JSON stored in zookeeper and on dis * `hdfs` * `solr` -Depending on how you start the indexing topology, it will have either -elasticsearch or solr and hdfs writers running. +Depending on how you start the indexing topology, it will have either Elasticsearch or Solr and HDFS writers running. + +| Property | Description | Default Value | +|----------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| `index` | The name of the index to write to. | Defaults to the name of the sensor. | +| `batchSize` | The size of the batch that is written to the indices at once. | Defaults to `1`; no batching. | +| `batchTimeout` | The timeout after which a batch will be flushed even if `batchSize` has not been met. | Defaults to a duration which is a fraction of the Storm parameter `topology.message.timeout.secs`, if left undefined or set to 0. Ignored if batchSize is `1`, since this disables batching.| +| `enabled` | A boolean indicating whether the writer is enabled. | Defaults to `true` | +| `fieldNameConverter` | Defines how field names are transformed before being written to the index. Only applicable to `elasticsearch`. | Defaults to `DEDOT`. Acceptable values are `DEDOT` that replaces all '.' with ':' or `NOOP` that does not change the field names . | -The configuration for an individual writer-specific configuration is a JSON map with the following fields: -* `index` : The name of the index to write to (defaulted to the name of the sensor). -* `batchSize` : The size of the batch that is written to the indices at once. Defaults to `1` (no batching). -* `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met. Optional. -If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm -parameter `topology.message.timeout.secs`. Ignored if batchSize is `1`, since this disables batching. -* `enabled` : Whether the writer is enabled (default `true`). ### Meta Alerts Alerts can be grouped, after appropriate searching, into a set of alerts called a meta alert. A meta alert is useful for maintaining the context of searching and grouping during further investigations. Standard searches can return meta alerts, but grouping and other aggregation or sorting requests will not, because there's not a clear way to aggregate in many cases if there are multiple alerts contained in the meta alert. All meta alerts will have the source type of metaalert, regardless of the contained alert's origins. diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java index b8af6a3eee..2416c864ec 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java @@ -18,16 +18,14 @@ package org.apache.metron.indexing.integration; -import com.google.common.base.Function; import com.google.common.collect.Iterables; -import org.apache.metron.common.interfaces.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.integration.*; import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.utils.TestUtils; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.*; diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java index 1671ab3273..2e703f667b 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java @@ -22,7 +22,7 @@ import org.apache.metron.TestConstants; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.ConfigurationsUtils; -import org.apache.metron.common.interfaces.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.enrichment.integration.components.ConfigUploadComponent; import org.apache.metron.integration.BaseIntegrationTest; @@ -38,7 +38,6 @@ import org.junit.Assert; import org.junit.Test; -import javax.annotation.Nullable; import java.io.File; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java index b1404e286d..da884f1442 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java @@ -301,7 +301,6 @@ public boolean isEnabled(String sensorName) { @Override public Map getSensorConfig(String sensorName) { return sensorConfig; - } @Override @@ -313,6 +312,11 @@ public Map getGlobalConfig() { public boolean isDefault(String sensorName) { return false; } + + @Override + public String getFieldNameConverter(String sensorName) { + return null; + } }; } } diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java index 7c907fd7da..256f23b800 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java @@ -20,7 +20,7 @@ import com.google.common.base.Function; import org.apache.metron.common.configuration.Configurations; import org.apache.metron.common.configuration.ConfigurationsUtils; -import org.apache.metron.common.interfaces.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.enrichment.integration.utils.SampleUtil; import org.apache.metron.indexing.integration.IndexingIntegrationTest; @@ -34,7 +34,6 @@ import org.apache.metron.solr.integration.components.SolrComponent; import javax.annotation.Nullable; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Properties;