Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/reference/asciidoc/core/configuration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ Whether to invoke an {ref}/indices-refresh.html[index refresh] or not after a bu
Number of retries for a given batch in case {es} is overloaded and data is rejected. Note that only the rejected data is retried. If there is still data rejected after the retries have been performed, the Hadoop job is cancelled (and fails). A negative value indicates infinite retries; be careful in setting this value as it can have unwanted side effects.

`es.batch.write.retry.wait` (default 10s)::
Time to wait between batch write retries.
Time to wait between batch write retries that are caused by bulk rejections.

`es.ser.reader.value.class` (default _depends on the library used_)::
Name of the `ValueReader` implementation for converting JSON to objects. This is set by the framework depending on the library ({mr}, Cascading, Hive, Pig, etc...) used.
Expand Down
347 changes: 347 additions & 0 deletions docs/src/reference/asciidoc/core/errorhandlers.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,347 @@
[[errorhandlers]]
== Error Handlers

NOTE: Error Handlers are a beta feature in 6.2.0. This means that the APIs and configurations detailed
here may be subject to breaking changes between releases while we improve the usability and clarity of the feature.

{ehtm} is designed to be a mostly hands off integration. Most of the features are managed through conventions and
configurations and no substantial amount of code is required to get up and running with the connector. When it comes to
exceptions, substantial effort has been put into handling the most common and expected errors from {es}.
In the case where errors are unexpected or indicative of a real problem, the connector adopts a "fail-fast" approach. We
realize that this approach is not the best for all users, especially those concerned with uptime of their jobs.

In these situations, users can handle unexpected errors by specifying the actions
to take when encountering them. To this end, we have provided a set of APIs and extensions that users can
implement in order to tailor the connector's behavior toward encountering failures in the most common locations to their
needs.

[[errorhandlers-mechanics]]
[float]
=== Error Handler Mechanics

Each type of failure that can be handled in {eh} has its own error handler API tailored to operation being performed.
An error handler is simply a function that is called when an error occurs, and informs {eh} how to proceed.
Multiple error handlers may be specified for each type of failure. When a failure occurs, each error handler will be
executed in the order that they are specified in the configuration.

An error handler is given information about the performed operation and any details about the error that was
encountered. The handler may then acknowledge and consume the failure telling {eh} to ignore the error.
Alternatively, the handler may mark the error to be rethrown, potentially ending the job. Error handlers are also given
the option to modify the operation's parameters and retry it. Finally, the handler may also "pass" the failure on to the
next handler in the list if it does not know how to handle it.

If every handler in the provided list of handlers chooses to "pass", it is marked as an unhandled
error and the exceptions will be rethrown, potentially ending the job. The connector ships with a few default error
handlers that take care of most use cases, but if you find that you need a more specific error handling strategy, you
can always write your own.

In the following sections, we will detail the different types of error handlers, where they are called, how to configure
them, and how to write your own if you need to.


[[errorhandlers-bulk]]
=== Bulk Write Error Handlers

When writing data, the connector batches up documents into a bulk request before sending them to {es}. In the response,
{es} returns a status for each document sent, which may include rejections or failures. A common error encountered
here is a _rejection_ which means that the shard that the document was meant to be written to was too busy to accept
the write. Other failures here might include documents that are refused because they do not conform to the current
index mapping, or conflict with the current version of the document.

{ehtm} provides an API to handle document level errors from bulk responses. Error handlers for bulk writes are given:

- The raw JSON bulk entry that was tried
- Error message
- HTTP status code for the document
- Number of times that the current document has been sent to {es}


[[errorhandlers-bulk-http]]
[float]
==== HTTP Retry Handler
Always configured as the first error handler for bulk writes.

This handler checks the failures for common HTTP codes that can be retried. These codes are usually indicators that the
shard the document would be written to is too busy to accept the write, and the document was rejected to shed load.
This handler is always configured to be run first for bulk failures. All handlers that are configured by the user are
placed in order after this one.

While the handler's position in the list of error handlers cannot be modified, its behavior can be modified by adjusting
the following configurations:

`es.batch.write.retry.policy` (default: simple)::
Defines the policy for determining which http codes are able to be retried. The default value `simple` allows for 429
(Too Many Requests) and 503 (Unavailable) to be retried. Setting the value to `none` will allow no status codes to be
retried.

`es.batch.write.retry.count` (default 3)::
Number of retries for a given batch in case {es} is overloaded and data is rejected. Note that only the rejected data
is retried. If there is still data rejected after the retries have been performed, the Hadoop job is cancelled (and
fails). A negative value indicates infinite retries; be careful in setting this value as it can have unwanted side
effects.

`es.batch.write.retry.wait` (default 10s)::
Time to wait between batch write retries that are caused by bulk rejections.


[[errorhandlers-bulk-log]]
[float]
==== Drop and Log Error Handler
Default Handler Name: `log`

When this handler is invoked it logs a message containing the JSON bulk entry that failed, the error message, and any previous
handler messages. After logging this message, the handler signals that the error has been acknowledged, thus
consuming/ignoring it.

Available configurations for this handler:

`es.write.rest.error.handler.log.logger.name` (required)::
The string name to use when creating the logger instance to log the errors. This setting is required if this handler is used.

`es.write.rest.error.handler.log.logger.class` (alternative to logger.name)::
The class name to use when creating the logger instance to log the errors. This setting can be used instead of the
required setting `es.write.rest.error.handler.log.logger.name`.

`es.write.rest.error.handler.log.logger.level` (default: WARN)::
The logger level to use when logging the error message. Available options are `FATAL`, `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`.


[[errorhandlers-bulk-fail]]
[float]
==== Abort on Failure Error Handler
Default Handler Name: `fail`

When this handler is called it rethrows the error given to it and aborts. This handler is always loaded and automatically
placed at the end of the list of error handlers.

There are no configurations for this handler.


[[errorhandlers-bulk-use]]
[float]
==== Using Bulk Error Handlers

To configure bulk error handlers, you must specify the handlers in order with the following properties.

Setting `es.write.rest.error.handlers`::
Lists the names of the error handlers to use for bulk write error handling, and the order that they should be called on.
Each default handler can be referenced by their handler name as the connector knows how to load them. Any handlers
provided from users or third party code will need to have their handler names defined with the `es.write.rest.error.handler.`
prefix.

For bulk write failures, the HTTP Retry built-in handler is always placed as the first error handler. Additionally, the Abort on
Failure built-in handler is always placed as the last error handler to catch any unhandled errors. These two error handlers alone
form the default bulk write error handling behavior for {eh}, which matches the behavior from previous versions.

1. HTTP Retry Built-In Handler: Retries benign bulk rejections and failures from {es} and passes any other error down the line
2. Any configured user handlers will go here.
3. Abort on Failure Built-In Handler: Rethrows the any errors it encounters

This behavior is modified by inserting handlers into the chain by using the handlers property. Let's say that we want
to log ALL errors and ignore them.

[source,ini]
----
es.write.rest.error.handlers = log <1>
----
<1> Specifying the default Drop and Log handler

With the above configuration, the handler list now looks like the following:

1. HTTP Retry Handler
2. Drop and Log Handler
3. Abort on Failure Handler

As described above, the built-in `log` error handler has a required setting: What to use for the logger name. The logger
used will respect whatever logging configuration you have in place, and thus needs a name for the logger to use:

[source,ini]
----
es.write.rest.error.handlers = log <1>
es.write.rest.error.handler.log.logger.name = BulkErrors <2>
----
<1> Specifying the default Drop and Log built-in handler
<2> The Drop and Log built-in handler will log all errors to the `BulkErrors` logger

At this point, the Abort on Failure built-in handler is effectively ignored since the Drop and Log built-in handler will
always mark an error as consumed. This practice can prove to be hazardous, as potentially important errors may simply be
ignored. In many cases, it is preferable for users to write their own error handler to handle expected exceptions.

[[errorhandlers-bulk-user-handlers]]
[float]
==== Writing Your Own Bulk Error Handlers

Let's say that you are streaming sensitive transaction data to {es}. In this scenario, your data is carefully versioned
and you take advantage of {es}'s version system to keep from overwriting newer data with older data. Perhaps your data
is distributed in a way that allows newer data to sneak in to {es} before some older bits of data. No worries, the
version system will reject the older data and preserve the integrity of the data in {es}. The problem here is that your
streaming job has failed because conflict errors were returned and the connector was unsure if you were expecting that.

Let's write an error handler for this situation:

[source, java]
----
package org.myproject.myhandlers;

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteErrorHandler;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteFailure;
import org.elasticsearch.hadoop.rest.bulk.handler.DelayableErrorCollector;

public class IgnoreConflictsHandler extends BulkWriteErrorHandler { <1>

private static final Logger LOGGER = ...; <2>

@Override
public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) <3>
throws Exception
{
if (entry.getResponseCode() == 409) { <4>
LOGGER.warn("Encountered conflict response. Ignoring old data.");
return HandlerResult.HANDLED; <5>
}
return collector.pass("Not a conflict response code."); <6>
}
}
----
<1> We create a class and extend the BulkWriteErrorHandler base class
<2> Create a logger using preferred logging solution
<3> Override the `onError` method which will be invoked with the error details
<4> Check the response code from the error to see if it is 409 (Confict)
<5> If it is a conflict, log the error and return `HandlerResult.HANDLED` to signal that the error is acknowledged
<6> If the error is not a conflict we pass it along to the next error handler with the reason we couldn't handle it

Before we can place this handler in the list of bulk write error handlers, we must register the handler class with a
name in the settings using `es.write.rest.error.handler.[HANDLER-NAME]`:

Setting `es.write.rest.error.handler.[HANDLER-NAME]`::
Create a new handler named HANDLER-NAME. The value of this property must be the binary name of the class to
instantiate for this handler.

In this case, lets register a handler name for our ignore conflicts handler:

[source,ini]
----
es.write.rest.error.handler.ignoreConflict = org.myproject.myhandlers.IgnoreConflictsHandler
----

Now that we have a name for the handler, we can use it in the handler list:

[source,ini]
----
es.write.rest.error.handlers = ignoreConflict
es.write.rest.error.handler.ignoreConflict = org.myproject.myhandlers.IgnoreConflictsHandler
----

Now, your ignore conflict error handler will be invoked whenever a bulk failure occurs, and will instruct the connector
that it is ok with ignoring conflict response codes from {es}.

[[errorhandlers-bulk-advanced]]
[float]
==== Advanced Concepts

What if instead of logging data and dropping it, what if you wanted to persist it somewhere for safe keeping? What if
we wanted to pass properties into our handlers to parameterize their behavior? Lets create a handler that stores error
information in a local file for later analysis.

[source, java]
----
package org.myproject.myhandlers;

import ...

import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteErrorHandler;
import org.elasticsearch.hadoop.rest.bulk.handler.BulkWriteFailure;
import org.elasticsearch.hadoop.rest.bulk.handler.DelayableErrorCollector;

public class OutputToFileHandler extends BulkWriteErrorHandler { <1>

private OutputStream outputStream; <2>
private BufferedWriter writer;

@Override
public void init(Properties properties) { <3>
try {
outputStream = new FileOutputStream(properties.getProperty("filename")); <4>
writer = new BufferedWriter(new OutputStreamWriter(outputStream));
} catch (FileNotFoundException e) {
throw new RuntimeException("Could not open file", e);
}
}

@Override
public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) <5>
throws Exception
{
writer.write("Code: " + entry.getResponseCode());
writer.newLine();
writer.write("Error: " + entry.getException().getMessage());
writer.newLine();
for (String message : entry.previousHandlerMessages()) {
writer.write("Previous Handler: " + message); <6>
writer.newLine();
}
writer.write("Attempts: " + entry.getNumberOfAttempts());
writer.newLine();
writer.write("Entry: ");
writer.newLine();
IOUtils.copy(entry.getEntryContents(), writer);
writer.newLine();

return HandlerResult.HANDLED; <7>
}

@Override
public void close() { <8>
try {
writer.close();
outputStream.close();
} catch (IOException e) {
throw new RuntimeException("Closing file failed", e);
}
}
}
----
<1> Extend the BulkWriteErrorHandler base class
<2> Some local state for writing data out to a file
<3> We override the `init` method. Any properties for this handler are passed in here.
<4> We are extracting the file to write to from the properties. We'll see how to set this property below.
<5> Overriding the `onError` method to define our behavior.
<6> Write out the error information. This highlights all the available data provided by the `BulkWriteFailure` object.
<7> Return the `HANDLED` result to signal that the error is handled.
<8> Finally, close out any internally allocated resources.

Added to this handler are the `init` and `close` methods. The `init` method is called when the handler is first created
at the start of the task and the `close` method is called when the task concludes. The `init` method accepts a properties
parameter, which contains any handler specific properties set by using `es.write.rest.error.handler.[HANDLER-NAME].[PROPERTY-NAME]`.

Setting `es.write.rest.error.handler.[HANDLER-NAME].[PROPERTY-NAME]`::
Used to pass properties into handlers. HANDLER-NAME is the handler to be configured, and PROPERTY-NAME is the property
to set for the handler.

In our use case, we will configure the our file logging error handler like so:

[source,ini]
----
es.write.rest.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler <1>
es.write.rest.error.handler.writeFile.filename = /path/to/some/output/file <2>
----
<1> We register our new handler with the name `writeFile`
<2> Now we set a property named `filename` for the `writeFile` handler. In the `init` method of the handler, this can be picked up by using `filename` as the property key.

Now to bring it all together with the previous example (ignoring conflicts):

[source,ini]
----
es.write.rest.error.handlers = ignoreConflict,writeFile

es.write.rest.error.handler.ignoreConflict = org.myproject.myhandlers.IgnoreConflictsHandler

es.write.rest.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler
es.write.rest.error.handler.writeFile.filename = /path/to/some/output/file
----

You now have a chain of handlers that retries bulk rejections by default (HTTP Retry built-in handler), then ignores
any errors that are conflicts (our own ignore conflicts handler), then ignores any other errors by writing them out to
a file (our own output to file handler).
2 changes: 2 additions & 0 deletions docs/src/reference/asciidoc/core/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ include::storm.adoc[]

include::mapping.adoc[]

include::errorhandlers.adoc[]

include::metrics.adoc[]

include::performance.adoc[]
Expand Down