From 9b2500490a90d8711377719a745daf850d5ce728 Mon Sep 17 00:00:00 2001 From: Asiri Liyana Arachchi Date: Mon, 18 Aug 2014 12:04:50 +0530 Subject: [PATCH 01/10] instance based Auto Scaling --- .../autoscaler/NetworkPartitionContext.java | 33 +++++++++++- .../AutoscalerHealthStatEventReceiver.java | 54 ++++++++++++++++++- .../autoscaler/monitor/ClusterMonitor.java | 2 + .../LoadBalancerStatisticsReader.java | 5 ++ .../LoadBalancerStatisticsNotifier.java | 11 +++- .../WSO2CEPInFlightRequestPublisher.java | 7 ++- .../LoadBalancerStatisticsCollector.java | 38 +++++++++++++ .../stat/AverageRequestsInFlightEvent.java | 11 +++- .../AverageRuestsServingCapabilityEvent.java | 31 +++++++++++ ...equestsServingCapabilityEventListener.java | 14 +++++ ...estsServingCapabilityMessageProcessor.java | 49 +++++++++++++++++ .../stat/HealthStatMessageProcessorChain.java | 5 ++ .../AverageInFlightRequestsEventFormatter.xml | 2 +- .../AverageServedRequestsEventFormatter.xml | 32 +++++++++++ .../AverageInFlightRequestsFinder.xml | 15 ++++-- .../stream-manager-config.xml | 20 ++++++- ...uestHandlingCapabilityWindowProcessor.java | 49 +++++++++++++++++ .../distribution/src/main/conf/scaling.drl | 25 +++++++-- 18 files changed, 390 insertions(+), 13 deletions(-) create mode 100644 components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRuestsServingCapabilityEvent.java create mode 100644 components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/health/stat/AverageRequestsServingCapabilityEventListener.java create mode 100644 components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/AverageRequestsServingCapabilityMessageProcessor.java create mode 100644 extensions/cep/artifacts/eventformatters/AverageServedRequestsEventFormatter.xml create mode 100644 extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java index bb9bd6053a..cc133a435a 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java @@ -41,6 +41,8 @@ public class NetworkPartitionContext implements Serializable{ private boolean scaleDownAllowed = false; private int scaleDownWaitCount = 5; //TODO get from a config private int scaleDownRequestsCount = 0; + private float averageRequestsServedPerInstance; + private float requestsServedPerInstance; // private String defaultLbClusterId; // @@ -59,6 +61,8 @@ public class NetworkPartitionContext implements Serializable{ private boolean loadAverageReset = false, averageLoadAverageReset = false, gradientLoadAverageReset = false, secondDerivativeLoadAverageRest = false; + //boolean values to keep whether average requests served per instance parameters are reset or not + private boolean averageRequestServedPerInstanceReset= false; //FIXME this should be populated via PartitionGroups a.k.a. NetworkPartitions private int minInstanceCount = 1, maxInstanceCount = 1; @@ -237,12 +241,32 @@ public void setCurrentPartitionIndex(int currentPartitionIndex) { this.currentPartitionIndex = currentPartitionIndex; } + + + public float getAverageRequestsServedPerInstance() { return averageRequestsServedPerInstance;} + + public void setAverageRequestsServedPerInstance(float averageRequestServedPerInstance) { + this.averageRequestsServedPerInstance = averageRequestServedPerInstance; + averageRequestServedPerInstanceReset = true; + + if(log.isDebugEnabled()){ + log.debug(String.format("Average Requesets Served Per Instance stats are reset, ready to do scale check [network partition] %s" + , this.id)); + + } + } + + + + public float getRequestsServedPerInstance() { return requestsServedPerInstance;} + public float getAverageRequestsInFlight() { return requestsInFlight.getAverage(); } - public void setAverageRequestsInFlight(float averageRequestsInFlight) { + public void setAverageRequestsInFlight(float averageRequestsInFlight, float requestsServedPerInstance) { requestsInFlight.setAverage(averageRequestsInFlight); + this.requestsServedPerInstance = requestsServedPerInstance; averageRifReset = true; if(secondDerivativeRifRest && gradientRifReset){ rifReset = true; @@ -285,6 +309,13 @@ public void setRequestsInFlightGradient(float requestsInFlightGradient) { } } + public boolean isAverageRequestServedPerInstanceReset() {return averageRequestServedPerInstanceReset;} + + public void setAverageRequestServedPerInstanceReset(boolean averageRequestServedPerInstanceReset) { + this.averageRequestServedPerInstanceReset = averageRequestServedPerInstanceReset; + + } + public boolean isRifReset() { return rifReset; } diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java index 2bcfb52cf6..d7521e2cec 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java @@ -172,6 +172,13 @@ protected void onEvent(org.apache.stratos.messaging.event.Event event) { String clusterId = e.getClusterId(); String networkPartitionId = e.getNetworkPartitionId(); Float floatValue = e.getValue(); + Float servedCount = e.getServedCount(); + Float activeInstances = e.getActiveInstances(); + Float requestsServedPerInstance = servedCount/activeInstances; + if(requestsServedPerInstance.isInfinite()){ + requestsServedPerInstance = 0f; + } + if (log.isDebugEnabled()) { @@ -194,7 +201,7 @@ protected void onEvent(org.apache.stratos.messaging.event.Event event) { if(null != monitor){ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); if(null != networkPartitionContext){ - networkPartitionContext.setAverageRequestsInFlight(floatValue); + networkPartitionContext.setAverageRequestsInFlight(floatValue, requestsServedPerInstance); } else { if(log.isDebugEnabled()) { log.debug(String.format("Network partition context is not available for :" + @@ -204,6 +211,51 @@ protected void onEvent(org.apache.stratos.messaging.event.Event event) { } } + }); + healthStatEventReceiver.addEventListener(new AverageRequestsServingCapabilityEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + + AverageRuestsServingCapabilityEvent e = (AverageRuestsServingCapabilityEvent) event; + String clusterId = e.getClusterId(); + String networkPartitionId = e.getNetworkPartitionId(); + Float floatValue = e.getValue(); + + + log.info("[AverageRequestsServingCapabilityEventListener]" + + " ========= "+floatValue); + + if (log.isDebugEnabled()) { + log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s [value] %s", + clusterId, networkPartitionId, floatValue)); + } + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractMonitor monitor; + + if(asCtx.monitorExist(clusterId)){ + monitor = asCtx.getMonitor(clusterId); + }else if(asCtx.lbMonitorExist(clusterId)){ + monitor = asCtx.getLBMonitor(clusterId); + }else{ + if(log.isDebugEnabled()){ + log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); + } + return; + } + if(null != monitor){ + NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); + if(null != networkPartitionContext){ + networkPartitionContext.setAverageRequestsServedPerInstance(floatValue); + + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + } + }); healthStatEventReceiver.addEventListener(new GradientOfLoadAverageEventListener() { @Override diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java index 5bb478e94f..2a80db352f 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java @@ -156,6 +156,7 @@ private void monitor() { boolean rifReset = networkPartitionContext.isRifReset(); boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset(); boolean loadAverageReset = networkPartitionContext.isLoadAverageReset(); + boolean averageRequestServedPerInstanceReset = networkPartitionContext.isAverageRequestServedPerInstanceReset(); if (log.isDebugEnabled()) { log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset + " flag of loadAverageReset" + loadAverageReset); @@ -167,6 +168,7 @@ private void monitor() { scaleCheckKnowledgeSession.setGlobal("rifReset", rifReset); scaleCheckKnowledgeSession.setGlobal("mcReset", memoryConsumptionReset); scaleCheckKnowledgeSession.setGlobal("laReset", loadAverageReset); + scaleCheckKnowledgeSession.setGlobal("arspiReset", averageRequestServedPerInstanceReset); scaleCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType); scaleCheckKnowledgeSession.setGlobal("isPrimary", false); scaleCheckKnowledgeSession.setGlobal("primaryMembers", primaryMemberListInNetworkPartition); diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java index 4a83aee69f..79386bd652 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java @@ -19,6 +19,8 @@ package org.apache.stratos.load.balancer.common.statistics; +import org.apache.stratos.messaging.domain.topology.Cluster; + /** * Load balancer statistics reader interface. */ @@ -29,4 +31,7 @@ public interface LoadBalancerStatisticsReader { * @param clusterId */ int getInFlightRequestCount(String clusterId); + int getServedRequestCount(String clusterId); + int getActiveInstancesCount(Cluster cluster); + } diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java index 4fe2504b74..7d123e5060 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java @@ -74,12 +74,21 @@ public void run() { try { TopologyManager.acquireReadLock(); int requestCount; + int servedRequestCount; + int activeInstancesCount; for (Service service : TopologyManager.getTopology().getServices()) { for (Cluster cluster : service.getClusters()) { if (!cluster.isLbCluster()) { // Publish in-flight request count of load balancer's network partition requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId()); - inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, requestCount); + servedRequestCount = statsReader.getServedRequestCount(cluster.getClusterId()); + if(requestCount == 0) { + servedRequestCount = 0; + } + activeInstancesCount = statsReader.getActiveInstancesCount(cluster); + inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId,activeInstancesCount, requestCount, servedRequestCount); + log.info(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d [active instances] %d [RIF] %d ", + cluster.getClusterId(), networkPartitionId, servedRequestCount , activeInstancesCount ,requestCount )); if (log.isDebugEnabled()) { log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d", cluster.getClusterId(), networkPartitionId, requestCount)); diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java index 519a687b80..24d5257c15 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java @@ -47,7 +47,9 @@ private static StreamDefinition createStreamDefinition() { // Payload definition payloadData.add(new Attribute("cluster_id", AttributeType.STRING)); payloadData.add(new Attribute("network_partition_id", AttributeType.STRING)); + payloadData.add(new Attribute("active_instances_count", AttributeType.DOUBLE)); payloadData.add(new Attribute("in_flight_request_count", AttributeType.DOUBLE)); + payloadData.add(new Attribute("served_request_count", AttributeType.DOUBLE)); streamDefinition.setPayloadData(payloadData); return streamDefinition; } catch (Exception e) { @@ -65,13 +67,16 @@ public WSO2CEPInFlightRequestPublisher() { * @param clusterId * @param networkPartitionId * @param inFlightRequestCount + * @param servedRequestCount */ - public void publish(String clusterId, String networkPartitionId, int inFlightRequestCount) { + public void publish(String clusterId, String networkPartitionId,int activeInstancesCount, int inFlightRequestCount, int servedRequestCount) { List payload = new ArrayList(); // Payload values payload.add(clusterId); payload.add(networkPartitionId); + payload.add((double)activeInstancesCount); payload.add((double)inFlightRequestCount); + payload.add((double)servedRequestCount); super.publish(payload.toArray()); } } diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java index 3557d3a926..556d2fe301 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java @@ -22,6 +22,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -35,9 +37,11 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe private static volatile LoadBalancerStatisticsCollector instance; // Map private Map clusterIdRequestCountMap; + private Map clusterIdServedRequestCountMap; private LoadBalancerStatisticsCollector() { clusterIdRequestCountMap = new ConcurrentHashMap(); + clusterIdServedRequestCountMap = new ConcurrentHashMap(); } public static LoadBalancerStatisticsCollector getInstance() { @@ -75,6 +79,33 @@ public int getInFlightRequestCount(String clusterId) { } } + /** + * Returns the number of requests served since the last time this function was called. + */ + public int getServedRequestCount(String clusterId){ + synchronized (LoadBalancerStatisticsCollector.class) { + if (clusterIdServedRequestCountMap.containsKey(clusterId)) { + Integer servedCount = clusterIdRequestCountMap.get(clusterId); + if (servedCount != null) { + clusterIdServedRequestCountMap.put(clusterId, 0); + return servedCount; + } + } + return 0; + } + } + + public int getActiveInstancesCount(Cluster cluster) { + int activeInstances = 0; + for( Member member :cluster.getMembers()){ + if(member.isActive()){ + activeInstances++; + } + + }return activeInstances; + + } + void incrementInFlightRequestCount(String clusterId) { synchronized (LoadBalancerStatisticsCollector.class) { if (StringUtils.isBlank(clusterId)) { @@ -118,6 +149,13 @@ void decrementInFlightRequestCount(String clusterId) { } clusterIdRequestCountMap.put(clusterId, count); + Integer servedCount = 0; + if (clusterIdServedRequestCountMap.containsKey(clusterId)) { + servedCount = clusterIdServedRequestCountMap.get(clusterId); + } + servedCount++; + clusterIdServedRequestCountMap.put(clusterId, servedCount); + if (log.isDebugEnabled()) { log.debug(String.format("In-flight request count decremented: [cluster] %s [count] %s ", clusterId, count)); diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java index 143ae03ce7..0e10af9cf5 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java @@ -28,11 +28,15 @@ public class AverageRequestsInFlightEvent extends Event { private final String networkPartitionId; private final String clusterId; private final float value; + private final float servedCount; + private final float activeInstances; - public AverageRequestsInFlightEvent(String networkPartitionId, String clusterId, float value) { + public AverageRequestsInFlightEvent(String networkPartitionId, String clusterId,float activeInstances, float value, float servedCount ) { this.networkPartitionId = networkPartitionId; this.clusterId = clusterId; this.value = value; + this.servedCount = servedCount; + this.activeInstances = activeInstances; } @@ -47,4 +51,9 @@ public float getValue() { public String getNetworkPartitionId() { return networkPartitionId; } + + public float getServedCount() { return servedCount;} + + public float getActiveInstances() { return activeInstances;} + } diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRuestsServingCapabilityEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRuestsServingCapabilityEvent.java new file mode 100644 index 0000000000..da508f7c96 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRuestsServingCapabilityEvent.java @@ -0,0 +1,31 @@ +package org.apache.stratos.messaging.event.health.stat; + +/** + * Created by asiri on 8/10/14. + */ +import org.apache.stratos.messaging.event.Event; +public class AverageRuestsServingCapabilityEvent extends Event{ + private final String networkPartitionId; + private final String clusterId; + private final float value; + + public AverageRuestsServingCapabilityEvent(String networkPartitionId, String clusterId, float value) { + this.networkPartitionId = networkPartitionId; + this.clusterId = clusterId; + this.value = value; + + } + public String getClusterId() { + return clusterId; + } + + public float getValue() { + return value; + } + + public String getNetworkPartitionId() { + return networkPartitionId; + } + + +} diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/health/stat/AverageRequestsServingCapabilityEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/health/stat/AverageRequestsServingCapabilityEventListener.java new file mode 100644 index 0000000000..3b7916404e --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/health/stat/AverageRequestsServingCapabilityEventListener.java @@ -0,0 +1,14 @@ +package org.apache.stratos.messaging.listener.health.stat; + +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.listener.EventListener; + +/** + * Created by asiri on 8/10/14. + */ +public class AverageRequestsServingCapabilityEventListener extends EventListener { + @Override + protected void onEvent(Event event) { + + } +} diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/AverageRequestsServingCapabilityMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/AverageRequestsServingCapabilityMessageProcessor.java new file mode 100644 index 0000000000..299e4805d2 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/AverageRequestsServingCapabilityMessageProcessor.java @@ -0,0 +1,49 @@ +package org.apache.stratos.messaging.message.processor.health.stat; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.health.stat.AverageRuestsServingCapabilityEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +/** + * Created by asiri on 8/15/14. + */ +public class AverageRequestsServingCapabilityMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(AverageRequestsServingCapabilityMessageProcessor.class); + + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (AverageRuestsServingCapabilityEvent.class.getName().equals(type)) { + + // Parse complete message and build event + AverageRuestsServingCapabilityEvent event = (AverageRuestsServingCapabilityEvent) Util.jsonToObject(message, AverageRuestsServingCapabilityEvent.class); + + // Notify event listeners + notifyEventListeners(event); + + if(log.isDebugEnabled()){ + log.debug(String.format("%s event processor notified listeners ... " , type)); + } + return true; + } + else { + if(nextProcessor != null) { + return nextProcessor.process(type, message, object); + } + else { + throw new RuntimeException(String.format("Failed to process health stat message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + + +} diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java index f9861f6e49..4da35a9bb8 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java @@ -44,6 +44,7 @@ public class HealthStatMessageProcessorChain extends MessageProcessorChain { private MemberGradientOfMemoryConsumptionMessageProcessor memberGradientOfMemoryConsumptionMessageProcessor; private MemberSecondDerivativeOfLoadAverageMessageProcessor memberSecondDerivativeOfLoadAverageMessageProcessor; private MemberSecondDerivativeOfMemoryConsumptionMessageProcessor memberSecondDerivativeOfMemoryConsumptionMessageProcessor; + private AverageRequestsServingCapabilityMessageProcessor averageRequestsServingCapabilityMessageProcessor; private MemberFaultMessageProcessor memberFaultMessageProcessor; @@ -66,6 +67,8 @@ protected void initialize() { averageRequestsInFlightMessageProcessor = new AverageRequestsInFlightMessageProcessor(); add(averageRequestsInFlightMessageProcessor); + averageRequestsServingCapabilityMessageProcessor = new AverageRequestsServingCapabilityMessageProcessor(); + add(averageRequestsServingCapabilityMessageProcessor); gradientOfRequestsInFlightMessageProcessor = new GradientOfRequestsInFlightMessageProcessor(); add(gradientOfRequestsInFlightMessageProcessor); secondDerivativeOfRequestsInFlightMessageProcessor = new SecondDerivativeOfRequestsInFlightMessageProcessor(); @@ -97,6 +100,8 @@ public void addEventListener(EventListener eventListener) { averageMemoryConsumptionMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof AverageRequestsInFlightEventListener) { averageRequestsInFlightMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof AverageRequestsServingCapabilityEventListener) { + averageRequestsServingCapabilityMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof GradientOfLoadAverageEventListener) { gradientOfLoadAverageMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof GradientOfMemoryConsumptionEventListener) { diff --git a/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml b/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml index 6e6ba967a2..84e95ae34c 100644 --- a/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml +++ b/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml @@ -24,7 +24,7 @@ statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventformatter"> - {"org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent":{"message":{"clusterId":"{{cluster_id}}","networkPartitionId":"{{network_partition_id}}","value":"{{count}}"}}} + {"org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent":{"message":{"clusterId":"{{cluster_id}}","networkPartitionId":"{{network_partition_id}}","activeInstances":"{{instances_count}}","value":"{{count}}" ,"servedCount":"{{served_count}}"}}} summarized-health-stats diff --git a/extensions/cep/artifacts/eventformatters/AverageServedRequestsEventFormatter.xml b/extensions/cep/artifacts/eventformatters/AverageServedRequestsEventFormatter.xml new file mode 100644 index 0000000000..5de01286e3 --- /dev/null +++ b/extensions/cep/artifacts/eventformatters/AverageServedRequestsEventFormatter.xml @@ -0,0 +1,32 @@ + + + + + + + {"org.apache.stratos.messaging.event.health.stat.AverageRuestsServingCapabilityEvent":{"message":{"clusterId":"{{cluster_id}}","networkPartitionId":"{{network_partition_id}}","value":"{{average_served_count}}"}}} + + + summarized-health-stats + + diff --git a/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml b/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml index 6826dab89b..a8e890fd89 100644 --- a/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml +++ b/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml @@ -32,16 +32,25 @@ + partition by avg_rif_cluster_partition; + from average_in_flight_requests + select cluster_id, network_partition_id,instances_count, served_count, + stratos:divider(served_count , instances_count) as requests_per_instance + insert into served_requests_per_instance; + from served_requests_per_instance[requests_per_instance>0]#window.timeBatch(10 min) + select cluster_id,network_partition_id, avg(requests_per_instance) as average_served_count + insert into average_served_request_count;]]> + diff --git a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml index 64b5d85e3b..a9472efed7 100644 --- a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml +++ b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml @@ -32,7 +32,9 @@ + + @@ -60,9 +62,25 @@ + + - + + + + average served count per instance + average in-flight requests + + + + + + + + + + second derivative of in-flight request count diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java new file mode 100644 index 0000000000..c2536a53ce --- /dev/null +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java @@ -0,0 +1,49 @@ +package org.apache.stratos.cep.extension; + +/** + * Created by asiri on 8/9/14. + */ + +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +@SiddhiExtension(namespace = "stratos", function = "divider") +public class MemeberRequestHandlingCapabilityWindowProcessor extends FunctionExecutor { + + Attribute.Type returnType = Attribute.Type.DOUBLE; + + @Override + public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { + } + + @Override + protected Object process(Object obj) { + + double[] value = new double[2]; + if (obj instanceof Object[]) { + int i=0; + for (Object aObj : (Object[]) obj) { + value[i]= Double.parseDouble(String.valueOf(aObj)); + i++; + } + }//to do avoid deviding zero number of active instances won't be zero cz there is min + Double unit = (value[0] / value[1]); + if(!unit.isNaN() && !unit.isInfinite()) + return unit; + else + return 0.0; + + } + + @Override + public void destroy() { + + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } +} diff --git a/products/stratos/modules/distribution/src/main/conf/scaling.drl b/products/stratos/modules/distribution/src/main/conf/scaling.drl index bd331e7515..b123c9dd8f 100644 --- a/products/stratos/modules/distribution/src/main/conf/scaling.drl +++ b/products/stratos/modules/distribution/src/main/conf/scaling.drl @@ -55,6 +55,7 @@ global java.lang.Boolean mcReset; global java.lang.Boolean laReset; global java.lang.Boolean isPrimary; global java.util.List primaryMembers; +global java.lang.Boolean arspiReset; rule "Scaling Rule" dialect "mvel" @@ -94,8 +95,22 @@ dialect "mvel" loadAverageSecondDerivative : Float() from $networkPartitionContext.getLoadAverageSecondDerivative() laPredictedValue : Double() from $delegator.getPredictedValueForNextMinute(loadAverageAverage, loadAverageGradient, loadAverageSecondDerivative, 1) - scaleUp : Boolean() from ((rifReset && (rifPredictedValue > rifUpperLimit)) || (mcReset && (mcPredictedValue > mcUpperLimit)) || (laReset && (laPredictedValue > laUpperLimit))) - scaleDown : Boolean() from ((rifReset && (rifPredictedValue < rifLowerLimit )) && (mcReset && (mcPredictedValue < mcLowerLimit)) && (laReset && (laPredictedValue < laLowerLimit))) + activeInstancesCount : Integer() from $delegator.getMemberCount(clusterId , 0) + instancesCount : Integer() from $delegator.getMemberCount(clusterId , 1) + + requestsServedPerInstance : Float() from $networkPartitionContext.getRequestsServedPerInstance() + averageRequestsServedPerInstance : Float() from $networkPartitionContext.getAverageRequestsServedPerInstance() + + numberOfInstancesReuquiredBasedOnRif : Integer() from $delegator.getNumberOfInstancesRequiredBasedOnRif(rifPredictedValue, requestsServedPerInstance, averageRequestsServedPerInstance, arspiReset) + numberOfInstancesReuquiredBasedOnMemoryConsumption : Integer() from $delegator.getNumberOfInstancesRequiredBasedOnLoadAndMemoryConsumption(mcUpperLimit , mcLowerLimit, mcPredictedValue ,activeInstancesCount ) + numberOfInstancesReuquiredBasedOnLoadAverage : Integer() from $delegator.getNumberOfInstancesRequiredBasedOnLoadAndMemoryConsumption(laUpperLimit , laLowerLimit, laPredictedValue ,activeInstancesCount ) + + numberOfRequiredInstances : Integer() from $delegator.getMaxNumberOfInstancesRequired(numberOfInstancesReuquiredBasedOnRif, numberOfInstancesReuquiredBasedOnMemoryConsumption ,mcReset ,numberOfInstancesReuquiredBasedOnLoadAverage, laReset) + + + + scaleUp : Boolean() from (instancesCount < numberOfRequiredInstances ) + scaleDown : Boolean() from (activeInstancesCount > numberOfRequiredInstances ) eval(log.debug("[scaling] " + " [cluster] " + clusterId + " RIF Resetted?: " + rifReset)) eval(log.debug("[scaling] " + " [cluster] " + clusterId + " RIF predicted value: " + rifPredictedValue)) @@ -116,19 +131,23 @@ dialect "mvel" then if(scaleUp){ + int additionalInstances = numberOfRequiredInstances - instancesCount ; $networkPartitionContext.resetScaleDownRequestsCount(); + int count=0; + while(count != additionalInstances){ Partition partition = autoscaleAlgorithm.getNextScaleUpPartition($networkPartitionContext, clusterId); if(partition != null){ log.info("[scale-up] Partition available, hence trying to spawn an instance to scale up!" ); log.debug("[scale-up] " + " [partition] " + partition.getId() + " [cluster] " + clusterId ); $delegator.delegateSpawn($networkPartitionContext.getPartitionCtxt(partition.getId()), clusterId, lbRef, isPrimary); + count++; } + } } else if(scaleDown){ log.debug("[scale-down] Decided to Scale down [cluster] " + clusterId); if($networkPartitionContext.getScaleDownRequestsCount() > 5 ){ log.debug("[scale-down] Reached scale down requests threshold [cluster] " + clusterId + " Count " + $networkPartitionContext.getScaleDownRequestsCount()); - $networkPartitionContext.resetScaleDownRequestsCount(); MemberStatsContext selectedMemberStatsContext = null; double lowestOverallLoad = 0.0; boolean foundAValue = false; From 551f696b60f11c59eedb19fd21196b5c990b629e Mon Sep 17 00:00:00 2001 From: Asiri Liyana Arachchi Date: Mon, 18 Aug 2014 13:22:54 +0530 Subject: [PATCH 02/10] missing delegator --- .../autoscaler/rule/RuleTasksDelegator.java | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java index 94def43777..c78c5c033e 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java @@ -33,6 +33,11 @@ import org.apache.stratos.autoscaler.client.cloud.controller.InstanceNotificationClient; import org.apache.stratos.autoscaler.partition.PartitionManager; import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; +import org.apache.stratos.messaging.domain.topology.MemberStatus; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; /** * This will have utility methods that need to be executed from rule file... @@ -41,6 +46,7 @@ public class RuleTasksDelegator { public static final double SCALE_UP_FACTOR = 0.8; //get from config public static final double SCALE_DOWN_FACTOR = 0.2; + private static boolean arspiIsSet = false; private static final Log log = LogFactory.getLog(RuleTasksDelegator.class); @@ -56,6 +62,86 @@ public double getPredictedValueForNextMinute(float average, float gradient, floa return predictedValue; } + + public int getNumberOfInstancesRequiredBasedOnRif(float rifPredictedValue , float requestsServedPerInstance , float averageRequestsServedPerInstance , boolean arspiReset){ + + float requestsInstanceCanHandle = requestsServedPerInstance; + + if(arspiReset && averageRequestsServedPerInstance != 0){ + requestsInstanceCanHandle = averageRequestsServedPerInstance; + + } + float numberOfInstances = 0; + if(requestsInstanceCanHandle!=0) { + numberOfInstances = rifPredictedValue / requestsInstanceCanHandle; + arspiReset = true; + + }else{ + arspiReset = false; + } + return (int)Math.ceil(numberOfInstances); + } + + public int getNumberOfInstancesRequiredBasedOnLoadAndMemoryConsumption(float upperLimit , float lowerLimit ,double predictedValue , int activeMemberCount ){ + + double numberOfInstances = 0; + if(predictedValue > upperLimit){ + numberOfInstances = (activeMemberCount*predictedValue)/upperLimit; + }else if((upperLimit >= predictedValue) && (predictedValue >= lowerLimit)){ + numberOfInstances = activeMemberCount; + }else{ + numberOfInstances = (activeMemberCount*predictedValue)/lowerLimit; + } + + return (int)Math.ceil(numberOfInstances); + } + + public int getMaxNumberOfInstancesRequired(int numberOfInstancesReuquiredBasedOnRif , int numberOfInstancesReuquiredBasedOnMemoryConsumption , boolean mcReset , int numberOfInstancesReuquiredBasedOnLoadAverage , boolean laReset){ + int numberOfInstances = 0; + + int rifBasedRequiredInstances = 0; + int mcBasedRequiredInstances = 0; + int laBasedRequiredInstances = 0; + if(arspiIsSet){ + rifBasedRequiredInstances = numberOfInstancesReuquiredBasedOnRif; + } + if(mcReset){ + rifBasedRequiredInstances = numberOfInstancesReuquiredBasedOnMemoryConsumption; + } + if(laReset){ + rifBasedRequiredInstances = numberOfInstancesReuquiredBasedOnLoadAverage; + } + numberOfInstances = Math.max(Math.max(numberOfInstancesReuquiredBasedOnMemoryConsumption,numberOfInstancesReuquiredBasedOnLoadAverage),numberOfInstancesReuquiredBasedOnRif); + return numberOfInstances; + } + + public int getMemberCount(String clusterId , int scalingPara ){ + + int activeMemberCount = 0; + int memberCount = 0; + for( Service service : TopologyManager.getTopology().getServices()) { + if(service.clusterExists(clusterId)) { + Cluster cluster = service.getCluster(clusterId); + + for (Member member : cluster.getMembers()) { + if (member.isActive() || member.getStatus() == MemberStatus.Created || member.getStatus() == MemberStatus.Starting ) { + memberCount++; + if(member.isActive()) { + activeMemberCount++; + } + } + } + } + } + if(scalingPara == 1){ + return memberCount; + }else{ + return activeMemberCount; + } + + + } + public AutoscaleAlgorithm getAutoscaleAlgorithm(String partitionAlgorithm){ AutoscaleAlgorithm autoscaleAlgorithm = null; if(log.isDebugEnabled()){ From 9f80b12aef48cc46836d2e1f34aaf316784868cc Mon Sep 17 00:00:00 2001 From: Asiri Liyana Arachchi Date: Mon, 18 Aug 2014 14:03:25 +0530 Subject: [PATCH 03/10] haproxy --- .../haproxy/extension/HAProxyStatisticsReader.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java index f564e7c8d4..415e0cd044 100644 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java @@ -94,4 +94,14 @@ public int getInFlightRequestCount(String clusterId) { } return 0; } + + @Override + public int getServedRequestCount(String clusterId) { + return 0; + } + + @Override + public int getActiveInstancesCount(Cluster cluster) { + return 0; + } } From 79892c0bba31bdc18c3eac0fd807aceaba15dd10 Mon Sep 17 00:00:00 2001 From: Asiri Liyana Arachchi Date: Sat, 23 Aug 2014 09:10:43 +0530 Subject: [PATCH 04/10] load balancer statistics --- .../balancer/statistics/LoadBalancerStatisticsCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java index 556d2fe301..10e38f9364 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java @@ -85,7 +85,7 @@ public int getInFlightRequestCount(String clusterId) { public int getServedRequestCount(String clusterId){ synchronized (LoadBalancerStatisticsCollector.class) { if (clusterIdServedRequestCountMap.containsKey(clusterId)) { - Integer servedCount = clusterIdRequestCountMap.get(clusterId); + Integer servedCount = clusterIdServedRequestCountMap.get(clusterId); if (servedCount != null) { clusterIdServedRequestCountMap.put(clusterId, 0); return servedCount; From d5e42abba074cd0d3c5254ab8a435388a2bf38d5 Mon Sep 17 00:00:00 2001 From: R-Rajkumar Date: Tue, 26 Aug 2014 14:34:29 +0530 Subject: [PATCH 05/10] removing unwanted strings in tenant_management.js --- .../console/themes/theme1/renderers/tenant_management.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/components/org.apache.stratos.manager.console/console/themes/theme1/renderers/tenant_management.js b/components/org.apache.stratos.manager.console/console/themes/theme1/renderers/tenant_management.js index 7be977ae1b..2b86c90ea4 100755 --- a/components/org.apache.stratos.manager.console/console/themes/theme1/renderers/tenant_management.js +++ b/components/org.apache.stratos.manager.console/console/themes/theme1/renderers/tenant_management.js @@ -84,12 +84,9 @@ var render = function (theme, data, meta, require) { name: 'Subscribe to Cartridge', class_name: 'btn-important' }, -<<<<<<< HEAD has_help:false, -======= bamInfo:data.bamInfo, has_help: true, ->>>>>>> 400 help: 'Create cartridges like PHP, Python, Ruby etc.. Or create data cartridges with mySql, PostgreSQL. Directly install applications like Drupal, Wordpress etc..' } } From 349b273b5415519caff81b2c0e58047bdacf7f77 Mon Sep 17 00:00:00 2001 From: R-Rajkumar Date: Tue, 26 Aug 2014 16:07:38 +0530 Subject: [PATCH 06/10] removing duplicate has_true --- .../console/themes/theme1/renderers/tenant_management.js | 1 - 1 file changed, 1 deletion(-) diff --git a/components/org.apache.stratos.manager.console/console/themes/theme1/renderers/tenant_management.js b/components/org.apache.stratos.manager.console/console/themes/theme1/renderers/tenant_management.js index 2b86c90ea4..8c8de86b88 100755 --- a/components/org.apache.stratos.manager.console/console/themes/theme1/renderers/tenant_management.js +++ b/components/org.apache.stratos.manager.console/console/themes/theme1/renderers/tenant_management.js @@ -84,7 +84,6 @@ var render = function (theme, data, meta, require) { name: 'Subscribe to Cartridge', class_name: 'btn-important' }, - has_help:false, bamInfo:data.bamInfo, has_help: true, help: 'Create cartridges like PHP, Python, Ruby etc.. Or create data cartridges with mySql, PostgreSQL. Directly install applications like Drupal, Wordpress etc..' From 348fddfb8c3a2aa70f99463712156db7b2edc70b Mon Sep 17 00:00:00 2001 From: Asiri Liyana Arachchi Date: Mon, 18 Aug 2014 12:04:50 +0530 Subject: [PATCH 07/10] instance based Auto Scaling --- .../autoscaler/NetworkPartitionContext.java | 33 +++++++++++- .../AutoscalerHealthStatEventReceiver.java | 54 ++++++++++++++++++- .../autoscaler/monitor/ClusterMonitor.java | 2 + .../LoadBalancerStatisticsReader.java | 5 ++ .../LoadBalancerStatisticsNotifier.java | 11 +++- .../WSO2CEPInFlightRequestPublisher.java | 7 ++- .../LoadBalancerStatisticsCollector.java | 38 +++++++++++++ .../stat/AverageRequestsInFlightEvent.java | 11 +++- .../AverageRuestsServingCapabilityEvent.java | 31 +++++++++++ ...equestsServingCapabilityEventListener.java | 14 +++++ ...estsServingCapabilityMessageProcessor.java | 49 +++++++++++++++++ .../stat/HealthStatMessageProcessorChain.java | 5 ++ .../AverageInFlightRequestsEventFormatter.xml | 2 +- .../AverageServedRequestsEventFormatter.xml | 32 +++++++++++ .../AverageInFlightRequestsFinder.xml | 15 ++++-- .../stream-manager-config.xml | 20 ++++++- ...uestHandlingCapabilityWindowProcessor.java | 49 +++++++++++++++++ .../distribution/src/main/conf/scaling.drl | 25 +++++++-- 18 files changed, 390 insertions(+), 13 deletions(-) create mode 100644 components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRuestsServingCapabilityEvent.java create mode 100644 components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/health/stat/AverageRequestsServingCapabilityEventListener.java create mode 100644 components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/AverageRequestsServingCapabilityMessageProcessor.java create mode 100644 extensions/cep/artifacts/eventformatters/AverageServedRequestsEventFormatter.xml create mode 100644 extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java index 0b75c492fc..bb36b405ef 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/NetworkPartitionContext.java @@ -40,6 +40,8 @@ public class NetworkPartitionContext implements Serializable{ private final String id; private int scaleDownWaitCount = 5; //TODO get from a config private int scaleDownRequestsCount = 0; + private float averageRequestsServedPerInstance; + private float requestsServedPerInstance; // private String defaultLbClusterId; // @@ -58,6 +60,8 @@ public class NetworkPartitionContext implements Serializable{ private boolean loadAverageReset = false, averageLoadAverageReset = false, gradientLoadAverageReset = false, secondDerivativeLoadAverageRest = false; + //boolean values to keep whether average requests served per instance parameters are reset or not + private boolean averageRequestServedPerInstanceReset= false; //FIXME this should be populated via PartitionGroups a.k.a. NetworkPartitions private int minInstanceCount = 1, maxInstanceCount = 1; @@ -236,12 +240,32 @@ public void setCurrentPartitionIndex(int currentPartitionIndex) { this.currentPartitionIndex = currentPartitionIndex; } + + + public float getAverageRequestsServedPerInstance() { return averageRequestsServedPerInstance;} + + public void setAverageRequestsServedPerInstance(float averageRequestServedPerInstance) { + this.averageRequestsServedPerInstance = averageRequestServedPerInstance; + averageRequestServedPerInstanceReset = true; + + if(log.isDebugEnabled()){ + log.debug(String.format("Average Requesets Served Per Instance stats are reset, ready to do scale check [network partition] %s" + , this.id)); + + } + } + + + + public float getRequestsServedPerInstance() { return requestsServedPerInstance;} + public float getAverageRequestsInFlight() { return requestsInFlight.getAverage(); } - public void setAverageRequestsInFlight(float averageRequestsInFlight) { + public void setAverageRequestsInFlight(float averageRequestsInFlight, float requestsServedPerInstance) { requestsInFlight.setAverage(averageRequestsInFlight); + this.requestsServedPerInstance = requestsServedPerInstance; averageRifReset = true; if(secondDerivativeRifRest && gradientRifReset){ rifReset = true; @@ -284,6 +308,13 @@ public void setRequestsInFlightGradient(float requestsInFlightGradient) { } } + public boolean isAverageRequestServedPerInstanceReset() {return averageRequestServedPerInstanceReset;} + + public void setAverageRequestServedPerInstanceReset(boolean averageRequestServedPerInstanceReset) { + this.averageRequestServedPerInstanceReset = averageRequestServedPerInstanceReset; + + } + public boolean isRifReset() { return rifReset; } diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java index 52952c342c..b0f0808acc 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatEventReceiver.java @@ -171,6 +171,13 @@ protected void onEvent(org.apache.stratos.messaging.event.Event event) { String clusterId = e.getClusterId(); String networkPartitionId = e.getNetworkPartitionId(); Float floatValue = e.getValue(); + Float servedCount = e.getServedCount(); + Float activeInstances = e.getActiveInstances(); + Float requestsServedPerInstance = servedCount/activeInstances; + if(requestsServedPerInstance.isInfinite()){ + requestsServedPerInstance = 0f; + } + if (log.isDebugEnabled()) { @@ -193,7 +200,7 @@ protected void onEvent(org.apache.stratos.messaging.event.Event event) { if(null != monitor){ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); if(null != networkPartitionContext){ - networkPartitionContext.setAverageRequestsInFlight(floatValue); + networkPartitionContext.setAverageRequestsInFlight(floatValue, requestsServedPerInstance); } else { if(log.isDebugEnabled()) { log.debug(String.format("Network partition context is not available for :" + @@ -203,6 +210,51 @@ protected void onEvent(org.apache.stratos.messaging.event.Event event) { } } + }); + healthStatEventReceiver.addEventListener(new AverageRequestsServingCapabilityEventListener() { + @Override + protected void onEvent(org.apache.stratos.messaging.event.Event event) { + + AverageRuestsServingCapabilityEvent e = (AverageRuestsServingCapabilityEvent) event; + String clusterId = e.getClusterId(); + String networkPartitionId = e.getNetworkPartitionId(); + Float floatValue = e.getValue(); + + + log.info("[AverageRequestsServingCapabilityEventListener]" + + " ========= "+floatValue); + + if (log.isDebugEnabled()) { + log.debug(String.format("Average Requests Served per Instance event: [cluster] %s [network-partition] %s [value] %s", + clusterId, networkPartitionId, floatValue)); + } + AutoscalerContext asCtx = AutoscalerContext.getInstance(); + AbstractMonitor monitor; + + if(asCtx.monitorExist(clusterId)){ + monitor = asCtx.getMonitor(clusterId); + }else if(asCtx.lbMonitorExist(clusterId)){ + monitor = asCtx.getLBMonitor(clusterId); + }else{ + if(log.isDebugEnabled()){ + log.debug(String.format("A cluster monitor is not found in autoscaler context [cluster] %s", clusterId)); + } + return; + } + if(null != monitor){ + NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId); + if(null != networkPartitionContext){ + networkPartitionContext.setAverageRequestsServedPerInstance(floatValue); + + } else { + if(log.isDebugEnabled()) { + log.debug(String.format("Network partition context is not available for :" + + " [network partition] %s", networkPartitionId)); + } + } + } + } + }); healthStatEventReceiver.addEventListener(new GradientOfLoadAverageEventListener() { @Override diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java index 5bb478e94f..2a80db352f 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ClusterMonitor.java @@ -156,6 +156,7 @@ private void monitor() { boolean rifReset = networkPartitionContext.isRifReset(); boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset(); boolean loadAverageReset = networkPartitionContext.isLoadAverageReset(); + boolean averageRequestServedPerInstanceReset = networkPartitionContext.isAverageRequestServedPerInstanceReset(); if (log.isDebugEnabled()) { log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset + " flag of loadAverageReset" + loadAverageReset); @@ -167,6 +168,7 @@ private void monitor() { scaleCheckKnowledgeSession.setGlobal("rifReset", rifReset); scaleCheckKnowledgeSession.setGlobal("mcReset", memoryConsumptionReset); scaleCheckKnowledgeSession.setGlobal("laReset", loadAverageReset); + scaleCheckKnowledgeSession.setGlobal("arspiReset", averageRequestServedPerInstanceReset); scaleCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType); scaleCheckKnowledgeSession.setGlobal("isPrimary", false); scaleCheckKnowledgeSession.setGlobal("primaryMembers", primaryMemberListInNetworkPartition); diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java index 4a83aee69f..79386bd652 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java @@ -19,6 +19,8 @@ package org.apache.stratos.load.balancer.common.statistics; +import org.apache.stratos.messaging.domain.topology.Cluster; + /** * Load balancer statistics reader interface. */ @@ -29,4 +31,7 @@ public interface LoadBalancerStatisticsReader { * @param clusterId */ int getInFlightRequestCount(String clusterId); + int getServedRequestCount(String clusterId); + int getActiveInstancesCount(Cluster cluster); + } diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java index 4fe2504b74..7d123e5060 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java @@ -74,12 +74,21 @@ public void run() { try { TopologyManager.acquireReadLock(); int requestCount; + int servedRequestCount; + int activeInstancesCount; for (Service service : TopologyManager.getTopology().getServices()) { for (Cluster cluster : service.getClusters()) { if (!cluster.isLbCluster()) { // Publish in-flight request count of load balancer's network partition requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId()); - inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, requestCount); + servedRequestCount = statsReader.getServedRequestCount(cluster.getClusterId()); + if(requestCount == 0) { + servedRequestCount = 0; + } + activeInstancesCount = statsReader.getActiveInstancesCount(cluster); + inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId,activeInstancesCount, requestCount, servedRequestCount); + log.info(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d [active instances] %d [RIF] %d ", + cluster.getClusterId(), networkPartitionId, servedRequestCount , activeInstancesCount ,requestCount )); if (log.isDebugEnabled()) { log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d", cluster.getClusterId(), networkPartitionId, requestCount)); diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java index 519a687b80..24d5257c15 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java @@ -47,7 +47,9 @@ private static StreamDefinition createStreamDefinition() { // Payload definition payloadData.add(new Attribute("cluster_id", AttributeType.STRING)); payloadData.add(new Attribute("network_partition_id", AttributeType.STRING)); + payloadData.add(new Attribute("active_instances_count", AttributeType.DOUBLE)); payloadData.add(new Attribute("in_flight_request_count", AttributeType.DOUBLE)); + payloadData.add(new Attribute("served_request_count", AttributeType.DOUBLE)); streamDefinition.setPayloadData(payloadData); return streamDefinition; } catch (Exception e) { @@ -65,13 +67,16 @@ public WSO2CEPInFlightRequestPublisher() { * @param clusterId * @param networkPartitionId * @param inFlightRequestCount + * @param servedRequestCount */ - public void publish(String clusterId, String networkPartitionId, int inFlightRequestCount) { + public void publish(String clusterId, String networkPartitionId,int activeInstancesCount, int inFlightRequestCount, int servedRequestCount) { List payload = new ArrayList(); // Payload values payload.add(clusterId); payload.add(networkPartitionId); + payload.add((double)activeInstancesCount); payload.add((double)inFlightRequestCount); + payload.add((double)servedRequestCount); super.publish(payload.toArray()); } } diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java index 3557d3a926..556d2fe301 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java @@ -22,6 +22,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -35,9 +37,11 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe private static volatile LoadBalancerStatisticsCollector instance; // Map private Map clusterIdRequestCountMap; + private Map clusterIdServedRequestCountMap; private LoadBalancerStatisticsCollector() { clusterIdRequestCountMap = new ConcurrentHashMap(); + clusterIdServedRequestCountMap = new ConcurrentHashMap(); } public static LoadBalancerStatisticsCollector getInstance() { @@ -75,6 +79,33 @@ public int getInFlightRequestCount(String clusterId) { } } + /** + * Returns the number of requests served since the last time this function was called. + */ + public int getServedRequestCount(String clusterId){ + synchronized (LoadBalancerStatisticsCollector.class) { + if (clusterIdServedRequestCountMap.containsKey(clusterId)) { + Integer servedCount = clusterIdRequestCountMap.get(clusterId); + if (servedCount != null) { + clusterIdServedRequestCountMap.put(clusterId, 0); + return servedCount; + } + } + return 0; + } + } + + public int getActiveInstancesCount(Cluster cluster) { + int activeInstances = 0; + for( Member member :cluster.getMembers()){ + if(member.isActive()){ + activeInstances++; + } + + }return activeInstances; + + } + void incrementInFlightRequestCount(String clusterId) { synchronized (LoadBalancerStatisticsCollector.class) { if (StringUtils.isBlank(clusterId)) { @@ -118,6 +149,13 @@ void decrementInFlightRequestCount(String clusterId) { } clusterIdRequestCountMap.put(clusterId, count); + Integer servedCount = 0; + if (clusterIdServedRequestCountMap.containsKey(clusterId)) { + servedCount = clusterIdServedRequestCountMap.get(clusterId); + } + servedCount++; + clusterIdServedRequestCountMap.put(clusterId, servedCount); + if (log.isDebugEnabled()) { log.debug(String.format("In-flight request count decremented: [cluster] %s [count] %s ", clusterId, count)); diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java index 143ae03ce7..0e10af9cf5 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRequestsInFlightEvent.java @@ -28,11 +28,15 @@ public class AverageRequestsInFlightEvent extends Event { private final String networkPartitionId; private final String clusterId; private final float value; + private final float servedCount; + private final float activeInstances; - public AverageRequestsInFlightEvent(String networkPartitionId, String clusterId, float value) { + public AverageRequestsInFlightEvent(String networkPartitionId, String clusterId,float activeInstances, float value, float servedCount ) { this.networkPartitionId = networkPartitionId; this.clusterId = clusterId; this.value = value; + this.servedCount = servedCount; + this.activeInstances = activeInstances; } @@ -47,4 +51,9 @@ public float getValue() { public String getNetworkPartitionId() { return networkPartitionId; } + + public float getServedCount() { return servedCount;} + + public float getActiveInstances() { return activeInstances;} + } diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRuestsServingCapabilityEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRuestsServingCapabilityEvent.java new file mode 100644 index 0000000000..da508f7c96 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/AverageRuestsServingCapabilityEvent.java @@ -0,0 +1,31 @@ +package org.apache.stratos.messaging.event.health.stat; + +/** + * Created by asiri on 8/10/14. + */ +import org.apache.stratos.messaging.event.Event; +public class AverageRuestsServingCapabilityEvent extends Event{ + private final String networkPartitionId; + private final String clusterId; + private final float value; + + public AverageRuestsServingCapabilityEvent(String networkPartitionId, String clusterId, float value) { + this.networkPartitionId = networkPartitionId; + this.clusterId = clusterId; + this.value = value; + + } + public String getClusterId() { + return clusterId; + } + + public float getValue() { + return value; + } + + public String getNetworkPartitionId() { + return networkPartitionId; + } + + +} diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/health/stat/AverageRequestsServingCapabilityEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/health/stat/AverageRequestsServingCapabilityEventListener.java new file mode 100644 index 0000000000..3b7916404e --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/health/stat/AverageRequestsServingCapabilityEventListener.java @@ -0,0 +1,14 @@ +package org.apache.stratos.messaging.listener.health.stat; + +import org.apache.stratos.messaging.event.Event; +import org.apache.stratos.messaging.listener.EventListener; + +/** + * Created by asiri on 8/10/14. + */ +public class AverageRequestsServingCapabilityEventListener extends EventListener { + @Override + protected void onEvent(Event event) { + + } +} diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/AverageRequestsServingCapabilityMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/AverageRequestsServingCapabilityMessageProcessor.java new file mode 100644 index 0000000000..299e4805d2 --- /dev/null +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/AverageRequestsServingCapabilityMessageProcessor.java @@ -0,0 +1,49 @@ +package org.apache.stratos.messaging.message.processor.health.stat; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.event.health.stat.AverageRuestsServingCapabilityEvent; +import org.apache.stratos.messaging.message.processor.MessageProcessor; +import org.apache.stratos.messaging.util.Util; + +/** + * Created by asiri on 8/15/14. + */ +public class AverageRequestsServingCapabilityMessageProcessor extends MessageProcessor { + private static final Log log = LogFactory.getLog(AverageRequestsServingCapabilityMessageProcessor.class); + + private MessageProcessor nextProcessor; + + @Override + public void setNext(MessageProcessor nextProcessor) { + this.nextProcessor = nextProcessor; + } + + @Override + public boolean process(String type, String message, Object object) { + if (AverageRuestsServingCapabilityEvent.class.getName().equals(type)) { + + // Parse complete message and build event + AverageRuestsServingCapabilityEvent event = (AverageRuestsServingCapabilityEvent) Util.jsonToObject(message, AverageRuestsServingCapabilityEvent.class); + + // Notify event listeners + notifyEventListeners(event); + + if(log.isDebugEnabled()){ + log.debug(String.format("%s event processor notified listeners ... " , type)); + } + return true; + } + else { + if(nextProcessor != null) { + return nextProcessor.process(type, message, object); + } + else { + throw new RuntimeException(String.format("Failed to process health stat message using available message processors: [type] %s [body] %s", type, message)); + } + } + } + + + +} diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java index f9861f6e49..4da35a9bb8 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/health/stat/HealthStatMessageProcessorChain.java @@ -44,6 +44,7 @@ public class HealthStatMessageProcessorChain extends MessageProcessorChain { private MemberGradientOfMemoryConsumptionMessageProcessor memberGradientOfMemoryConsumptionMessageProcessor; private MemberSecondDerivativeOfLoadAverageMessageProcessor memberSecondDerivativeOfLoadAverageMessageProcessor; private MemberSecondDerivativeOfMemoryConsumptionMessageProcessor memberSecondDerivativeOfMemoryConsumptionMessageProcessor; + private AverageRequestsServingCapabilityMessageProcessor averageRequestsServingCapabilityMessageProcessor; private MemberFaultMessageProcessor memberFaultMessageProcessor; @@ -66,6 +67,8 @@ protected void initialize() { averageRequestsInFlightMessageProcessor = new AverageRequestsInFlightMessageProcessor(); add(averageRequestsInFlightMessageProcessor); + averageRequestsServingCapabilityMessageProcessor = new AverageRequestsServingCapabilityMessageProcessor(); + add(averageRequestsServingCapabilityMessageProcessor); gradientOfRequestsInFlightMessageProcessor = new GradientOfRequestsInFlightMessageProcessor(); add(gradientOfRequestsInFlightMessageProcessor); secondDerivativeOfRequestsInFlightMessageProcessor = new SecondDerivativeOfRequestsInFlightMessageProcessor(); @@ -97,6 +100,8 @@ public void addEventListener(EventListener eventListener) { averageMemoryConsumptionMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof AverageRequestsInFlightEventListener) { averageRequestsInFlightMessageProcessor.addEventListener(eventListener); + } else if (eventListener instanceof AverageRequestsServingCapabilityEventListener) { + averageRequestsServingCapabilityMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof GradientOfLoadAverageEventListener) { gradientOfLoadAverageMessageProcessor.addEventListener(eventListener); } else if (eventListener instanceof GradientOfMemoryConsumptionEventListener) { diff --git a/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml b/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml index 6e6ba967a2..84e95ae34c 100644 --- a/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml +++ b/extensions/cep/artifacts/eventformatters/AverageInFlightRequestsEventFormatter.xml @@ -24,7 +24,7 @@ statistics="disable" trace="enable" xmlns="http://wso2.org/carbon/eventformatter"> - {"org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent":{"message":{"clusterId":"{{cluster_id}}","networkPartitionId":"{{network_partition_id}}","value":"{{count}}"}}} + {"org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent":{"message":{"clusterId":"{{cluster_id}}","networkPartitionId":"{{network_partition_id}}","activeInstances":"{{instances_count}}","value":"{{count}}" ,"servedCount":"{{served_count}}"}}} summarized-health-stats diff --git a/extensions/cep/artifacts/eventformatters/AverageServedRequestsEventFormatter.xml b/extensions/cep/artifacts/eventformatters/AverageServedRequestsEventFormatter.xml new file mode 100644 index 0000000000..5de01286e3 --- /dev/null +++ b/extensions/cep/artifacts/eventformatters/AverageServedRequestsEventFormatter.xml @@ -0,0 +1,32 @@ + + + + + + + {"org.apache.stratos.messaging.event.health.stat.AverageRuestsServingCapabilityEvent":{"message":{"clusterId":"{{cluster_id}}","networkPartitionId":"{{network_partition_id}}","value":"{{average_served_count}}"}}} + + + summarized-health-stats + + diff --git a/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml b/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml index 6826dab89b..a8e890fd89 100644 --- a/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml +++ b/extensions/cep/artifacts/executionplans/AverageInFlightRequestsFinder.xml @@ -32,16 +32,25 @@ + partition by avg_rif_cluster_partition; + from average_in_flight_requests + select cluster_id, network_partition_id,instances_count, served_count, + stratos:divider(served_count , instances_count) as requests_per_instance + insert into served_requests_per_instance; + from served_requests_per_instance[requests_per_instance>0]#window.timeBatch(10 min) + select cluster_id,network_partition_id, avg(requests_per_instance) as average_served_count + insert into average_served_request_count;]]> + diff --git a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml index 64b5d85e3b..a9472efed7 100644 --- a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml +++ b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml @@ -32,7 +32,9 @@ + + @@ -60,9 +62,25 @@ + + - + + + + average served count per instance + average in-flight requests + + + + + + + + + + second derivative of in-flight request count diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java new file mode 100644 index 0000000000..c2536a53ce --- /dev/null +++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/MemeberRequestHandlingCapabilityWindowProcessor.java @@ -0,0 +1,49 @@ +package org.apache.stratos.cep.extension; + +/** + * Created by asiri on 8/9/14. + */ + +import org.wso2.siddhi.core.config.SiddhiContext; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension; + +@SiddhiExtension(namespace = "stratos", function = "divider") +public class MemeberRequestHandlingCapabilityWindowProcessor extends FunctionExecutor { + + Attribute.Type returnType = Attribute.Type.DOUBLE; + + @Override + public void init(Attribute.Type[] types, SiddhiContext siddhiContext) { + } + + @Override + protected Object process(Object obj) { + + double[] value = new double[2]; + if (obj instanceof Object[]) { + int i=0; + for (Object aObj : (Object[]) obj) { + value[i]= Double.parseDouble(String.valueOf(aObj)); + i++; + } + }//to do avoid deviding zero number of active instances won't be zero cz there is min + Double unit = (value[0] / value[1]); + if(!unit.isNaN() && !unit.isInfinite()) + return unit; + else + return 0.0; + + } + + @Override + public void destroy() { + + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } +} diff --git a/products/stratos/modules/distribution/src/main/conf/scaling.drl b/products/stratos/modules/distribution/src/main/conf/scaling.drl index bd331e7515..b123c9dd8f 100644 --- a/products/stratos/modules/distribution/src/main/conf/scaling.drl +++ b/products/stratos/modules/distribution/src/main/conf/scaling.drl @@ -55,6 +55,7 @@ global java.lang.Boolean mcReset; global java.lang.Boolean laReset; global java.lang.Boolean isPrimary; global java.util.List primaryMembers; +global java.lang.Boolean arspiReset; rule "Scaling Rule" dialect "mvel" @@ -94,8 +95,22 @@ dialect "mvel" loadAverageSecondDerivative : Float() from $networkPartitionContext.getLoadAverageSecondDerivative() laPredictedValue : Double() from $delegator.getPredictedValueForNextMinute(loadAverageAverage, loadAverageGradient, loadAverageSecondDerivative, 1) - scaleUp : Boolean() from ((rifReset && (rifPredictedValue > rifUpperLimit)) || (mcReset && (mcPredictedValue > mcUpperLimit)) || (laReset && (laPredictedValue > laUpperLimit))) - scaleDown : Boolean() from ((rifReset && (rifPredictedValue < rifLowerLimit )) && (mcReset && (mcPredictedValue < mcLowerLimit)) && (laReset && (laPredictedValue < laLowerLimit))) + activeInstancesCount : Integer() from $delegator.getMemberCount(clusterId , 0) + instancesCount : Integer() from $delegator.getMemberCount(clusterId , 1) + + requestsServedPerInstance : Float() from $networkPartitionContext.getRequestsServedPerInstance() + averageRequestsServedPerInstance : Float() from $networkPartitionContext.getAverageRequestsServedPerInstance() + + numberOfInstancesReuquiredBasedOnRif : Integer() from $delegator.getNumberOfInstancesRequiredBasedOnRif(rifPredictedValue, requestsServedPerInstance, averageRequestsServedPerInstance, arspiReset) + numberOfInstancesReuquiredBasedOnMemoryConsumption : Integer() from $delegator.getNumberOfInstancesRequiredBasedOnLoadAndMemoryConsumption(mcUpperLimit , mcLowerLimit, mcPredictedValue ,activeInstancesCount ) + numberOfInstancesReuquiredBasedOnLoadAverage : Integer() from $delegator.getNumberOfInstancesRequiredBasedOnLoadAndMemoryConsumption(laUpperLimit , laLowerLimit, laPredictedValue ,activeInstancesCount ) + + numberOfRequiredInstances : Integer() from $delegator.getMaxNumberOfInstancesRequired(numberOfInstancesReuquiredBasedOnRif, numberOfInstancesReuquiredBasedOnMemoryConsumption ,mcReset ,numberOfInstancesReuquiredBasedOnLoadAverage, laReset) + + + + scaleUp : Boolean() from (instancesCount < numberOfRequiredInstances ) + scaleDown : Boolean() from (activeInstancesCount > numberOfRequiredInstances ) eval(log.debug("[scaling] " + " [cluster] " + clusterId + " RIF Resetted?: " + rifReset)) eval(log.debug("[scaling] " + " [cluster] " + clusterId + " RIF predicted value: " + rifPredictedValue)) @@ -116,19 +131,23 @@ dialect "mvel" then if(scaleUp){ + int additionalInstances = numberOfRequiredInstances - instancesCount ; $networkPartitionContext.resetScaleDownRequestsCount(); + int count=0; + while(count != additionalInstances){ Partition partition = autoscaleAlgorithm.getNextScaleUpPartition($networkPartitionContext, clusterId); if(partition != null){ log.info("[scale-up] Partition available, hence trying to spawn an instance to scale up!" ); log.debug("[scale-up] " + " [partition] " + partition.getId() + " [cluster] " + clusterId ); $delegator.delegateSpawn($networkPartitionContext.getPartitionCtxt(partition.getId()), clusterId, lbRef, isPrimary); + count++; } + } } else if(scaleDown){ log.debug("[scale-down] Decided to Scale down [cluster] " + clusterId); if($networkPartitionContext.getScaleDownRequestsCount() > 5 ){ log.debug("[scale-down] Reached scale down requests threshold [cluster] " + clusterId + " Count " + $networkPartitionContext.getScaleDownRequestsCount()); - $networkPartitionContext.resetScaleDownRequestsCount(); MemberStatsContext selectedMemberStatsContext = null; double lowestOverallLoad = 0.0; boolean foundAValue = false; From d9f7c2b9cecf88e4ac527fc35ea186f219640b00 Mon Sep 17 00:00:00 2001 From: Asiri Liyana Arachchi Date: Mon, 18 Aug 2014 13:22:54 +0530 Subject: [PATCH 08/10] missing delegator --- .../autoscaler/rule/RuleTasksDelegator.java | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java index 94def43777..c78c5c033e 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java @@ -33,6 +33,11 @@ import org.apache.stratos.autoscaler.client.cloud.controller.InstanceNotificationClient; import org.apache.stratos.autoscaler.partition.PartitionManager; import org.apache.stratos.cloud.controller.stub.pojo.MemberContext; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; +import org.apache.stratos.messaging.domain.topology.MemberStatus; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; /** * This will have utility methods that need to be executed from rule file... @@ -41,6 +46,7 @@ public class RuleTasksDelegator { public static final double SCALE_UP_FACTOR = 0.8; //get from config public static final double SCALE_DOWN_FACTOR = 0.2; + private static boolean arspiIsSet = false; private static final Log log = LogFactory.getLog(RuleTasksDelegator.class); @@ -56,6 +62,86 @@ public double getPredictedValueForNextMinute(float average, float gradient, floa return predictedValue; } + + public int getNumberOfInstancesRequiredBasedOnRif(float rifPredictedValue , float requestsServedPerInstance , float averageRequestsServedPerInstance , boolean arspiReset){ + + float requestsInstanceCanHandle = requestsServedPerInstance; + + if(arspiReset && averageRequestsServedPerInstance != 0){ + requestsInstanceCanHandle = averageRequestsServedPerInstance; + + } + float numberOfInstances = 0; + if(requestsInstanceCanHandle!=0) { + numberOfInstances = rifPredictedValue / requestsInstanceCanHandle; + arspiReset = true; + + }else{ + arspiReset = false; + } + return (int)Math.ceil(numberOfInstances); + } + + public int getNumberOfInstancesRequiredBasedOnLoadAndMemoryConsumption(float upperLimit , float lowerLimit ,double predictedValue , int activeMemberCount ){ + + double numberOfInstances = 0; + if(predictedValue > upperLimit){ + numberOfInstances = (activeMemberCount*predictedValue)/upperLimit; + }else if((upperLimit >= predictedValue) && (predictedValue >= lowerLimit)){ + numberOfInstances = activeMemberCount; + }else{ + numberOfInstances = (activeMemberCount*predictedValue)/lowerLimit; + } + + return (int)Math.ceil(numberOfInstances); + } + + public int getMaxNumberOfInstancesRequired(int numberOfInstancesReuquiredBasedOnRif , int numberOfInstancesReuquiredBasedOnMemoryConsumption , boolean mcReset , int numberOfInstancesReuquiredBasedOnLoadAverage , boolean laReset){ + int numberOfInstances = 0; + + int rifBasedRequiredInstances = 0; + int mcBasedRequiredInstances = 0; + int laBasedRequiredInstances = 0; + if(arspiIsSet){ + rifBasedRequiredInstances = numberOfInstancesReuquiredBasedOnRif; + } + if(mcReset){ + rifBasedRequiredInstances = numberOfInstancesReuquiredBasedOnMemoryConsumption; + } + if(laReset){ + rifBasedRequiredInstances = numberOfInstancesReuquiredBasedOnLoadAverage; + } + numberOfInstances = Math.max(Math.max(numberOfInstancesReuquiredBasedOnMemoryConsumption,numberOfInstancesReuquiredBasedOnLoadAverage),numberOfInstancesReuquiredBasedOnRif); + return numberOfInstances; + } + + public int getMemberCount(String clusterId , int scalingPara ){ + + int activeMemberCount = 0; + int memberCount = 0; + for( Service service : TopologyManager.getTopology().getServices()) { + if(service.clusterExists(clusterId)) { + Cluster cluster = service.getCluster(clusterId); + + for (Member member : cluster.getMembers()) { + if (member.isActive() || member.getStatus() == MemberStatus.Created || member.getStatus() == MemberStatus.Starting ) { + memberCount++; + if(member.isActive()) { + activeMemberCount++; + } + } + } + } + } + if(scalingPara == 1){ + return memberCount; + }else{ + return activeMemberCount; + } + + + } + public AutoscaleAlgorithm getAutoscaleAlgorithm(String partitionAlgorithm){ AutoscaleAlgorithm autoscaleAlgorithm = null; if(log.isDebugEnabled()){ From 7ac03d9b4165f1ccf9f4865c03532f9e8b604a7e Mon Sep 17 00:00:00 2001 From: Asiri Liyana Arachchi Date: Mon, 18 Aug 2014 14:03:25 +0530 Subject: [PATCH 09/10] haproxy --- .../haproxy/extension/HAProxyStatisticsReader.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java index f564e7c8d4..415e0cd044 100644 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java @@ -94,4 +94,14 @@ public int getInFlightRequestCount(String clusterId) { } return 0; } + + @Override + public int getServedRequestCount(String clusterId) { + return 0; + } + + @Override + public int getActiveInstancesCount(Cluster cluster) { + return 0; + } } From 1e41472e5a71dc157f1a66a916ff1707ad8195c8 Mon Sep 17 00:00:00 2001 From: Asiri Liyana Arachchi Date: Sat, 23 Aug 2014 09:10:43 +0530 Subject: [PATCH 10/10] load balancer statistics --- .../balancer/statistics/LoadBalancerStatisticsCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java index 556d2fe301..10e38f9364 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java @@ -85,7 +85,7 @@ public int getInFlightRequestCount(String clusterId) { public int getServedRequestCount(String clusterId){ synchronized (LoadBalancerStatisticsCollector.class) { if (clusterIdServedRequestCountMap.containsKey(clusterId)) { - Integer servedCount = clusterIdRequestCountMap.get(clusterId); + Integer servedCount = clusterIdServedRequestCountMap.get(clusterId); if (servedCount != null) { clusterIdServedRequestCountMap.put(clusterId, 0); return servedCount;