Skip to content

Commit

Permalink
Eng 11138 topology aware system test 2 (#4071)
Browse files Browse the repository at this point in the history
ENG-11131 add --topologyaware option to enable topology aware feature in java client for voltkv, voter, voltkvqa and txnid2
  • Loading branch information
jwiebema1 authored and Phil Rosegay committed Nov 16, 2016
1 parent 2c6f5b8 commit f7b78d5
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 50 deletions.
37 changes: 24 additions & 13 deletions examples/voltkv/client/voltkv/AsyncBenchmark.java
Expand Up @@ -145,6 +145,9 @@ static class KVConfig extends CLIConfig {
@Option(desc = "Filename to write raw summary statistics to.")
String statsfile = "";

@Option(desc = "Enable topology awareness")
boolean topologyaware = false;

@Override
public void validate() {
if (duration <= 0) exitWithMessageAndUsage("duration must be > 0");
Expand Down Expand Up @@ -191,6 +194,10 @@ public AsyncBenchmark(KVConfig config) {
ClientConfig clientConfig = new ClientConfig("", "", new StatusListener());
clientConfig.setMaxTransactionsPerSecond(config.ratelimit);

if (config.topologyaware) {
clientConfig.setTopologyChangeAware(true);
}

client = ClientFactory.createClient(clientConfig);

periodicStatsContext = client.createStatsContext();
Expand Down Expand Up @@ -243,20 +250,24 @@ void connect(String servers) throws InterruptedException {
System.out.println("Connecting to VoltDB...");

String[] serverArray = servers.split(",");
final CountDownLatch connections = new CountDownLatch(serverArray.length);

// use a new thread to connect to each server
for (final String server : serverArray) {
new Thread(new Runnable() {
@Override
public void run() {
connectToOneServerWithRetry(server);
connections.countDown();
}
}).start();
if (config.topologyaware) {
connectToOneServerWithRetry(serverArray[0]);
} else {
final CountDownLatch connections = new CountDownLatch(serverArray.length);

// use a new thread to connect to each server
for (final String server : serverArray) {
new Thread(new Runnable() {
@Override
public void run() {
connectToOneServerWithRetry(server);
connections.countDown();
}
}).start();
}
// block until all have connected
connections.await();
}
// block until all have connected
connections.await();
}

/**
Expand Down
37 changes: 24 additions & 13 deletions examples/voter/client/voter/AsyncBenchmark.java
Expand Up @@ -137,6 +137,9 @@ static class VoterConfig extends CLIConfig {
@Option(desc = "Password for connection.")
String password = "";

@Option(desc = "Enable topology awareness")
boolean topologyaware = false;

@Override
public void validate() {
if (duration <= 0) exitWithMessageAndUsage("duration must be > 0");
Expand Down Expand Up @@ -174,6 +177,10 @@ public AsyncBenchmark(VoterConfig config) {
ClientConfig clientConfig = new ClientConfig(config.user, config.password, new StatusListener());
clientConfig.setMaxTransactionsPerSecond(config.ratelimit);

if (config.topologyaware) {
clientConfig.setTopologyChangeAware(true);
}

client = ClientFactory.createClient(clientConfig);

periodicStatsContext = client.createStatsContext();
Expand Down Expand Up @@ -225,20 +232,24 @@ void connect(String servers) throws InterruptedException {
System.out.println("Connecting to VoltDB...");

String[] serverArray = servers.split(",");
final CountDownLatch connections = new CountDownLatch(serverArray.length);

// use a new thread to connect to each server
for (final String server : serverArray) {
new Thread(new Runnable() {
@Override
public void run() {
connectToOneServerWithRetry(server);
connections.countDown();
}
}).start();
if (config.topologyaware) {
connectToOneServerWithRetry(serverArray[0]);
} else {
final CountDownLatch connections = new CountDownLatch(serverArray.length);

// use a new thread to connect to each server
for (final String server : serverArray) {
new Thread(new Runnable() {
@Override
public void run() {
connectToOneServerWithRetry(server);
connections.countDown();
}
}).start();
}
// block until all have connected
connections.await();
}
// block until all have connected
connections.await();
}

/**
Expand Down
36 changes: 23 additions & 13 deletions tests/test_apps/txnid-selfcheck2/src/txnIdSelfCheck/Benchmark.java
Expand Up @@ -176,6 +176,9 @@ private static class Config extends CLIConfig {
String disabledthreads = "none";
ArrayList<String> disabledThreads = null;

@Option(desc = "Enable topology awareness")
boolean topologyaware = false;

@Override
public void validate() {
if (duration <= 0) exitWithMessageAndUsage("duration must be > 0");
Expand Down Expand Up @@ -351,6 +354,9 @@ public void run() {

StatusListener statusListener = new StatusListener();
ClientConfig clientConfig = new ClientConfig("", "", statusListener);
if (config.topologyaware) {
clientConfig.setTopologyChangeAware(true);
}
client = ClientFactory.createClient(clientConfig);
}

Expand Down Expand Up @@ -387,20 +393,24 @@ private void connectToOneServerWithRetry(String server) {
private void connect() throws InterruptedException {
log.info("Connecting to VoltDB...");

final CountDownLatch connections = new CountDownLatch(1);

// use a new thread to connect to each server
for (final String server : config.parsedServers) {
new Thread(new Runnable() {
@Override
public void run() {
connectToOneServerWithRetry(server);
connections.countDown();
}
}).start();
if (config.topologyaware) {
connectToOneServerWithRetry(config.parsedServers[0]);
} else {
final CountDownLatch connections = new CountDownLatch(1);

// use a new thread to connect to each server
for (final String server : config.parsedServers) {
new Thread(new Runnable() {
@Override
public void run() {
connectToOneServerWithRetry(server);
connections.countDown();
}
}).start();
}
// block until at least one connection is established
connections.await();
}
// block until at least one connection is established
connections.await();
}

/**
Expand Down
33 changes: 22 additions & 11 deletions tests/test_apps/voltkvqa/src/voltkvqa/AsyncBenchmark.java
Expand Up @@ -199,6 +199,9 @@ static class KVConfig extends CLIConfig {
@Option(desc = "Filename to write raw summary statistics to.")
String statsfile = "";

@Option(desc = "Enable topology awareness")
boolean topologyaware = false;

@Override
public void validate() {
if (duration <= 0) exitWithMessageAndUsage("duration must be > 0");
Expand Down Expand Up @@ -308,6 +311,9 @@ public AsyncBenchmark(KVConfig config) {
ClientConfig clientConfig = new ClientConfig("", "", new StatusListener());
clientConfig.setReconnectOnConnectionLoss(config.recover);

if (config.topologyaware) {
clientConfig.setTopologyChangeAware(true);
}
if (config.autotune) {
clientConfig.enableAutoTune();
clientConfig.setAutoTuneTargetInternalLatency(config.latencytarget);
Expand Down Expand Up @@ -409,24 +415,29 @@ else if((e.getMessage().contains("Client instance is shutdown"))) {
void connect(String servers, final String info) throws InterruptedException {
String[] serverArray = servers.split(",");
String msg;
final CountDownLatch connections = new CountDownLatch(1);
if(debug) {
msg = "\n=========\n" + info + "\nIn connect Server counts: " + serverArray.length +
", Server Names: " + servers;
prt(msg);
}

// use a new thread to connect to each server
for (final String server : serverArray) {
new Thread(new Runnable() {
@Override
public void run() {
connectToOneServerWithRetry(server, info);
connections.countDown();
}
}).start();
if (config.topologyaware) {
//client.createConnection(serverArray[0]);
connectToOneServerWithRetry(serverArray[0], info);
} else {
final CountDownLatch connections = new CountDownLatch(1);
// use a new thread to connect to each server
for (final String server : serverArray) {
new Thread(new Runnable() {
@Override
public void run() {
connectToOneServerWithRetry(server, info);
connections.countDown();
}
}).start();
}
connections.await();
}
connections.await();
}

/**
Expand Down

0 comments on commit f7b78d5

Please sign in to comment.