Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
anthonygauthier committed Jan 17, 2018
2 parents 84337ff + 659cff6 commit 7350c34
Showing 1 changed file with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.jmeter.visualizers.backend.AbstractBackendListenerClient;
import org.apache.jmeter.visualizers.backend.BackendListenerContext;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -51,7 +51,7 @@ public class ElasticsearchBackend extends AbstractBackendListenerClient {
private static final long DEFAULT_TIMEOUT_MS = 200L;
private static final Logger logger = LoggerFactory.getLogger(ElasticsearchBackend.class);

private Client client;
private PreBuiltTransportClient client;
private String index;
private int buildNumber;
private int bulkSize;
Expand Down Expand Up @@ -84,7 +84,8 @@ public void setupTest(BackendListenerContext context) throws Exception {
Settings settings = Settings.builder().put("cluster.name", context.getParameter(ES_CLUSTER)).build();
String host = context.getParameter(ES_HOST);
int port = Integer.parseInt(context.getParameter(ES_PORT));
this.client = new PreBuiltTransportClient(settings).addTransportAddress(
this.client = new PreBuiltTransportClient(settings);
this.client.addTransportAddress(
new InetSocketTransportAddress(InetAddress.getByName(host), port));
this.bulkRequest = this.client.prepareBulk();
super.setupTest(context);
Expand All @@ -101,7 +102,16 @@ public void handleSampleResults(List<SampleResult> results, BackendListenerConte

if(this.bulkRequest.numberOfActions() >= this.bulkSize) {
try {
this.bulkRequest.get(TimeValue.timeValueMillis(timeoutMs));
BulkResponse bulkResponse = this.bulkRequest.get(TimeValue.timeValueMillis(timeoutMs));
if (bulkResponse.hasFailures()) {
if(logger.isErrorEnabled()) {
logger.error("Failed to write a result on {}: {}",
index, bulkResponse.buildFailureMessage());
}
} else {
logger.debug("Wrote {} results in {}.",
index);
}
} catch (Exception e) {
logger.error("Error sending data to ES, data will be lost", e);
} finally {
Expand Down

0 comments on commit 7350c34

Please sign in to comment.