Skip to content

Commit

Permalink
Add serialization error handler API (elastic#1153)
Browse files Browse the repository at this point in the history
Add error handler API to wrap the execution of serializing records to JSON 
bulk entries. This should encapsulate the process of extracting document 
metadata from the record to place in the action header, as well as the 
traversal and conversion of the record fields to JSON data.
  • Loading branch information
jbaiera committed Jun 13, 2018
1 parent fc63bc3 commit d9b2503
Show file tree
Hide file tree
Showing 10 changed files with 1,001 additions and 5 deletions.
272 changes: 272 additions & 0 deletions docs/src/reference/asciidoc/core/errorhandlers.adoc
Expand Up @@ -55,6 +55,7 @@ index mapping, or conflict with the current version of the document.
- HTTP status code for the document
- Number of times that the current document has been sent to {es}

There are a few default error handlers provided by the connector:

[[errorhandlers-bulk-http]]
[float]
Expand Down Expand Up @@ -346,6 +347,275 @@ You now have a chain of handlers that retries bulk rejections by default (HTTP R
any errors that are conflicts (our own ignore conflicts handler), then ignores any other errors by writing them out to
a file (our own output to file handler).

[[errorhandlers-serialization]]
=== Serialization Error Handlers

Before sending data to Elasticsearch, {eh} must serialize each document into a JSON bulk entry. It is during this
process that the bulk operation is determined, document metadata is extracted, and integration specific data structures
are converted into JSON documents. During this process, inconsistencies with record structure can cause exceptions to be
thrown during the serialization process. These errors often lead to failed tasks and halted processing.

{ehtm} provides an API to handle serialization errors at the record level. Error handlers for serialization are given:

- The integration specific data structure that was unable to be serialized
- Exception encountered during serialization

NOTE: Serialization Error Handlers are not yet available for Hive. {ehtm} uses Hive's SerDe constructs to convert data into
bulk entries before being sent to the output format. SerDe objects do not have a cleanup method that is called when the
object ends its lifecycle. Because of this, we do not support serialization error handlers in Hive as they cannot be
closed at the end of the job execution.

There are a few default error handlers provided by the connector:

[[errorhandlers-serialization-log]]
[float]
==== Drop and Log Error Handler
Default Handler Name: `log`

When this handler is invoked it logs a message containing the data structure's toString() contents that failed, the
error message, and any previous handler messages. After logging this message, the handler signals that the error has
been acknowledged, thus consuming/ignoring it.

Available configurations for this handler:

`es.write.data.error.handler.log.logger.name` (required)::
The string name to use when creating the logger instance to log the errors. This setting is required if this handler is used.

`es.write.data.error.handler.log.logger.class` (alternative to logger.name)::
The class name to use when creating the logger instance to log the errors. This setting can be used instead of the
required setting `es.write.data.error.handler.log.logger.name`.

`es.write.data.error.handler.log.logger.level` (default: WARN)::
The logger level to use when logging the error message. Available options are `FATAL`, `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`.


[[errorhandlers-serialization-fail]]
[float]
==== Abort on Failure Error Handler
Default Handler Name: `fail`

When this handler is called it rethrows the error given to it and aborts. This handler is always loaded and automatically
placed at the end of the list of error handlers.

There are no configurations for this handler.


[[errorhandlers-serialization-use]]
[float]
=== Using Serialization Error Handlers

To configure serialization error handlers, you must specify the handlers in order with the following properties.

Setting `es.write.data.error.handlers`::
Lists the names of the error handlers to use for serialization error handling, and the order that they should be called on.
Each default handler can be referenced by their handler name as the connector knows how to load them. Any handlers
provided from users or third party code will need to have their handler names defined with the `es.write.data.error.handler.`
prefix.

For serialization failures, the Abort on Failure built-in handler is always placed as the last error handler to catch
any unhandled errors. This error handler forms the default serialization error handling behavior for {eh}, which
matches the behavior from previous versions.

1. Any configured user handlers will go here.
2. Abort on Failure Built-In Handler: Rethrows the any errors it encounters

This behavior is modified by inserting handlers into the chain by using the handlers property. Let's say that we want
to log ALL errors and ignore them.

[source,ini]
----
es.write.data.error.handlers = log <1>
----
<1> Specifying the default Drop and Log handler

With the above configuration, the handler list now looks like the following:

1. Drop and Log Handler
2. Abort on Failure Handler

As described above, the built-in `log` error handler has a required setting: What to use for the logger name. The logger
used will respect whatever logging configuration you have in place, and thus needs a name for the logger to use:

[source,ini]
----
es.write.data.error.handlers = log <1>
es.write.data.error.handler.log.logger.name = SerializationErrors <2>
----
<1> Specifying the default Drop and Log built-in handler
<2> The Drop and Log built-in handler will log all errors to the `SerializationErrors` logger

At this point, the Abort on Failure built-in handler is effectively ignored since the Drop and Log built-in handler will
always mark an error as consumed. This practice can prove to be hazardous, as potentially important errors may simply be
ignored. In many cases, it is preferable for users to write their own error handler to handle expected exceptions.

[[errorhandlers-serialization-user-handlers]]
[float]
==== Writing Your Own Serialization Handlers

Let's say that you are streaming some unstructured data to {es}. In this scenario, your data is not fully sanitized and
may contain field values that cannot be translated to JSON by the connector. You may not want to have your streaming job
fail on this data, as you are potentially expecting it to contain errors. In this situation, you may want to log the
data in a more comprehensive manner than to rely on the logging solution's toString() method for your data.

Let's write an error handler for this situation:

[source, java]
----
package org.myproject.myhandlers;
import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationFailure;
public class CustomLogOnError extends SerializationErrorHandler { <1>
private Log logger = ???; <2>
@Override
public HandlerResult onError(SerializationFailure entry, ErrorCollector<Object> collector) throws Exception { <3>
MyRecord record = (MyRecord) entry.getRecord(); <4>
logger.error("Could not serialize record. " +
"Record data : " + record.getSpecificField() + ", " + record.getOtherField(), entry.getException()); <5>
return HandlerResult.HANDLED; <6>
}
}
----
<1> We create a class and extend the SerializationErrorHandler base class
<2> Create a logger using preferred logging solution
<3> Override the `onError` method which will be invoked with the error details
<4> Retrieve the record that failed to be serialized. Cast it to the record type you are expecting from your job
<5> Log the specific information from the data you are interested in
<6> Finally after logging the error, return `HandlerResult.HANDLED` to signal that the error is acknowledged

Before we can place this handler in the list of serialization error handlers, we must register the handler class with a
name in the settings using `es.write.data.error.handler.[HANDLER-NAME]`:

Setting `es.write.data.error.handler.[HANDLER-NAME]`::
Create a new handler named HANDLER-NAME. The value of this property must be the binary name of the class to
instantiate for this handler.

In this case, lets register a handler name for our ignore conflicts handler:

[source,ini]
----
es.write.data.error.handler.customLog = org.myproject.myhandlers.CustomLogOnError
----

Now that we have a name for the handler, we can use it in the handler list:

[source,ini]
----
es.write.data.error.handlers = customLog
es.write.data.error.handler.customLog = org.myproject.myhandlers.CustomLogOnError
----

Now, your custom logging error handler will be invoked whenever a serialization failure occurs, and will instruct the
connector that it is ok with ignoring those failures to continue processing.

[[errorhandlers-serialization-advanced]]
[float]
==== Advanced Concepts

Instead of logging data and dropping it, what if you wanted to persist it somewhere for safe keeping? What if
we wanted to pass properties into our handlers to parameterize their behavior? Lets create a handler that stores error
information in a local file for later analysis.

[source, java]
----
package org.myproject.myhandlers;
import ...
import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.write.SerializationFailure;
public class OutputToFileHandler extends SerializationErrorHandler { <1>
private OutputStream outputStream; <2>
private BufferedWriter writer;
@Override
public void init(Properties properties) { <3>
try {
outputStream = new FileOutputStream(properties.getProperty("filename")); <4>
writer = new BufferedWriter(new OutputStreamWriter(outputStream));
} catch (FileNotFoundException e) {
throw new RuntimeException("Could not open file", e);
}
}
@Override
public HandlerResult onError(SerializationFailure entry, ErrorCollector<Object> collector) <5>
throws Exception
{
writer.write("Record: " + entry.getRecord().toString());
writer.newLine();
writer.write("Error: " + entry.getException().getMessage());
writer.newLine();
for (String message : entry.previousHandlerMessages()) {
writer.write("Previous Handler: " + message); <6>
writer.newLine();
}
return HandlerResult.PASS; <7>
}
@Override
public void close() { <8>
try {
writer.close();
outputStream.close();
} catch (IOException e) {
throw new RuntimeException("Closing file failed", e);
}
}
}
----
<1> Extend the SerializationErrorHandler base class
<2> Some local state for writing data out to a file
<3> We override the `init` method. Any properties for this handler are passed in here.
<4> We are extracting the file to write to from the properties. We'll see how to set this property below.
<5> Overriding the `onError` method to define our behavior.
<6> Write out the error information. This highlights all the available data provided by the `SerializationFailure` object.
<7> Return the `PASS` result to signal that the error should be handed off to the next error handler in the chain.
<8> Finally, close out any internally allocated resources.

Added to this handler are the `init` and `close` methods. The `init` method is called when the handler is first created
at the start of the task and the `close` method is called when the task concludes. The `init` method accepts a properties
parameter, which contains any handler specific properties set by using `es.write.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME]`.

Setting `es.write.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME]`::
Used to pass properties into handlers. HANDLER-NAME is the handler to be configured, and PROPERTY-NAME is the property
to set for the handler.

In our use case, we will configure the our file logging error handler like so:

[source,ini]
----
es.write.data.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler <1>
es.write.data.error.handler.writeFile.filename = /path/to/some/output/file <2>
----
<1> We register our new handler with the name `writeFile`
<2> Now we set a property named `filename` for the `writeFile` handler. In the `init` method of the handler, this can be picked up by using `filename` as the property key.

Now to bring it all together:

[source,ini]
----
es.write.data.error.handlers = writeFile,customLog
es.write.data.error.handler.customLog = org.myproject.myhandlers.CustomLogOnError
es.write.data.error.handler.writeFile = org.myproject.myhandlers.OutputToFileHandler
es.write.data.error.handler.writeFile.filename = /path/to/some/output/file
----

You now have a chain of handlers that writes all relevant data about the failure to a file (our writeFile handler), then
logs the errors using a custom log line and ignores the error to continue processing (our customLog handler).

[[errorhandlers-read-json]]
=== Deserialization Error Handlers

Expand All @@ -369,6 +639,8 @@ exception when it is used in a completely different part of the framework. This
the most reasonable place to handle exceptions in the scroll reading process, but this does not encapsulate all logic
for each integration.

There are a few default error handlers provided by the connector:

[[errorhandlers-read-json-log]]
[float]
==== Drop and Log Error Handler
Expand Down
2 changes: 2 additions & 0 deletions hive/src/main/java/org/elasticsearch/hadoop/hive/EsSerDe.java
Expand Up @@ -160,6 +160,8 @@ public Writable serialize(Object data, ObjectInspector objInspector) throws SerD
hiveType.setObjectInspector(objInspector);
hiveType.setObject(data);

// We use the command directly instead of the bulk entry writer since there is no close() method on SerDes.
// See FileSinkOperator#process() for more info of how this is used with the output format.
command.write(hiveType).copyTo(scratchPad);
result.setContent(scratchPad);
return result;
Expand Down
Expand Up @@ -31,8 +31,8 @@
import org.elasticsearch.hadoop.serialization.ScrollReader.Scroll;
import org.elasticsearch.hadoop.serialization.ScrollReaderConfigBuilder;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.serialization.bulk.BulkCommand;
import org.elasticsearch.hadoop.serialization.bulk.BulkCommands;
import org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter;
import org.elasticsearch.hadoop.serialization.bulk.MetadataExtractor;
import org.elasticsearch.hadoop.serialization.dto.NodeInfo;
import org.elasticsearch.hadoop.serialization.dto.ShardInfo;
Expand Down Expand Up @@ -76,10 +76,10 @@ public class RestRepository implements Closeable, StatsAware {
private boolean writeInitialized = false;

private RestClient client;
private BulkCommand command;
// optional extractor passed lazily to BulkCommand
private MetadataExtractor metaExtractor;

private BulkEntryWriter bulkEntryWriter;
private BulkProcessor bulkProcessor;

// Internal
Expand Down Expand Up @@ -135,7 +135,7 @@ private void lazyInitWriting() {
this.writeInitialized = true;
this.bulkProcessor = new BulkProcessor(client, resources.getResourceWrite(), settings);
this.trivialBytesRef = new BytesRef();
this.command = BulkCommands.create(settings, metaExtractor, client.internalVersion);
this.bulkEntryWriter = new BulkEntryWriter(settings, BulkCommands.create(settings, metaExtractor, client.internalVersion));
}
}

Expand Down Expand Up @@ -167,8 +167,7 @@ public void writeToIndex(Object object) {
Assert.notNull(object, "no object data given");

lazyInitWriting();
BytesRef serialized = null;
serialized = command.write(object);
BytesRef serialized = bulkEntryWriter.writeBulkEntry(object);
if (serialized != null) {
doWriteToIndex(serialized);
}
Expand Down Expand Up @@ -220,6 +219,11 @@ public void close() {
stats.aggregate(bulkProcessor.stats());
bulkProcessor = null;
}

if (bulkEntryWriter != null) {
bulkEntryWriter.close();
bulkEntryWriter = null;
}
} finally {
client.close();
// Aggregate stats before discarding them.
Expand Down

0 comments on commit d9b2503

Please sign in to comment.