From 7ef07411d479b5eb0e6d28ee84f60b14c92b70ca Mon Sep 17 00:00:00 2001 From: Aparna Ravindra Date: Fri, 18 Oct 2019 13:26:15 +0530 Subject: [PATCH] setEvictionPolicy and setTriggerPolicy --- .../src/jvm/org/apache/storm/Config.java | 24 +++++++++++++++---- .../storm/topology/WindowedBoltExecutor.java | 17 +++++++++---- .../storm/topology/base/BaseWindowedBolt.java | 11 +++++++++ 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index 4bcb2e3aae5..4811e074f9b 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -29,6 +29,7 @@ import org.apache.storm.policy.IWaitStrategy; import org.apache.storm.serialization.IKryoDecorator; import org.apache.storm.serialization.IKryoFactory; +import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.Utils; import org.apache.storm.validation.ConfigValidation; import org.apache.storm.validation.ConfigValidation.EventLoggerRegistryValidator; @@ -53,6 +54,8 @@ import org.apache.storm.validation.ConfigValidationAnnotations.IsType; import org.apache.storm.validation.ConfigValidationAnnotations.NotNull; import org.apache.storm.validation.ConfigValidationAnnotations.Password; +import org.apache.storm.windowing.EvictionPolicy; +import org.apache.storm.windowing.TriggerPolicy; /** * Topology configs are specified as a plain old map. This class provides a convenient way to create a topology config map by providing @@ -248,14 +251,14 @@ public class Config extends HashMap { */ @IsPositiveNumber(includeZero = true) public static final String TOPOLOGY_METRICS_CONSUMER_RESOURCES_ONHEAP_MEMORY_MB = - "topology.metrics.consumer.resources.onheap.memory.mb"; + "topology.metrics.consumer.resources.onheap.memory.mb"; /** * The maximum amount of memory an instance of a metrics consumer will take off heap. This enables the scheduler to allocate slots on * machines with enough available memory. A default value will be set for this config if user does not override */ @IsPositiveNumber(includeZero = true) public static final String TOPOLOGY_METRICS_CONSUMER_RESOURCES_OFFHEAP_MEMORY_MB = - "topology.metrics.consumer.resources.offheap.memory.mb"; + "topology.metrics.consumer.resources.offheap.memory.mb"; /** * The config indicates the percentage of cpu for a core an instance(executor) of a metrics consumer will use. Assuming the a core value * to be 100, a value of 10 indicates 10% of the core. The P in PCORE represents the term "physical". A default value will be set for @@ -857,7 +860,7 @@ public class Config extends HashMap { @NotNull @IsPositiveNumber(includeZero = true) public static final String TOPOLOGY_BACKPRESSURE_WAIT_PROGRESSIVE_LEVEL3_SLEEP_MILLIS = - "topology.backpressure.wait.progressive.level3.sleep.millis"; + "topology.backpressure.wait.progressive.level3.sleep.millis"; /** * Configures steps used to determine progression to the next level of wait .. if using WaitStrategyProgressive for BackPressure. */ @@ -1461,7 +1464,7 @@ public class Config extends HashMap { * Impersonation user ACL config entries. */ @IsMapEntryCustom(keyValidatorClasses = { ConfigValidation.StringValidator.class }, - valueValidatorClasses = { ConfigValidation.ImpersonationAclUserEntryValidator.class }) + valueValidatorClasses = { ConfigValidation.ImpersonationAclUserEntryValidator.class }) public static final String NIMBUS_IMPERSONATION_ACL = "nimbus.impersonation.acl"; /** * A whitelist of the RAS scheduler strategies allowed by nimbus. Should be a list of fully-qualified class names or null to allow all. @@ -1676,6 +1679,17 @@ public class Config extends HashMap { //DO NOT CHANGE UNLESS WE ADD IN STATE NOT STORED IN THE PARENT CLASS private static final long serialVersionUID = -1550278723792864455L; + /** + * Specify the trigger policy for the window*/ + @IsType(type = TriggerPolicy.class) + public static final String TOPOLOGY_BOLTS_WINDOW_TRIGGER_POLICY = "topology.bolts.window.trigger.policy"; + + /** + * Specify the eviction policy for the window + */ + @IsType(type = EvictionPolicy.class) + public static final String TOPOLOGY_BOLTS_WINDOW_EVICTION_POLICY = "topology.bolts.window.eviction.policy"; + public static void setClasspath(Map conf, String cp) { conf.put(Config.TOPOLOGY_CLASSPATH, cp); } @@ -1938,7 +1952,7 @@ public void setTopologyComponentWorkerConstraints(String component1, String comp if (component1 != null && component2 != null) { List constraintPair = Arrays.asList(component1, component2); List> constraints = (List>) computeIfAbsent(Config.TOPOLOGY_RAS_CONSTRAINTS, - (k) -> new ArrayList<>(1)); + (k) -> new ArrayList<>(1)); constraints.add(constraintPair); } } diff --git a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java index e6a12d9bdf2..8adde9af6f5 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java +++ b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java @@ -205,10 +205,19 @@ private WindowManager initWindowManager(WindowLifecycleListener li } // validate validate(topoConf, windowLengthCount, windowLengthDuration, - slidingIntervalCount, slidingIntervalDuration); - evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration); - triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration, - manager, evictionPolicy); + slidingIntervalCount, slidingIntervalDuration); + if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_EVICTION_POLICY) && topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_EVICTION_POLICY) instanceof EvictionPolicy ) { + evictionPolicy = (EvictionPolicy)topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_EVICTION_POLICY); + + }else{ + evictionPolicy = getEvictionPolicy(windowLengthCount, windowLengthDuration); + } + if (topoConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_TRIGGER_POLICY) && topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_TRIGGER_POLICY) instanceof TriggerPolicy ) { + triggerPolicy = (TriggerPolicy) topoConf.get(Config.TOPOLOGY_BOLTS_WINDOW_TRIGGER_POLICY); + }else{ + triggerPolicy = getTriggerPolicy(slidingIntervalCount, slidingIntervalDuration, + manager, evictionPolicy); + } manager.setEvictionPolicy(evictionPolicy); manager.setTriggerPolicy(triggerPolicy); return manager; diff --git a/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java b/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java index f411e87dbdd..a2310908875 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java +++ b/storm-client/src/jvm/org/apache/storm/topology/base/BaseWindowedBolt.java @@ -22,7 +22,10 @@ import org.apache.storm.topology.IWindowedBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TupleFieldTimestampExtractor; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.EvictionPolicy; import org.apache.storm.windowing.TimestampExtractor; +import org.apache.storm.windowing.TriggerPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,6 +229,14 @@ public BaseWindowedBolt withWatermarkInterval(Duration interval) { windowConfiguration.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, interval.value); return this; } + public BaseWindowedBolt withTriggerPolicy(TriggerPolicy triggerPolicy) { + windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_TRIGGER_POLICY, triggerPolicy); + return this; + } + public BaseWindowedBolt withEvictionPolicy(EvictionPolicy evictionPolicy){ + windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_EVICTION_POLICY, evictionPolicy); + return this; + } @Override public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) {