-
Notifications
You must be signed in to change notification settings - Fork 465
/
AsyncBenchmark.java
472 lines (409 loc) · 19 KB
/
AsyncBenchmark.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
/* This file is part of VoltDB.
* Copyright (C) 2008-2016 VoltDB Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to
* the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
/*
* This samples uses the native asynchronous request processing protocol
* to post requests to the VoltDB server, thus leveraging to the maximum
* VoltDB's ability to run requests in parallel on multiple database
* partitions, and multiple servers.
*
* While asynchronous processing is (marginally) more convoluted to work
* with and not adapted to all workloads, it is the preferred interaction
* model to VoltDB as it allows a single client with a small amount of
* threads to flood VoltDB with requests, guaranteeing blazing throughput
* performance.
*
* Note that this benchmark focuses on throughput performance and
* not low latency performance. This benchmark will likely 'firehose'
* the database cluster (if the cluster is too slow or has too few CPUs)
* and as a result, queue a significant amount of requests on the server
* to maximize throughput measurement. To test VoltDB latency, run the
* SyncBenchmark client, also found in the voter sample directory.
*/
package voter;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.voltdb.CLIConfig;
import org.voltdb.VoltTable;
import org.voltdb.client.Client;
import org.voltdb.client.ClientConfig;
import org.voltdb.client.ClientFactory;
import org.voltdb.client.ClientResponse;
import org.voltdb.client.ClientStats;
import org.voltdb.client.ClientStatsContext;
import org.voltdb.client.ClientStatusListenerExt;
import org.voltdb.client.NullCallback;
import org.voltdb.client.ProcedureCallback;
public class AsyncBenchmark {
// Initialize some common constants and variables
static final String CONTESTANT_NAMES_CSV =
"Edwina Burnam,Tabatha Gehling,Kelly Clauss,Jessie Alloway," +
"Alana Bregman,Jessie Eichman,Allie Rogalski,Nita Coster," +
"Kurt Walser,Ericka Dieter,Loraine NygrenTania Mattioli";
// handy, rather than typing this out several times
static final String HORIZONTAL_RULE =
"----------" + "----------" + "----------" + "----------" +
"----------" + "----------" + "----------" + "----------" + "\n";
// potential return codes (synced with Vote procedure)
static final long VOTE_SUCCESSFUL = 0;
static final long ERR_INVALID_CONTESTANT = 1;
static final long ERR_VOTER_OVER_VOTE_LIMIT = 2;
// validated command line configuration
final VoterConfig config;
// Reference to the database connection we will use
final Client client;
// Phone number generator
PhoneCallGenerator switchboard;
// Timer for periodic stats printing
Timer timer;
// Benchmark start time
long benchmarkStartTS;
// Statistics manager objects from the client
final ClientStatsContext periodicStatsContext;
final ClientStatsContext fullStatsContext;
// voter benchmark state
AtomicLong totalVotes = new AtomicLong(0);
AtomicLong acceptedVotes = new AtomicLong(0);
AtomicLong badContestantVotes = new AtomicLong(0);
AtomicLong badVoteCountVotes = new AtomicLong(0);
AtomicLong failedVotes = new AtomicLong(0);
/**
* Uses included {@link CLIConfig} class to
* declaratively state command line options with defaults
* and validation.
*/
static class VoterConfig extends CLIConfig {
@Option(desc = "Interval for performance feedback, in seconds.")
long displayinterval = 5;
@Option(desc = "Benchmark duration, in seconds.")
int duration = 20;
@Option(desc = "Warmup duration in seconds.")
int warmup = 2;
@Option(desc = "Comma separated list of the form server[:port] to connect to.")
String servers = "localhost";
@Option(desc = "Number of contestants in the voting contest (from 1 to 10).")
int contestants = 6;
@Option(desc = "Maximum number of votes cast per voter.")
int maxvotes = 2;
@Option(desc = "Maximum TPS rate for benchmark.")
int ratelimit = Integer.MAX_VALUE;
@Option(desc = "Report latency for async benchmark run.")
boolean latencyreport = false;
@Option(desc = "Filename to write raw summary statistics to.")
String statsfile = "";
@Option(desc = "User name for connection.")
String user = "";
@Option(desc = "Password for connection.")
String password = "";
@Option(desc = "Enable topology awareness")
boolean topologyaware = false;
@Override
public void validate() {
if (duration <= 0) exitWithMessageAndUsage("duration must be > 0");
if (warmup < 0) exitWithMessageAndUsage("warmup must be >= 0");
if (displayinterval <= 0) exitWithMessageAndUsage("displayinterval must be > 0");
if (contestants <= 0) exitWithMessageAndUsage("contestants must be > 0");
if (maxvotes <= 0) exitWithMessageAndUsage("maxvotes must be > 0");
if (ratelimit <= 0) exitWithMessageAndUsage("ratelimit must be > 0");
}
}
/**
* Provides a callback to be notified on node failure.
* This example only logs the event.
*/
class StatusListener extends ClientStatusListenerExt {
@Override
public void connectionLost(String hostname, int port, int connectionsLeft, DisconnectCause cause) {
// if the benchmark is still active
if ((System.currentTimeMillis() - benchmarkStartTS) < (config.duration * 1000)) {
System.err.printf("Connection to %s:%d was lost.\n", hostname, port);
}
}
}
/**
* Constructor for benchmark instance.
* Configures VoltDB client and prints configuration.
*
* @param config Parsed & validated CLI options.
*/
public AsyncBenchmark(VoterConfig config) {
this.config = config;
ClientConfig clientConfig = new ClientConfig(config.user, config.password, new StatusListener());
clientConfig.setMaxTransactionsPerSecond(config.ratelimit);
if (config.topologyaware) {
clientConfig.setTopologyChangeAware(true);
}
client = ClientFactory.createClient(clientConfig);
periodicStatsContext = client.createStatsContext();
fullStatsContext = client.createStatsContext();
switchboard = new PhoneCallGenerator(config.contestants);
System.out.print(HORIZONTAL_RULE);
System.out.println(" Command Line Configuration");
System.out.println(HORIZONTAL_RULE);
System.out.println(config.getConfigDumpString());
if(config.latencyreport) {
System.out.println("NOTICE: Option latencyreport is ON for async run, please set a reasonable ratelimit.\n");
}
}
/**
* Connect to a single server with retry. Limited exponential backoff.
* No timeout. This will run until the process is killed if it's not
* able to connect.
*
* @param server hostname:port or just hostname (hostname can be ip).
*/
void connectToOneServerWithRetry(String server) {
int sleep = 1000;
while (true) {
try {
client.createConnection(server);
break;
}
catch (Exception e) {
System.err.printf("Connection failed - retrying in %d second(s).\n", sleep / 1000);
try { Thread.sleep(sleep); } catch (Exception interruted) {}
if (sleep < 8000) sleep += sleep;
}
}
System.out.printf("Connected to VoltDB node at: %s.\n", server);
}
/**
* Connect to a set of servers in parallel. Each will retry until
* connection. This call will block until all have connected.
*
* @param servers A comma separated list of servers using the hostname:port
* syntax (where :port is optional).
* @throws InterruptedException if anything bad happens with the threads.
*/
void connect(String servers) throws InterruptedException {
System.out.println("Connecting to VoltDB...");
String[] serverArray = servers.split(",");
if (config.topologyaware) {
connectToOneServerWithRetry(serverArray[0]);
} else {
final CountDownLatch connections = new CountDownLatch(serverArray.length);
// use a new thread to connect to each server
for (final String server : serverArray) {
new Thread(new Runnable() {
@Override
public void run() {
connectToOneServerWithRetry(server);
connections.countDown();
}
}).start();
}
// block until all have connected
connections.await();
}
}
/**
* Create a Timer task to display performance data on the Vote procedure
* It calls printStatistics() every displayInterval seconds
*/
public void schedulePeriodicStats() {
timer = new Timer();
TimerTask statsPrinting = new TimerTask() {
@Override
public void run() { printStatistics(); }
};
timer.scheduleAtFixedRate(statsPrinting,
config.displayinterval * 1000,
config.displayinterval * 1000);
}
/**
* Prints a one line update on performance that can be printed
* periodically during a benchmark.
*/
public synchronized void printStatistics() {
ClientStats stats = periodicStatsContext.fetchAndResetBaseline().getStats();
long time = Math.round((stats.getEndTimestamp() - benchmarkStartTS) / 1000.0);
System.out.printf("%02d:%02d:%02d ", time / 3600, (time / 60) % 60, time % 60);
System.out.printf("Throughput %d/s, ", stats.getTxnThroughput());
System.out.printf("Aborts/Failures %d/%d",
stats.getInvocationAborts(), stats.getInvocationErrors());
if(this.config.latencyreport) {
System.out.printf(", Avg/95%% Latency %.2f/%.2fms", stats.getAverageLatency(),
stats.kPercentileLatencyAsDouble(0.95));
}
System.out.printf("\n");
}
/**
* Prints the results of the voting simulation and statistics
* about performance.
*
* @throws Exception if anything unexpected happens.
*/
public synchronized void printResults() throws Exception {
ClientStats stats = fullStatsContext.fetch().getStats();
// 1. Voting Board statistics, Voting results and performance statistics
String display = "\n" +
HORIZONTAL_RULE +
" Voting Results\n" +
HORIZONTAL_RULE +
"\nA total of %,9d votes were received during the benchmark...\n" +
" - %,9d Accepted\n" +
" - %,9d Rejected (Invalid Contestant)\n" +
" - %,9d Rejected (Maximum Vote Count Reached)\n" +
" - %,9d Failed (Transaction Error)\n\n";
System.out.printf(display, totalVotes.get(),
acceptedVotes.get(), badContestantVotes.get(),
badVoteCountVotes.get(), failedVotes.get());
// 2. Voting results
VoltTable result = client.callProcedure("Results").getResults()[0];
System.out.println("Contestant Name\t\tVotes Received");
while(result.advanceRow()) {
System.out.printf("%s\t\t%,14d\n", result.getString(0), result.getLong(2));
}
System.out.printf("\nThe Winner is: %s\n\n", result.fetchRow(0).getString(0));
// 3. Performance statistics
System.out.print(HORIZONTAL_RULE);
System.out.println(" Client Workload Statistics");
System.out.println(HORIZONTAL_RULE);
System.out.printf("Average throughput: %,9d txns/sec\n", stats.getTxnThroughput());
if(this.config.latencyreport) {
System.out.printf("Average latency: %,9.2f ms\n", stats.getAverageLatency());
System.out.printf("10th percentile latency: %,9.2f ms\n", stats.kPercentileLatencyAsDouble(.1));
System.out.printf("25th percentile latency: %,9.2f ms\n", stats.kPercentileLatencyAsDouble(.25));
System.out.printf("50th percentile latency: %,9.2f ms\n", stats.kPercentileLatencyAsDouble(.5));
System.out.printf("75th percentile latency: %,9.2f ms\n", stats.kPercentileLatencyAsDouble(.75));
System.out.printf("90th percentile latency: %,9.2f ms\n", stats.kPercentileLatencyAsDouble(.9));
System.out.printf("95th percentile latency: %,9.2f ms\n", stats.kPercentileLatencyAsDouble(.95));
System.out.printf("99th percentile latency: %,9.2f ms\n", stats.kPercentileLatencyAsDouble(.99));
System.out.printf("99.5th percentile latency: %,9.2f ms\n", stats.kPercentileLatencyAsDouble(.995));
System.out.printf("99.9th percentile latency: %,9.2f ms\n", stats.kPercentileLatencyAsDouble(.999));
System.out.print("\n" + HORIZONTAL_RULE);
System.out.println(" System Server Statistics");
System.out.println(HORIZONTAL_RULE);
System.out.printf("Reported Internal Avg Latency: %,9.2f ms\n", stats.getAverageInternalLatency());
System.out.print("\n" + HORIZONTAL_RULE);
System.out.println(" Latency Histogram");
System.out.println(HORIZONTAL_RULE);
System.out.println(stats.latencyHistoReport());
}
// 4. Write stats to file if requested
client.writeSummaryCSV(stats, config.statsfile);
}
/**
* Callback to handle the response to a stored procedure call.
* Tracks response types.
*
*/
class VoterCallback implements ProcedureCallback {
@Override
public void clientCallback(ClientResponse response) throws Exception {
totalVotes.incrementAndGet();
if (response.getStatus() == ClientResponse.SUCCESS) {
long resultCode = response.getResults()[0].asScalarLong();
if (resultCode == ERR_INVALID_CONTESTANT) {
badContestantVotes.incrementAndGet();
}
else if (resultCode == ERR_VOTER_OVER_VOTE_LIMIT) {
badVoteCountVotes.incrementAndGet();
}
else {
assert(resultCode == VOTE_SUCCESSFUL);
acceptedVotes.incrementAndGet();
}
}
else {
failedVotes.incrementAndGet();
}
}
}
/**
* Core benchmark code.
* Connect. Initialize. Run the loop. Cleanup. Print Results.
*
* @throws Exception if anything unexpected happens.
*/
public void runBenchmark() throws Exception {
System.out.print(HORIZONTAL_RULE);
System.out.println(" Setup & Initialization");
System.out.println(HORIZONTAL_RULE);
// connect to one or more servers, loop until success
connect(config.servers);
// initialize using synchronous call
System.out.println("\nPopulating Static Tables\n");
client.callProcedure("Initialize", config.contestants, CONTESTANT_NAMES_CSV);
System.out.print(HORIZONTAL_RULE);
System.out.println(" Starting Benchmark");
System.out.println(HORIZONTAL_RULE);
// Run the benchmark loop for the requested warmup time
// The throughput may be throttled depending on client configuration
System.out.println("Warming up...");
final long warmupEndTime = System.currentTimeMillis() + (1000l * config.warmup);
while (warmupEndTime > System.currentTimeMillis()) {
// Get the next phone call
PhoneCallGenerator.PhoneCall call = switchboard.receive();
// asynchronously call the "Vote" procedure
client.callProcedure(new NullCallback(),
"Vote",
call.phoneNumber,
call.contestantNumber,
config.maxvotes);
}
// reset the stats after warmup
fullStatsContext.fetchAndResetBaseline();
periodicStatsContext.fetchAndResetBaseline();
// print periodic statistics to the console
benchmarkStartTS = System.currentTimeMillis();
schedulePeriodicStats();
// Run the benchmark loop for the requested duration
// The throughput may be throttled depending on client configuration
System.out.println("\nRunning benchmark...");
final long benchmarkEndTime = System.currentTimeMillis() + (1000l * config.duration);
while (benchmarkEndTime > System.currentTimeMillis()) {
// Get the next phone call
PhoneCallGenerator.PhoneCall call = switchboard.receive();
// asynchronously call the "Vote" procedure
client.callProcedure(new VoterCallback(),
"Vote",
call.phoneNumber,
call.contestantNumber,
config.maxvotes);
}
// cancel periodic stats printing
timer.cancel();
// block until all outstanding txns return
client.drain();
// print the summary results
printResults();
// close down the client connections
client.close();
}
/**
* Main routine creates a benchmark instance and kicks off the run method.
*
* @param args Command line arguments.
* @throws Exception if anything goes wrong.
* @see {@link VoterConfig}
*/
public static void main(String[] args) throws Exception {
// create a configuration from the arguments
VoterConfig config = new VoterConfig();
config.parse(AsyncBenchmark.class.getName(), args);
AsyncBenchmark benchmark = new AsyncBenchmark(config);
benchmark.runBenchmark();
}
}