Skip to content

Commit

Permalink
Introduce HdrHistogram and response/service/wait separation to stress…
Browse files Browse the repository at this point in the history
… tool

Patch by Nitsan Wakart; reviewed by tjake for CASSANDRA-11853
  • Loading branch information
nitsanw authored and tjake committed May 27, 2016
1 parent d9b192e commit 89f275c
Show file tree
Hide file tree
Showing 28 changed files with 1,456 additions and 509 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -71,3 +71,4 @@ lib/jsr223/jython/*.jar
lib/jsr223/jython/cachedir
lib/jsr223/scala/*.jar

/.ant-targets-build.xml
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
3.8
* Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
* entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
* Support older ant versions (CASSANDRA-11807)
* Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
Expand Down
15 changes: 15 additions & 0 deletions NEWS.txt
Expand Up @@ -13,6 +13,21 @@ restore snapshots created with the previous major version using the
'sstableloader' tool. You can upgrade the file format of your snapshots
using the provided 'sstableupgrade' tool.


3.8
===

New features
------------
- A new option has been added to cassandra-stress "-rate fixed={number}/s"
that forces a scheduled rate of operations/sec over time. Using this, stress can
accurately account for coordinated ommission from the stress process.
- The cassandra-stress "-rate limit=" option has been renamed to "-rate throttle="
- hdr histograms have been added to stress runs, it's output can be saved to disk using:
"-log hdrfile=" option. This histogram includes response/service/wait times when used with the
fixed or throttle rate options. The histogram file can be plotted on
http://hdrhistogram.github.io/HdrHistogram/plotFiles.html

3.7
===

Expand Down
3 changes: 3 additions & 0 deletions NOTICE.txt
Expand Up @@ -83,3 +83,6 @@ BSD 3-clause
ASM
(http://asm.ow2.org/)
Copyright (c) 2000-2011 INRIA, France Telecom

HdrHistogram
http://hdrhistogram.org
1 change: 1 addition & 0 deletions build.xml
Expand Up @@ -350,6 +350,7 @@
<dependency groupId="net.jpountz.lz4" artifactId="lz4" version="1.3.0"/>
<dependency groupId="com.ning" artifactId="compress-lzf" version="0.8.4"/>
<dependency groupId="com.google.guava" artifactId="guava" version="18.0"/>
<dependency groupId="org.hdrhistogram" artifactId="HdrHistogram" version="2.1.9"/>
<dependency groupId="commons-cli" artifactId="commons-cli" version="1.1"/>
<dependency groupId="commons-codec" artifactId="commons-codec" version="1.2"/>
<dependency groupId="org.apache.commons" artifactId="commons-lang3" version="3.1"/>
Expand Down
Binary file added lib/HdrHistogram-2.1.9.jar
Binary file not shown.
936 changes: 936 additions & 0 deletions lib/licenses/hdrhistogram-2.1.9.txt

Large diffs are not rendered by default.

17 changes: 13 additions & 4 deletions tools/stress/src/org/apache/cassandra/stress/Operation.java
Expand Up @@ -33,7 +33,7 @@
public abstract class Operation
{
public final StressSettings settings;
public final Timer timer;
private final Timer timer;

public Operation(Timer timer, StressSettings settings)
{
Expand All @@ -48,7 +48,7 @@ public static interface RunOp
public int rowCount();
}

public abstract boolean ready(WorkManager permits, RateLimiter rateLimiter);
public abstract int ready(WorkManager permits);

public boolean isWrite()
{
Expand All @@ -72,7 +72,7 @@ public void run(JavaDriverClient client) throws IOException
throw new UnsupportedOperationException();
}

public void timeWithRetry(RunOp run) throws IOException
public final void timeWithRetry(RunOp run) throws IOException
{
timer.start();

Expand Down Expand Up @@ -108,7 +108,7 @@ public void timeWithRetry(RunOp run) throws IOException
exceptionMessage = getExceptionMessage(e);
}
}

timer.stop(run.partitionCount(), run.rowCount(), !success);

if (!success)
Expand Down Expand Up @@ -140,4 +140,13 @@ else if (settings.log.level.compareTo(SettingsLog.Level.MINIMAL) > 0)
System.err.println(message);
}

public void close()
{
timer.close();
}

public void intendedStartNs(long intendedTime)
{
timer.intendedTimeNs(intendedTime);
}
}
165 changes: 136 additions & 29 deletions tools/stress/src/org/apache/cassandra/stress/StressAction.java
Expand Up @@ -24,24 +24,25 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

import org.apache.cassandra.stress.operations.OpDistribution;
import org.apache.cassandra.stress.operations.OpDistributionFactory;
import org.apache.cassandra.stress.settings.ConnectionAPI;
import org.apache.cassandra.stress.settings.SettingsCommand;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.transport.SimpleClient;

import com.google.common.util.concurrent.Uninterruptibles;

public class StressAction implements Runnable
{

private final StressSettings settings;
private final PrintStream output;

public StressAction(StressSettings settings, PrintStream out)
{
this.settings = settings;
Expand All @@ -58,13 +59,14 @@ public void run()

if (!settings.command.noWarmup)
warmup(settings.command.getFactory(settings));

if (settings.command.truncate == SettingsCommand.TruncateWhen.ONCE)
settings.command.truncateTables(settings);

// TODO : move this to a new queue wrapper that gates progress based on a poisson (or configurable) distribution
RateLimiter rateLimiter = null;
if (settings.rate.opRateTargetPerSecond > 0)
rateLimiter = RateLimiter.create(settings.rate.opRateTargetPerSecond);
UniformRateLimiter rateLimiter = null;
if (settings.rate.opsPerSecond > 0)
rateLimiter = new UniformRateLimiter(settings.rate.opsPerSecond);

boolean success;
if (settings.rate.minThreads > 0)
Expand All @@ -85,6 +87,7 @@ public void run()
}

// type provided separately to support recursive call for mixed command with each command type it is performing
@SuppressWarnings("resource") // warmupOutput doesn't need closing
private void warmup(OpDistributionFactory operations)
{
PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } );
Expand Down Expand Up @@ -113,7 +116,7 @@ private void warmup(OpDistributionFactory operations)

// TODO : permit varying more than just thread count
// TODO : vary thread count based on percentage improvement of previous increment, not by fixed amounts
private boolean runMulti(boolean auto, RateLimiter rateLimiter)
private boolean runMulti(boolean auto, UniformRateLimiter rateLimiter)
{
if (settings.command.targetUncertainty >= 0)
output.println("WARNING: uncertainty mode (err<) results in uneven workload between thread runs, so should be used for high level analysis only");
Expand Down Expand Up @@ -166,7 +169,7 @@ private boolean runMulti(boolean auto, RateLimiter rateLimiter)
} while (!auto || (hasAverageImprovement(results, 3, 0) && hasAverageImprovement(results, 5, settings.command.targetUncertainty)));

// summarise all results
StressMetrics.summarise(runIds, results, output, settings.samples.historyCount);
StressMetrics.summarise(runIds, results, output);
return true;
}

Expand All @@ -191,7 +194,7 @@ private StressMetrics run(OpDistributionFactory operations,
int threadCount,
long opCount,
long duration,
RateLimiter rateLimiter,
UniformRateLimiter rateLimiter,
TimeUnit durationUnits,
PrintStream output,
boolean isWarmup)
Expand All @@ -210,20 +213,35 @@ private StressMetrics run(OpDistributionFactory operations,

final StressMetrics metrics = new StressMetrics(output, settings.log.intervalMillis, settings);

final CountDownLatch releaseConsumers = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(threadCount);
final CountDownLatch start = new CountDownLatch(threadCount);
final Consumer[] consumers = new Consumer[threadCount];
int sampleCount = settings.samples.liveCount / threadCount;
for (int i = 0; i < threadCount; i++)
{

consumers[i] = new Consumer(operations.get(metrics.getTiming(), sampleCount, isWarmup),
done, workManager, metrics, rateLimiter);
consumers[i] = new Consumer(operations.get(metrics.getTiming(), isWarmup),
done, start, releaseConsumers, workManager, metrics, rateLimiter);
}

// starting worker threadCount
for (int i = 0; i < threadCount; i++)
consumers[i].start();

// wait for the lot of them to get their pants on
try
{
start.await();
}
catch (InterruptedException e)
{
throw new RuntimeException("Unexpected interruption", e);
}
// start counting from NOW!
if(rateLimiter != null)
rateLimiter.start();
// release the hounds!!!
releaseConsumers.countDown();

metrics.start();

if (durationUnits != null)
Expand Down Expand Up @@ -264,40 +282,123 @@ else if (opCount <= 0)
return metrics;
}

private class Consumer extends Thread
/**
* Provides a 'next operation time' for rate limited operation streams. The rate limiter is thread safe and is to be
* shared by all consumer threads.
*/
private static class UniformRateLimiter
{
long start = Long.MIN_VALUE;
final long intervalNs;
final AtomicLong opIndex = new AtomicLong();

UniformRateLimiter(int opsPerSec)
{
intervalNs = 1000000000 / opsPerSec;
}

void start()
{
start = System.nanoTime();
}

/**
* @param partitionCount
* @return expect start time in ns for the operation
*/
long acquire(int partitionCount)
{
long currOpIndex = opIndex.getAndAdd(partitionCount);
return start + currOpIndex * intervalNs;
}
}

/**
* Provides a blocking stream of operations per consumer.
*/
private static class StreamOfOperations
{
private final OpDistribution operations;
private final UniformRateLimiter rateLimiter;
private final WorkManager workManager;

public StreamOfOperations(OpDistribution operations, UniformRateLimiter rateLimiter, WorkManager workManager)
{
this.operations = operations;
this.rateLimiter = rateLimiter;
this.workManager = workManager;
}

/**
* This method will block until the next operation becomes available.
*
* @return next operation or null if no more ops are coming
*/
Operation nextOp()
{
Operation op = operations.next();
final int partitionCount = op.ready(workManager);
if (partitionCount == 0)
return null;
if (rateLimiter != null)
{
long intendedTime = rateLimiter.acquire(partitionCount);
op.intendedStartNs(intendedTime);
long now;
while ((now = System.nanoTime()) < intendedTime)
{
LockSupport.parkNanos(intendedTime - now);
}
}
return op;
}

void close()
{
operations.closeTimers();
}

void abort()
{
workManager.stop();
}
}

private class Consumer extends Thread
{
private final StreamOfOperations opStream;
private final StressMetrics metrics;
private final RateLimiter rateLimiter;
private volatile boolean success = true;
private final WorkManager workManager;
private final CountDownLatch done;
private final CountDownLatch start;
private final CountDownLatch releaseConsumers;

public Consumer(OpDistribution operations,
CountDownLatch done,
CountDownLatch start,
CountDownLatch releaseConsumers,
WorkManager workManager,
StressMetrics metrics,
RateLimiter rateLimiter)
UniformRateLimiter rateLimiter)
{
this.done = done;
this.rateLimiter = rateLimiter;
this.workManager = workManager;
this.start = start;
this.releaseConsumers = releaseConsumers;
this.metrics = metrics;
this.operations = operations;
this.opStream = new StreamOfOperations(operations, rateLimiter, workManager);
}

public void run()
{
operations.initTimers();

try
{
SimpleClient sclient = null;
ThriftClient tclient = null;
JavaDriverClient jclient = null;

switch (settings.mode.api)

final ConnectionAPI clientType = settings.mode.api;
switch (clientType)
{
case JAVA_DRIVER_NATIVE:
jclient = settings.getJavaDriverClient();
Expand All @@ -313,15 +414,21 @@ public void run()
throw new IllegalStateException();
}

// synchronize the start of all the consumer threads
start.countDown();

releaseConsumers.await();

while (true)
{
Operation op = operations.next();
if (!op.ready(workManager, rateLimiter))
// Assumption: All ops are thread local, operations are never shared across threads.
Operation op = opStream.nextOp();
if (op == null)
break;

try
{
switch (settings.mode.api)
switch (clientType)
{
case JAVA_DRIVER_NATIVE:
op.run(jclient);
Expand All @@ -343,7 +450,7 @@ public void run()
e.printStackTrace(output);

success = false;
workManager.stop();
opStream.abort();
metrics.cancel();
return;
}
Expand All @@ -357,7 +464,7 @@ public void run()
finally
{
done.countDown();
operations.closeTimers();
opStream.close();
}
}
}
Expand Down

0 comments on commit 89f275c

Please sign in to comment.