From b8f8631fa7dfb400017e3a62b40eeef94e5cab91 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Wed, 6 Jul 2016 11:24:36 -0400 Subject: [PATCH] [REST] Add mechanic for viewing bulk error messages. fixes #720 --- .../hadoop/rest/BulkResponse.java | 72 +++++++++++++++++++ .../elasticsearch/hadoop/rest/RestClient.java | 31 +++++--- .../hadoop/rest/RestRepository.java | 24 ++++--- .../hadoop/rest/ParseBulkErrorsTest.java | 10 +-- .../java/org/elasticsearch/storm/EsBolt.java | 3 +- 5 files changed, 114 insertions(+), 26 deletions(-) create mode 100644 mr/src/main/java/org/elasticsearch/hadoop/rest/BulkResponse.java diff --git a/mr/src/main/java/org/elasticsearch/hadoop/rest/BulkResponse.java b/mr/src/main/java/org/elasticsearch/hadoop/rest/BulkResponse.java new file mode 100644 index 00000000000000..83c278a3a28884 --- /dev/null +++ b/mr/src/main/java/org/elasticsearch/hadoop/rest/BulkResponse.java @@ -0,0 +1,72 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.hadoop.rest; + +import java.util.BitSet; +import java.util.Collections; +import java.util.List; + +/** + * Simple response object that tracks useful information about a bulk indexing response. + * This includes the determined response code, the number of docs indexed, the docs that + * did not make it, and a sample of error messages for the user to peruse. + */ +public class BulkResponse { + + static BulkResponse ok(int totalWrites) { + return new BulkResponse(totalWrites); + } + + private final int httpStatus; + private final int totalWrites; + private final BitSet leftovers; + private final List errorExamples; + + /** + * Creates a bulk response denoting that everything is OK + * @param totalWrites + */ + private BulkResponse(int totalWrites) { + this(HttpStatus.OK, totalWrites, new BitSet(), Collections.emptyList()); + } + + public BulkResponse(int httpStatus, int totalWrites, BitSet leftovers, List errorExamples) { + this.httpStatus = httpStatus; + this.totalWrites = totalWrites; + this.leftovers = leftovers; + this.errorExamples = errorExamples; + } + + public int getHttpStatus() { + return httpStatus; + } + + public int getTotalWrites() { + return totalWrites; + } + + public BitSet getLeftovers() { + return leftovers; + } + + public List getErrorExamples() { + return errorExamples; + } +} diff --git a/mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java b/mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java index d7911d178ce759..1abcfc19dd4f48 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -64,6 +63,8 @@ public class RestClient implements Closeable, StatsAware { + private final static int MAX_BULK_ERROR_MESSAGES = 5; + private NetworkClient network; private final ObjectMapper mapper; private final TimeValue scrollKeepAlive; @@ -149,9 +150,9 @@ private T parseContent(InputStream content, String string) { return (T) (string != null ? map.get(string) : map); } - public BitSet bulk(Resource resource, TrackingBytesArray data) { + public BulkResponse bulk(Resource resource, TrackingBytesArray data) { Retry retry = retryPolicy.init(); - int httpStatus = 0; + BulkResponse processedResponse; boolean isRetry = false; @@ -175,14 +176,14 @@ public BitSet bulk(Resource resource, TrackingBytesArray data) { isRetry = true; - httpStatus = (retryFailedEntries(response, data) ? HttpStatus.SERVICE_UNAVAILABLE : HttpStatus.OK); - } while (data.length() > 0 && retry.retry(httpStatus)); + processedResponse = processBulkResponse(response, data); + } while (data.length() > 0 && retry.retry(processedResponse.getHttpStatus())); - return data.leftoversPosition(); + return processedResponse; } @SuppressWarnings("rawtypes") - boolean retryFailedEntries(Response response, TrackingBytesArray data) { + BulkResponse processBulkResponse(Response response, TrackingBytesArray data) { InputStream content = response.body(); try { ObjectReader r = JsonFactory.objectReader(mapper, Map.class); @@ -192,12 +193,15 @@ boolean retryFailedEntries(Response response, TrackingBytesArray data) { // recorded bytes are ack here stats.bytesAccepted += data.length(); stats.docsAccepted += data.entries(); - return false; + return BulkResponse.ok(data.entries()); } } finally { countStreamStats(content); } + int docsSent = data.entries(); + List errorMessageSample = new ArrayList(MAX_BULK_ERROR_MESSAGES); + int errorMessagesSoFar = 0; int entryToDeletePosition = 0; // head of the list for (Iterator iterator = r.readValues(parser); iterator.hasNext();) { Map map = iterator.next(); @@ -208,6 +212,14 @@ boolean retryFailedEntries(Response response, TrackingBytesArray data) { if (error != null && !error.isEmpty()) { if ((status != null && HttpStatus.canRetry(status)) || error.contains("EsRejectedExecutionException")) { entryToDeletePosition++; + if (errorMessagesSoFar < MAX_BULK_ERROR_MESSAGES) { + // We don't want to spam the log with the same error message 1000 times. + // Chances are that the error message is the same across all failed writes + // and if it is not, then there are probably only a few different errors which + // this should pick up. + errorMessageSample.add(error); + errorMessagesSoFar++; + } } else { String message = (status != null ? @@ -222,7 +234,8 @@ boolean retryFailedEntries(Response response, TrackingBytesArray data) { } } - return entryToDeletePosition > 0; + int httpStatusToReport = entryToDeletePosition > 0 ? HttpStatus.SERVICE_UNAVAILABLE : HttpStatus.OK; + return new BulkResponse(httpStatusToReport, docsSent, data.leftoversPosition(), errorMessageSample); // catch IO/parsing exceptions } catch (IOException ex) { throw new EsHadoopParsingException(ex); diff --git a/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java b/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java index 52b3e1ecc9baa9..c48613c8796111 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java @@ -165,8 +165,7 @@ public void writeToIndex(Object object) { /** * Writes the objects to index. * - * @param data as a byte array - * @param size the length to use from the given array + * @param ba The data as a bytes array */ public void writeProcessedToIndex(BytesArray ba) { Assert.notNull(ba, "no data given"); @@ -211,8 +210,8 @@ private void doWriteToIndex(BytesRef payload) { } } - public BitSet tryFlush() { - BitSet bulkResult = EMPTY; + public BulkResponse tryFlush() { + BulkResponse bulkResult; try { // double check data - it might be a false flush (called on clean-up) @@ -223,17 +222,16 @@ public BitSet tryFlush() { bulkResult = client.bulk(resourceW, data); executedBulkWrite = true; + } else { + bulkResult = BulkResponse.ok(0); } } catch (EsHadoopException ex) { hadWriteErrors = true; throw ex; } - // discard the data buffer, only if it was properly sent/processed - //if (bulkResult.isEmpty()) { // always discard data since there's no code path that uses the in flight data discard(); - //} return bulkResult; } @@ -244,9 +242,15 @@ public void discard() { } public void flush() { - BitSet bulk = tryFlush(); - if (!bulk.isEmpty()) { - throw new EsHadoopException(String.format("Could not write all entries [%s/%s] (maybe ES was overloaded?). Bailing out...", bulk.cardinality(), bulk.size())); + BulkResponse bulk = tryFlush(); + if (!bulk.getLeftovers().isEmpty()) { + String header = String.format("Could not write all entries [%s/%s] (Maybe ES was overloaded?). Error sample (first [%s] error messages):\n", bulk.getLeftovers().cardinality(), bulk.getTotalWrites(), bulk.getErrorExamples().size()); + StringBuilder message = new StringBuilder(header); + for (String errors : bulk.getErrorExamples()) { + message.append("\t").append(errors).append("\n"); + } + message.append("Bailing out..."); + throw new EsHadoopException(message.toString()); } } diff --git a/mr/src/test/java/org/elasticsearch/hadoop/rest/ParseBulkErrorsTest.java b/mr/src/test/java/org/elasticsearch/hadoop/rest/ParseBulkErrorsTest.java index c6bbca8bd9dd96..95cd491ed156d4 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/rest/ParseBulkErrorsTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/rest/ParseBulkErrorsTest.java @@ -87,7 +87,7 @@ public void testParseBulkErrorsInES2x() throws Exception { assertEquals("{0, 1, 2, 3, 4}", inputData.leftoversPosition().toString()); Response response = new SimpleResponse(HttpStatus.OK, getClass().getResourceAsStream("bulk-retry-output-es2x.json"), ""); - assertTrue(rc.retryFailedEntries(response, inputData)); + assertTrue(rc.processBulkResponse(response, inputData).getHttpStatus() == HttpStatus.SERVICE_UNAVAILABLE); assertEquals(3, inputData.entries()); assertEquals("{1, 3, 4}", inputData.leftoversPosition().toString()); String string = inputData.toString(); @@ -112,7 +112,7 @@ public void testParseBulkErrorsInES1x() throws Exception { assertEquals("{0, 1, 2, 3, 4}", inputData.leftoversPosition().toString()); Response response = new SimpleResponse(HttpStatus.OK, getClass().getResourceAsStream("bulk-retry-output-es1x.json"), ""); - assertTrue(rc.retryFailedEntries(response, inputData)); + assertTrue(rc.processBulkResponse(response, inputData).getHttpStatus() == HttpStatus.SERVICE_UNAVAILABLE); assertEquals(3, inputData.entries()); assertEquals("{1, 3, 4}", inputData.leftoversPosition().toString()); String string = inputData.toString(); @@ -137,7 +137,7 @@ public void testParseBulkErrorsInES10x() throws Exception { assertEquals("{0, 1, 2, 3, 4}", inputData.leftoversPosition().toString()); Response response = new SimpleResponse(HttpStatus.OK, getClass().getResourceAsStream("bulk-retry-output-es10x.json"), ""); - assertTrue(rc.retryFailedEntries(response, inputData)); + assertTrue(rc.processBulkResponse(response, inputData).getHttpStatus() == HttpStatus.SERVICE_UNAVAILABLE); assertEquals(3, inputData.entries()); assertEquals("{1, 3, 4}", inputData.leftoversPosition().toString()); String string = inputData.toString(); @@ -162,7 +162,7 @@ public void testParseBulkErrorsInES090x() throws Exception { assertEquals("{0, 1, 2, 3, 4}", inputData.leftoversPosition().toString()); Response response = new SimpleResponse(HttpStatus.OK, getClass().getResourceAsStream("bulk-retry-output-es090x.json"), ""); - assertTrue(rc.retryFailedEntries(response, inputData)); + assertTrue(rc.processBulkResponse(response, inputData).getHttpStatus() == HttpStatus.SERVICE_UNAVAILABLE); assertEquals(3, inputData.entries()); assertEquals("{1, 3, 4}", inputData.leftoversPosition().toString()); String string = inputData.toString(); @@ -187,7 +187,7 @@ public void testParseBulkErrorsInES5x() throws Exception { assertEquals("{0, 1, 2, 3, 4}", inputData.leftoversPosition().toString()); Response response = new SimpleResponse(HttpStatus.OK, getClass().getResourceAsStream("bulk-retry-output-es5x.json"), ""); - assertTrue(rc.retryFailedEntries(response, inputData)); + assertTrue(rc.processBulkResponse(response, inputData).getHttpStatus() == HttpStatus.SERVICE_UNAVAILABLE); assertEquals(3, inputData.entries()); assertEquals("{1, 3, 4}", inputData.leftoversPosition().toString()); String string = inputData.toString(); diff --git a/storm/src/main/java/org/elasticsearch/storm/EsBolt.java b/storm/src/main/java/org/elasticsearch/storm/EsBolt.java index 499b2039146df1..b9477e7a461470 100644 --- a/storm/src/main/java/org/elasticsearch/storm/EsBolt.java +++ b/storm/src/main/java/org/elasticsearch/storm/EsBolt.java @@ -144,8 +144,7 @@ private void flushWithAck() { BitSet flush = null; try { - flush = writer.repository.tryFlush(); - writer.repository.discard(); + flush = writer.repository.tryFlush().getLeftovers(); } catch (EsHadoopException ex) { // fail all recorded tuples for (Tuple input : inflightTuples) {