Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-1569 Allow user to change field name conversion when indexing … #1022

Closed
wants to merge 8 commits into from
Expand Up @@ -31,6 +31,7 @@ public class IndexingConfigurations extends Configurations {
public static final String ENABLED_CONF = "enabled";
public static final String INDEX_CONF = "index";
public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction";
public static final String FIELD_NAME_CONVERTER_CONF = "fieldNameConverter";

public Map<String, Object> getSensorIndexingConfig(String sensorType, boolean emptyMapOnNonExistent) {
Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorType));
Expand Down Expand Up @@ -61,7 +62,8 @@ public void delete(String sensorType) {
}

public Map<String, Object> getSensorIndexingConfig(String sensorType, String writerName) {
Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorType));
String key = getKey(sensorType);
Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(key);
if(ret == null) {
return new HashMap();
}
Expand Down Expand Up @@ -147,6 +149,10 @@ public String getOutputPathFunction(String sensorName, String writerName) {
return getOutputPathFunction(getSensorIndexingConfig(sensorName, writerName), sensorName);
}

public String getFieldNameConverter(String sensorName, String writerName) {
return getFieldNameConverter(getSensorIndexingConfig(sensorName, writerName), sensorName);
}

public static boolean isEnabled(Map<String, Object> conf) {
return getAs( ENABLED_CONF
,conf
Expand Down Expand Up @@ -187,6 +193,10 @@ public static String getOutputPathFunction(Map<String, Object> conf, String sens
);
}

public static String getFieldNameConverter(Map<String, Object> conf, String sensorName) {
return getAs(FIELD_NAME_CONVERTER_CONF, conf, "", String.class);
}

public static Map<String, Object> setEnabled(Map<String, Object> conf, boolean enabled) {
Map<String, Object> ret = conf == null?new HashMap<>():conf;
ret.put(ENABLED_CONF, enabled);
Expand All @@ -210,6 +220,12 @@ public static Map<String, Object> setIndex(Map<String, Object> conf, String inde
return ret;
}

public static Map<String, Object> setFieldNameConverter(Map<String, Object> conf, String index) {
Map<String, Object> ret = conf == null ? new HashMap<>(): conf;
ret.put(FIELD_NAME_CONVERTER_CONF, index);
return ret;
}

public static <T> T getAs(String key, Map<String, Object> map, T defaultValue, Class<T> clazz) {
return map == null?defaultValue: ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz);
}
Expand Down
Expand Up @@ -72,4 +72,9 @@ public Map<String, Object> getGlobalConfig() {
public boolean isDefault(String sensorName) {
return config.orElse(new IndexingConfigurations()).isDefault(sensorName, writerName);
}

@Override
public String getFieldNameConverter(String sensorName) {
return config.orElse(new IndexingConfigurations()).getFieldNameConverter(sensorName, writerName);
}
}
Expand Up @@ -101,4 +101,10 @@ public Map<String, Object> getGlobalConfig() {
public boolean isDefault(String sensorName) {
return false;
}

@Override
public String getFieldNameConverter(String sensorName) {
// not applicable
return null;
}
}
Expand Up @@ -68,4 +68,10 @@ public Map<String, Object> getGlobalConfig() {
public boolean isDefault(String sensorName) {
return false;
}

@Override
public String getFieldNameConverter(String sensorName) {
// not applicable
return null;
}
}
Expand Up @@ -18,17 +18,86 @@

package org.apache.metron.common.configuration.writer;

import org.apache.metron.common.field.FieldNameConverter;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
* Configures a writer to write messages to an endpoint.
*
* <p>Each destination will have its own {@link WriterConfiguration}; for example HDFS, Elasticsearch, and Solr.
* <p>A writer can be configured independently for each source type.
*/
public interface WriterConfiguration extends Serializable {

/**
* Defines the maximum batch size for a given sensor.
*
* @param sensorName The name of the sensor.
* @return The batch size for the sensor.
*/
int getBatchSize(String sensorName);

/**
* Defines the batch timeout for a given sensor. Even if the maximum
* batch size has not been reached, the messages will be written when
* the timeout is reached.
*
* @param sensorName The name of the sensor.
* @return The batch timeout for the sensor.
*/
int getBatchTimeout(String sensorName);

/**
* Returns the batch timeouts for all of the currently configured sensors.
* @return All of the batch timeouts.
*/
List<Integer> getAllConfiguredTimeouts();

/**
* The name of the index to write to for a given sensor.
*
* @param sensorName The name of the sensor.
* @return The name of the index to write to
*/
String getIndex(String sensorName);

/**
* Returns true, if this writer is enabled for the given sensor.
*
* @param sensorName The name of the sensor.
* @return True, if this writer is enabled. Otherwise, false.
*/
boolean isEnabled(String sensorName);

/**
* @param sensorName The name of a sensor.
* @return
*/
Map<String, Object> getSensorConfig(String sensorName);

/**
* Returns the global configuration.
* @return The global configuration.
*/
Map<String, Object> getGlobalConfig();

/**
* Returns true, if the current writer configuration is set to all default values.
*
* @param sensorName The name of the sensor.
* @return True, if the writer is using all default values. Otherwise, false.
*/
boolean isDefault(String sensorName);

/**
* Return the {@link FieldNameConverter} to use
* when writing messages.
*
* @param sensorName The name of the sensor;
* @return
*/
String getFieldNameConverter(String sensorName);
}
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.common.field;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.lang.invoke.MethodHandles;

/**
* A {@link FieldNameConverter} that replaces all field names containing dots
* with colons.
*/
public class DeDotFieldNameConverter implements FieldNameConverter, Serializable {

private static final long serialVersionUID = -3126840090749760299L;
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

@Override
public String convert(String originalField) {

String newField = originalField.replace(".",":");

if(LOG.isDebugEnabled() && originalField.contains(".")) {
LOG.debug("Renamed dotted field; original={}, new={}", originalField, newField);
}

return newField;
}
}
Expand Up @@ -15,10 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.common.interfaces;
package org.apache.metron.common.field;

/**
* Allows field names to be transformed before a message is written to an endpoint.
*/
public interface FieldNameConverter {

String convert(String originalField);

/**
* Convert the field name.
*
* @param originalField The original field name.
* @return The new field name.
*/
String convert(String originalField);
}
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.metron.common.field;

import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;

/**
* Enumerates a set of {@link FieldNameConverter} implementations.
*
* <p>Provides shared instances of each {@link FieldNameConverter}.
*
* <p>Allows the field name converter to be specified using a short-hand
* name, rather than the entire fully-qualified class name.
*/
public enum FieldNameConverters {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, can we have this implement FieldNameConverter so the enum can be used directly without calling get()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Check 'er out.


/**
* A {@link FieldNameConverter} that does not rename any fields. All field
* names remain unchanged.
*/
NOOP(new NoopFieldNameConverter()),

/**
* A {@link FieldNameConverter} that replaces all field names containing dots
* with colons.
*/
DEDOT(new DeDotFieldNameConverter());

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private FieldNameConverter converter;

FieldNameConverters(FieldNameConverter converter) {
this.converter = converter;
}

/**
* Returns a shared instance of the {@link FieldNameConverter}/
*
* @return A shared {@link FieldNameConverter} instance.
*/
public FieldNameConverter get() {
return converter;
}

/**
* Create a new {@link FieldNameConverter}.
*
* @param sensorType The type of sensor.
* @param config The writer configuration.
* @return
*/
public static FieldNameConverter create(String sensorType, WriterConfiguration config) {

// which field name converter should be used?
String converterName = config.getFieldNameConverter(sensorType);
FieldNameConverter result = null;
try {
result = FieldNameConverters.valueOf(converterName).get();

} catch(IllegalArgumentException e) {
LOG.error("Invalid field name converter, using default; configured={}, knownValues={}, error={}",
converterName, FieldNameConverters.values(), ExceptionUtils.getRootCauseMessage(e));
}

if(result == null) {
// default to the 'DEDOT' field name converter to maintain backwards compatibility
result = FieldNameConverters.DEDOT.get();
}

LOG.debug("Created field name converter; sensorType={}, configured={}, class={}",
sensorType, converterName, ClassUtils.getShortClassName(result, "null"));

return result;
}
}
Expand Up @@ -15,18 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.common.field;

package org.apache.metron.elasticsearch.writer;

import org.junit.Test;

import static org.junit.Assert.*;

public class ElasticsearchFieldNameConverterTest {
/**
* A {@link FieldNameConverter} that does not rename any fields. All field
* names remain unchanged.
*/
public class NoopFieldNameConverter implements FieldNameConverter {

@Test
public void convert() throws Exception {
assertEquals("testfield:with:colons",new ElasticsearchFieldNameConverter().convert("testfield.with.colons"));
}
@Override
public String convert(String originalField) {

}
// no change to the field name
return originalField;
}
}