Skip to content

Commit

Permalink
Feature deserialization handlers (#1149)
Browse files Browse the repository at this point in the history
Make parser instance in ScrollReader a local field. Add a block aware json 
parser implementation that tracks how deep in a nested structure it is and
allows for using logic like ParsingUtils.skipBlock combined with better 
state tracking.
  • Loading branch information
jbaiera committed May 23, 2018
1 parent 50564b5 commit df2e678
Show file tree
Hide file tree
Showing 28 changed files with 2,132 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.ScrollReader.ScrollReaderConfig;
import org.elasticsearch.hadoop.serialization.ScrollReaderConfigBuilder;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.serialization.dto.mapping.Mapping;
import org.elasticsearch.hadoop.serialization.dto.mapping.MappingSet;
Expand Down Expand Up @@ -116,7 +116,7 @@ public TupleEntryIterator openForRead(FlowProcess<Properties> flowProcess, Scrol
.readMetadata(settings.getReadMetadata())
.filters(QueryUtils.parseFilters(settings))
.fields(StringUtils.concatenate(fields, ","));
input = queryBuilder.build(client, new ScrollReader(new ScrollReaderConfig(new JdkValueReader(), mapping, settings)));
input = queryBuilder.build(client, new ScrollReader(ScrollReaderConfigBuilder.builder(new JdkValueReader(), mapping, settings)));
}
return new TupleEntrySchemeIterator<Properties, ScrollQuery>(flowProcess, getScheme(), input, getIdentifier());
}
Expand Down
291 changes: 290 additions & 1 deletion docs/src/reference/asciidoc/core/errorhandlers.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ There are no configurations for this handler.

[[errorhandlers-bulk-use]]
[float]
==== Using Bulk Error Handlers
=== Using Bulk Error Handlers

To configure bulk error handlers, you must specify the handlers in order with the following properties.

Expand Down Expand Up @@ -345,3 +345,292 @@ es.write.rest.error.handler.writeFile.filename = /path/to/some/output/file
You now have a chain of handlers that retries bulk rejections by default (HTTP Retry built-in handler), then ignores
any errors that are conflicts (our own ignore conflicts handler), then ignores any other errors by writing them out to
a file (our own output to file handler).

[[errorhandlers-read-json]]
=== Deserialization Error Handlers

When reading data, the connector executes scroll requests against the configured indices and reads their contents. For
each hit in a scroll search result, the connector attempts to deserialize it into an integration specific record type.
When using MapReduce, this data type is either a MapWritable or Text (for raw JSON data). For an integration like
Spark SQL which uses data schemas, the resulting data type is a Row object.

Elasticsearch stores documents in lucene indices. These documents can sometimes have loose definitions, or have
structures that cannot be parsed into a schema-based data type, for one reason or another. Sometimes a field may be
in a format that cannot be read correctly.

{ehtm} provides an API to handle document level deserialization errors from scroll responses. Error handlers for scroll reads are given:

- The raw JSON search result that was tried
- Exception encountered

Note: Deserialization Error Handlers only allow handling of errors that occur when parsing documents from scroll
responses. It may be possible that a search result can be successfully read, but is still malformed, thus causing an
exception when it is used in a completely different part of the framework. This Error Handler is called from the top of
the most reasonable place to handle exceptions in the scroll reading process, but this does not encapsulate all logic
for each integration.

[[errorhandlers-read-json-log]]
[float]
==== Drop and Log Error Handler
Default Handler Name: `log`

When this handler is invoked it logs a message containing the JSON search hit that failed, the error message, and any previous
handler messages. After logging this message, the handler signals that the error has been acknowledged, thus
consuming/ignoring it.

Available configurations for this handler:

`es.read.data.error.handler.log.logger.name` (required)::
The string name to use when creating the logger instance to log the errors. This setting is required if this handler is used.

`es.read.data.error.handler.log.logger.class` (alternative to logger.name)::
The class name to use when creating the logger instance to log the errors. This setting can be used instead of the
required setting `es.read.data.error.handler.log.logger.name`.

`es.read.data.error.handler.log.logger.level` (default: WARN)::
The logger level to use when logging the error message. Available options are `FATAL`, `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`.


[[errorhandlers-read-json-fail]]
[float]
==== Abort on Failure Error Handler
Default Handler Name: `fail`

When this handler is called it rethrows the error given to it and aborts. This handler is always loaded and automatically
placed at the end of the list of error handlers.

There are no configurations for this handler.


[[errorhandlers-read-json-use]]
[float]
=== Using Deserialization Error Handlers

To configure deserialization error handlers, you must specify the handlers in order with the following properties.

Setting `es.read.data.error.handlers`::
Lists the names of the error handlers to use for deserialization error handling, and the order that they should be called on.
Each default handler can be referenced by their handler name as the connector knows how to load them. Any handlers
provided from users or third party code will need to have their handler names defined with the `es.read.data.error.handler.`
prefix.

For deserialization failures, the Abort on Failure built-in handler is always placed as the last error handler to catch
any unhandled errors. This error handler alone forms the default deserialization error handling behavior for {eh}, which
matches the behavior from previous versions.

1. Any configured user handlers will go here.
2. Abort on Failure Built-In Handler: Rethrows the any errors it encounters

This behavior is modified by inserting handlers into the chain by using the handlers property. Let's say that we want
to log ALL errors and ignore them.

[source,ini]
----
es.read.data.error.handlers = log <1>
----
<1> Specifying the default Drop and Log handler

With the above configuration, the handler list now looks like the following:

1. Drop and Log Handler
2. Abort on Failure Handler

As described above, the built-in `log` error handler has a required setting: What to use for the logger name. The logger
used will respect whatever logging configuration you have in place, and thus needs a name for the logger to use:

[source,ini]
----
es.read.data.error.handlers = log <1>
es.read.data.error.handler.log.logger.name = DeserializationErrors <2>
----
<1> Specifying the default Drop and Log built-in handler
<2> The Drop and Log built-in handler will log all errors to the `DeserializationErrors` logger

At this point, the Abort on Failure built-in handler is effectively ignored since the Drop and Log built-in handler will
always mark an error as consumed. This practice can prove to be hazardous, as potentially important errors may simply be
ignored. In many cases, it is preferable for users to write their own error handler to handle expected exceptions.

[[errorhandlers-read-json-user-handlers]]
[float]
==== Writing Your Own Deserialization Error Handlers

Let's say that you are reading a large index of log data from {es}. In this scenario, your log data is highly
unstructured, and not all of its contents are critical to your process. Due to the volume of data being read, your job
takes a long time to complete. In this case, you might want to replace records that cannot be read with a dummy record
to mark the failure, and not interrupt your processing. The offending data should be logged and dropped.

Let's write an error handler for this situation:

[source, java]
----
package org.myproject.myhandlers;
import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationFailure;
public class ReturnDummyHandler extends DeserializationErrorHandler { <1>
private static final Logger LOGGER = ...; <2>
private static final String DUMMY_RECORD = "..."; <3>
@Override
public HandlerResult onError(DeserializationFailure entry, ErrorCollector<byte[]> collector) <4>
throws Exception
{
BufferedReader reader = new BufferedReader(new InputStreamReader(entry.getHitContents()));
StringBuilder hitContent = new StringBuilder();
for (String line = reader.readLine(); line != null; line = reader.readLine()) { <5>
hitContent.append(line);
}
LOGGER.warn("Encountered malformed record during read. Replacing with dummy record. " + <6>
"Malformed Data: " + hitContent, entry.getException());
return collector.retry(DUMMY_RECORD.getBytes()); <7>
}
}
----
<1> We create a class and extend the DeserializationErrorHandler base class
<2> Create a logger using preferred logging solution
<3> We create a String to use for our dummy record that should be deserialized instead
<4> Override the `onError` method which will be invoked with the error details
<5> We read the contents of the failed search hit as a String
<6> We log the contents of the failed document, as well as the exception that details the cause of the failure
<7> Finally, we return the dummy data contents to be deserialized.

Before we can place this handler in the list of deserialization error handlers, we must register the handler class with a
name in the settings using `es.read.data.error.handler.[HANDLER-NAME]`:

Setting `es.read.data.error.handler.[HANDLER-NAME]`::
Create a new handler named HANDLER-NAME. The value of this property must be the binary name of the class to
instantiate for this handler.

In this case, lets register a handler name for our dummy record handler:

[source,ini]
----
es.read.data.error.handler.returnDummy = org.myproject.myhandlers.ReturnDummyHandler
----

Now that we have a name for the handler, we can use it in the handler list:

[source,ini]
----
es.read.data.error.handlers = returnDummy
es.read.data.error.handler.returnDummy = org.myproject.myhandlers.ReturnDummyHandler
----

Now, your dummy data error handler will be invoked whenever a deserialization failure occurs, and will instruct the
connector to use your provided dummy record instead of the malformed data.

[[errorhandlers-read-json-advanced]]
[float]
==== Advanced Concepts

What if instead of logging data and dropping it, what if you wanted to persist it somewhere for safe keeping? What if
we wanted to pass properties into our handlers to parameterize their behavior? Lets create a handler that stores error
information in a local file for later analysis.

[source, java]
----
package org.myproject.myhandlers;
import ...
import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationErrorHandler;
import org.elasticsearch.hadoop.serialization.handler.read.DeserializationFailure;
public class ReturnDummyAndLogToFileHandler extends DeserializationErrorHandler { <1>
private static final String DUMMY_RECORD = "...";
private OutputStream outputStream; <2>
private BufferedWriter writer;
@Override
public void init(Properties properties) { <3>
try {
outputStream = new FileOutputStream(properties.getProperty("filename")); <4>
writer = new BufferedWriter(new OutputStreamWriter(outputStream));
} catch (FileNotFoundException e) {
throw new RuntimeException("Could not open file", e);
}
}
@Override
public HandlerResult onError(DeserializationFailure entry, ErrorCollector<byte[]> collector) <5>
throws Exception
{
BufferedReader reader = new BufferedReader(new InputStreamReader(entry.getHitContents()));
StringBuilder hitContent = new StringBuilder();
for (String line = reader.readLine(); line != null; line = reader.readLine()) { <6>
hitContent.append(line);
}
writer.write("Error: " + entry.getException().getMessage());
writer.newLine();
for (String message : entry.previousHandlerMessages()) {
writer.write("Previous Handler: " + message); <7>
writer.newLine();
}
writer.write("Entry: ");
writer.newLine();
writer.write(hitContent.toString());
writer.newLine();
return collector.retry(DUMMY_RECORD.getBytes()); <8>
}
@Override
public void close() { <9>
try {
writer.close();
outputStream.close();
} catch (IOException e) {
throw new RuntimeException("Closing file failed", e);
}
}
}
----
<1> Extend the DeserializationErrorHandler base class
<2> Some local state for writing data out to a file
<3> We override the `init` method. Any properties for this handler are passed in here
<4> We are extracting the file to write to from the properties. We'll see how to set this property below
<5> Overriding the `onError` method to define our behavior
<6> Read the contents of the failed search hit
<7> Write out the error information. This highlights all the available data provided by the `DeserializationFailure` object
<8> Perform a retry operation, using our dummy record
<9> Finally, close out any internally allocated resources

Added to this handler are the `init` and `close` methods. The `init` method is called when the scroll query is first
created at the start of the task and the `close` method is called when the scroll query is closed when the task
concludes. The `init` method accepts a properties parameter, which contains any handler specific properties set by
using `es.read.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME]`.

Setting `es.read.data.error.handler.[HANDLER-NAME].[PROPERTY-NAME]`::
Used to pass properties into handlers. HANDLER-NAME is the handler to be configured, and PROPERTY-NAME is the property
to set for the handler.

In our use case, we will configure the our file logging error handler like so:

[source,ini]
----
es.read.data.error.handler.writeFile = org.myproject.myhandlers.ReturnDummyAndLogToFileHandler <1>
es.read.data.error.handler.writeFile.filename = /path/to/some/output/file <2>
----
<1> We register our new handler with the name `writeFile`
<2> Now we set a property named `filename` for the `writeFile` handler. In the `init` method of the handler, this can be picked up by using `filename` as the property key.

Now to bring it all together with the previous example:

[source,ini]
----
es.read.data.error.handlers = writeFile
es.read.data.error.handler.writeFile = org.myproject.myhandlers.ReturnDummyAndLogToFileHandler
es.read.data.error.handler.writeFile.filename = /path/to/some/output/file
----

You now have a handler that retries replaces malformed data with dummy records, then logs those malformed records along
with their error information by writing them out to a custom file.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import org.apache.hadoop.io.Text;
import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.ScrollReader.ScrollReaderConfig;
import org.elasticsearch.hadoop.serialization.ScrollReaderConfigBuilder;
import org.elasticsearch.hadoop.serialization.dto.mapping.FieldParser;
import org.elasticsearch.hadoop.serialization.dto.mapping.Mapping;
import org.elasticsearch.hadoop.util.TestSettings;
import org.junit.Test;

import static org.hamcrest.Matchers.containsString;
Expand All @@ -40,7 +41,12 @@ public class HiveValueReaderTest {

@Test
public void testDateMapping() throws Exception {
ScrollReader reader = new ScrollReader(new ScrollReaderConfig(new HiveValueReader(), mapping("hive-date-mappingresponse.json"), false, "_mapping", false, false));
ScrollReaderConfigBuilder scrollCfg = ScrollReaderConfigBuilder.builder(new HiveValueReader(), new TestSettings())
.setResolvedMapping(mapping("hive-date-mappingresponse.json"))
.setReadMetadata(false)
.setReturnRawJson(false)
.setIgnoreUnmappedFields(false);
ScrollReader reader = new ScrollReader(scrollCfg);
InputStream stream = getClass().getResourceAsStream("hive-date-source.json");
List<Object[]> read = reader.read(stream).getHits();
assertEquals(1, read.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.hadoop.integration.rest;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -32,7 +33,7 @@
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.ScrollQuery;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.ScrollReader.ScrollReaderConfig;
import org.elasticsearch.hadoop.serialization.ScrollReaderConfigBuilder;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.serialization.builder.JdkValueWriter;
import org.elasticsearch.hadoop.serialization.dto.mapping.MappingSet;
Expand Down Expand Up @@ -93,8 +94,16 @@ public void testQueryBuilder() throws Exception {
.filters(QueryUtils.parseFilters(settings));
MappingSet mappingSet = client.getMappings();

ScrollReaderConfig scrollReaderConfig = new ScrollReaderConfig(new JdkValueReader(), mappingSet.getResolvedView(), true, "_metadata", false, false);
ScrollReader reader = new ScrollReader(scrollReaderConfig);
ScrollReaderConfigBuilder scrollCfg = ScrollReaderConfigBuilder.builder(new JdkValueReader(), settings)
.setResolvedMapping(mappingSet.getResolvedView())
.setReadMetadata(true)
.setMetadataName("_metadata")
.setReturnRawJson(false)
.setIgnoreUnmappedFields(false)
.setIncludeFields(Collections.<String>emptyList())
.setExcludeFields(Collections.<String>emptyList())
.setIncludeArrayFields(Collections.<String>emptyList());
ScrollReader reader = new ScrollReader(scrollCfg);

int count = 0;
for (ScrollQuery query = qb.build(client, reader); query.hasNext();) {
Expand Down
Loading

0 comments on commit df2e678

Please sign in to comment.