From 1eeead432f756d6327dbf038c4bcbbc4da2d0ead Mon Sep 17 00:00:00 2001 From: Thanuja Date: Wed, 29 Jul 2015 18:51:34 +0530 Subject: [PATCH] Adding Metering and Monitoring Service Implementation --- .../AutoscalerCloudControllerClient.java | 16 +- .../autoscaler/rule/RuleTasksDelegator.java | 23 +- .../publisher/HealthStatisticsNotifier.java | 10 +- .../messaging/topology/TopologyBuilder.java | 70 ++- .../impl/CloudControllerServiceImpl.java | 6 +- .../impl/CloudControllerServiceUtil.java | 15 +- .../services/impl/InstanceCreator.java | 18 +- .../publisher/BAMUsageDataPublisher.java | 44 +- .../util/CloudControllerConstants.java | 4 + .../common/constants/StratosConstants.java | 8 +- .../publisher/HealthStatisticsPublisher.java | 3 +- .../publisher/InFlightRequestPublisher.java | 4 +- .../cep/WSO2CEPHealthStatisticsPublisher.java | 9 +- .../cep/WSO2CEPInFlightRequestPublisher.java | 6 +- .../LoadBalancerStatisticsNotifier.java | 3 +- .../MockHealthStatisticsNotifier.java | 3 + .../healthstatspublisher/healthstats.py | 5 +- .../HealthStatsEventFormatter.xml | 30 ++ .../eventformatters/RIFEventFormatter.xml | 31 ++ .../DASDefaultWSO2EventOutputAdaptor.xml | 29 ++ .../stream-manager-config.xml | 486 +++++++++--------- extensions/das/README.md | 10 + .../CloudControllerEventReceiver.xml | 29 ++ .../HealthStatsEventReceiver.xml | 29 ++ .../eventreceivers/RIFEventReceiver.xml | 29 ++ .../cartridge_agent_health_stats.xml | 85 +++ .../eventsink/in_flight_requests.xml | 64 +++ .../org_apache_stratos_cloud_controller.xml | 211 ++++++++ .../cartridge_agent_health_stats_1.0.0.json | 40 ++ .../in_flight_requests_1.0.0.json | 28 + ...apache.stratos.cloud.controller_1.0.0.json | 112 ++++ extensions/das/artifacts/sparkscript/CCEvent | 18 + extensions/das/pom.xml | 40 ++ extensions/das/spark-udf/pom.xml | 36 ++ .../das/extension/spark/udf/TimeUDF.java | 49 ++ extensions/pom.xml | 4 +- .../main/conf/drools/dependent-scaling.drl | 4 +- .../src/main/conf/drools/mincheck.drl | 5 +- .../src/main/conf/drools/scaling.drl | 7 +- 39 files changed, 1311 insertions(+), 312 deletions(-) create mode 100644 extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml create mode 100644 extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml create mode 100755 extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml create mode 100644 extensions/das/README.md create mode 100644 extensions/das/artifacts/eventreceivers/CloudControllerEventReceiver.xml create mode 100644 extensions/das/artifacts/eventreceivers/HealthStatsEventReceiver.xml create mode 100644 extensions/das/artifacts/eventreceivers/RIFEventReceiver.xml create mode 100644 extensions/das/artifacts/eventsink/cartridge_agent_health_stats.xml create mode 100644 extensions/das/artifacts/eventsink/in_flight_requests.xml create mode 100644 extensions/das/artifacts/eventsink/org_apache_stratos_cloud_controller.xml create mode 100644 extensions/das/artifacts/eventstreams/cartridge_agent_health_stats_1.0.0.json create mode 100644 extensions/das/artifacts/eventstreams/in_flight_requests_1.0.0.json create mode 100644 extensions/das/artifacts/eventstreams/org.apache.stratos.cloud.controller_1.0.0.json create mode 100644 extensions/das/artifacts/sparkscript/CCEvent create mode 100644 extensions/das/pom.xml create mode 100644 extensions/das/spark-udf/pom.xml create mode 100644 extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java index f944a9fb37..c65a5f7e5b 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/AutoscalerCloudControllerClient.java @@ -84,7 +84,8 @@ private AutoscalerCloudControllerClient() { public synchronized MemberContext startInstance(PartitionRef partition, String clusterId, String clusterInstanceId, String networkPartitionId, boolean isPrimary, - int minMemberCount) throws SpawningException { + int minMemberCount, String autoscalingReason, + long scalingTime) throws SpawningException { try { if (log.isInfoEnabled()) { log.info(String.format("Trying to spawn an instance via cloud controller: " + @@ -115,8 +116,18 @@ public synchronized MemberContext startInstance(PartitionRef partition, minCountProp.setName(StratosConstants.MIN_COUNT); minCountProp.setValue(String.valueOf(minMemberCount)); + Property autoscalingReasonProp = new Property(); + autoscalingReasonProp.setName(StratosConstants.SCALING_REASON); + autoscalingReasonProp.setValue(autoscalingReason); + + Property scalingTimeProp = new Property(); + scalingTimeProp.setName(StratosConstants.SCALING_TIME); + scalingTimeProp.setValue(String.valueOf(scalingTime)); + memberContextProps.addProperty(isPrimaryProp); memberContextProps.addProperty(minCountProp); + memberContextProps.addProperty(autoscalingReasonProp); + memberContextProps.addProperty(scalingTimeProp); instanceContext.setProperties(AutoscalerUtil.toStubProperties(memberContextProps)); long startTime = System.currentTimeMillis(); @@ -228,7 +239,8 @@ public void terminateInstanceForcefully(String memberId) throws Exception { public void terminateAllInstances(String clusterId) throws RemoteException, CloudControllerServiceInvalidClusterExceptionException { if (log.isInfoEnabled()) { - log.info(String.format("Terminating all instances of cluster via cloud controller: [cluster] %s", clusterId)); + log.info(String.format("Terminating all instances of cluster via cloud controller: " + + "[cluster] %s", clusterId)); } long startTime = System.currentTimeMillis(); stub.terminateInstances(clusterId); diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java index 51443a12a5..733ce579df 100644 --- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java +++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/RuleTasksDelegator.java @@ -36,7 +36,6 @@ import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher; import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor; import org.apache.stratos.cloud.controller.stub.domain.MemberContext; -import org.apache.stratos.common.client.CloudControllerServiceClient; import org.apache.stratos.common.constants.StratosConstants; /** @@ -48,7 +47,8 @@ public class RuleTasksDelegator { private static final Log log = LogFactory.getLog(RuleTasksDelegator.class); - public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative, int timeInterval) { + public double getPredictedValueForNextMinute(float average, float gradient, float secondDerivative, + int timeInterval) { double predictedValue; // s = u * t + 0.5 * a * t * t if (log.isDebugEnabled()) { @@ -175,9 +175,11 @@ public void delegateInstanceCleanup(String memberId) { * @param clusterId Cluster id * @param clusterInstanceId Instance id * @param isPrimary Is a primary member + * @param autoscalingReason scaling reason for member + * @param scalingTime scaling time */ public void delegateSpawn(ClusterLevelPartitionContext clusterMonitorPartitionContext, String clusterId, - String clusterInstanceId, boolean isPrimary) { + String clusterInstanceId, boolean isPrimary, String autoscalingReason, long scalingTime) { try { String nwPartitionId = clusterMonitorPartitionContext.getNetworkPartitionId(); @@ -199,14 +201,15 @@ public void delegateSpawn(ClusterLevelPartitionContext clusterMonitorPartitionCo clusterId, clusterInstanceId, clusterMonitorPartitionContext.getNetworkPartitionId(), isPrimary, - minimumCountOfNetworkPartition); + minimumCountOfNetworkPartition, autoscalingReason, scalingTime); if (memberContext != null) { ClusterLevelPartitionContext partitionContext = clusterInstanceContext. getPartitionCtxt(clusterMonitorPartitionContext.getPartitionId()); partitionContext.addPendingMember(memberContext); partitionContext.addMemberStatsContext(new MemberStatsContext(memberContext.getMemberId())); if (log.isDebugEnabled()) { - log.debug(String.format("Pending member added, [member] %s [partition] %s", memberContext.getMemberId(), + log.debug(String.format("Pending member added, [member] %s [partition] %s", + memberContext.getMemberId(), memberContext.getPartition().getId())); } @@ -245,7 +248,8 @@ public void delegateScalingOverMaxNotification(String clusterId, String networkP clusterMonitor.sendScalingOverMaxEvent(networkPartitionId, instanceId); } - public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId, String instanceId) { + public void delegateScalingDownBeyondMinNotification(String clusterId, String networkPartitionId, + String instanceId) { if (log.isDebugEnabled()) { log.debug("Scaling down lower min notification is going to the [parentInstance] " + instanceId); } @@ -268,8 +272,8 @@ public void delegateTerminate(ClusterLevelPartitionContext clusterMonitorPartiti clusterMonitorPartitionContext.removeMemberStatsContext(memberId); } else if (clusterMonitorPartitionContext.pendingMemberAvailable(memberId)) { - log.info(String.format("[scale-down] Moving pending member to termination pending list [member id] %s " + - "[partition] %s [network partition] %s", memberId, + log.info(String.format("[scale-down] Moving pending member to termination pending list " + + "[member id] %s " + "[partition] %s [network partition] %s", memberId, clusterMonitorPartitionContext.getPartitionId(), clusterMonitorPartitionContext.getNetworkPartitionId())); clusterMonitorPartitionContext.movePendingMemberToObsoleteMembers(memberId); @@ -280,7 +284,8 @@ public void delegateTerminate(ClusterLevelPartitionContext clusterMonitorPartiti } } - public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext, String memberId) { + public void delegateTerminateDependency(ClusterLevelPartitionContext clusterMonitorPartitionContext, + String memberId) { try { //calling SM to send the instance notification event. if (log.isDebugEnabled()) { diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java index 74c51564f5..5ab2ebf37c 100644 --- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java +++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/statistics/publisher/HealthStatisticsNotifier.java @@ -51,7 +51,8 @@ public HealthStatisticsNotifier() { File pluginFile = new File(pluginFileName); if ((pluginFile != null) && (pluginFile.exists())) { - List pluginClass = PluginLoader.loadPluginClassesFromJar(pluginFile, IHealthStatisticsReader.class); + List pluginClass = PluginLoader.loadPluginClassesFromJar(pluginFile, + IHealthStatisticsReader.class); if (!pluginClass.isEmpty()) { try { log.trace("Instantiating new instance of plugin type " + pluginClass); @@ -63,7 +64,8 @@ public HealthStatisticsNotifier() { } } } else { - log.error("Plugin not found or malformed: " + pluginFileName + ((pluginFile == null) ? " NULL" : "Doesn't exist")); + log.error("Plugin not found or malformed: " + pluginFileName + ((pluginFile == null) ? " NULL" : + "Doesn't exist")); } } if (this.statsReader == null) { @@ -95,7 +97,7 @@ public void run() { if (log.isDebugEnabled()) { log.debug(String.format("Publishing memory consumption: %f", stats.getMemoryUsage())); } - statsPublisher.publish( + statsPublisher.publish(System.currentTimeMillis(), CartridgeAgentConfiguration.getInstance().getClusterId(), CartridgeAgentConfiguration.getInstance().getClusterInstanceId(), CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(), @@ -108,7 +110,7 @@ public void run() { if (log.isDebugEnabled()) { log.debug(String.format("Publishing load average: %f", stats.getProcessorUsage())); } - statsPublisher.publish( + statsPublisher.publish(System.currentTimeMillis(), CartridgeAgentConfiguration.getInstance().getClusterId(), CartridgeAgentConfiguration.getInstance().getClusterInstanceId(), CartridgeAgentConfiguration.getInstance().getNetworkPartitionId(), diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java index f04a11fb0f..419c7119cc 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java @@ -31,6 +31,7 @@ import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; import org.apache.stratos.cloud.controller.util.CloudControllerUtil; import org.apache.stratos.common.Property; +import org.apache.stratos.common.constants.StratosConstants; import org.apache.stratos.messaging.domain.application.ClusterDataHolder; import org.apache.stratos.messaging.domain.instance.ClusterInstance; import org.apache.stratos.messaging.domain.topology.*; @@ -67,7 +68,8 @@ public static void handleServiceCreated(List cartridgeList) { TopologyManager.acquireWriteLock(); for (Cartridge cartridge : cartridgeList) { if (!topology.serviceExists(cartridge.getType())) { - ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant : ServiceType.SingleTenant; + ServiceType serviceType = cartridge.isMultiTenant() ? ServiceType.MultiTenant : + ServiceType.SingleTenant; service = new Service(cartridge.getType(), serviceType); Properties properties = new Properties(); @@ -199,14 +201,14 @@ public static void handleApplicationClustersCreated(String appId, List } log.debug("Creating cluster port mappings: [appication-id] " + appId); - for(Cluster cluster : appClusters) { + for (Cluster cluster : appClusters) { String cartridgeType = cluster.getServiceName(); Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); - if(cartridge == null) { + if (cartridge == null) { throw new CloudControllerException("Cartridge not found: [cartridge-type] " + cartridgeType); } - for(PortMapping portMapping : cartridge.getPortMappings()) { + for (PortMapping portMapping : cartridge.getPortMappings()) { ClusterPortMapping clusterPortMapping = new ClusterPortMapping(appId, cluster.getClusterId(), portMapping.getName(), portMapping.getProtocol(), portMapping.getPort(), portMapping.getProxyPort()); @@ -406,6 +408,11 @@ public static void handleMemberCreatedEvent(MemberContext memberContext) { String partitionId = memberContext.getPartition().getId(); String lbClusterId = memberContext.getLbClusterId(); long initTime = memberContext.getInitTime(); + String autoscalingReason = memberContext.getProperties().getProperty( + StratosConstants.SCALING_REASON).getValue(); + long scalingTime = Long.parseLong(memberContext.getProperties().getProperty( + StratosConstants.SCALING_TIME).getValue()); + if (cluster.memberExists(memberId)) { log.warn(String.format("Member %s already exists", memberId)); @@ -421,6 +428,19 @@ public static void handleMemberCreatedEvent(MemberContext memberContext) { member.setProperties(CloudControllerUtil.toJavaUtilProperties(memberContext.getProperties())); cluster.addMember(member); TopologyManager.updateTopology(topology); + //member created time + Long timeStamp = System.currentTimeMillis(); + //publishing to BAM + BAMUsageDataPublisher + .publish(memberContext.getMemberId(), + memberContext.getPartition().getId(), + memberContext.getNetworkPartitionId(), + memberContext.getClusterId(), + memberContext.getClusterInstanceId(), + memberContext.getCartridgeType(), + MemberStatus.Created.toString(), + timeStamp, autoscalingReason, + scalingTime, null); } finally { TopologyManager.releaseWriteLock(); } @@ -479,16 +499,18 @@ public static void handleMemberInitializedEvent(MemberContext memberContext) { log.info("Member status updated to initialized"); TopologyManager.updateTopology(topology); - + //member intialized time + Long timeStamp = System.currentTimeMillis(); TopologyEventPublisher.sendMemberInitializedEvent(memberContext); //publishing data BAMUsageDataPublisher.publish(memberContext.getMemberId(), memberContext.getPartition().getId(), memberContext.getNetworkPartitionId(), + memberContext.getClusterInstanceId(), memberContext.getClusterId(), memberContext.getCartridgeType(), MemberStatus.Initialized.toString(), - null); + timeStamp, null, null, null); } } finally { TopologyManager.releaseWriteLock(); @@ -542,16 +564,19 @@ public static void handleMemberStarted(InstanceStartedEvent instanceStartedEvent log.info("member started event adding status started"); TopologyManager.updateTopology(topology); + //member started time + Long timeStamp = System.currentTimeMillis(); //memberStartedEvent. TopologyEventPublisher.sendMemberStartedEvent(instanceStartedEvent); //publishing data BAMUsageDataPublisher.publish(instanceStartedEvent.getMemberId(), instanceStartedEvent.getPartitionId(), instanceStartedEvent.getNetworkPartitionId(), + instanceStartedEvent.getClusterInstanceId(), instanceStartedEvent.getClusterId(), instanceStartedEvent.getServiceName(), MemberStatus.Starting.toString(), - null); + timeStamp, null, null, null); } } finally { TopologyManager.releaseWriteLock(); @@ -602,7 +627,8 @@ public static void handleMemberActivated(InstanceActivatedEvent instanceActivate TopologyManager.acquireWriteLock(); // try update lifecycle state if (!member.isStateTransitionValid(MemberStatus.Active)) { - log.error("Invalid state transition from [" + member.getStatus() + "] to [" + MemberStatus.Active + "]"); + log.error("Invalid state transition from [" + member.getStatus() + "] to [" + + MemberStatus.Active + "]"); return; } else { member.setStatus(MemberStatus.Active); @@ -644,7 +670,8 @@ public static void handleMemberActivated(InstanceActivatedEvent instanceActivate memberActivatedEvent.setDefaultPublicIP(member.getDefaultPublicIP()); memberActivatedEvent.setMemberPublicIPs(member.getMemberPublicIPs()); TopologyManager.updateTopology(topology); - + //member activated time + Long timeStamp = System.currentTimeMillis(); // Publish member activated event TopologyEventPublisher.sendMemberActivatedEvent(memberActivatedEvent); @@ -652,10 +679,11 @@ public static void handleMemberActivated(InstanceActivatedEvent instanceActivate BAMUsageDataPublisher.publish(memberActivatedEvent.getMemberId(), memberActivatedEvent.getPartitionId(), memberActivatedEvent.getNetworkPartitionId(), + memberActivatedEvent.getClusterInstanceId(), memberActivatedEvent.getClusterId(), memberActivatedEvent.getServiceName(), MemberStatus.Active.toString(), - null); + timeStamp, null, null, null); } } finally { TopologyManager.releaseWriteLock(); @@ -694,6 +722,8 @@ public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent inst instanceReadyToShutdownEvent.getMemberId(), instanceReadyToShutdownEvent.getNetworkPartitionId(), instanceReadyToShutdownEvent.getPartitionId()); + //member ReadyToShutDown state change time + Long timeStamp = null; try { TopologyManager.acquireWriteLock(); @@ -706,6 +736,7 @@ public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent inst log.info("Member Ready to shut down event adding status started"); TopologyManager.updateTopology(topology); + timeStamp = System.currentTimeMillis(); } finally { TopologyManager.releaseWriteLock(); } @@ -714,10 +745,11 @@ public static void handleMemberReadyToShutdown(InstanceReadyToShutdownEvent inst BAMUsageDataPublisher.publish(instanceReadyToShutdownEvent.getMemberId(), instanceReadyToShutdownEvent.getPartitionId(), instanceReadyToShutdownEvent.getNetworkPartitionId(), + instanceReadyToShutdownEvent.getClusterInstanceId(), instanceReadyToShutdownEvent.getClusterId(), instanceReadyToShutdownEvent.getServiceName(), MemberStatus.ReadyToShutDown.toString(), - null); + timeStamp, null, null, null); //termination of particular instance will be handled by autoscaler } @@ -834,7 +866,8 @@ public static void handleMemberSuspended() { } } - public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent clusterStatusClusterActivatedEvent) { + public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEvent + clusterStatusClusterActivatedEvent) { Topology topology = TopologyManager.getTopology(); Service service = topology.getService(clusterStatusClusterActivatedEvent.getServiceName()); @@ -888,7 +921,8 @@ public static void handleClusterActivatedEvent(ClusterStatusClusterActivatedEven } else { log.error(String.format("Cluster state transition is not valid: [cluster-id] %s " + " [instance-id] %s [current-status] %s [status-requested] %s", - clusterStatusClusterActivatedEvent.getClusterId(), clusterStatusClusterActivatedEvent.getInstanceId(), + clusterStatusClusterActivatedEvent.getClusterId(), + clusterStatusClusterActivatedEvent.getInstanceId(), context.getStatus(), status)); return; } @@ -997,8 +1031,8 @@ public static void handleClusterTerminatedEvent(ClusterStatusClusterTerminatedEv cluster.removeInstanceContext(event.getInstanceId()); TopologyManager.updateTopology(topology); //publishing data - ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent(event.getAppId(), - event.getServiceName(), event.getClusterId(), event.getInstanceId()); + ClusterInstanceTerminatedEvent clusterTerminatedEvent = new ClusterInstanceTerminatedEvent( + event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId()); TopologyEventPublisher.sendClusterTerminatedEvent(clusterTerminatedEvent); } else { @@ -1041,15 +1075,15 @@ public static void handleClusterTerminatingEvent(ClusterStatusClusterTerminating log.info("Cluster Terminating started for " + cluster.getClusterId()); TopologyManager.updateTopology(topology); //publishing data - ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent(event.getAppId(), - event.getServiceName(), event.getClusterId(), event.getInstanceId()); + ClusterInstanceTerminatingEvent clusterTerminaingEvent = new ClusterInstanceTerminatingEvent( + event.getAppId(), event.getServiceName(), event.getClusterId(), event.getInstanceId()); TopologyEventPublisher.sendClusterTerminatingEvent(clusterTerminaingEvent); // Remove kubernetes services if available ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(event.getClusterId()); - if(StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) { + if (StringUtils.isNotBlank(clusterContext.getKubernetesClusterId())) { KubernetesIaas.removeKubernetesServices(event.getAppId(), event.getClusterId()); } } else { diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java index 4d51cc1e80..2b19b052a0 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java @@ -447,13 +447,13 @@ public MemberContext startInstance(InstanceContext instanceContext) throws clusterContext.setVolumes(volumes); } - // Handle member created event - TopologyBuilder.handleMemberCreatedEvent(memberContext); - // Persist member context CloudControllerContext.getInstance().addMemberContext(memberContext); CloudControllerContext.getInstance().persist(); + // Handle member created event + TopologyBuilder.handleMemberCreatedEvent(memberContext); + // Start instance in a new thread if (log.isDebugEnabled()) { log.debug(String.format("Starting instance creator thread: [cluster] %s [cluster-instance] %s " + diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java index 37580eb92b..e7be3a668b 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceUtil.java @@ -65,18 +65,21 @@ public static void executeMemberTerminationPostProcess(MemberContext memberConte TopologyBuilder.handleMemberTerminated(memberContext.getCartridgeType(), memberContext.getClusterId(), memberContext.getNetworkPartitionId(), partitionId, memberContext.getMemberId()); - + //member terminated time + Long timeStamp = System.currentTimeMillis(); // Publish statistics to BAM BAMUsageDataPublisher.publish(memberContext.getMemberId(), partitionId, memberContext.getNetworkPartitionId(), + memberContext.getClusterInstanceId(), memberContext.getClusterId(), memberContext.getCartridgeType(), MemberStatus.Terminated.toString(), - null); + timeStamp, null, null, null); // Remove member context - CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(), memberContext.getMemberId()); + CloudControllerContext.getInstance().removeMemberContext(memberContext.getClusterId(), + memberContext.getMemberId()); // Persist cloud controller context CloudControllerContext.getInstance().persist(); @@ -87,7 +90,8 @@ public static boolean isValidIpAddress(String ip) { return isValid; } - public static IaasProvider validatePartitionAndGetIaasProvider(Partition partition, IaasProvider iaasProvider) throws InvalidPartitionException { + public static IaasProvider validatePartitionAndGetIaasProvider(Partition partition, IaasProvider iaasProvider) + throws InvalidPartitionException { if (iaasProvider != null) { // if this is a IaaS based partition Iaas iaas = iaasProvider.getIaas(); @@ -104,7 +108,8 @@ public static IaasProvider validatePartitionAndGetIaasProvider(Partition partiti } } - public static boolean validatePartition(Partition partition, IaasProvider iaasProvider) throws InvalidPartitionException { + public static boolean validatePartition(Partition partition, IaasProvider iaasProvider) + throws InvalidPartitionException { validatePartitionAndGetIaasProvider(partition, iaasProvider); return true; } diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java index 77cfea2c53..c0dbf57eb6 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/InstanceCreator.java @@ -27,8 +27,6 @@ import org.apache.stratos.cloud.controller.exception.CartridgeNotFoundException; import org.apache.stratos.cloud.controller.iaases.Iaas; import org.apache.stratos.cloud.controller.messaging.topology.TopologyBuilder; -import org.apache.stratos.cloud.controller.statistics.publisher.BAMUsageDataPublisher; -import org.apache.stratos.messaging.domain.topology.MemberStatus; import java.util.concurrent.locks.Lock; @@ -68,7 +66,8 @@ public void run() { memberContext = startInstance(iaas, memberContext, payload); if (log.isInfoEnabled()) { - log.info(String.format("Instance started successfully: [cartridge-type] %s [cluster-id] %s [instance-id] %s " + + log.info(String.format("Instance started successfully: [cartridge-type] %s [cluster-id] %s " + + "[instance-id] %s " + "[default-private-ip] %s [default-public-ip] %s", memberContext.getCartridgeType(), memberContext.getClusterId(), memberContext.getInstanceId(), memberContext.getDefaultPrivateIP(), @@ -84,16 +83,6 @@ public void run() { // Update topology TopologyBuilder.handleMemberInitializedEvent(memberContext); - - // Publish instance creation statistics to BAM - BAMUsageDataPublisher.publish( - memberContext.getMemberId(), - memberContext.getPartition().getId(), - memberContext.getNetworkPartitionId(), - memberContext.getClusterId(), - memberContext.getCartridgeType(), - MemberStatus.Initialized.toString(), - memberContext.getInstanceMetadata()); } catch (Exception e) { String message = String.format("Could not start instance: [cartridge-type] %s [cluster-id] %s", memberContext.getCartridgeType(), memberContext.getClusterId()); @@ -105,7 +94,8 @@ public void run() { } } - private MemberContext startInstance(Iaas iaas, MemberContext memberContext, byte[] payload) throws CartridgeNotFoundException { + private MemberContext startInstance(Iaas iaas, MemberContext memberContext, byte[] payload) throws + CartridgeNotFoundException { memberContext = iaas.startInstance(memberContext, payload); // Validate instance id diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java index d5aabbdae4..690bc5934d 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/statistics/publisher/BAMUsageDataPublisher.java @@ -52,12 +52,31 @@ public class BAMUsageDataPublisher { private static StreamDefinition streamDefinition; private static final String cloudControllerEventStreamVersion = "1.0.0"; + /** + * Publish events to BAM + * + * @param memberId member id + * @param partitionId partition id + * @param networkId network partition id + * @param clusterId cluster id + * @param clusterInstanceId cluster instance id + * @param serviceName service name + * @param status member status + * @param timeStamp time + * @param autoscalingReason scaling reason related to member + * @param scalingTime scaling time + * @param metadata meta-data + */ public static void publish(String memberId, String partitionId, String networkId, String clusterId, + String clusterInstanceId, String serviceName, String status, + Long timeStamp, + String autoscalingReason, + Long scalingTime, InstanceMetadata metadata) { if (!CloudControllerConfig.getInstance().isBAMDataPublisherEnabled()) { return; @@ -79,16 +98,23 @@ public static void publish(String memberId, MemberContext memberContext = CloudControllerContext.getInstance().getMemberContextOfMemberId(memberId); String cartridgeType = memberContext.getCartridgeType(); Cartridge cartridge = CloudControllerContext.getInstance().getCartridge(cartridgeType); + String instanceType = CloudControllerContext.getInstance().getIaasProviderOfPartition(cartridgeType, + partitionId).getProperty(CloudControllerConstants.INSTANCE_TYPE); //Construct the data to be published List payload = new ArrayList(); // Payload values + payload.add(timeStamp); payload.add(memberId); payload.add(serviceName); payload.add(clusterId); + payload.add(clusterInstanceId); payload.add(handleNull(memberContext.getLbClusterId())); payload.add(handleNull(partitionId)); payload.add(handleNull(networkId)); + payload.add(handleNull(instanceType)); + payload.add(handleNull(autoscalingReason)); + payload.add(handleNull(scalingTime)); if (cartridge != null) { payload.add(handleNull(String.valueOf(cartridge.isMultiTenant()))); } else { @@ -129,12 +155,14 @@ public static void publish(String memberId, try { if (log.isDebugEnabled()) { - log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion())); + log.debug(String.format("Publishing BAM event: [stream] %s [version] %s", streamDefinition.getName(), + streamDefinition.getVersion())); } dataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event); } catch (AgentException e) { if (log.isErrorEnabled()) { - log.error(String.format("Could not publish BAM event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e); + log.error(String.format("Could not publish BAM event: [stream] %s [version] %s", + streamDefinition.getName(), streamDefinition.getVersion()), e); } } } @@ -151,12 +179,17 @@ private static StreamDefinition initializeStream() throws Exception { streamDefinition.setDescription("Instances booted up by the Cloud Controller"); // Payload definition List payloadData = new ArrayList(); + payloadData.add(new Attribute(CloudControllerConstants.TIME_STAMP, AttributeType.LONG)); payloadData.add(new Attribute(CloudControllerConstants.MEMBER_ID_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.CARTRIDGE_TYPE_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.CLUSTER_INSTANCE_ID_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.LB_CLUSTER_ID_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.PARTITION_ID_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.NETWORK_ID_COL, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.INSTANCE_TYPE, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.SCALING_REASON, AttributeType.STRING)); + payloadData.add(new Attribute(CloudControllerConstants.SCALING_TIME, AttributeType.LONG)); payloadData.add(new Attribute(CloudControllerConstants.IS_MULTI_TENANT_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.IAAS_COL, AttributeType.STRING)); payloadData.add(new Attribute(CloudControllerConstants.STATUS_COL, AttributeType.STRING)); @@ -210,4 +243,11 @@ private static String handleNull(String val) { } return val; } + + private static Long handleNull(Long val) { + if (val == null) { + return -1L; + } + return val; + } } diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java index 5e6115f604..2cb0c31181 100644 --- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java +++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/util/CloudControllerConstants.java @@ -103,6 +103,7 @@ public final class CloudControllerConstants { public static final String MEMBER_ID_COL = "memberId"; public static final String CARTRIDGE_TYPE_COL = "cartridgeType"; public static final String CLUSTER_ID_COL = "clusterId"; + public static final String CLUSTER_INSTANCE_ID_COL = "clusterInstanceId"; public static final String PARTITION_ID_COL = "partitionId"; public static final String NETWORK_ID_COL = "networkId"; public static final String ALIAS_COL = "alias"; @@ -122,6 +123,9 @@ public final class CloudControllerConstants { public static final String PRIV_IP_COL = "privateIPAddresses"; public static final String PUB_IP_COL = "publicIPAddresses"; public static final String ALLOCATE_IP_COL = "allocateIPAddresses"; + public static final String TIME_STAMP = "timeStamp"; + public static final String SCALING_REASON = "scalingReason"; + public static final String SCALING_TIME = "scalingTime"; /** * Properties diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java index 1275f5c45c..af46cfe687 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/constants/StratosConstants.java @@ -91,7 +91,8 @@ public class StratosConstants { // metering constants public static final String THROTTLING_ALL_ACTION = "all_actions"; - public static final String THROTTLING_IN_DATA_ACTION = "in_data_action"; //this covers registry capacity + registry bandwidth + public static final String THROTTLING_IN_DATA_ACTION = + "in_data_action"; //this covers registry capacity + registry bandwidth public static final String THROTTLING_OUT_DATA_ACTION = "out_data_action"; //this covers registry bandwidth public static final String THROTTLING_ADD_USER_ACTION = "add_user_action"; public static final String THROTTLING_SERVICE_IN_BANDWIDTH_ACTION = "service_in_bandwith_action"; @@ -158,6 +159,8 @@ public class StratosConstants { public static final String MAX_CHECK_DROOL_FILE = "maxcheck.drl"; public static final String OBSOLETE_CHECK_DROOL_FILE = "obsoletecheck.drl"; public static final String MIN_COUNT = "MIN_COUNT"; + public static final String SCALING_REASON = "SCALING_REASON"; + public static final String SCALING_TIME = "SCALING_TIME"; // Policy and definition related constants public static final int PUBLIC_DEFINITION = 0; @@ -165,7 +168,8 @@ public class StratosConstants { // member expiry timeout constants public static final String PENDING_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.pendingMemberExpiryTimeout"; public static final String OBSOLETED_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.obsoletedMemberExpiryTimeout"; - public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT = "autoscaler.member.pendingTerminationMemberExpiryTimeout"; + public static final String PENDING_TERMINATION_MEMBER_EXPIRY_TIMEOUT = + "autoscaler.member.pendingTerminationMemberExpiryTimeout"; public static final String FILTER_VALUE_SEPARATOR = ","; public static final String TOPOLOGY_APPLICATION_FILTER = "stratos.topology.application.filter"; diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java index dd7ddd458b..95b04ff2b8 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/HealthStatisticsPublisher.java @@ -27,6 +27,7 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher { /** * Publish health statistics to complex event processor. * + * @param timeStamp time * @param clusterId Cluster id of the member * @param clusterInstanceId Cluster instance id of the member * @param networkPartitionId Network partition id of the member @@ -35,6 +36,6 @@ public interface HealthStatisticsPublisher extends StatisticsPublisher { * @param health Health type: memory_consumption | load_average * @param value Health type value */ - void publish(String clusterId, String clusterInstanceId, String networkPartitionId, + void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId, String memberId, String partitionId, String health, double value); } diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java index 289be8bc7c..af9c8e904c 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/InFlightRequestPublisher.java @@ -27,10 +27,12 @@ public interface InFlightRequestPublisher extends StatisticsPublisher { /** * Publish in-flight request count. * + * @param timeStamp time * @param clusterId Cluster id * @param clusterInstanceId Cluster instance id * @param networkPartitionId Network partition id of the cluster * @param inFlightRequestCount In-flight request count of the cluster */ - void publish(String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount); + void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId, + int inFlightRequestCount); } diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java index 1dc42409a0..d5c9265efe 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPHealthStatisticsPublisher.java @@ -52,6 +52,7 @@ private static StreamDefinition createStreamDefinition() { // Set payload definition List payloadData = new ArrayList(); + payloadData.add(new Attribute("time_stamp", AttributeType.LONG)); payloadData.add(new Attribute("cluster_id", AttributeType.STRING)); payloadData.add(new Attribute("cluster_instance_id", AttributeType.STRING)); payloadData.add(new Attribute("network_partition_id", AttributeType.STRING)); @@ -70,6 +71,7 @@ private static StreamDefinition createStreamDefinition() { /** * Publish health statistics to cep. * + * @param timeStamp * @param clusterId * @param clusterInstanceId * @param networkPartitionId @@ -79,13 +81,16 @@ private static StreamDefinition createStreamDefinition() { * @param value */ @Override - public void publish(String clusterId, String clusterInstanceId, String networkPartitionId, String memberId, String partitionId, String health, double value) { + public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId, + String memberId, String partitionId, String health, double value) { if (log.isDebugEnabled()) { - log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s [partition] %s [member] %s [health] %s [value] %f", + log.debug(String.format("Publishing health statistics: [cluster] %s [network-partition] %s " + + "[partition] %s [member] %s [health] %s [value] %f", clusterId, networkPartitionId, partitionId, memberId, health, value)); } // Set payload values List payload = new ArrayList(); + payload.add(timeStamp); payload.add(clusterId); payload.add(clusterInstanceId); payload.add(networkPartitionId); diff --git a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java index 2ed888381a..f51eb91038 100644 --- a/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java +++ b/components/org.apache.stratos.common/src/main/java/org/apache/stratos/common/statistics/publisher/wso2/cep/WSO2CEPInFlightRequestPublisher.java @@ -51,6 +51,7 @@ private static StreamDefinition createStreamDefinition() { List payloadData = new ArrayList(); // Set payload definition + payloadData.add(new Attribute("time_stamp", AttributeType.LONG)); payloadData.add(new Attribute("cluster_id", AttributeType.STRING)); payloadData.add(new Attribute("cluster_instance_id", AttributeType.STRING)); payloadData.add(new Attribute("network_partition_id", AttributeType.STRING)); @@ -65,15 +66,18 @@ private static StreamDefinition createStreamDefinition() { /** * Publish in-flight request count of a cluster. * + * @param timeStamp * @param clusterId * @param clusterInstanceId * @param networkPartitionId * @param inFlightRequestCount */ @Override - public void publish(String clusterId, String clusterInstanceId, String networkPartitionId, int inFlightRequestCount) { + public void publish(Long timeStamp, String clusterId, String clusterInstanceId, String networkPartitionId, + int inFlightRequestCount) { // Set payload values List payload = new ArrayList(); + payload.add(timeStamp); payload.add(clusterId); payload.add(clusterInstanceId); payload.add(networkPartitionId); diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java index dc2233d915..1dd12c7ec3 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java @@ -81,7 +81,8 @@ public void run() { for (Cluster cluster : service.getClusters()) { // Publish in-flight request count of load balancer's network partition int requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId()); - inFlightRequestPublisher.publish(cluster.getClusterId(), clusterInstanceId, + inFlightRequestPublisher.publish(System.currentTimeMillis(), cluster.getClusterId(), + clusterInstanceId, networkPartitionId, requestCount); if (log.isDebugEnabled()) { diff --git a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java index c2d1c6c03a..0dc5e67e55 100644 --- a/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java +++ b/components/org.apache.stratos.mock.iaas/src/main/java/org/apache/stratos/mock/iaas/statistics/publisher/MockHealthStatisticsNotifier.java @@ -69,6 +69,7 @@ public void run() { mockMemberContext.getMemberId(), memoryConsumption)); } healthStatisticsPublisher.publish( + System.currentTimeMillis(), mockMemberContext.getClusterId(), mockMemberContext.getClusterInstanceId(), mockMemberContext.getNetworkPartitionId(), @@ -93,6 +94,7 @@ public void run() { mockMemberContext.getMemberId(), loadAvereage)); } healthStatisticsPublisher.publish( + System.currentTimeMillis(), mockMemberContext.getClusterId(), mockMemberContext.getClusterInstanceId(), mockMemberContext.getNetworkPartitionId(), @@ -116,6 +118,7 @@ public void run() { mockMemberContext.getMemberId(), requestsInFlight)); } inFlightRequestPublisher.publish( + System.currentTimeMillis(), mockMemberContext.getClusterId(), mockMemberContext.getClusterInstanceId(), mockMemberContext.getNetworkPartitionId(), diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py index 9753c3e429..aae9e9d600 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py @@ -17,7 +17,7 @@ from threading import Thread import multiprocessing - +import time import psutil from abstracthealthstatisticspublisher import * @@ -124,6 +124,7 @@ def create_stream_definition(): stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION # stream_def.add_payloaddata_attribute() + stream_def.add_payloaddata_attribute("time_stamp", StreamDefinition.LONG) stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING) stream_def.add_payloaddata_attribute("cluster_instance_id", StreamDefinition.STRING) stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING) @@ -141,6 +142,7 @@ def publish_memory_usage(self, memory_usage): """ event = ThriftEvent() + event.payloadData.append(int(round(time.time() * 1000))) event.payloadData.append(self.cartridge_agent_config.cluster_id) event.payloadData.append(self.cartridge_agent_config.cluster_instance_id) event.payloadData.append(self.cartridge_agent_config.network_partition_id) @@ -159,6 +161,7 @@ def publish_load_average(self, load_avg): """ event = ThriftEvent() + event.payloadData.append(int(round(time.time() * 1000))) event.payloadData.append(self.cartridge_agent_config.cluster_id) event.payloadData.append(self.cartridge_agent_config.cluster_instance_id) event.payloadData.append(self.cartridge_agent_config.network_partition_id) diff --git a/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml b/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml new file mode 100644 index 0000000000..bcef15ff0a --- /dev/null +++ b/extensions/cep/artifacts/eventformatters/HealthStatsEventFormatter.xml @@ -0,0 +1,30 @@ + + + + + + + cartridge_agent_health_stats + 1.0.0 + + diff --git a/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml b/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml new file mode 100644 index 0000000000..3cfd4a966a --- /dev/null +++ b/extensions/cep/artifacts/eventformatters/RIFEventFormatter.xml @@ -0,0 +1,31 @@ + + + + + + + in_flight_requests + 1.0.0 + + + diff --git a/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml b/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml new file mode 100755 index 0000000000..5cec300c3d --- /dev/null +++ b/extensions/cep/artifacts/outputeventadaptors/DASDefaultWSO2EventOutputAdaptor.xml @@ -0,0 +1,29 @@ + + + + admin + tcp://localhost:7612 + admin + ssl://localhost:7712 + diff --git a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml index 4c4c7e0c9c..a256770434 100644 --- a/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml +++ b/extensions/cep/artifacts/streamdefinitions/stream-manager-config.xml @@ -23,287 +23,289 @@ - in-flight request count - in-flight requests - - - - - - - - - - - + in-flight request count + in-flight requests + + + + + + + + + + + + - gradient of in flight request count - gradient in flight requests - - - - - - - - - - - + gradient of in flight request count + gradient in flight requests + + + + + + + + + + + - average of in-flight request count - average in-flight requests - - - - - - - - - - - + average of in-flight request count + average in-flight requests + + + + + + + + + + + - second derivative of in-flight request count - second derivative in-flight requests - - - - - - - - - - + second derivative of in-flight request count + second derivative in-flight requests + + + + + + + + + + - agent health stats - agent health stats - - - - - - - - - - - - - - + agent health stats + agent health stats + + + + + + + + + + + + + + + - average load average stats - average load average stats - - - - - - - - - - - + average load average stats + average load average stats + + + + + + + + + + + - average memory consumption stats - average memory consumption stats - - - - - - - - - - - + average memory consumption stats + average memory consumption stats + + + + + + + + + + + - gradient load average stats - gradient load average stats - - - - - - - - - - - + gradient load average stats + gradient load average stats + + + + + + + + + + + - gradient memoryconsumption stats - gradient memoryconsumption stats - - - - - - - - - - - + gradient memoryconsumption stats + gradient memoryconsumption stats + + + + + + + + + + + - second derivative memory consumption stats - second derivative memory consumption stats - - - - - - - - - - - + second derivative memory consumption stats + second derivative memory consumption stats + + + + + + + + + + + - second derivative load average stats - second derivative load average stats - - - - - - - - - - - + second derivative load average stats + second derivative load average stats + + + + + + + + + + + - fault message - fault message - - - - - - - - - - - + fault message + fault message + + + + + + + + + + + - average load average stats - average load average stats - - - - - - - - - - - - + average load average stats + average load average stats + + + + + + + + + + + + - average memory consumption stats - average memory consumption stats - - - - - - - - - - - - + average memory consumption stats + average memory consumption stats + + + + + + + + + + + + - gradient load average stats - gradient load average stats - - - - - - - - - - - - + gradient load average stats + gradient load average stats + + + + + + + + + + + + - gradient memoryconsumption stats - gradient memoryconsumption stats - - - - - - - - - - - - + gradient memoryconsumption stats + gradient memoryconsumption stats + + + + + + + + + + + + - second derivative memory consumption stats - second derivative memory consumption stats - - - - - - - - - - - - + second derivative memory consumption stats + second derivative memory consumption stats + + + + + + + + + + + + - second derivative load average stats - second derivative load average stats - - - - - - - - - - - - + second derivative load average stats + second derivative load average stats + + + + + + + + + + + + diff --git a/extensions/das/README.md b/extensions/das/README.md new file mode 100644 index 0000000000..00be2970ef --- /dev/null +++ b/extensions/das/README.md @@ -0,0 +1,10 @@ +# Apache Stratos DAS Extensions + +Apache Stratos Data Analytics Server (DAS) extensions include DAS artifacts and spark udf to run spark script. +These extensions need to be deployed manually when running DAS externally. + +Please refer below link for more information on WSO2 DAS. +https://docs.wso2.com/display/DAS300/WSO2+Data+Analytics+Server+Documentation + +Thank you for using Apache Stratos! +The Stratos Team \ No newline at end of file diff --git a/extensions/das/artifacts/eventreceivers/CloudControllerEventReceiver.xml b/extensions/das/artifacts/eventreceivers/CloudControllerEventReceiver.xml new file mode 100644 index 0000000000..05789db2ed --- /dev/null +++ b/extensions/das/artifacts/eventreceivers/CloudControllerEventReceiver.xml @@ -0,0 +1,29 @@ + + + + + false + + + + diff --git a/extensions/das/artifacts/eventreceivers/HealthStatsEventReceiver.xml b/extensions/das/artifacts/eventreceivers/HealthStatsEventReceiver.xml new file mode 100644 index 0000000000..7e0a5ce1f7 --- /dev/null +++ b/extensions/das/artifacts/eventreceivers/HealthStatsEventReceiver.xml @@ -0,0 +1,29 @@ + + + + + false + + + + diff --git a/extensions/das/artifacts/eventreceivers/RIFEventReceiver.xml b/extensions/das/artifacts/eventreceivers/RIFEventReceiver.xml new file mode 100644 index 0000000000..b11c016995 --- /dev/null +++ b/extensions/das/artifacts/eventreceivers/RIFEventReceiver.xml @@ -0,0 +1,29 @@ + + + + + false + + + + diff --git a/extensions/das/artifacts/eventsink/cartridge_agent_health_stats.xml b/extensions/das/artifacts/eventsink/cartridge_agent_health_stats.xml new file mode 100644 index 0000000000..b870bc2963 --- /dev/null +++ b/extensions/das/artifacts/eventsink/cartridge_agent_health_stats.xml @@ -0,0 +1,85 @@ + + + + + + time_stamp + false + false + false + LONG + + + cluster_id + false + false + false + STRING + + + cluster_instance_id + false + false + false + STRING + + + network_partition_id + false + false + false + STRING + + + member_id + false + false + false + STRING + + + partition_id + false + false + false + STRING + + + health_description + false + false + false + STRING + + + value + false + false + false + DOUBLE + + + + cartridge_agent_health_stats:1.0.0 + + EVENT_STORE + \ No newline at end of file diff --git a/extensions/das/artifacts/eventsink/in_flight_requests.xml b/extensions/das/artifacts/eventsink/in_flight_requests.xml new file mode 100644 index 0000000000..d4ca48b58d --- /dev/null +++ b/extensions/das/artifacts/eventsink/in_flight_requests.xml @@ -0,0 +1,64 @@ + + + + + + time_stamp + false + false + false + LONG + + + cluster_id + false + false + false + STRING + + + cluster_instance_id + false + false + false + STRING + + + network_partition_id + false + false + false + STRING + + + in_flight_request_count + false + false + false + DOUBLE + + + + in_flight_requests:1.0.0 + + EVENT_STORE + \ No newline at end of file diff --git a/extensions/das/artifacts/eventsink/org_apache_stratos_cloud_controller.xml b/extensions/das/artifacts/eventsink/org_apache_stratos_cloud_controller.xml new file mode 100644 index 0000000000..f0dae099bc --- /dev/null +++ b/extensions/das/artifacts/eventsink/org_apache_stratos_cloud_controller.xml @@ -0,0 +1,211 @@ + + + + + + timeStamp + false + false + false + LONG + + + memberId + false + false + false + STRING + + + cartridgeType + false + false + false + STRING + + + clusterId + false + false + false + STRING + + + clusterInstanceId + false + false + false + STRING + + + lbclusterId + false + false + false + STRING + + + partitionId + false + false + false + STRING + + + networkId + false + false + false + STRING + + + instanceType + false + false + false + STRING + + + scalingReason + false + false + false + STRING + + + scalingTime + false + false + false + LONG + + + isMultiTenant + false + false + false + STRING + + + iaas + false + false + false + STRING + + + status + false + false + false + STRING + + + hostName + false + false + false + STRING + + + hypervisor + false + false + false + STRING + + + ram + false + false + false + STRING + + + imageId + false + false + false + STRING + + + loginPort + false + false + false + INTEGER + + + osName + false + false + false + STRING + + + osVersion + false + false + false + STRING + + + osArch + false + false + false + STRING + + + is64bitOS + false + false + false + STRING + + + privateIPAddresses + false + false + false + STRING + + + publicIPAddresses + false + false + false + STRING + + + allocateIPAddresses + false + false + false + STRING + + + + org.apache.stratos.cloud.controller:1.0.0 + + EVENT_STORE + \ No newline at end of file diff --git a/extensions/das/artifacts/eventstreams/cartridge_agent_health_stats_1.0.0.json b/extensions/das/artifacts/eventstreams/cartridge_agent_health_stats_1.0.0.json new file mode 100644 index 0000000000..ec61229e44 --- /dev/null +++ b/extensions/das/artifacts/eventstreams/cartridge_agent_health_stats_1.0.0.json @@ -0,0 +1,40 @@ +{ + "name": "cartridge_agent_health_stats", + "version": "1.0.0", + "nickName": "", + "description": "", + "payloadData": [ + { + "name": "time_stamp", + "type": "LONG" + }, + { + "name": "cluster_id", + "type": "STRING" + }, + { + "name": "cluster_instance_id", + "type": "STRING" + }, + { + "name": "network_partition_id", + "type": "STRING" + }, + { + "name": "member_id", + "type": "STRING" + }, + { + "name": "partition_id", + "type": "STRING" + }, + { + "name": "health_description", + "type": "STRING" + }, + { + "name": "value", + "type": "DOUBLE" + } + ] +} \ No newline at end of file diff --git a/extensions/das/artifacts/eventstreams/in_flight_requests_1.0.0.json b/extensions/das/artifacts/eventstreams/in_flight_requests_1.0.0.json new file mode 100644 index 0000000000..8c5232a9ea --- /dev/null +++ b/extensions/das/artifacts/eventstreams/in_flight_requests_1.0.0.json @@ -0,0 +1,28 @@ +{ + "name": "in_flight_requests", + "version": "1.0.0", + "nickName": "", + "description": "", + "payloadData": [ + { + "name": "time_stamp", + "type": "LONG" + }, + { + "name": "cluster_id", + "type": "STRING" + }, + { + "name": "cluster_instance_id", + "type": "STRING" + }, + { + "name": "network_partition_id", + "type": "STRING" + }, + { + "name": "in_flight_request_count", + "type": "DOUBLE" + } + ] +} \ No newline at end of file diff --git a/extensions/das/artifacts/eventstreams/org.apache.stratos.cloud.controller_1.0.0.json b/extensions/das/artifacts/eventstreams/org.apache.stratos.cloud.controller_1.0.0.json new file mode 100644 index 0000000000..de1025f3d1 --- /dev/null +++ b/extensions/das/artifacts/eventstreams/org.apache.stratos.cloud.controller_1.0.0.json @@ -0,0 +1,112 @@ +{ + "name": "org.apache.stratos.cloud.controller", + "version": "1.0.0", + "nickName": "cloud.controller", + "description": "Instances booted up by the Cloud Controller", + "payloadData": [ + { + "name": "timeStamp", + "type": "LONG" + }, + { + "name": "memberId", + "type": "STRING" + }, + { + "name": "cartridgeType", + "type": "STRING" + }, + { + "name": "clusterId", + "type": "STRING" + }, + { + "name": "clusterInstanceId", + "type": "STRING" + }, + { + "name": "lbclusterId", + "type": "STRING" + }, + { + "name": "partitionId", + "type": "STRING" + }, + { + "name": "networkId", + "type": "STRING" + }, + { + "name": "instanceType", + "type": "STRING" + }, + { + "name": "scalingReason", + "type": "STRING" + }, + { + "name": "scalingTime", + "type": "LONG" + }, + { + "name": "isMultiTenant", + "type": "STRING" + }, + { + "name": "iaas", + "type": "STRING" + }, + { + "name": "status", + "type": "STRING" + }, + { + "name": "hostName", + "type": "STRING" + }, + { + "name": "hypervisor", + "type": "STRING" + }, + { + "name": "ram", + "type": "STRING" + }, + { + "name": "imageId", + "type": "STRING" + }, + { + "name": "loginPort", + "type": "INT" + }, + { + "name": "osName", + "type": "STRING" + }, + { + "name": "osVersion", + "type": "STRING" + }, + { + "name": "osArch", + "type": "STRING" + }, + { + "name": "is64bitOS", + "type": "STRING" + }, + { + "name": "privateIPAddresses", + "type": "STRING" + }, + { + "name": "publicIPAddresses", + "type": "STRING" + }, + { + "name": "allocateIPAddresses", + "type": "STRING" + } + ] +} \ No newline at end of file diff --git a/extensions/das/artifacts/sparkscript/CCEvent b/extensions/das/artifacts/sparkscript/CCEvent new file mode 100644 index 0000000000..b8bb6fd2bd --- /dev/null +++ b/extensions/das/artifacts/sparkscript/CCEvent @@ -0,0 +1,18 @@ +CREATE TEMPORARY TABLE memberstatus +USING CarbonAnalytics +OPTIONS (tableName "ORG_APACHE_STRATOS_CLOUD_CONTROLLER"); + +CREATE TEMPORARY TABLE memberstatusnew +USING CarbonAnalytics +OPTIONS (tableName "CLUSTER_MEMBER_NEW", + schema "startTime String, endTime String, clusterId STRING, activatedInstanceCount INT, terminatedInstanceCount INT, activeInstanceCount INT"); + +;WITH InstanceCount as +(select clusterId, count(case when status='Active' and timeStamp > current_time(null)-60000 and timeStamp <= current_time(null) then 1 else NULL end) as activatedInstanceCount, count(case when status='Terminated' and timeStamp > current_time(null)-60000 and timeStamp <= current_time(null) then 1 else NULL end) as terminatedInstanceCount, (sum(case when status='Active' then 1 else 0 end) - sum(case when status='Terminated' then 1 else 0 end))as activeInstanceCount from memberstatus group by clusterId) +INSERT INTO table memberstatusnew select time(current_time(null)-60000),time(current_time(null)),clusterId, activatedInstanceCount, terminatedInstanceCount,activeInstanceCount from InstanceCount; + +CREATE TEMPORARY TABLE membersnew +USING CarbonAnalytics +OPTIONS (tableName "MEMBER_NEW",schema "clusterId STRING, clusterInstanceId STRING, networkId STRING, partitionId STRING, cartridgeType STRING, instanceType STRING, memberId STRING, scalingTime LONG,scalingReason STRING, timeStamp STRING"); + +INSERT INTO TABLE membersnew select clusterId,clusterInstanceId,networkId,partitionId,cartridgeType,instanceType, memberId,scalingTime,scalingReason,time(timeStamp)as timeStamp FROM memberstatus where status='Created'; \ No newline at end of file diff --git a/extensions/das/pom.xml b/extensions/das/pom.xml new file mode 100644 index 0000000000..d21d1be294 --- /dev/null +++ b/extensions/das/pom.xml @@ -0,0 +1,40 @@ + + + + + stratos-extensions + org.apache.stratos + 4.1.1-SNAPSHOT + + 4.0.0 + + org.apache.stratos + strats-das-extension + pom + Apache Stratos - DAS Extension + Apache Stratos extensions for DAS. + + spark-udf + + + + \ No newline at end of file diff --git a/extensions/das/spark-udf/pom.xml b/extensions/das/spark-udf/pom.xml new file mode 100644 index 0000000000..ced0f0a53e --- /dev/null +++ b/extensions/das/spark-udf/pom.xml @@ -0,0 +1,36 @@ + + + + + strats-das-extension + org.apache.stratos + 4.1.1-SNAPSHOT + + 4.0.0 + + org.apache.stratos + apache-stratos-spark-udf + Apache Stratos - Spark UDF + Apache Stratos Spark UDF for DAS + + + \ No newline at end of file diff --git a/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java b/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java new file mode 100644 index 0000000000..0b8f408fcc --- /dev/null +++ b/extensions/das/spark-udf/src/main/java/org/apache/stratos/das/extension/spark/udf/TimeUDF.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.stratos.das.extension.spark.udf; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Implementing UDF for implementing spark sql query related to time. + */ +public class TimeUDF { + /** + * Convert time(ms) to DateFormat + * + * @param timeStamp time in ms + * @return date as String + */ + public String time(Long timeStamp) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); + Date date = new Date(timeStamp.longValue()); + return sdf.format(date); + } + + /** + * Get the current time in ms + * + * @param param + * @return + */ + public long current_time(Integer param) { + return System.currentTimeMillis(); + } +} \ No newline at end of file diff --git a/extensions/pom.xml b/extensions/pom.xml index c57bc8a410..07c2733965 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -17,7 +17,8 @@ ~ specific language governing permissions and limitations ~ under the License. --> - + org.apache.stratos @@ -36,6 +37,7 @@ cep/stratos-cep-extension cep/distribution/ load-balancer + das diff --git a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl index 56e9164796..a8102da2b4 100644 --- a/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl +++ b/products/stratos/modules/distribution/src/main/conf/drools/dependent-scaling.drl @@ -84,7 +84,9 @@ dialect "mvel" log.info("[dependency-scale] [scale-up] Partition available, hence trying to spawn an instance to scale up!" ); log.debug("[dependency-scale] [scale-up] " + " [partition] " + partitionContext.getPartitionId() + " [cluster] " + clusterId ); - delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary); + long scalingTime = System.currentTimeMillis(); + String scalingReason = "Dependency scaling"; + delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary,scalingReason,scalingTime); count++; } else { partitionsAvailable = false; diff --git a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl index 96b60da2ef..4eaab2bc3f 100755 --- a/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl +++ b/products/stratos/modules/distribution/src/main/conf/drools/mincheck.drl @@ -84,7 +84,10 @@ dialect "mvel" log.info("[min-check] Partition available, hence trying to spawn an instance to fulfil minimum count!" + " [cluster] " + clusterId); log.debug("[min-check] " + " [partition] " + partitionContext.getPartitionId() + " [cluster] " + clusterId); - delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary); + long scalingTime = System.currentTimeMillis(); + String scalingReason = "Scaling up to fulfil minimum count, [Cluster Min Members] "+clusterInstanceContext.getMinInstanceCount()+" [Additional instances to be created] " + additionalInstances; + delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary,scalingReason,scalingTime); + count++; } else { diff --git a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl index e6f8f67f74..3b4a91601d 100644 --- a/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl +++ b/products/stratos/modules/distribution/src/main/conf/drools/scaling.drl @@ -164,6 +164,10 @@ dialect "mvel" boolean partitionsAvailable = true; int count = 0; + String autoscalingReason = (numberOfRequiredInstances == numberOfInstancesReuquiredBasedOnRif)?"Scaling up due to RIF, [Predicted Value] "+rifPredictedValue+" [Threshold] "+rifThreshold:(numberOfRequiredInstances== numberOfInstancesReuquiredBasedOnMemoryConsumption)?"Scaling up due to MC, [Predicted Value] "+mcPredictedValue+" [Threshold] "+mcThreshold:"Scaling up due to LA, [Predicted Value] "+laPredictedValue+" [Threshold] "+laThreshold; + autoscalingReason += " [Number of required instances] "+numberOfRequiredInstances+" [Cluster Max Members] "+clusterMaxMembers+" [Additional instances to be created] " + additionalInstances; + + while(count != additionalInstances && partitionsAvailable){ ClusterLevelPartitionContext partitionContext = (ClusterLevelPartitionContext) partitionAlgorithm.getNextScaleUpPartitionContext(clusterInstanceContext.getPartitionCtxtsAsAnArray()); @@ -182,7 +186,8 @@ dialect "mvel" " [laPredictedValue] " + laPredictedValue + " [laThreshold] " + laThreshold); log.debug("[scale-up] " + " [partition] " + partitionContext.getPartitionId() + " [cluster] " + clusterId ); - delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary); + long scalingTime = System.currentTimeMillis(); + delegator.delegateSpawn(partitionContext, clusterId, clusterInstanceContext.getId(), isPrimary,autoscalingReason,scalingTime); count++; } else {