Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'apache/master' into METRON-1533-NEW
Browse files Browse the repository at this point in the history
  • Loading branch information
nickwallen committed Jun 8, 2018
2 parents 957a942 + 40796c0 commit 9e99bd5
Show file tree
Hide file tree
Showing 150 changed files with 24,253 additions and 32,323 deletions.
15 changes: 15 additions & 0 deletions metron-analytics/metron-profiler/README.md
Expand Up @@ -538,6 +538,8 @@ The Profiler runs as an independent Storm topology. The configuration for the P
| [`profiler.hbase.batch`](#profilerhbasebatch) | The number of puts that are written to HBase in a single batch.
| [`profiler.hbase.flush.interval.seconds`](#profilerhbaseflushintervalseconds) | The maximum number of seconds between batch writes to HBase.
| [`topology.kryo.register`](#topologykryoregister) | Storm will use Kryo serialization for these classes.
| [`profiler.writer.batchSize`](#profilerwriterbatchsize) | The number of records to batch when writing to Kakfa.
| [`profiler.writer.batchTimeout`](#profilerwriterbatchtimeout) | The timeout in ms for batching when writing to Kakfa.


### `profiler.input.topic`
Expand Down Expand Up @@ -852,6 +854,19 @@ More information on accessing profile data can be found in the [Profiler Client]

More information on using the [`STATS_*` functions in Stellar can be found here](../../metron-platform/metron-common).

### `profiler.writer.batchSize`

*Default*: 15

The number of records to batch when writing to Kakfa. This is managed in the global configuration and does not require a topology restart.

### `profiler.writer.batchTimeout`

*Default*: 0

The timeout after which a batch will be flushed even if batchSize has not been met. Optional. If unspecified, or set to `0`, it defaults to a system-determined duration which is a fraction of the Storm parameter `topology.message.timeout.secs`.
Ignored if batchSize is `1`, since this disables batching. This is managed in the global configuration and does not require a topology restart.

## Implementation

## Key Classes
Expand Down
Expand Up @@ -183,8 +183,9 @@ bolts:
className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
constructorArgs:
- "${kafka.zk}"
- "PROFILER"
configMethods:
- name: "withMessageWriter"
- name: "withBulkMessageWriter"
args: [ref: "kafkaWriter"]

streams:
Expand Down
5 changes: 5 additions & 0 deletions metron-platform/metron-common/README.md
Expand Up @@ -92,9 +92,14 @@ but a convenient index is provided here:
| [`stellar.function.resolver.excludes`](../../metron-stellar/stellar-common#stellarfunctionresolverincludesexcludes) | Stellar | CSV String | N/A |
| [`profiler.period.duration`](../../metron-analytics/metron-profiler#profilerperiodduration) | Profiler | Integer | `profiler_period_duration` |
| [`profiler.period.duration.units`](../../metron-analytics/metron-profiler#profilerperioddurationunits) | Profiler | String | `profiler_period_units` |
| [`profiler.writer.batchSize`](../../metron-analytics/metron-profiler/#profilerwriterbatchsize) | Profiler | Integer | N/A |
| [`profiler.writer.batchTimeout`](../../metron-analytics/metron-profiler/#profilerwriterbatchtimeout) | Profiler | Integer | N/A |
| [`update.hbase.table`](../metron-indexing#updatehbasetable) | REST/Indexing | String | `update_hbase_table` |
| [`update.hbase.cf`](../metron-indexing#updatehbasecf) | REST/Indexing | String | `update_hbase_cf` |
| [`geo.hdfs.file`](../metron-enrichment#geohdfsfile) | Enrichment | String | `geo_hdfs_file` |
| [`enrichment.writer.batchSize`](../metron-enrichment#enrichmentwriterbatchsize) | Enrichment | Integer | N/A |
| [`enrichment.writer.batchTimeout`](../metron-enrichment#enrichmentwriterbatchtimeout) | Enrichment | Integer | N/A |
| [`geo.hdfs.file`](../metron-enrichment#geohdfsfile) | Enrichment | String | `geo_hdfs_file` |
| [`source.type.field`](../../metron-interface/metron-alerts#sourcetypefield) | UI | String | N/A |

## Note Configs in Ambari
Expand Down
Expand Up @@ -28,6 +28,8 @@
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.Configurations;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.writer.ConfigurationStrategy;
import org.apache.metron.common.configuration.writer.ConfigurationsStrategies;
import org.apache.metron.zookeeper.SimpleEventListener;
import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
import org.apache.metron.common.zookeeper.configurations.Reloadable;
Expand All @@ -43,12 +45,14 @@ public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends Ba
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private String zookeeperUrl;
private String configurationStrategy;

protected CuratorFramework client;
protected ZKCache cache;
private final CONFIG_T configurations;
public ConfiguredBolt(String zookeeperUrl) {
public ConfiguredBolt(String zookeeperUrl, String configurationStrategy) {
this.zookeeperUrl = zookeeperUrl;
this.configurationStrategy = configurationStrategy;
this.configurations = createUpdater().defaultConfigurations();
}

Expand All @@ -68,7 +72,13 @@ public CONFIG_T getConfigurations() {
return configurations;
}

protected abstract ConfigurationsUpdater<CONFIG_T> createUpdater();
protected ConfigurationStrategy<CONFIG_T> getConfigurationStrategy() {
return ConfigurationsStrategies.valueOf(configurationStrategy);
}

protected ConfigurationsUpdater<CONFIG_T> createUpdater() {
return getConfigurationStrategy().createUpdater(this, this::getConfigurations);
}


@Override
Expand Down
Expand Up @@ -17,13 +17,8 @@
*/
package org.apache.metron.common.bolt;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.EnrichmentConfigurations;
import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
import org.apache.metron.common.zookeeper.configurations.EnrichmentUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -33,11 +28,7 @@ public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt<Enrichment


public ConfiguredEnrichmentBolt(String zookeeperUrl) {
super(zookeeperUrl);
super(zookeeperUrl, "ENRICHMENT");
}

@Override
protected ConfigurationsUpdater<EnrichmentConfigurations> createUpdater() {
return new EnrichmentUpdater(this, this::getConfigurations);
}
}
Expand Up @@ -17,26 +17,17 @@
*/
package org.apache.metron.common.bolt;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.IndexingConfigurations;
import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
import org.apache.metron.common.zookeeper.configurations.IndexingUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO delete - no longer used after removing from BulkMessageWriterBolt?
public abstract class ConfiguredIndexingBolt extends ConfiguredBolt<IndexingConfigurations> {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public ConfiguredIndexingBolt(String zookeeperUrl) {
super(zookeeperUrl);
}

@Override
protected ConfigurationsUpdater<IndexingConfigurations> createUpdater() {
return new IndexingUpdater(this, this::getConfigurations);
super(zookeeperUrl, "INDEXING");
}

}
Expand Up @@ -17,14 +17,9 @@
*/
package org.apache.metron.common.bolt;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import org.apache.metron.common.configuration.ConfigurationType;
import org.apache.metron.common.configuration.ConfigurationsUtils;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
import org.apache.metron.common.zookeeper.configurations.ParserUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,7 +30,7 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt<ParserConfigur
protected final ParserConfigurations configurations = new ParserConfigurations();
private String sensorType;
public ConfiguredParserBolt(String zookeeperUrl, String sensorType) {
super(zookeeperUrl);
super(zookeeperUrl, "PARSERS");
this.sensorType = sensorType;
}

Expand All @@ -47,10 +42,4 @@ public String getSensorType() {
return sensorType;
}


@Override
protected ConfigurationsUpdater<ParserConfigurations> createUpdater() {
return new ParserUpdater(this, this::getConfigurations);
}

}
Expand Up @@ -21,8 +21,6 @@
import java.lang.invoke.MethodHandles;
import org.apache.metron.common.configuration.profiler.ProfilerConfig;
import org.apache.metron.common.configuration.profiler.ProfilerConfigurations;
import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater;
import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,16 +32,11 @@ public abstract class ConfiguredProfilerBolt extends ConfiguredBolt<ProfilerConf
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public ConfiguredProfilerBolt(String zookeeperUrl) {
super(zookeeperUrl);
super(zookeeperUrl, "PROFILER");
}

protected ProfilerConfig getProfilerConfig() {
return getConfigurations().getProfilerConfig();
}

@Override
protected ConfigurationsUpdater<ProfilerConfigurations> createUpdater() {
return new ProfilerUpdater(this, this::getConfigurations);
}

}
Expand Up @@ -27,8 +27,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,6 +75,11 @@ public void deleteGlobalConfig() {
getConfigurations().remove(ConfigurationType.GLOBAL.getTypeName());
}

public static <T> T getAs(String key, Map<String, Object> map, T defaultValue, Class<T> clazz) {
return map == null ? defaultValue
: ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Expand Up @@ -27,6 +27,9 @@
import java.util.List;

public class EnrichmentConfigurations extends Configurations {
public static final Integer DEFAULT_KAFKA_BATCH_SIZE = 15;
public static final String BATCH_SIZE_CONF = "enrichment.writer.batchSize";
public static final String BATCH_TIMEOUT_CONF = "enrichment.writer.batchTimeout";

public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) {
return (SensorEnrichmentConfig) getConfigurations().get(getKey(sensorType));
Expand All @@ -49,6 +52,28 @@ public void delete(String sensorType) {
getConfigurations().remove(getKey(sensorType));
}

/**
* Pulled from global config.
* Note: enrichment writes out to 1 kafka topic, so it is not pulling this config by sensor.
*
* @return batch size for writing to kafka
* @see org.apache.metron.common.configuration.EnrichmentConfigurations#BATCH_SIZE_CONF
*/
public int getBatchSize() {
return getAs(BATCH_SIZE_CONF, getGlobalConfig(true), DEFAULT_KAFKA_BATCH_SIZE, Integer.class);
}

/**
* Pulled from global config
* Note: enrichment writes out to 1 kafka topic, so it is not pulling this config by sensor.
*
* @return batch timeout for writing to kafka
* @see org.apache.metron.common.configuration.EnrichmentConfigurations#BATCH_TIMEOUT_CONF
*/
public int getBatchTimeout() {
return getAs(BATCH_TIMEOUT_CONF, getGlobalConfig(true), 0, Integer.class);
}

public List<String> getTypes() {
List<String> ret = new ArrayList<>();
for(String keyedSensor : getConfigurations().keySet()) {
Expand All @@ -62,4 +87,5 @@ public List<String> getTypes() {
public static String getKey(String sensorType) {
return ConfigurationType.ENRICHMENT.getTypeName() + "." + sensorType;
}

}
Expand Up @@ -17,13 +17,14 @@
*/
package org.apache.metron.common.configuration;

import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.apache.metron.common.utils.JSONUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.metron.common.utils.JSONUtils;

public class IndexingConfigurations extends Configurations {
public static final String BATCH_SIZE_CONF = "batchSize";
Expand Down Expand Up @@ -226,7 +227,4 @@ public static Map<String, Object> setFieldNameConverter(Map<String, Object> conf
return ret;
}

public static <T> T getAs(String key, Map<String, Object> map, T defaultValue, Class<T> clazz) {
return map == null?defaultValue: ConversionUtils.convert(map.getOrDefault(key, defaultValue), clazz);
}
}
Expand Up @@ -26,6 +26,7 @@
import java.util.List;

public class ParserConfigurations extends Configurations {
public static final Integer DEFAULT_KAFKA_BATCH_SIZE = 15;

public SensorParserConfig getSensorParserConfig(String sensorType) {
return (SensorParserConfig) getConfigurations().get(getKey(sensorType));
Expand Down
Expand Up @@ -19,6 +19,7 @@

import org.apache.metron.stellar.common.StellarAssignment;
import org.apache.metron.stellar.common.StellarProcessor;
import org.apache.metron.stellar.dsl.VariableResolver;
import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -142,8 +143,14 @@ private Map<String, Object> getMessage( Set<String> stellarFields
{

Map<String, Object> messageSegment = new HashMap<>();
for(String variable : stellarFields) {
messageSegment.put(variable, message.get(variable));
if(stellarFields.contains(VariableResolver.ALL_FIELDS)) {
//we need to include all of the fields here.
messageSegment.putAll(message);
}
else {
for (String variable : stellarFields) {
messageSegment.put(variable, message.get(variable));
}
}
return messageSegment;
}
Expand Down
Expand Up @@ -29,6 +29,9 @@
* Used to manage configurations for the Profiler.
*/
public class ProfilerConfigurations extends Configurations {
public static final Integer DEFAULT_KAFKA_BATCH_SIZE = 15;
public static final String BATCH_SIZE_CONF = "profiler.writer.batchSize";
public static final String BATCH_TIMEOUT_CONF = "profiler.writer.batchTimeout";

public ProfilerConfig getProfilerConfig() {
return (ProfilerConfig) getConfigurations().get(getKey());
Expand All @@ -55,4 +58,26 @@ public void delete() {
configurations.remove(getKey());
}

/**
* Pulled from global config.
* Note: profiler writes out to 1 kafka topic, so it is not pulling this config by sensor.
*
* @return batch size for writing to kafka
* @see org.apache.metron.common.configuration.profiler.ProfilerConfigurations#BATCH_SIZE_CONF
*/
public int getBatchSize() {
return getAs(BATCH_SIZE_CONF, getGlobalConfig(true), DEFAULT_KAFKA_BATCH_SIZE, Integer.class);
}

/**
* Pulled from global config
* Note: profiler writes out to 1 kafka topic, so it is not pulling this config by sensor.
*
* @return batch timeout for writing to kafka
* @see org.apache.metron.common.configuration.profiler.ProfilerConfigurations#BATCH_TIMEOUT_CONF
*/
public int getBatchTimeout() {
return getAs(BATCH_TIMEOUT_CONF, getGlobalConfig(true), 0, Integer.class);
}

}

0 comments on commit 9e99bd5

Please sign in to comment.