Skip to content

Commit

Permalink
[JFG-1118] Exclusive access load balancing in multi kernel mode (#830)
Browse files Browse the repository at this point in the history
* [JFG-1115] added ExclusiveAccessRandomLoadBalancer load balancer

* [JFG-1115] Added ExclusiveAccessCircularLoadBalancer.java

* Bug fix

* Impoved logs

* [JFG-1117] added ExclusiveAccessOneIterationLoadBalancer
	Master now is aware of empty transactions
	(when load balancer did't return anything to handle)
	and initiates termination scenario when every node has empty transactions

* [JFG-1117] Refactor structure of exclusive access load balancers

* [JFG-1118] exclusive access load balancing in multi kernel mode

* [JFG-1118] fixed serialization issue.

* [JFG-1118] fixes after review

* [JFG-1118] More fixes after review

* Update environment.properties
  • Loading branch information
a-badaev authored and dlatnikov committed Feb 14, 2017
1 parent e6e9324 commit 7007607
Show file tree
Hide file tree
Showing 35 changed files with 689 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ chassis.monitoring.command.execution.ttl=30000
#chassis.monitoring.executor.pool.core=2
#chassis.monitoring.executor.pool.max=10


# begin: following section is used for docu generation - Monitoring metrics ids

# # # Setup for monitoring metrics # # #
Expand Down Expand Up @@ -276,6 +277,12 @@ realtime.status.report.interval.seconds=15
# 10 min = 600000
#chassis.master.node.awaittime=600000

# # # Load balancer timeouts # # #
# Timeout which is used by some load balancer implementations (like exclusive or unique) to poll next <endpoint,query> pair.
# For such balancers <endpoint,query> pair can be used only by single visrtual user at one time. As a result when number of virtual users
# is larger than number of pairs, virtual users will wait for free pair. No ;onger that this timeout
load.balancer.poll.timeout.in.millis=600000


# ****************************************************************** #
# Reporting #
Expand Down Expand Up @@ -489,4 +496,4 @@ chassis.default.invoker.connect.timeout.in.milliseconds=60000
#chassis.monitoring.mon_gc_major_unit.showSummary=true
#chassis.monitoring.mon_gc_major_time.showSummary=true

# end: following section is used for docu generation - How to enable summary calculation for monitoring metrics
# end: following section is used for docu generation - How to enable summary calculation for monitoring metrics
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ chassis.decision.maker.with.limits.decisionWhenNoMetricForLimit=OK
chassis.decision.maker.with.limits.decisionWhenNoBaselineForMetric=FATAL
chassis.decision.maker.with.limits.decisionWhenSeveralLimitsMatchSingleMetric=FATAL


### Section for properties which is going to be used during configuration construction
# Timeout which is used by some load balancer implementations
# (those which provide pairs in unique access and infinitly circly through provided set of pairs.
# As a result there are a contention on access to those pairs in case there are more consumers then pairs in the provided set)
# to poll next pair.
load.balancer.poll.timeout.in.millis=600000

# begin: following section is used for docu generation - Default setup for monitoring metrics

# monitoring metrics default setup
Expand Down
3 changes: 3 additions & 0 deletions chassis/configuration/configuration/master/master.conf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@
</bean>
</property>
<property name="limitSetConfig" ref="limitSetConfig"/>
<property name="configurationProperties">
<bean class="com.griddynamics.jagger.util.generators.ConfigurationProperties"/>
</property>
</bean>
</beans>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.griddynamics.jagger.engine.e1.process;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.griddynamics.jagger.coordinator.NodeContext;
import com.griddynamics.jagger.engine.e1.Provider;
import com.griddynamics.jagger.engine.e1.ProviderUtil;
Expand All @@ -19,6 +16,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
Expand All @@ -39,8 +40,9 @@ public abstract class AbstractWorkloadProcess implements WorkloadProcess {
protected final NodeContext context;
protected final TimeoutsConfiguration timeoutsConfiguration;

protected int samplesCountStartedFromTerminatedThreads = 0;
protected int samplesCountFinishedFromTerminatedThreads = 0;
protected volatile int samplesCountStartedFromTerminatedThreads = 0;
protected volatile int samplesCountFinishedFromTerminatedThreads = 0;
protected volatile long emptyTransactionsFromTerminatedThreads = 0;

private static final Logger log = LoggerFactory.getLogger(AbstractWorkloadProcess.class);

Expand Down Expand Up @@ -107,15 +109,18 @@ public void stop() {
public WorkloadStatus getStatus() {
int started = samplesCountStartedFromTerminatedThreads;
int finished = samplesCountFinishedFromTerminatedThreads;
long emptyTrn = emptyTransactionsFromTerminatedThreads;
int runningThreads = 0;

for (WorkloadService thread : threads) {
started += thread.getStartedSamples();
finished += thread.getFinishedSamples();
emptyTrn += thread.getEmptyTransactions();
if (thread.isRunning()) {
runningThreads ++;
}
}
return new WorkloadStatus(started, finished, runningThreads);
return new WorkloadStatus(started, finished, runningThreads, emptyTrn);
}

/**
Expand All @@ -126,7 +131,7 @@ public WorkloadStatus getStatus() {
protected void startNewThread(int delay) {

log.debug("Adding new workload thread");
Scenario<Object, Object, Object> scenario = command.getScenarioFactory().get(context);
Scenario<Object, Object, Object> scenario = command.getScenarioFactory().get(context, command.getKernelInfo());

List<InvocationListener<?, ?, ?>> listeners = Lists.newArrayList();
for (Provider<InvocationListener<Object, Object, Object>> listener : command.getListeners()){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,8 @@

package com.griddynamics.jagger.engine.e1.process;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.Uninterruptibles;
import static com.griddynamics.jagger.util.TimeUtils.sleepMillis;

import com.griddynamics.jagger.engine.e1.collector.Validator;
import com.griddynamics.jagger.engine.e1.collector.invocation.InvocationListener;
import com.griddynamics.jagger.engine.e1.scenario.Flushable;
Expand All @@ -38,6 +32,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.Uninterruptibles;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -46,10 +49,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import static com.griddynamics.jagger.util.TimeUtils.sleepMillis;

public abstract class AbstractWorkloadService extends AbstractExecutionThreadService implements WorkloadService {
private static final Logger log = LoggerFactory.getLogger(AbstractWorkloadService.class);

Expand All @@ -60,6 +62,7 @@ public abstract class AbstractWorkloadService extends AbstractExecutionThreadSer
private final AtomicInteger delay = new AtomicInteger(0);
private final AtomicInteger startedSamples = new AtomicInteger(0);
private final AtomicInteger finishedSamples = new AtomicInteger(0);
private final AtomicLong emptyTransactions = new AtomicLong(0);

public static WorkloadServiceBuilder builder(Scenario<Object, Object, Object> scenario) {
return new WorkloadServiceBuilder(scenario);
Expand All @@ -77,7 +80,10 @@ protected void run() throws Exception {
while (isRunning() && !terminationRequired()) {
log.debug("Scenario {} doTransaction called", scenario);
startedSamples.incrementAndGet();
scenario.doTransaction();
if (!scenario.doTransaction()) {
emptyTransactions.incrementAndGet();
log.debug("empty transaction occurred");
}
log.debug("Sleep between invocations for {}", delay);
sleepMillis(delay.get());
finishedSamples.incrementAndGet();
Expand All @@ -104,16 +110,22 @@ protected Executor executor() {
}

protected abstract boolean terminationRequired();


@Override
public Integer getStartedSamples() {
return startedSamples.get();
}


@Override
public Integer getFinishedSamples() {
return finishedSamples.get();
}


@Override
public long getEmptyTransactions() {
return emptyTransactions.get();
}

public void changeDelay(int delay) {
this.delay.set(delay);
}
Expand Down Expand Up @@ -254,6 +266,7 @@ private class InvokeOnDemandWorkloadService implements WorkloadService {
private final AtomicInteger delay = new AtomicInteger(0);
private final AtomicInteger startedSamples = new AtomicInteger(0);
private final AtomicInteger finishedSamples = new AtomicInteger(0);
private final AtomicLong emptyTransactions = new AtomicLong(0);

private final ReentrantLock lock = new ReentrantLock();

Expand Down Expand Up @@ -281,6 +294,7 @@ public ListenableFuture<State> start() {
if (delegate != null) {
startedSamples.addAndGet(delegate.getStartedSamples());
finishedSamples.addAndGet(delegate.getFinishedSamples());
emptyTransactions.addAndGet(delegate.getEmptyTransactions());
}

delegate = new PredefinedSamplesWorkloadService(flushables, 1) {
Expand Down Expand Up @@ -414,7 +428,17 @@ public Integer getFinishedSamples() {
lock.unlock();
}
}


@Override
public long getEmptyTransactions() {
lock.lock();
try {
return emptyTransactions.get() + delegate.getEmptyTransactions();
} finally {
lock.unlock();
}
}

@Override
public boolean isRunning() {
return delegate != null && delegate.isRunning();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.griddynamics.jagger.engine.e1.process;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Service;
import com.griddynamics.jagger.coordinator.NodeContext;
import com.griddynamics.jagger.engine.e1.scenario.WorkloadConfiguration;
import com.griddynamics.jagger.exception.TechnicalException;
Expand All @@ -11,6 +8,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Service;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -71,9 +72,10 @@ public void changeConfiguration(WorkloadConfiguration configuration) {
for (Iterator<WorkloadService> it = threads.iterator(); it.hasNext(); ){
WorkloadService workloadService = it.next();
if (workloadService.state().equals(Service.State.TERMINATED)) {
it.remove();
samplesCountStartedFromTerminatedThreads += workloadService.getStartedSamples();
samplesCountFinishedFromTerminatedThreads += workloadService.getFinishedSamples();
it.remove();
emptyTransactionsFromTerminatedThreads += workloadService.getEmptyTransactions();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.griddynamics.jagger.engine.e1.collector.invocation.InvocationListener;
import com.griddynamics.jagger.engine.e1.scenario.KernelSideObjectProvider;
import com.griddynamics.jagger.engine.e1.scenario.ScenarioCollector;
import com.griddynamics.jagger.invoker.KernelInfo;
import com.griddynamics.jagger.invoker.ScenarioFactory;

import java.util.List;
Expand All @@ -38,7 +39,8 @@ public class StartWorkloadProcess implements Command<String> {
private List<KernelSideObjectProvider<ScenarioCollector<Object, Object, Object>>> collectors;
private List<Provider<InvocationListener<Object, Object, Object>>> listeners;
private int poolSize;

private KernelInfo kernelInfo;

public static StartWorkloadProcess create(String sessionId, ScenarioContext scenarioContext, int poolSize) {
return new StartWorkloadProcess(sessionId, scenarioContext, poolSize);
}
Expand Down Expand Up @@ -106,23 +108,27 @@ public ScenarioContext getScenarioContext() {
return this.scenarioContext;
}

@Override
public String toString() {
return "StartWorkloadProcess{" +
"scenarioFactory=" + scenarioFactory +
", sessionId='" + sessionId + '\'' +
", scenarioContext=" + scenarioContext +
", collectors=" + collectors +
", listeners=" + listeners +
", poolSize=" + poolSize +
'}';
}


public int getPoolSize() {
return poolSize;
}

public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}

public void setKernelInfo(KernelInfo kernelInfo) {
this.kernelInfo = kernelInfo;
}

public KernelInfo getKernelInfo() {
return kernelInfo;
}

@Override
public String toString() {
return "StartWorkloadProcess{" + "scenarioFactory=" + scenarioFactory + ", sessionId='" + sessionId + '\''
+ ", scenarioContext=" + scenarioContext + ", validators=" + validators + ", collectors=" + collectors
+ ", listeners=" + listeners + ", poolSize=" + poolSize + ", kernelInfo=" + kernelInfo + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ public interface WorkloadService extends Service {

Integer getStartedSamples();
Integer getFinishedSamples();
long getEmptyTransactions();
void changeDelay(int delay);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ public class WorkloadStatus implements Serializable {
private final int startedSamples;
private final int finishedSamples;
private final int currentThreadNumber;
private final long emptyTransactions;

public WorkloadStatus(int startedSamples, int finishedSamples, int currentThreadNumber) {
public WorkloadStatus(int startedSamples, int finishedSamples, int currentThreadNumber, long emptyTransactions) {
this.startedSamples = startedSamples;
this.finishedSamples = finishedSamples;
this.currentThreadNumber = currentThreadNumber;
this.emptyTransactions = emptyTransactions;
}

public int getStartedSamples() {
Expand All @@ -46,13 +48,14 @@ public int getFinishedSamples() {
public int getCurrentThreadNumber() {
return currentThreadNumber;
}


public long getEmptyTransactions() {
return emptyTransactions;
}

@Override
public String toString() {
return "WorkloadStatus{" +
"startedSamples=" + startedSamples +
", finishedSamples=" + finishedSamples +
", currentThreadNumber=" + currentThreadNumber +
'}';
return "WorkloadStatus{" + "startedSamples=" + startedSamples + ", finishedSamples=" + finishedSamples
+ ", currentThreadNumber=" + currentThreadNumber + ", emptyTransactions=" + emptyTransactions + '}';
}
}
Loading

0 comments on commit 7007607

Please sign in to comment.