From c478de1f099d62c0f5271cd9b8ad5124089ad735 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Mon, 21 May 2018 16:50:25 -0400 Subject: [PATCH 1/8] METRON-1569 Allow user to change field name conversion when indexing to Elasticsearch --- .../configuration/IndexingConfigurations.java | 18 +- .../writer/IndexingWriterConfiguration.java | 5 + .../writer/ParserWriterConfiguration.java | 6 + .../SingleBatchConfigurationFacade.java | 6 + .../writer/WriterConfiguration.java | 69 +++++ .../common/field/DeDotFieldNameConverter.java | 46 ++++ .../FieldNameConverter.java | 14 +- .../common/field/FieldNameConverters.java | 48 ++++ .../common/field/NoopFieldNameConverter.java} | 24 +- .../field/DeDotFieldNameConverterTest.java | 38 +++ metron-platform/metron-elasticsearch/pom.xml | 6 + .../CachedFieldNameConverterFactory.java | 155 +++++++++++ .../writer/ElasticsearchWriter.java | 74 ++++-- ...er.java => FieldNameConverterFactory.java} | 24 +- .../ElasticsearchIndexingIntegrationTest.java | 6 +- .../CachedFieldNameConverterFactoryTest.java | 250 ++++++++++++++++++ metron-platform/metron-indexing/README.md | 20 +- .../HDFSIndexingIntegrationTest.java | 4 +- .../integration/IndexingIntegrationTest.java | 3 +- .../SimpleHBaseEnrichmentWriterTest.java | 6 +- .../SolrIndexingIntegrationTest.java | 3 +- 21 files changed, 758 insertions(+), 67 deletions(-) create mode 100644 metron-platform/metron-common/src/main/java/org/apache/metron/common/field/DeDotFieldNameConverter.java rename metron-platform/metron-common/src/main/java/org/apache/metron/common/{interfaces => field}/FieldNameConverter.java (73%) create mode 100644 metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java rename metron-platform/{metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverterTest.java => metron-common/src/main/java/org/apache/metron/common/field/NoopFieldNameConverter.java} (70%) create mode 100644 metron-platform/metron-common/src/test/java/org/apache/metron/common/field/DeDotFieldNameConverterTest.java create mode 100644 metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java rename metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/{ElasticsearchFieldNameConverter.java => FieldNameConverterFactory.java} (62%) create mode 100644 metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactoryTest.java diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java index 7ef9f00785..5b67aa5254 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java @@ -31,6 +31,7 @@ public class IndexingConfigurations extends Configurations { public static final String ENABLED_CONF = "enabled"; public static final String INDEX_CONF = "index"; public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction"; + public static final String FIELD_NAME_CONVERTER_CONF = "fieldNameConverter"; public Map getSensorIndexingConfig(String sensorType, boolean emptyMapOnNonExistent) { Map ret = (Map) getConfigurations().get(getKey(sensorType)); @@ -61,7 +62,8 @@ public void delete(String sensorType) { } public Map getSensorIndexingConfig(String sensorType, String writerName) { - Map ret = (Map) getConfigurations().get(getKey(sensorType)); + String key = getKey(sensorType); + Map ret = (Map) getConfigurations().get(key); if(ret == null) { return new HashMap(); } @@ -147,6 +149,10 @@ public String getOutputPathFunction(String sensorName, String writerName) { return getOutputPathFunction(getSensorIndexingConfig(sensorName, writerName), sensorName); } + public String getFieldNameConverter(String sensorName, String writerName) { + return getFieldNameConverter(getSensorIndexingConfig(sensorName, writerName), sensorName); + } + public static boolean isEnabled(Map conf) { return getAs( ENABLED_CONF ,conf @@ -187,6 +193,10 @@ public static String getOutputPathFunction(Map conf, String sens ); } + public static String getFieldNameConverter(Map conf, String sensorName) { + return getAs(FIELD_NAME_CONVERTER_CONF, conf, "", String.class); + } + public static Map setEnabled(Map conf, boolean enabled) { Map ret = conf == null?new HashMap<>():conf; ret.put(ENABLED_CONF, enabled); @@ -210,6 +220,12 @@ public static Map setIndex(Map conf, String inde return ret; } + public static Map setFieldNameConverter(Map conf, String index) { + Map ret = conf == null ? new HashMap<>(): conf; + ret.put(FIELD_NAME_CONVERTER_CONF, index); + return ret; + } + public static T getAs(String key, Map map, T defaultValue, Class clazz) { return map == null?defaultValue: ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz); } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java index beb9373543..ab25a80753 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/IndexingWriterConfiguration.java @@ -72,4 +72,9 @@ public Map getGlobalConfig() { public boolean isDefault(String sensorName) { return config.orElse(new IndexingConfigurations()).isDefault(sensorName, writerName); } + + @Override + public String getFieldNameConverter(String sensorName) { + return config.orElse(new IndexingConfigurations()).getFieldNameConverter(sensorName, writerName); + } } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java index ae74c65872..4603b32d69 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/ParserWriterConfiguration.java @@ -101,4 +101,10 @@ public Map getGlobalConfig() { public boolean isDefault(String sensorName) { return false; } + + @Override + public String getFieldNameConverter(String sensorName) { + // not applicable + return null; + } } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java index e50bd2bb33..2d5a62c549 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/SingleBatchConfigurationFacade.java @@ -68,4 +68,10 @@ public Map getGlobalConfig() { public boolean isDefault(String sensorName) { return false; } + + @Override + public String getFieldNameConverter(String sensorName) { + // not applicable + return null; + } } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java index 2354f95602..4abb582a4c 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/writer/WriterConfiguration.java @@ -18,17 +18,86 @@ package org.apache.metron.common.configuration.writer; +import org.apache.metron.common.field.FieldNameConverter; + import java.io.Serializable; import java.util.List; import java.util.Map; +/** + * Configures a writer to write messages to an endpoint. + * + *

Each destination will have its own {@link WriterConfiguration}; for example HDFS, Elasticsearch, and Solr. + *

A writer can be configured independently for each source type. + */ public interface WriterConfiguration extends Serializable { + + /** + * Defines the maximum batch size for a given sensor. + * + * @param sensorName The name of the sensor. + * @return The batch size for the sensor. + */ int getBatchSize(String sensorName); + + /** + * Defines the batch timeout for a given sensor. Even if the maximum + * batch size has not been reached, the messages will be written when + * the timeout is reached. + * + * @param sensorName The name of the sensor. + * @return The batch timeout for the sensor. + */ int getBatchTimeout(String sensorName); + + /** + * Returns the batch timeouts for all of the currently configured sensors. + * @return All of the batch timeouts. + */ List getAllConfiguredTimeouts(); + + /** + * The name of the index to write to for a given sensor. + * + * @param sensorName The name of the sensor. + * @return The name of the index to write to + */ String getIndex(String sensorName); + + /** + * Returns true, if this writer is enabled for the given sensor. + * + * @param sensorName The name of the sensor. + * @return True, if this writer is enabled. Otherwise, false. + */ boolean isEnabled(String sensorName); + + /** + * @param sensorName The name of a sensor. + * @return + */ Map getSensorConfig(String sensorName); + + /** + * Returns the global configuration. + * @return The global configuration. + */ Map getGlobalConfig(); + + /** + * Returns true, if the current writer configuration is set to all default values. + * + * @param sensorName The name of the sensor. + * @return True, if the writer is using all default values. Otherwise, false. + */ boolean isDefault(String sensorName); + + /** + * Return the {@link FieldNameConverter} to use + * when writing messages. + * + * @param sensorName The name of the sensor; + * @return + */ + String getFieldNameConverter(String sensorName); } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/DeDotFieldNameConverter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/DeDotFieldNameConverter.java new file mode 100644 index 0000000000..a571ce5d53 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/DeDotFieldNameConverter.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.field; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.lang.invoke.MethodHandles; + +/** + * A {@link FieldNameConverter} that replaces all field names containing dots + * with colons. + */ +public class DeDotFieldNameConverter implements FieldNameConverter, Serializable { + + private static final long serialVersionUID = -3126840090749760299L; + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Override + public String convert(String originalField) { + + String newField = originalField.replace(".",":"); + + if(LOG.isDebugEnabled() && originalField.contains(".")) { + LOG.debug("Renamed dotted field; original={}, new={}", originalField, newField); + } + + return newField; + } +} diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/FieldNameConverter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverter.java similarity index 73% rename from metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/FieldNameConverter.java rename to metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverter.java index 92e7ec64ad..0c15ec44f9 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/interfaces/FieldNameConverter.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverter.java @@ -15,10 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.metron.common.interfaces; +package org.apache.metron.common.field; +/** + * Allows field names to be transformed before a message is written to an endpoint. + */ public interface FieldNameConverter { - String convert(String originalField); - + /** + * Convert the field name. + * + * @param originalField The original field name. + * @return The new field name. + */ + String convert(String originalField); } diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java new file mode 100644 index 0000000000..318a27381f --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.field; + +/** + * Allows the field name converter to be specified using a short-hand + * name, rather than the entire fully-qualified class name. + */ +public enum FieldNameConverters { + + /** + * A {@link FieldNameConverter} that does not rename any fields. All field + * names remain unchanged. + */ + NOOP(new NoopFieldNameConverter()), + + /** + * A {@link FieldNameConverter} that replaces all field names containing dots + * with colons. + */ + DEDOT(new DeDotFieldNameConverter()); + + private FieldNameConverter converter; + + FieldNameConverters(FieldNameConverter converter) { + this.converter = converter; + } + + public FieldNameConverter get() { + return converter; + } +} diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverterTest.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/NoopFieldNameConverter.java similarity index 70% rename from metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverterTest.java rename to metron-platform/metron-common/src/main/java/org/apache/metron/common/field/NoopFieldNameConverter.java index 3e52581fb8..a7f3f7c5a0 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverterTest.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/NoopFieldNameConverter.java @@ -15,18 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.metron.common.field; -package org.apache.metron.elasticsearch.writer; - -import org.junit.Test; - -import static org.junit.Assert.*; - -public class ElasticsearchFieldNameConverterTest { +/** + * A {@link FieldNameConverter} that does not rename any fields. All field + * names remain unchanged. + */ +public class NoopFieldNameConverter implements FieldNameConverter { - @Test - public void convert() throws Exception { - assertEquals("testfield:with:colons",new ElasticsearchFieldNameConverter().convert("testfield.with.colons")); - } + @Override + public String convert(String originalField) { -} \ No newline at end of file + // no change to the field name + return originalField; + } +} diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/DeDotFieldNameConverterTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/DeDotFieldNameConverterTest.java new file mode 100644 index 0000000000..cc1102f256 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/DeDotFieldNameConverterTest.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.field; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class DeDotFieldNameConverterTest { + + @Test + public void testWithColons() throws Exception { + String actual = new DeDotFieldNameConverter().convert("testfield.with.colons"); + assertEquals("testfield:with:colons", actual); + } + + @Test + public void testNoColons() throws Exception { + String actual = new DeDotFieldNameConverter().convert("test-field-no-colons"); + assertEquals("test-field-no-colons", actual); + } +} \ No newline at end of file diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml index 141d8aaf6b..22d37cac88 100644 --- a/metron-platform/metron-elasticsearch/pom.xml +++ b/metron-platform/metron-elasticsearch/pom.xml @@ -216,6 +216,12 @@ log4j-core ${global_log4j_core_version} + + com.google.guava + guava-testlib + ${global_guava_version} + test + diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java new file mode 100644 index 0000000000..3f8a0193ad --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.writer; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.commons.lang.ClassUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.field.DeDotFieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverters; +import org.apache.metron.common.field.NoopFieldNameConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.TimeUnit; + +/** + * A {@link FieldNameConverterFactory} that is backed by a cache. + * + *

Each sensor type can use a different {@link FieldNameConverter} implementation. + * + *

The {@link WriterConfiguration} allows a user to define the {@link FieldNameConverter} + * that should be used for a given sensor type. + * + *

The {@link FieldNameConverter}s are maintained in a cache for a fixed period of time + * after they are created. Once they expire, the {@link WriterConfiguration} is used to + * reload the {@link FieldNameConverter}. + * + *

The user can change the {@link FieldNameConverter} in use at runtime. A change + * to this configuration is recognized once the old {@link FieldNameConverter} expires + * from the cache. + * + *

Defining a shorter expiration interval allows config changes to be recognized more + * quickly, but also can impact performance negatively. + */ +public class CachedFieldNameConverterFactory implements FieldNameConverterFactory { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * A cache that contains a {@link FieldNameConverter} for each sensor type. + * + * A user can alter the {@link FieldNameConverter} for a given sensor at any time + * by altering the Indexing configuration. The actual {@link FieldNameConverter} + * in use for a given sensor will only change once the original converter has + * expired from the cache. + */ + private Cache fieldNameConverters; + + /** + * Creates a {@link CachedFieldNameConverterFactory}. + * + * @param expires The duration before {@link FieldNameConverter}s are expired. + * @param expiresUnits The units before {@link FieldNameConverter}s are expired. + */ + public CachedFieldNameConverterFactory(int expires, TimeUnit expiresUnits) { + + fieldNameConverters = createFieldNameConverterCache(expires, expiresUnits); + } + + /** + * Creates a {@link CachedFieldNameConverterFactory} where the cache expires after 5 minutes. + */ + public CachedFieldNameConverterFactory() { + + this(5, TimeUnit.MINUTES); + } + + /** + * Creates a {@link CachedFieldNameConverterFactory} using the given cache. This should only + * be used for testing. + * + * @param fieldNameConverters A {@link Cache} containing {@link FieldNameConverter}s. + */ + public CachedFieldNameConverterFactory(Cache fieldNameConverters) { + + this.fieldNameConverters = fieldNameConverters; + } + + /** + * Creates a cache of {@link FieldNameConverter}s, one for each source type. + * + * @return A cache of {@link FieldNameConverter}s. + */ + private Cache createFieldNameConverterCache(int expire, TimeUnit expireUnits) { + + return Caffeine + .newBuilder() + .expireAfterWrite(expire, expireUnits) + .build(); + } + + /** + * Create a new {@link FieldNameConverter}. + * + * @param sensorType The type of sensor. + * @param config The writer configuration. + * @return + */ + @Override + public FieldNameConverter create(String sensorType, WriterConfiguration config) { + + return fieldNameConverters.get(sensorType, (s) -> createInstance(sensorType, config)); + } + + /** + * Create a new {@link FieldNameConverter}. + * + * @param sensorType The type of sensor. + * @param config The writer configuration. + * @return + */ + private FieldNameConverter createInstance(String sensorType, WriterConfiguration config) { + + // default to the 'DEDOT' field name converter to maintain backwards compatibility + FieldNameConverter result = new DeDotFieldNameConverter(); + + // which field name converter should be used? + String converterName = config.getFieldNameConverter(sensorType); + if(StringUtils.isNotBlank(converterName)) { + + try { + result = FieldNameConverters.valueOf(converterName).get(); + + } catch(IllegalArgumentException e) { + LOG.error("Unable to create field name converter, using default; value={}, error={}", + converterName, ExceptionUtils.getRootCauseMessage(e)); + } + } + + LOG.debug("Created field name converter; sensorType={}, configuredName={}, class={}", + sensorType, converterName, ClassUtils.getShortClassName(result, "null")); + + return result; + } +} diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index 5959623785..a1e3f2f56e 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -17,15 +17,9 @@ */ package org.apache.metron.elasticsearch.writer; -import java.io.Serializable; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.Map; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.interfaces.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; @@ -40,31 +34,58 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A {@link BulkMessageWriter} that writes messages to Elasticsearch. + */ public class ElasticsearchWriter implements BulkMessageWriter, Serializable { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * The Elasticsearch client. + */ private transient TransportClient client; + + /** + * A simple data formatter used to build the appropriate Elasticsearch index name. + */ private SimpleDateFormat dateFormat; - private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class); - private FieldNameConverter fieldNameConverter = new ElasticsearchFieldNameConverter(); + + /** + * A factory that creates {@link FieldNameConverter} objects. + */ + private FieldNameConverterFactory fieldNameConverterFactory; @Override public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) { + Map globalConfiguration = configurations.getGlobalConfig(); client = ElasticsearchUtils.getClient(globalConfiguration); dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration); + fieldNameConverterFactory = new CachedFieldNameConverterFactory(); } - @Override public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable tuples, List messages) throws Exception { + + // fetch the field name converter for this sensor type + FieldNameConverter fieldNameConverter = fieldNameConverterFactory.create(sensorType, configurations); + final String indexPostfix = dateFormat.format(new Date()); BulkRequestBuilder bulkRequest = client.prepareBulk(); - for(JSONObject message: messages) { JSONObject esDoc = new JSONObject(); for(Object k : message.keySet()){ - deDot(k.toString(), message, esDoc); + copyField(k.toString(), message, esDoc, fieldNameConverter); } String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations); @@ -125,19 +146,30 @@ public void close() throws Exception { client.close(); } + /** + * Copies the value of a field from the source message to the destination message. + * + *

A field name may also be transformed in the destination message by the {@link FieldNameConverter}. + * + * @param sourceFieldName The name of the field to copy from the source message + * @param source The source message. + * @param destination The destination message. + * @param fieldNameConverter The field name converter that transforms the field name + * between the source and destination. + */ //JSONObject doesn't expose map generics @SuppressWarnings("unchecked") - private void deDot(String field, JSONObject origMessage, JSONObject message){ - - if(field.contains(".")){ + private void copyField( + String sourceFieldName, + JSONObject source, + JSONObject destination, + FieldNameConverter fieldNameConverter) { - LOG.debug("Dotted field: {}", field); - - } - String newkey = fieldNameConverter.convert(field); - message.put(newkey,origMessage.get(field)); + // allow the field name to be transformed + String destinationFieldName = fieldNameConverter.convert(sourceFieldName); + // copy the field + destination.put(destinationFieldName, source.get(sourceFieldName)); } - } diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/FieldNameConverterFactory.java similarity index 62% rename from metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverter.java rename to metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/FieldNameConverterFactory.java index 57e07ea863..1371a580cb 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchFieldNameConverter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/FieldNameConverterFactory.java @@ -17,16 +17,20 @@ */ package org.apache.metron.elasticsearch.writer; -import org.apache.metron.common.interfaces.FieldNameConverter; -import java.io.Serializable; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.field.FieldNameConverter; -public class ElasticsearchFieldNameConverter implements FieldNameConverter, Serializable { - - private static final long serialVersionUID = -3126840090749760299L; - - @Override - public String convert(String originalField) { - return originalField.replace(".",":"); - } +/** + * A factory that creates {@link FieldNameConverter} objects. + */ +public interface FieldNameConverterFactory { + /** + * Create a {@link FieldNameConverter} object. + * + * @param sensorType The type of sensor. + * @param config The writer configuration. + * @return A {@link FieldNameConverter} object. + */ + FieldNameConverter create(String sensorType, WriterConfiguration config); } diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java index e3047b6631..c4a2ceca20 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java @@ -18,9 +18,9 @@ package org.apache.metron.elasticsearch.integration; import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.interfaces.FieldNameConverter; +import org.apache.metron.common.field.DeDotFieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; -import org.apache.metron.elasticsearch.writer.ElasticsearchFieldNameConverter; import org.apache.metron.indexing.integration.IndexingIntegrationTest; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.InMemoryComponent; @@ -43,7 +43,7 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes private String indexDir = "target/elasticsearch"; private String dateFormat = "yyyy.MM.dd.HH"; private String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date()); - private FieldNameConverter fieldNameConverter = new ElasticsearchFieldNameConverter(); + private FieldNameConverter fieldNameConverter = new DeDotFieldNameConverter(); /** * { * "yaf_doc": { diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactoryTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactoryTest.java new file mode 100644 index 0000000000..5198189a75 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactoryTest.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.writer; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.testing.FakeTicker; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.field.DeDotFieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; +import org.apache.metron.common.field.NoopFieldNameConverter; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +/** + * Tests the {@link CachedFieldNameConverterFactory}. + */ +public class CachedFieldNameConverterFactoryTest { + + private CachedFieldNameConverterFactory factory; + private Cache cache; + private FakeTicker ticker; + + @Before + public void setup() throws Exception { + + // allows us to advance time in the cache + ticker = new FakeTicker(); + + // a cache configured for testing + cache = Caffeine.newBuilder() + .expireAfterWrite(5, TimeUnit.MINUTES) + .executor(Runnable::run) + .ticker(ticker::read) + .recordStats() + .build(); + + // the factory being tested + factory = new CachedFieldNameConverterFactory(cache); + } + + private WriterConfiguration createConfig(String writer, String sensor, String json) throws Exception { + + IndexingConfigurations indexingConfig = new IndexingConfigurations(); + indexingConfig.updateSensorIndexingConfig(sensor, json.getBytes()); + return new IndexingWriterConfiguration(writer, indexingConfig); + } + + /** + * { + * "elasticsearch": { + * + * "index": "theIndex", + * "batchSize": 100, + * "batchTimeout": 1000, + * "enabled": true, + * "fieldNameConverter": "DEDOT" + * } + * } + */ + @Multiline + private static String jsonWithDedot; + + /** + * The factory should be able to create the {@link FieldNameConverter} + * that has been defined by the user. + */ + @Test + public void testCreateDedot() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + WriterConfiguration config = createConfig(writer, sensor, jsonWithDedot); + + // validate the converter created for 'bro' + FieldNameConverter converter = factory.create(sensor, config); + assertTrue(converter instanceof DeDotFieldNameConverter); + } + + /** + * { + * "elasticsearch": { + * + * "index": "theIndex", + * "batchSize": 100, + * "batchTimeout": 1000, + * "enabled": true, + * "fieldNameConverter": "NOOP" + * } + * } + */ + @Multiline + private static String jsonWithNoop; + + /** + * The factory should be able to create the {@link FieldNameConverter} + * that has been defined by the user. + */ + @Test + public void testCreateNoop() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + WriterConfiguration config = createConfig(writer, sensor, jsonWithNoop); + + // validate the converter created for 'bro' + FieldNameConverter converter = factory.create(sensor, config); + assertTrue(converter instanceof NoopFieldNameConverter); + } + + /** + * { + * "elasticsearch": { + * + * "index": "theIndex", + * "batchSize": 100, + * "batchTimeout": 1000, + * "enabled": true + * } + * } + */ + @Multiline + private static String jsonWithNoConverter; + + /** + * The factory should create a default {@link FieldNameConverter} if none has been defined + * by the user in the writer configuration. + */ + @Test + public void testCreateDefault() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + WriterConfiguration config = createConfig(writer, sensor, jsonWithNoConverter); + + // if none defined, should default to 'DEDOT' + FieldNameConverter converter = factory.create(sensor, config); + assertTrue(converter instanceof DeDotFieldNameConverter); + } + + /** + * The factory should cache and reuse {@link FieldNameConverter} objects. + */ + @Test + public void testCacheUsage() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + WriterConfiguration config = createConfig(writer, sensor, jsonWithNoConverter); + + // validate the converter created for 'bro' + FieldNameConverter converter1 = factory.create(sensor, config); + assertNotNull(converter1); + assertEquals(1, cache.stats().requestCount()); + assertEquals(0, cache.stats().hitCount()); + assertEquals(1, cache.stats().missCount()); + + // the converter should come from the cache on the next request + FieldNameConverter converter2 = factory.create(sensor, config); + assertNotNull(converter2); + assertEquals(2, cache.stats().requestCount()); + assertEquals(1, cache.stats().hitCount()); + assertEquals(1, cache.stats().missCount()); + + assertSame(converter1, converter2); + } + + /** + * If the user changes the {@link FieldNameConverter} in the writer configuration, the new + * {@link FieldNameConverter} should be used after the old one expires. + */ + @Test + public void testConfigChange() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + + // no converter defined in config, should use 'DEDOT' converter + WriterConfiguration config = createConfig(writer, sensor, jsonWithNoConverter); + assertTrue(factory.create(sensor, config) instanceof DeDotFieldNameConverter); + + // an 'updated' config uses the 'NOOP' converter + WriterConfiguration newConfig = createConfig(writer, sensor, jsonWithNoop); + + // even though config has changed, the cache has not expired yet, still using 'DEDOT' + assertTrue(factory.create(sensor, newConfig) instanceof DeDotFieldNameConverter); + + // advance 30 minutes + ticker.advance(8, TimeUnit.MINUTES); + + // now the 'NOOP' converter should be used + assertTrue(factory.create(sensor, newConfig) instanceof NoopFieldNameConverter); + } + + /** + * { + * "elasticsearch": { + * + * "index": "theIndex", + * "batchSize": 100, + * "batchTimeout": 1000, + * "enabled": true, + * "fieldNameConverter": "INVALID" + * } + * } + */ + @Multiline + private static String jsonWithInvalidConverter; + + /** + * If an invalid field name converter is specified, it should fall-back to using the + * default, noop converter. + */ + @Test + public void testCreateInvalid() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + WriterConfiguration config = createConfig(writer, sensor, jsonWithInvalidConverter); + + // if invalid value defined, it should fall-back to using default 'DEDOT' + FieldNameConverter converter = factory.create(sensor, config); + assertTrue(converter instanceof DeDotFieldNameConverter); + } +} diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md index f4a45010e6..a9a8ed9192 100644 --- a/metron-platform/metron-indexing/README.md +++ b/metron-platform/metron-indexing/README.md @@ -50,6 +50,8 @@ and sent to By default, errors during indexing are sent back into the `indexing` kafka queue so that they can be indexed and archived. ## Sensor Indexing Configuration + + The sensor specific configuration is intended to configure the indexing used for a given sensor type (e.g. `snort`). @@ -58,16 +60,16 @@ Just like the global config, the format is a JSON stored in zookeeper and on dis * `hdfs` * `solr` -Depending on how you start the indexing topology, it will have either -elasticsearch or solr and hdfs writers running. +Depending on how you start the indexing topology, it will have either Elasticsearch or Solr and HDFS writers running. + +| Property | Description | Default Value | +|----------------------|---------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| `index` | The name of the index to write to. | Defaults to the name of the sensor. | +| `batchSize` | The size of the batch that is written to the indices at once. | Defaults to `1`; no batching. | +| `batchTimeout` | The timeout after which a batch will be flushed even if `batchSize` has not been met. | Defaults to a duration which is a fraction of the Storm parameter `topology.message.timeout.secs`, if left undefined or set to 0. Ignored if batchSize is `1`, since this disables batching.| +| `enabled` | A boolean indicating whether the writer is enabled. | Defaults to `true` | +| `fieldNameConverter` | Defines how field names are transformed before being written to the index. Only applicable to `elasticsearch`. | Defaults to `DEDOT`. Acceptable values are `DEDOT` that replaces all '.' with ':' or `NOOP` that does not change the field names . | -The configuration for an individual writer-specific configuration is a JSON map with the following fields: -* `index` : The name of the index to write to (defaulted to the name of the sensor). -* `batchSize` : The size of the batch that is written to the indices at once. Defaults to `1` (no batching). -* `batchTimeout` : The timeout after which a batch will be flushed even if batchSize has not been met. Optional. -If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm -parameter `topology.message.timeout.secs`. Ignored if batchSize is `1`, since this disables batching. -* `enabled` : Whether the writer is enabled (default `true`). ### Meta Alerts Alerts can be grouped, after appropriate searching, into a set of alerts called a meta alert. A meta alert is useful for maintaining the context of searching and grouping during further investigations. Standard searches can return meta alerts, but grouping and other aggregation or sorting requests will not, because there's not a clear way to aggregate in many cases if there are multiple alerts contained in the meta alert. All meta alerts will have the source type of metaalert, regardless of the contained alert's origins. diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java index b8af6a3eee..2416c864ec 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HDFSIndexingIntegrationTest.java @@ -18,16 +18,14 @@ package org.apache.metron.indexing.integration; -import com.google.common.base.Function; import com.google.common.collect.Iterables; -import org.apache.metron.common.interfaces.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.integration.*; import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.utils.TestUtils; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; -import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.*; diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java index 1671ab3273..2e703f667b 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java @@ -22,7 +22,7 @@ import org.apache.metron.TestConstants; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.ConfigurationsUtils; -import org.apache.metron.common.interfaces.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.enrichment.integration.components.ConfigUploadComponent; import org.apache.metron.integration.BaseIntegrationTest; @@ -38,7 +38,6 @@ import org.junit.Assert; import org.junit.Test; -import javax.annotation.Nullable; import java.io.File; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java index b1404e286d..da884f1442 100644 --- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java +++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java @@ -301,7 +301,6 @@ public boolean isEnabled(String sensorName) { @Override public Map getSensorConfig(String sensorName) { return sensorConfig; - } @Override @@ -313,6 +312,11 @@ public Map getGlobalConfig() { public boolean isDefault(String sensorName) { return false; } + + @Override + public String getFieldNameConverter(String sensorName) { + return null; + } }; } } diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java index 7c907fd7da..256f23b800 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java @@ -20,7 +20,7 @@ import com.google.common.base.Function; import org.apache.metron.common.configuration.Configurations; import org.apache.metron.common.configuration.ConfigurationsUtils; -import org.apache.metron.common.interfaces.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.enrichment.integration.utils.SampleUtil; import org.apache.metron.indexing.integration.IndexingIntegrationTest; @@ -34,7 +34,6 @@ import org.apache.metron.solr.integration.components.SolrComponent; import javax.annotation.Nullable; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Properties; From 898c812f4bbbb5b2152f2d53394ae37ec3631c9a Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Fri, 25 May 2018 10:12:28 -0400 Subject: [PATCH 2/8] Addressed review comments when default converter is created --- .../elasticsearch/writer/CachedFieldNameConverterFactory.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java index 3f8a0193ad..cbe1554497 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java @@ -23,10 +23,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.field.DeDotFieldNameConverter; import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.field.FieldNameConverters; -import org.apache.metron.common.field.NoopFieldNameConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,7 +130,7 @@ public FieldNameConverter create(String sensorType, WriterConfiguration config) private FieldNameConverter createInstance(String sensorType, WriterConfiguration config) { // default to the 'DEDOT' field name converter to maintain backwards compatibility - FieldNameConverter result = new DeDotFieldNameConverter(); + FieldNameConverter result = FieldNameConverters.DEDOT.get(); // which field name converter should be used? String converterName = config.getFieldNameConverter(sensorType); From b18c3bf37c806e89cda48fb744828e27bafbdc8a Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Fri, 25 May 2018 10:13:39 -0400 Subject: [PATCH 3/8] FieldNameConverters needs to return a new instance on each get(), rather reusing the same instance each time. --- .../common/field/FieldNameConverters.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java index 318a27381f..d03188bbbc 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java @@ -18,6 +18,8 @@ package org.apache.metron.common.field; +import org.apache.metron.common.utils.ReflectionUtils; + /** * Allows the field name converter to be specified using a short-hand * name, rather than the entire fully-qualified class name. @@ -28,21 +30,27 @@ public enum FieldNameConverters { * A {@link FieldNameConverter} that does not rename any fields. All field * names remain unchanged. */ - NOOP(new NoopFieldNameConverter()), + NOOP(NoopFieldNameConverter.class.getName()), /** * A {@link FieldNameConverter} that replaces all field names containing dots * with colons. */ - DEDOT(new DeDotFieldNameConverter()); + DEDOT(DeDotFieldNameConverter.class.getName()); - private FieldNameConverter converter; + /** + * The name of a class that implements {@link FieldNameConverter}. + */ + private String className; - FieldNameConverters(FieldNameConverter converter) { - this.converter = converter; + FieldNameConverters(String className) { + this.className = className; } + /** + * Returns a new instance of the {@link FieldNameConverter}. + */ public FieldNameConverter get() { - return converter; + return ReflectionUtils.createInstance(className); } } From cee3994644a3c1dff7751a6f13de4938676e8a3e Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Fri, 25 May 2018 12:41:38 -0400 Subject: [PATCH 4/8] Removed 'caching' factory and replaced with 'shared' factory --- .../common/field/FieldNameConverters.java | 27 ++-- .../CachedFieldNameConverterFactory.java | 153 ------------------ .../writer/ElasticsearchWriter.java | 2 +- .../writer/FieldNameConverterFactory.java | 7 + .../SharedFieldNameConverterFactory.java | 72 +++++++++ ... SharedFieldNameConverterFactoryTest.java} | 70 +------- 6 files changed, 100 insertions(+), 231 deletions(-) delete mode 100644 metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java create mode 100644 metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactory.java rename metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/{CachedFieldNameConverterFactoryTest.java => SharedFieldNameConverterFactoryTest.java} (69%) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java index d03188bbbc..d41d498330 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java @@ -18,10 +18,12 @@ package org.apache.metron.common.field; -import org.apache.metron.common.utils.ReflectionUtils; - /** - * Allows the field name converter to be specified using a short-hand + * Enumerates a set of {@link FieldNameConverter} implementations. + * + *

Provides shared instances of each {@link FieldNameConverter}. + * + *

Allows the field name converter to be specified using a short-hand * name, rather than the entire fully-qualified class name. */ public enum FieldNameConverters { @@ -30,27 +32,26 @@ public enum FieldNameConverters { * A {@link FieldNameConverter} that does not rename any fields. All field * names remain unchanged. */ - NOOP(NoopFieldNameConverter.class.getName()), + NOOP(new NoopFieldNameConverter()), /** * A {@link FieldNameConverter} that replaces all field names containing dots * with colons. */ - DEDOT(DeDotFieldNameConverter.class.getName()); + DEDOT(new DeDotFieldNameConverter()); - /** - * The name of a class that implements {@link FieldNameConverter}. - */ - private String className; + private FieldNameConverter converter; - FieldNameConverters(String className) { - this.className = className; + FieldNameConverters(FieldNameConverter converter) { + this.converter = converter; } /** - * Returns a new instance of the {@link FieldNameConverter}. + * Returns a shared instance of the {@link FieldNameConverter}/ + * + * @return A shared {@link FieldNameConverter} instance. */ public FieldNameConverter get() { - return ReflectionUtils.createInstance(className); + return converter; } } diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java deleted file mode 100644 index cbe1554497..0000000000 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactory.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.elasticsearch.writer; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import org.apache.commons.lang.ClassUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.field.FieldNameConverter; -import org.apache.metron.common.field.FieldNameConverters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.invoke.MethodHandles; -import java.util.concurrent.TimeUnit; - -/** - * A {@link FieldNameConverterFactory} that is backed by a cache. - * - *

Each sensor type can use a different {@link FieldNameConverter} implementation. - * - *

The {@link WriterConfiguration} allows a user to define the {@link FieldNameConverter} - * that should be used for a given sensor type. - * - *

The {@link FieldNameConverter}s are maintained in a cache for a fixed period of time - * after they are created. Once they expire, the {@link WriterConfiguration} is used to - * reload the {@link FieldNameConverter}. - * - *

The user can change the {@link FieldNameConverter} in use at runtime. A change - * to this configuration is recognized once the old {@link FieldNameConverter} expires - * from the cache. - * - *

Defining a shorter expiration interval allows config changes to be recognized more - * quickly, but also can impact performance negatively. - */ -public class CachedFieldNameConverterFactory implements FieldNameConverterFactory { - - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - /** - * A cache that contains a {@link FieldNameConverter} for each sensor type. - * - * A user can alter the {@link FieldNameConverter} for a given sensor at any time - * by altering the Indexing configuration. The actual {@link FieldNameConverter} - * in use for a given sensor will only change once the original converter has - * expired from the cache. - */ - private Cache fieldNameConverters; - - /** - * Creates a {@link CachedFieldNameConverterFactory}. - * - * @param expires The duration before {@link FieldNameConverter}s are expired. - * @param expiresUnits The units before {@link FieldNameConverter}s are expired. - */ - public CachedFieldNameConverterFactory(int expires, TimeUnit expiresUnits) { - - fieldNameConverters = createFieldNameConverterCache(expires, expiresUnits); - } - - /** - * Creates a {@link CachedFieldNameConverterFactory} where the cache expires after 5 minutes. - */ - public CachedFieldNameConverterFactory() { - - this(5, TimeUnit.MINUTES); - } - - /** - * Creates a {@link CachedFieldNameConverterFactory} using the given cache. This should only - * be used for testing. - * - * @param fieldNameConverters A {@link Cache} containing {@link FieldNameConverter}s. - */ - public CachedFieldNameConverterFactory(Cache fieldNameConverters) { - - this.fieldNameConverters = fieldNameConverters; - } - - /** - * Creates a cache of {@link FieldNameConverter}s, one for each source type. - * - * @return A cache of {@link FieldNameConverter}s. - */ - private Cache createFieldNameConverterCache(int expire, TimeUnit expireUnits) { - - return Caffeine - .newBuilder() - .expireAfterWrite(expire, expireUnits) - .build(); - } - - /** - * Create a new {@link FieldNameConverter}. - * - * @param sensorType The type of sensor. - * @param config The writer configuration. - * @return - */ - @Override - public FieldNameConverter create(String sensorType, WriterConfiguration config) { - - return fieldNameConverters.get(sensorType, (s) -> createInstance(sensorType, config)); - } - - /** - * Create a new {@link FieldNameConverter}. - * - * @param sensorType The type of sensor. - * @param config The writer configuration. - * @return - */ - private FieldNameConverter createInstance(String sensorType, WriterConfiguration config) { - - // default to the 'DEDOT' field name converter to maintain backwards compatibility - FieldNameConverter result = FieldNameConverters.DEDOT.get(); - - // which field name converter should be used? - String converterName = config.getFieldNameConverter(sensorType); - if(StringUtils.isNotBlank(converterName)) { - - try { - result = FieldNameConverters.valueOf(converterName).get(); - - } catch(IllegalArgumentException e) { - LOG.error("Unable to create field name converter, using default; value={}, error={}", - converterName, ExceptionUtils.getRootCauseMessage(e)); - } - } - - LOG.debug("Created field name converter; sensorType={}, configuredName={}, class={}", - sensorType, converterName, ClassUtils.getShortClassName(result, "null")); - - return result; - } -} diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index a1e3f2f56e..e31c3bb076 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -70,7 +70,7 @@ public void init(Map stormConf, TopologyContext topologyContext, WriterConfigura Map globalConfiguration = configurations.getGlobalConfig(); client = ElasticsearchUtils.getClient(globalConfiguration); dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration); - fieldNameConverterFactory = new CachedFieldNameConverterFactory(); + fieldNameConverterFactory = new SharedFieldNameConverterFactory(); } @Override diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/FieldNameConverterFactory.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/FieldNameConverterFactory.java index 1371a580cb..16a4ac95d3 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/FieldNameConverterFactory.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/FieldNameConverterFactory.java @@ -22,6 +22,13 @@ /** * A factory that creates {@link FieldNameConverter} objects. + * + *

The {@link WriterConfiguration} allows a user to define the {@link FieldNameConverter} + * that should be used. + * + *

Each sensor type can use a different {@link FieldNameConverter} implementation. + * + *

The user can change the {@link FieldNameConverter} in use at runtime. */ public interface FieldNameConverterFactory { diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactory.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactory.java new file mode 100644 index 0000000000..18b02d7b9a --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.writer; + +import org.apache.commons.lang.ClassUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.field.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; + +/** + * A {@link FieldNameConverterFactory} that returns instances of {@link FieldNameConverter} + * that are shared and reused. + * + *

The instances are created and managed by the {@link FieldNameConverters} class. + */ +public class SharedFieldNameConverterFactory implements FieldNameConverterFactory { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * Create a new {@link FieldNameConverter}. + * + * @param sensorType The type of sensor. + * @param config The writer configuration. + * @return + */ + @Override + public FieldNameConverter create(String sensorType, WriterConfiguration config) { + + // default to the 'DEDOT' field name converter to maintain backwards compatibility + FieldNameConverter result = FieldNameConverters.DEDOT.get(); + + // which field name converter should be used? + String converterName = config.getFieldNameConverter(sensorType); + if(StringUtils.isNotBlank(converterName)) { + + try { + result = FieldNameConverters.valueOf(converterName).get(); + + } catch(IllegalArgumentException e) { + LOG.error("Unable to create field name converter, using default; value={}, error={}", + converterName, ExceptionUtils.getRootCauseMessage(e)); + } + } + + LOG.debug("Created field name converter; sensorType={}, configuredName={}, class={}", + sensorType, converterName, ClassUtils.getShortClassName(result, "null")); + + return result; + } +} diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactoryTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactoryTest.java similarity index 69% rename from metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactoryTest.java rename to metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactoryTest.java index 5198189a75..dcc332d19c 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/CachedFieldNameConverterFactoryTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactoryTest.java @@ -17,9 +17,6 @@ */ package org.apache.metron.elasticsearch.writer; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.google.common.testing.FakeTicker; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.configuration.IndexingConfigurations; import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; @@ -30,38 +27,20 @@ import org.junit.Before; import org.junit.Test; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; /** - * Tests the {@link CachedFieldNameConverterFactory}. + * Tests the {@link SharedFieldNameConverterFactory}. */ -public class CachedFieldNameConverterFactoryTest { +public class SharedFieldNameConverterFactoryTest { - private CachedFieldNameConverterFactory factory; - private Cache cache; - private FakeTicker ticker; + private SharedFieldNameConverterFactory factory; @Before public void setup() throws Exception { - // allows us to advance time in the cache - ticker = new FakeTicker(); - - // a cache configured for testing - cache = Caffeine.newBuilder() - .expireAfterWrite(5, TimeUnit.MINUTES) - .executor(Runnable::run) - .ticker(ticker::read) - .recordStats() - .build(); - // the factory being tested - factory = new CachedFieldNameConverterFactory(cache); + factory = new SharedFieldNameConverterFactory(); } private WriterConfiguration createConfig(String writer, String sensor, String json) throws Exception { @@ -87,8 +66,7 @@ private WriterConfiguration createConfig(String writer, String sensor, String js private static String jsonWithDedot; /** - * The factory should be able to create the {@link FieldNameConverter} - * that has been defined by the user. + * The factory should be able to create a {@link DeDotFieldNameConverter}. */ @Test public void testCreateDedot() throws Exception { @@ -118,8 +96,7 @@ public void testCreateDedot() throws Exception { private static String jsonWithNoop; /** - * The factory should be able to create the {@link FieldNameConverter} - * that has been defined by the user. + * The factory should be able to create a {@link NoopFieldNameConverter}. */ @Test public void testCreateNoop() throws Exception { @@ -163,33 +140,6 @@ public void testCreateDefault() throws Exception { assertTrue(converter instanceof DeDotFieldNameConverter); } - /** - * The factory should cache and reuse {@link FieldNameConverter} objects. - */ - @Test - public void testCacheUsage() throws Exception { - - final String writer = "elasticsearch"; - final String sensor = "bro"; - WriterConfiguration config = createConfig(writer, sensor, jsonWithNoConverter); - - // validate the converter created for 'bro' - FieldNameConverter converter1 = factory.create(sensor, config); - assertNotNull(converter1); - assertEquals(1, cache.stats().requestCount()); - assertEquals(0, cache.stats().hitCount()); - assertEquals(1, cache.stats().missCount()); - - // the converter should come from the cache on the next request - FieldNameConverter converter2 = factory.create(sensor, config); - assertNotNull(converter2); - assertEquals(2, cache.stats().requestCount()); - assertEquals(1, cache.stats().hitCount()); - assertEquals(1, cache.stats().missCount()); - - assertSame(converter1, converter2); - } - /** * If the user changes the {@link FieldNameConverter} in the writer configuration, the new * {@link FieldNameConverter} should be used after the old one expires. @@ -206,14 +156,6 @@ public void testConfigChange() throws Exception { // an 'updated' config uses the 'NOOP' converter WriterConfiguration newConfig = createConfig(writer, sensor, jsonWithNoop); - - // even though config has changed, the cache has not expired yet, still using 'DEDOT' - assertTrue(factory.create(sensor, newConfig) instanceof DeDotFieldNameConverter); - - // advance 30 minutes - ticker.advance(8, TimeUnit.MINUTES); - - // now the 'NOOP' converter should be used assertTrue(factory.create(sensor, newConfig) instanceof NoopFieldNameConverter); } From d79702da172059682e2a2f4211d739cf272c51e6 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Fri, 25 May 2018 14:43:28 -0400 Subject: [PATCH 5/8] Moved create to FileNameConverters --- .../common/field/FieldNameConverters.java | 41 ++++++++++ .../field/FieldNameConvertersTest.java} | 79 ++++++++++--------- .../writer/ElasticsearchWriter.java | 8 +- .../writer/FieldNameConverterFactory.java | 43 ---------- .../SharedFieldNameConverterFactory.java | 72 ----------------- 5 files changed, 83 insertions(+), 160 deletions(-) rename metron-platform/{metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactoryTest.java => metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java} (69%) delete mode 100644 metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/FieldNameConverterFactory.java delete mode 100644 metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactory.java diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java index d41d498330..45adbd8650 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java @@ -18,6 +18,14 @@ package org.apache.metron.common.field; +import org.apache.commons.lang.ClassUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; + /** * Enumerates a set of {@link FieldNameConverter} implementations. * @@ -40,6 +48,8 @@ public enum FieldNameConverters { */ DEDOT(new DeDotFieldNameConverter()); + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private FieldNameConverter converter; FieldNameConverters(FieldNameConverter converter) { @@ -54,4 +64,35 @@ public enum FieldNameConverters { public FieldNameConverter get() { return converter; } + + /** + * Create a new {@link FieldNameConverter}. + * + * @param sensorType The type of sensor. + * @param config The writer configuration. + * @return + */ + public static FieldNameConverter create(String sensorType, WriterConfiguration config) { + + // which field name converter should be used? + String converterName = config.getFieldNameConverter(sensorType); + FieldNameConverter result = null; + try { + result = FieldNameConverters.valueOf(converterName).get(); + + } catch(IllegalArgumentException e) { + LOG.error("Unable to create field name converter, using default; badValue={}, knownValues={}, error={}", + converterName, FieldNameConverters.values(), ExceptionUtils.getRootCauseMessage(e)); + } + + if(result == null) { + // default to the 'DEDOT' field name converter to maintain backwards compatibility + result = FieldNameConverters.DEDOT.get(); + } + + LOG.debug("Created field name converter; sensorType={}, configuredName={}, class={}", + sensorType, converterName, ClassUtils.getShortClassName(result, "null")); + + return result; + } } diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactoryTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java similarity index 69% rename from metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactoryTest.java rename to metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java index dcc332d19c..5a8604c622 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactoryTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java @@ -1,47 +1,17 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.elasticsearch.writer; +package org.apache.metron.common.field; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.configuration.IndexingConfigurations; import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration; import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.field.DeDotFieldNameConverter; -import org.apache.metron.common.field.FieldNameConverter; -import org.apache.metron.common.field.NoopFieldNameConverter; -import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertTrue; /** - * Tests the {@link SharedFieldNameConverterFactory}. + * Test the {@link FieldNameConverters} class. */ -public class SharedFieldNameConverterFactoryTest { - - private SharedFieldNameConverterFactory factory; - - @Before - public void setup() throws Exception { - - // the factory being tested - factory = new SharedFieldNameConverterFactory(); - } +public class FieldNameConvertersTest { private WriterConfiguration createConfig(String writer, String sensor, String json) throws Exception { @@ -76,7 +46,7 @@ public void testCreateDedot() throws Exception { WriterConfiguration config = createConfig(writer, sensor, jsonWithDedot); // validate the converter created for 'bro' - FieldNameConverter converter = factory.create(sensor, config); + FieldNameConverter converter = FieldNameConverters.create(sensor, config); assertTrue(converter instanceof DeDotFieldNameConverter); } @@ -106,7 +76,7 @@ public void testCreateNoop() throws Exception { WriterConfiguration config = createConfig(writer, sensor, jsonWithNoop); // validate the converter created for 'bro' - FieldNameConverter converter = factory.create(sensor, config); + FieldNameConverter converter = FieldNameConverters.create(sensor, config); assertTrue(converter instanceof NoopFieldNameConverter); } @@ -136,7 +106,7 @@ public void testCreateDefault() throws Exception { WriterConfiguration config = createConfig(writer, sensor, jsonWithNoConverter); // if none defined, should default to 'DEDOT' - FieldNameConverter converter = factory.create(sensor, config); + FieldNameConverter converter = FieldNameConverters.create(sensor, config); assertTrue(converter instanceof DeDotFieldNameConverter); } @@ -152,11 +122,11 @@ public void testConfigChange() throws Exception { // no converter defined in config, should use 'DEDOT' converter WriterConfiguration config = createConfig(writer, sensor, jsonWithNoConverter); - assertTrue(factory.create(sensor, config) instanceof DeDotFieldNameConverter); + assertTrue(FieldNameConverters.create(sensor, config) instanceof DeDotFieldNameConverter); // an 'updated' config uses the 'NOOP' converter WriterConfiguration newConfig = createConfig(writer, sensor, jsonWithNoop); - assertTrue(factory.create(sensor, newConfig) instanceof NoopFieldNameConverter); + assertTrue(FieldNameConverters.create(sensor, newConfig) instanceof NoopFieldNameConverter); } /** @@ -186,7 +156,38 @@ public void testCreateInvalid() throws Exception { WriterConfiguration config = createConfig(writer, sensor, jsonWithInvalidConverter); // if invalid value defined, it should fall-back to using default 'DEDOT' - FieldNameConverter converter = factory.create(sensor, config); + FieldNameConverter converter = FieldNameConverters.create(sensor, config); + assertTrue(converter instanceof DeDotFieldNameConverter); + } + + /** + * { + * "elasticsearch": { + * + * "index": "theIndex", + * "batchSize": 100, + * "batchTimeout": 1000, + * "enabled": true, + * "fieldNameConverter": "" + * } + * } + */ + @Multiline + private static String jsonWithBlankConverter; + + /** + * If the field name converter field is blank, it should fall-back to using the + * default converter. + */ + @Test + public void testCreateBlank() throws Exception { + + final String writer = "elasticsearch"; + final String sensor = "bro"; + WriterConfiguration config = createConfig(writer, sensor, jsonWithInvalidConverter); + + // if invalid value defined, it should fall-back to using default 'DEDOT' + FieldNameConverter converter = FieldNameConverters.create(sensor, config); assertTrue(converter instanceof DeDotFieldNameConverter); } } diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index e31c3bb076..4b8dd083e9 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -20,6 +20,7 @@ import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.field.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverters; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; @@ -59,10 +60,6 @@ public class ElasticsearchWriter implements BulkMessageWriter, Seria */ private SimpleDateFormat dateFormat; - /** - * A factory that creates {@link FieldNameConverter} objects. - */ - private FieldNameConverterFactory fieldNameConverterFactory; @Override public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) { @@ -70,14 +67,13 @@ public void init(Map stormConf, TopologyContext topologyContext, WriterConfigura Map globalConfiguration = configurations.getGlobalConfig(); client = ElasticsearchUtils.getClient(globalConfiguration); dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration); - fieldNameConverterFactory = new SharedFieldNameConverterFactory(); } @Override public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable tuples, List messages) throws Exception { // fetch the field name converter for this sensor type - FieldNameConverter fieldNameConverter = fieldNameConverterFactory.create(sensorType, configurations); + FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations); final String indexPostfix = dateFormat.format(new Date()); BulkRequestBuilder bulkRequest = client.prepareBulk(); diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/FieldNameConverterFactory.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/FieldNameConverterFactory.java deleted file mode 100644 index 16a4ac95d3..0000000000 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/FieldNameConverterFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.elasticsearch.writer; - -import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.field.FieldNameConverter; - -/** - * A factory that creates {@link FieldNameConverter} objects. - * - *

The {@link WriterConfiguration} allows a user to define the {@link FieldNameConverter} - * that should be used. - * - *

Each sensor type can use a different {@link FieldNameConverter} implementation. - * - *

The user can change the {@link FieldNameConverter} in use at runtime. - */ -public interface FieldNameConverterFactory { - - /** - * Create a {@link FieldNameConverter} object. - * - * @param sensorType The type of sensor. - * @param config The writer configuration. - * @return A {@link FieldNameConverter} object. - */ - FieldNameConverter create(String sensorType, WriterConfiguration config); -} diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactory.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactory.java deleted file mode 100644 index 18b02d7b9a..0000000000 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/SharedFieldNameConverterFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.elasticsearch.writer; - -import org.apache.commons.lang.ClassUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.metron.common.configuration.writer.WriterConfiguration; -import org.apache.metron.common.field.FieldNameConverter; -import org.apache.metron.common.field.FieldNameConverters; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.invoke.MethodHandles; - -/** - * A {@link FieldNameConverterFactory} that returns instances of {@link FieldNameConverter} - * that are shared and reused. - * - *

The instances are created and managed by the {@link FieldNameConverters} class. - */ -public class SharedFieldNameConverterFactory implements FieldNameConverterFactory { - - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - /** - * Create a new {@link FieldNameConverter}. - * - * @param sensorType The type of sensor. - * @param config The writer configuration. - * @return - */ - @Override - public FieldNameConverter create(String sensorType, WriterConfiguration config) { - - // default to the 'DEDOT' field name converter to maintain backwards compatibility - FieldNameConverter result = FieldNameConverters.DEDOT.get(); - - // which field name converter should be used? - String converterName = config.getFieldNameConverter(sensorType); - if(StringUtils.isNotBlank(converterName)) { - - try { - result = FieldNameConverters.valueOf(converterName).get(); - - } catch(IllegalArgumentException e) { - LOG.error("Unable to create field name converter, using default; value={}, error={}", - converterName, ExceptionUtils.getRootCauseMessage(e)); - } - } - - LOG.debug("Created field name converter; sensorType={}, configuredName={}, class={}", - sensorType, converterName, ClassUtils.getShortClassName(result, "null")); - - return result; - } -} From 00f02eb13c2ad32db14c5530758b4a08e437c910 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Fri, 25 May 2018 18:19:55 -0400 Subject: [PATCH 6/8] Added license and small fixups --- .../common/field/FieldNameConverters.java | 6 +++--- .../common/field/FieldNameConvertersTest.java | 18 ++++++++++++++++++ .../ElasticsearchIndexingIntegrationTest.java | 3 ++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java index 45adbd8650..5b70a004e9 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -81,7 +81,7 @@ public static FieldNameConverter create(String sensorType, WriterConfiguration c result = FieldNameConverters.valueOf(converterName).get(); } catch(IllegalArgumentException e) { - LOG.error("Unable to create field name converter, using default; badValue={}, knownValues={}, error={}", + LOG.error("Invalid field name converter, using default; configured={}, knownValues={}, error={}", converterName, FieldNameConverters.values(), ExceptionUtils.getRootCauseMessage(e)); } @@ -90,7 +90,7 @@ public static FieldNameConverter create(String sensorType, WriterConfiguration c result = FieldNameConverters.DEDOT.get(); } - LOG.debug("Created field name converter; sensorType={}, configuredName={}, class={}", + LOG.debug("Created field name converter; sensorType={}, configured={}, class={}", sensorType, converterName, ClassUtils.getShortClassName(result, "null")); return result; diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java index 5a8604c622..57be53a18c 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.metron.common.field; import org.adrianwalker.multilinestring.Multiline; diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java index c4a2ceca20..3400d7c506 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java @@ -20,6 +20,7 @@ import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.field.DeDotFieldNameConverter; import org.apache.metron.common.field.FieldNameConverter; +import org.apache.metron.common.field.FieldNameConverters; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; import org.apache.metron.indexing.integration.IndexingIntegrationTest; import org.apache.metron.integration.ComponentRunner; @@ -43,7 +44,7 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes private String indexDir = "target/elasticsearch"; private String dateFormat = "yyyy.MM.dd.HH"; private String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date()); - private FieldNameConverter fieldNameConverter = new DeDotFieldNameConverter(); + private FieldNameConverter fieldNameConverter = FieldNameConverters.DEDOT.get(); /** * { * "yaf_doc": { From b7363f303195279113540b704fd842584eb5dfc6 Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 29 May 2018 17:48:24 -0400 Subject: [PATCH 7/8] FieldNameConverters is-a FieldNameConverter --- .../common/field/FieldNameConverters.java | 22 ++++++++++++++++--- .../common/field/FieldNameConvertersTest.java | 16 +++++++------- .../ElasticsearchIndexingIntegrationTest.java | 2 +- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java index 5b70a004e9..c7bb3fd3fa 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java @@ -19,6 +19,7 @@ package org.apache.metron.common.field; import org.apache.commons.lang.ClassUtils; +import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.slf4j.Logger; @@ -34,7 +35,7 @@ *

Allows the field name converter to be specified using a short-hand * name, rather than the entire fully-qualified class name. */ -public enum FieldNameConverters { +public enum FieldNameConverters implements FieldNameConverter { /** * A {@link FieldNameConverter} that does not rename any fields. All field @@ -65,6 +66,21 @@ public FieldNameConverter get() { return converter; } + /** + * Allows the {@link FieldNameConverters} enums to be used directly as a {@link FieldNameConverter}. + * + * {@code + * FieldNameConverter converter = FieldNameConverters.DEDOT; + * } + * + * @param originalField The original field name. + * @return + */ + @Override + public String convert(String originalField) { + return converter.convert(originalField); + } + /** * Create a new {@link FieldNameConverter}. * @@ -78,7 +94,7 @@ public static FieldNameConverter create(String sensorType, WriterConfiguration c String converterName = config.getFieldNameConverter(sensorType); FieldNameConverter result = null; try { - result = FieldNameConverters.valueOf(converterName).get(); + result = FieldNameConverters.valueOf(converterName); } catch(IllegalArgumentException e) { LOG.error("Invalid field name converter, using default; configured={}, knownValues={}, error={}", @@ -87,7 +103,7 @@ public static FieldNameConverter create(String sensorType, WriterConfiguration c if(result == null) { // default to the 'DEDOT' field name converter to maintain backwards compatibility - result = FieldNameConverters.DEDOT.get(); + result = FieldNameConverters.DEDOT; } LOG.debug("Created field name converter; sensorType={}, configured={}, class={}", diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java index 57be53a18c..2c263f2f92 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/field/FieldNameConvertersTest.java @@ -24,7 +24,7 @@ import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.junit.Test; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; /** * Test the {@link FieldNameConverters} class. @@ -65,7 +65,7 @@ public void testCreateDedot() throws Exception { // validate the converter created for 'bro' FieldNameConverter converter = FieldNameConverters.create(sensor, config); - assertTrue(converter instanceof DeDotFieldNameConverter); + assertEquals(FieldNameConverters.DEDOT, converter); } /** @@ -95,7 +95,7 @@ public void testCreateNoop() throws Exception { // validate the converter created for 'bro' FieldNameConverter converter = FieldNameConverters.create(sensor, config); - assertTrue(converter instanceof NoopFieldNameConverter); + assertEquals(FieldNameConverters.NOOP, converter); } /** @@ -125,7 +125,7 @@ public void testCreateDefault() throws Exception { // if none defined, should default to 'DEDOT' FieldNameConverter converter = FieldNameConverters.create(sensor, config); - assertTrue(converter instanceof DeDotFieldNameConverter); + assertEquals(FieldNameConverters.DEDOT, converter); } /** @@ -140,11 +140,11 @@ public void testConfigChange() throws Exception { // no converter defined in config, should use 'DEDOT' converter WriterConfiguration config = createConfig(writer, sensor, jsonWithNoConverter); - assertTrue(FieldNameConverters.create(sensor, config) instanceof DeDotFieldNameConverter); + assertEquals(FieldNameConverters.DEDOT, FieldNameConverters.create(sensor, config)); // an 'updated' config uses the 'NOOP' converter WriterConfiguration newConfig = createConfig(writer, sensor, jsonWithNoop); - assertTrue(FieldNameConverters.create(sensor, newConfig) instanceof NoopFieldNameConverter); + assertEquals(FieldNameConverters.NOOP, FieldNameConverters.create(sensor, newConfig)); } /** @@ -175,7 +175,7 @@ public void testCreateInvalid() throws Exception { // if invalid value defined, it should fall-back to using default 'DEDOT' FieldNameConverter converter = FieldNameConverters.create(sensor, config); - assertTrue(converter instanceof DeDotFieldNameConverter); + assertEquals(FieldNameConverters.DEDOT, converter); } /** @@ -206,6 +206,6 @@ public void testCreateBlank() throws Exception { // if invalid value defined, it should fall-back to using default 'DEDOT' FieldNameConverter converter = FieldNameConverters.create(sensor, config); - assertTrue(converter instanceof DeDotFieldNameConverter); + assertEquals(FieldNameConverters.DEDOT, converter); } } diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java index 3400d7c506..df5e96a8f8 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java @@ -44,7 +44,7 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes private String indexDir = "target/elasticsearch"; private String dateFormat = "yyyy.MM.dd.HH"; private String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date()); - private FieldNameConverter fieldNameConverter = FieldNameConverters.DEDOT.get(); + private FieldNameConverter fieldNameConverter = FieldNameConverters.DEDOT; /** * { * "yaf_doc": { From 30315ee9a6b6b1a3e26820d588099ccdfe9e004b Mon Sep 17 00:00:00 2001 From: Nick Allen Date: Tue, 29 May 2018 17:51:54 -0400 Subject: [PATCH 8/8] Don't want an error level log if the user has not defined a converter --- .../common/field/FieldNameConverters.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java index c7bb3fd3fa..d5858ed272 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/field/FieldNameConverters.java @@ -89,20 +89,22 @@ public String convert(String originalField) { * @return */ public static FieldNameConverter create(String sensorType, WriterConfiguration config) { - - // which field name converter should be used? - String converterName = config.getFieldNameConverter(sensorType); FieldNameConverter result = null; - try { - result = FieldNameConverters.valueOf(converterName); - } catch(IllegalArgumentException e) { - LOG.error("Invalid field name converter, using default; configured={}, knownValues={}, error={}", - converterName, FieldNameConverters.values(), ExceptionUtils.getRootCauseMessage(e)); + // which field name converter has been configured? + String converterName = config.getFieldNameConverter(sensorType); + if(StringUtils.isNotBlank(converterName)) { + try { + result = FieldNameConverters.valueOf(converterName); + + } catch (IllegalArgumentException e) { + LOG.error("Invalid field name converter, using default; configured={}, knownValues={}, error={}", + converterName, FieldNameConverters.values(), ExceptionUtils.getRootCauseMessage(e)); + } } if(result == null) { - // default to the 'DEDOT' field name converter to maintain backwards compatibility + // if no converter defined or an invalid converter is defined, default to 'DEDOT' result = FieldNameConverters.DEDOT; }