Skip to content

Commit

Permalink
[REST] Add mechanic for viewing bulk error messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaiera committed Jul 11, 2016
1 parent c3fc0d2 commit b8f8631
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 26 deletions.
72 changes: 72 additions & 0 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/BulkResponse.java
@@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.hadoop.rest;

import java.util.BitSet;
import java.util.Collections;
import java.util.List;

/**
* Simple response object that tracks useful information about a bulk indexing response.
* This includes the determined response code, the number of docs indexed, the docs that
* did not make it, and a sample of error messages for the user to peruse.
*/
public class BulkResponse {

static BulkResponse ok(int totalWrites) {
return new BulkResponse(totalWrites);
}

private final int httpStatus;
private final int totalWrites;
private final BitSet leftovers;
private final List<String> errorExamples;

/**
* Creates a bulk response denoting that everything is OK
* @param totalWrites
*/
private BulkResponse(int totalWrites) {
this(HttpStatus.OK, totalWrites, new BitSet(), Collections.<String>emptyList());
}

public BulkResponse(int httpStatus, int totalWrites, BitSet leftovers, List<String> errorExamples) {
this.httpStatus = httpStatus;
this.totalWrites = totalWrites;
this.leftovers = leftovers;
this.errorExamples = errorExamples;
}

public int getHttpStatus() {
return httpStatus;
}

public int getTotalWrites() {
return totalWrites;
}

public BitSet getLeftovers() {
return leftovers;
}

public List<String> getErrorExamples() {
return errorExamples;
}
}
31 changes: 22 additions & 9 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -64,6 +63,8 @@

public class RestClient implements Closeable, StatsAware {

private final static int MAX_BULK_ERROR_MESSAGES = 5;

private NetworkClient network;
private final ObjectMapper mapper;
private final TimeValue scrollKeepAlive;
Expand Down Expand Up @@ -149,9 +150,9 @@ private <T> T parseContent(InputStream content, String string) {
return (T) (string != null ? map.get(string) : map);
}

public BitSet bulk(Resource resource, TrackingBytesArray data) {
public BulkResponse bulk(Resource resource, TrackingBytesArray data) {
Retry retry = retryPolicy.init();
int httpStatus = 0;
BulkResponse processedResponse;

boolean isRetry = false;

Expand All @@ -175,14 +176,14 @@ public BitSet bulk(Resource resource, TrackingBytesArray data) {

isRetry = true;

httpStatus = (retryFailedEntries(response, data) ? HttpStatus.SERVICE_UNAVAILABLE : HttpStatus.OK);
} while (data.length() > 0 && retry.retry(httpStatus));
processedResponse = processBulkResponse(response, data);
} while (data.length() > 0 && retry.retry(processedResponse.getHttpStatus()));

return data.leftoversPosition();
return processedResponse;
}

@SuppressWarnings("rawtypes")
boolean retryFailedEntries(Response response, TrackingBytesArray data) {
BulkResponse processBulkResponse(Response response, TrackingBytesArray data) {
InputStream content = response.body();
try {
ObjectReader r = JsonFactory.objectReader(mapper, Map.class);
Expand All @@ -192,12 +193,15 @@ boolean retryFailedEntries(Response response, TrackingBytesArray data) {
// recorded bytes are ack here
stats.bytesAccepted += data.length();
stats.docsAccepted += data.entries();
return false;
return BulkResponse.ok(data.entries());
}
} finally {
countStreamStats(content);
}

int docsSent = data.entries();
List<String> errorMessageSample = new ArrayList<String>(MAX_BULK_ERROR_MESSAGES);
int errorMessagesSoFar = 0;
int entryToDeletePosition = 0; // head of the list
for (Iterator<Map> iterator = r.readValues(parser); iterator.hasNext();) {
Map map = iterator.next();
Expand All @@ -208,6 +212,14 @@ boolean retryFailedEntries(Response response, TrackingBytesArray data) {
if (error != null && !error.isEmpty()) {
if ((status != null && HttpStatus.canRetry(status)) || error.contains("EsRejectedExecutionException")) {
entryToDeletePosition++;
if (errorMessagesSoFar < MAX_BULK_ERROR_MESSAGES) {
// We don't want to spam the log with the same error message 1000 times.
// Chances are that the error message is the same across all failed writes
// and if it is not, then there are probably only a few different errors which
// this should pick up.
errorMessageSample.add(error);
errorMessagesSoFar++;
}
}
else {
String message = (status != null ?
Expand All @@ -222,7 +234,8 @@ boolean retryFailedEntries(Response response, TrackingBytesArray data) {
}
}

return entryToDeletePosition > 0;
int httpStatusToReport = entryToDeletePosition > 0 ? HttpStatus.SERVICE_UNAVAILABLE : HttpStatus.OK;
return new BulkResponse(httpStatusToReport, docsSent, data.leftoversPosition(), errorMessageSample);
// catch IO/parsing exceptions
} catch (IOException ex) {
throw new EsHadoopParsingException(ex);
Expand Down
24 changes: 14 additions & 10 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java
Expand Up @@ -165,8 +165,7 @@ public void writeToIndex(Object object) {
/**
* Writes the objects to index.
*
* @param data as a byte array
* @param size the length to use from the given array
* @param ba The data as a bytes array
*/
public void writeProcessedToIndex(BytesArray ba) {
Assert.notNull(ba, "no data given");
Expand Down Expand Up @@ -211,8 +210,8 @@ private void doWriteToIndex(BytesRef payload) {
}
}

public BitSet tryFlush() {
BitSet bulkResult = EMPTY;
public BulkResponse tryFlush() {
BulkResponse bulkResult;

try {
// double check data - it might be a false flush (called on clean-up)
Expand All @@ -223,17 +222,16 @@ public BitSet tryFlush() {

bulkResult = client.bulk(resourceW, data);
executedBulkWrite = true;
} else {
bulkResult = BulkResponse.ok(0);
}
} catch (EsHadoopException ex) {
hadWriteErrors = true;
throw ex;
}

// discard the data buffer, only if it was properly sent/processed
//if (bulkResult.isEmpty()) {
// always discard data since there's no code path that uses the in flight data
discard();
//}

return bulkResult;
}
Expand All @@ -244,9 +242,15 @@ public void discard() {
}

public void flush() {
BitSet bulk = tryFlush();
if (!bulk.isEmpty()) {
throw new EsHadoopException(String.format("Could not write all entries [%s/%s] (maybe ES was overloaded?). Bailing out...", bulk.cardinality(), bulk.size()));
BulkResponse bulk = tryFlush();
if (!bulk.getLeftovers().isEmpty()) {
String header = String.format("Could not write all entries [%s/%s] (Maybe ES was overloaded?). Error sample (first [%s] error messages):\n", bulk.getLeftovers().cardinality(), bulk.getTotalWrites(), bulk.getErrorExamples().size());
StringBuilder message = new StringBuilder(header);
for (String errors : bulk.getErrorExamples()) {
message.append("\t").append(errors).append("\n");
}
message.append("Bailing out...");
throw new EsHadoopException(message.toString());
}
}

Expand Down
Expand Up @@ -87,7 +87,7 @@ public void testParseBulkErrorsInES2x() throws Exception {
assertEquals("{0, 1, 2, 3, 4}", inputData.leftoversPosition().toString());

Response response = new SimpleResponse(HttpStatus.OK, getClass().getResourceAsStream("bulk-retry-output-es2x.json"), "");
assertTrue(rc.retryFailedEntries(response, inputData));
assertTrue(rc.processBulkResponse(response, inputData).getHttpStatus() == HttpStatus.SERVICE_UNAVAILABLE);
assertEquals(3, inputData.entries());
assertEquals("{1, 3, 4}", inputData.leftoversPosition().toString());
String string = inputData.toString();
Expand All @@ -112,7 +112,7 @@ public void testParseBulkErrorsInES1x() throws Exception {
assertEquals("{0, 1, 2, 3, 4}", inputData.leftoversPosition().toString());

Response response = new SimpleResponse(HttpStatus.OK, getClass().getResourceAsStream("bulk-retry-output-es1x.json"), "");
assertTrue(rc.retryFailedEntries(response, inputData));
assertTrue(rc.processBulkResponse(response, inputData).getHttpStatus() == HttpStatus.SERVICE_UNAVAILABLE);
assertEquals(3, inputData.entries());
assertEquals("{1, 3, 4}", inputData.leftoversPosition().toString());
String string = inputData.toString();
Expand All @@ -137,7 +137,7 @@ public void testParseBulkErrorsInES10x() throws Exception {
assertEquals("{0, 1, 2, 3, 4}", inputData.leftoversPosition().toString());

Response response = new SimpleResponse(HttpStatus.OK, getClass().getResourceAsStream("bulk-retry-output-es10x.json"), "");
assertTrue(rc.retryFailedEntries(response, inputData));
assertTrue(rc.processBulkResponse(response, inputData).getHttpStatus() == HttpStatus.SERVICE_UNAVAILABLE);
assertEquals(3, inputData.entries());
assertEquals("{1, 3, 4}", inputData.leftoversPosition().toString());
String string = inputData.toString();
Expand All @@ -162,7 +162,7 @@ public void testParseBulkErrorsInES090x() throws Exception {
assertEquals("{0, 1, 2, 3, 4}", inputData.leftoversPosition().toString());

Response response = new SimpleResponse(HttpStatus.OK, getClass().getResourceAsStream("bulk-retry-output-es090x.json"), "");
assertTrue(rc.retryFailedEntries(response, inputData));
assertTrue(rc.processBulkResponse(response, inputData).getHttpStatus() == HttpStatus.SERVICE_UNAVAILABLE);
assertEquals(3, inputData.entries());
assertEquals("{1, 3, 4}", inputData.leftoversPosition().toString());
String string = inputData.toString();
Expand All @@ -187,7 +187,7 @@ public void testParseBulkErrorsInES5x() throws Exception {
assertEquals("{0, 1, 2, 3, 4}", inputData.leftoversPosition().toString());

Response response = new SimpleResponse(HttpStatus.OK, getClass().getResourceAsStream("bulk-retry-output-es5x.json"), "");
assertTrue(rc.retryFailedEntries(response, inputData));
assertTrue(rc.processBulkResponse(response, inputData).getHttpStatus() == HttpStatus.SERVICE_UNAVAILABLE);
assertEquals(3, inputData.entries());
assertEquals("{1, 3, 4}", inputData.leftoversPosition().toString());
String string = inputData.toString();
Expand Down
3 changes: 1 addition & 2 deletions storm/src/main/java/org/elasticsearch/storm/EsBolt.java
Expand Up @@ -144,8 +144,7 @@ private void flushWithAck() {
BitSet flush = null;

try {
flush = writer.repository.tryFlush();
writer.repository.discard();
flush = writer.repository.tryFlush().getLeftovers();
} catch (EsHadoopException ex) {
// fail all recorded tuples
for (Tuple input : inflightTuples) {
Expand Down

0 comments on commit b8f8631

Please sign in to comment.