Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-1569 Allow user to change field name conversion when indexing … #1022

Closed
wants to merge 8 commits into from
Expand Up @@ -31,6 +31,7 @@ public class IndexingConfigurations extends Configurations {
public static final String ENABLED_CONF = "enabled";
public static final String INDEX_CONF = "index";
public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction";
public static final String FIELD_NAME_CONVERTER_CONF = "fieldNameConverter";

public Map<String, Object> getSensorIndexingConfig(String sensorType, boolean emptyMapOnNonExistent) {
Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorType));
Expand Down Expand Up @@ -61,7 +62,8 @@ public void delete(String sensorType) {
}

public Map<String, Object> getSensorIndexingConfig(String sensorType, String writerName) {
Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorType));
String key = getKey(sensorType);
Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(key);
if(ret == null) {
return new HashMap();
}
Expand Down Expand Up @@ -147,6 +149,10 @@ public String getOutputPathFunction(String sensorName, String writerName) {
return getOutputPathFunction(getSensorIndexingConfig(sensorName, writerName), sensorName);
}

public String getFieldNameConverter(String sensorName, String writerName) {
return getFieldNameConverter(getSensorIndexingConfig(sensorName, writerName), sensorName);
}

public static boolean isEnabled(Map<String, Object> conf) {
return getAs( ENABLED_CONF
,conf
Expand Down Expand Up @@ -187,6 +193,10 @@ public static String getOutputPathFunction(Map<String, Object> conf, String sens
);
}

public static String getFieldNameConverter(Map<String, Object> conf, String sensorName) {
return getAs(FIELD_NAME_CONVERTER_CONF, conf, "", String.class);
}

public static Map<String, Object> setEnabled(Map<String, Object> conf, boolean enabled) {
Map<String, Object> ret = conf == null?new HashMap<>():conf;
ret.put(ENABLED_CONF, enabled);
Expand All @@ -210,6 +220,12 @@ public static Map<String, Object> setIndex(Map<String, Object> conf, String inde
return ret;
}

public static Map<String, Object> setFieldNameConverter(Map<String, Object> conf, String index) {
Map<String, Object> ret = conf == null ? new HashMap<>(): conf;
ret.put(FIELD_NAME_CONVERTER_CONF, index);
return ret;
}

public static <T> T getAs(String key, Map<String, Object> map, T defaultValue, Class<T> clazz) {
return map == null?defaultValue: ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz);
}
Expand Down
Expand Up @@ -72,4 +72,9 @@ public Map<String, Object> getGlobalConfig() {
public boolean isDefault(String sensorName) {
return config.orElse(new IndexingConfigurations()).isDefault(sensorName, writerName);
}

@Override
public String getFieldNameConverter(String sensorName) {
return config.orElse(new IndexingConfigurations()).getFieldNameConverter(sensorName, writerName);
}
}
Expand Up @@ -101,4 +101,10 @@ public Map<String, Object> getGlobalConfig() {
public boolean isDefault(String sensorName) {
return false;
}

@Override
public String getFieldNameConverter(String sensorName) {
// not applicable
return null;
}
}
Expand Up @@ -68,4 +68,10 @@ public Map<String, Object> getGlobalConfig() {
public boolean isDefault(String sensorName) {
return false;
}

@Override
public String getFieldNameConverter(String sensorName) {
// not applicable
return null;
}
}
Expand Up @@ -18,17 +18,86 @@

package org.apache.metron.common.configuration.writer;

import org.apache.metron.common.field.FieldNameConverter;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
* Configures a writer to write messages to an endpoint.
*
* <p>Each destination will have its own {@link WriterConfiguration}; for example HDFS, Elasticsearch, and Solr.
* <p>A writer can be configured independently for each source type.
*/
public interface WriterConfiguration extends Serializable {

/**
* Defines the maximum batch size for a given sensor.
*
* @param sensorName The name of the sensor.
* @return The batch size for the sensor.
*/
int getBatchSize(String sensorName);

/**
* Defines the batch timeout for a given sensor. Even if the maximum
* batch size has not been reached, the messages will be written when
* the timeout is reached.
*
* @param sensorName The name of the sensor.
* @return The batch timeout for the sensor.
*/
int getBatchTimeout(String sensorName);

/**
* Returns the batch timeouts for all of the currently configured sensors.
* @return All of the batch timeouts.
*/
List<Integer> getAllConfiguredTimeouts();

/**
* The name of the index to write to for a given sensor.
*
* @param sensorName The name of the sensor.
* @return The name of the index to write to
*/
String getIndex(String sensorName);

/**
* Returns true, if this writer is enabled for the given sensor.
*
* @param sensorName The name of the sensor.
* @return True, if this writer is enabled. Otherwise, false.
*/
boolean isEnabled(String sensorName);

/**
* @param sensorName The name of a sensor.
* @return
*/
Map<String, Object> getSensorConfig(String sensorName);

/**
* Returns the global configuration.
* @return The global configuration.
*/
Map<String, Object> getGlobalConfig();

/**
* Returns true, if the current writer configuration is set to all default values.
*
* @param sensorName The name of the sensor.
* @return True, if the writer is using all default values. Otherwise, false.
*/
boolean isDefault(String sensorName);

/**
* Return the {@link FieldNameConverter} to use
* when writing messages.
*
* @param sensorName The name of the sensor;
* @return
*/
String getFieldNameConverter(String sensorName);
}
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.common.field;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.lang.invoke.MethodHandles;

/**
* A {@link FieldNameConverter} that replaces all field names containing dots
* with colons.
*/
public class DeDotFieldNameConverter implements FieldNameConverter, Serializable {

private static final long serialVersionUID = -3126840090749760299L;
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Override
public String convert(String originalField) {

String newField = originalField.replace(".",":");

if(LOG.isDebugEnabled() && originalField.contains(".")) {
LOG.debug("Renamed dotted field; original={}, new={}", originalField, newField);
}

return newField;
}
}
Expand Up @@ -15,10 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.common.interfaces;
package org.apache.metron.common.field;

/**
* Allows field names to be transformed before a message is written to an endpoint.
*/
public interface FieldNameConverter {

String convert(String originalField);

/**
* Convert the field name.
*
* @param originalField The original field name.
* @return The new field name.
*/
String convert(String originalField);
}
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.metron.common.field;

import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;

/**
* Enumerates a set of {@link FieldNameConverter} implementations.
*
* <p>Provides shared instances of each {@link FieldNameConverter}.
*
* <p>Allows the field name converter to be specified using a short-hand
* name, rather than the entire fully-qualified class name.
*/
public enum FieldNameConverters implements FieldNameConverter {

/**
* A {@link FieldNameConverter} that does not rename any fields. All field
* names remain unchanged.
*/
NOOP(new NoopFieldNameConverter()),

/**
* A {@link FieldNameConverter} that replaces all field names containing dots
* with colons.
*/
DEDOT(new DeDotFieldNameConverter());

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private FieldNameConverter converter;

FieldNameConverters(FieldNameConverter converter) {
this.converter = converter;
}

/**
* Returns a shared instance of the {@link FieldNameConverter}/
*
* @return A shared {@link FieldNameConverter} instance.
*/
public FieldNameConverter get() {
return converter;
}

/**
* Allows the {@link FieldNameConverters} enums to be used directly as a {@link FieldNameConverter}.
*
* {@code
* FieldNameConverter converter = FieldNameConverters.DEDOT;
* }
*
* @param originalField The original field name.
* @return
*/
@Override
public String convert(String originalField) {
return converter.convert(originalField);
}

/**
* Create a new {@link FieldNameConverter}.
*
* @param sensorType The type of sensor.
* @param config The writer configuration.
* @return
*/
public static FieldNameConverter create(String sensorType, WriterConfiguration config) {
FieldNameConverter result = null;

// which field name converter has been configured?
String converterName = config.getFieldNameConverter(sensorType);
if(StringUtils.isNotBlank(converterName)) {
try {
result = FieldNameConverters.valueOf(converterName);

} catch (IllegalArgumentException e) {
LOG.error("Invalid field name converter, using default; configured={}, knownValues={}, error={}",
converterName, FieldNameConverters.values(), ExceptionUtils.getRootCauseMessage(e));
}
}

if(result == null) {
// if no converter defined or an invalid converter is defined, default to 'DEDOT'
result = FieldNameConverters.DEDOT;
}

LOG.debug("Created field name converter; sensorType={}, configured={}, class={}",
sensorType, converterName, ClassUtils.getShortClassName(result, "null"));

return result;
}
}
Expand Up @@ -15,18 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.common.field;

package org.apache.metron.elasticsearch.writer;

import org.junit.Test;

import static org.junit.Assert.*;

public class ElasticsearchFieldNameConverterTest {
/**
* A {@link FieldNameConverter} that does not rename any fields. All field
* names remain unchanged.
*/
public class NoopFieldNameConverter implements FieldNameConverter {

@Test
public void convert() throws Exception {
assertEquals("testfield:with:colons",new ElasticsearchFieldNameConverter().convert("testfield.with.colons"));
}
@Override
public String convert(String originalField) {

}
// no change to the field name
return originalField;
}
}