Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.serialization.IKryoDecorator;
import org.apache.storm.serialization.IKryoFactory;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidation;
import org.apache.storm.validation.ConfigValidation.EventLoggerRegistryValidator;
Expand All @@ -53,6 +54,8 @@
import org.apache.storm.validation.ConfigValidationAnnotations.IsType;
import org.apache.storm.validation.ConfigValidationAnnotations.NotNull;
import org.apache.storm.validation.ConfigValidationAnnotations.Password;
import org.apache.storm.windowing.EvictionPolicy;
import org.apache.storm.windowing.TriggerPolicy;

/**
* Topology configs are specified as a plain old map. This class provides a convenient way to create a topology config map by providing
Expand Down Expand Up @@ -248,14 +251,14 @@ public class Config extends HashMap<String, Object> {
*/
@IsPositiveNumber(includeZero = true)
public static final String TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB =
"topology.metrics.consumer.resources.onheap.memory.mb";
"topology.metrics.consumer.resources.onheap.memory.mb";
/**
* The maximum amount of memory an instance of a metrics consumer will take off heap. This enables the scheduler to allocate slots on
* machines with enough available memory. A default value will be set for this config if user does not override
*/
@IsPositiveNumber(includeZero = true)
public static final String TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB =
"topology.metrics.consumer.resources.offheap.memory.mb";
"topology.metrics.consumer.resources.offheap.memory.mb";
/**
* The config indicates the percentage of cpu for a core an instance(executor) of a metrics consumer will use. Assuming the a core value
* to be 100, a value of 10 indicates 10% of the core. The P in PCORE represents the term "physical". A default value will be set for
Expand Down Expand Up @@ -857,7 +860,7 @@ public class Config extends HashMap<String, Object> {
@NotNull
@IsPositiveNumber(includeZero = true)
public static final String TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS =
"topology.backpressure.wait.progressive.level3.sleep.millis";
"topology.backpressure.wait.progressive.level3.sleep.millis";
/**
* Configures steps used to determine progression to the next level of wait .. if using WaitStrategyProgressive for BackPressure.
*/
Expand Down Expand Up @@ -1461,7 +1464,7 @@ public class Config extends HashMap<String, Object> {
* Impersonation user ACL config entries.
*/
@IsMapEntryCustom(keyValidatorClasses = { ConfigValidation.StringValidator.class },
valueValidatorClasses = { ConfigValidation.ImpersonationAclUserEntryValidator.class })
valueValidatorClasses = { ConfigValidation.ImpersonationAclUserEntryValidator.class })
public static final String NIMBUS_IMPERSONATION_ACL = "nimbus.impersonation.acl";
/**
* A whitelist of the RAS scheduler strategies allowed by nimbus. Should be a list of fully-qualified class names or null to allow all.
Expand Down Expand Up @@ -1676,6 +1679,17 @@ public class Config extends HashMap<String, Object> {
//DO NOT CHANGE UNLESS WE ADD IN STATE NOT STORED IN THE PARENT CLASS
private static final long serialVersionUID = -1550278723792864455L;

/**
* Specify the trigger policy for the window*/
@IsType(type = TriggerPolicy.class)
public static final String TOPOLOGY_BOLTS_WINDOW_TRIGGER_POLICY = "topology.bolts.window.trigger.policy";

/**
* Specify the eviction policy for the window
*/
@IsType(type = EvictionPolicy.class)
public static final String TOPOLOGY_BOLTS_WINDOW_EVICTION_POLICY = "topology.bolts.window.eviction.policy";

public static void setClasspath(Map<String, Object> conf, String cp) {
conf.put(Config.TOPOLOGY_CLASSPATH, cp);
}
Expand Down Expand Up @@ -1938,7 +1952,7 @@ public void setTopologyComponentWorkerConstraints(String component1, String comp
if (component1 != null && component2 != null) {
List<String> constraintPair = Arrays.asList(component1, component2);
List<List<String>> constraints = (List<List<String>>) computeIfAbsent(Config.TOPOLOGY_RAS_CONSTRAINTS,
(k) -> new ArrayList<>(1));
(k) -> new ArrayList<>(1));
constraints.add(constraintPair);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,19 @@ private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> li
}
// validate
validate(topoConf, windowLengthCount, windowLengthDuration,
slidingIntervalCount, slidingIntervalDuration);
evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
manager, evictionPolicy);
slidingIntervalCount, slidingIntervalDuration);
if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_EVICTION_POLICY) && topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_EVICTION_POLICY) instanceof EvictionPolicy ) {
evictionPolicy = (EvictionPolicy<Tuple, ?>)topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_EVICTION_POLICY);

}else{
evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration);
}
if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_TRIGGER_POLICY) && topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_TRIGGER_POLICY) instanceof TriggerPolicy ) {
triggerPolicy = (TriggerPolicy<Tuple,?>) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_TRIGGER_POLICY);
}else{
triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration,
manager, evictionPolicy);
}
manager.setEvictionPolicy(evictionPolicy);
manager.setTriggerPolicy(triggerPolicy);
return manager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import org.apache.storm.topology.IWindowedBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TupleFieldTimestampExtractor;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.EvictionPolicy;
import org.apache.storm.windowing.TimestampExtractor;
import org.apache.storm.windowing.TriggerPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -226,6 +229,14 @@ public BaseWindowedBolt withWatermarkInterval(Duration interval) {
windowConfiguration.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, interval.value);
return this;
}
public BaseWindowedBolt withTriggerPolicy(TriggerPolicy<Tuple, ?> triggerPolicy) {
windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_TRIGGER_POLICY, triggerPolicy);
return this;
}
public BaseWindowedBolt withEvictionPolicy(EvictionPolicy<Tuple, ?> evictionPolicy){
windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_EVICTION_POLICY, evictionPolicy);
return this;
}

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
Expand Down