Writing broken records or texts to a file to store them or retry later #27

Open
frsyuki opened this Issue Jan 29, 2015 · 6 comments

Projects

None yet

3 participants

@frsyuki
Contributor
frsyuki commented Jan 29, 2015

Data include a lot of broken records. A bulk import can skip them but we want to load them later as an exceptional case. To do it, we want to get those error records written to other files or databases.

Difficulty in terms of API design is that format of the records can be different depending on plugin types.

  • Encoder, Decoder, some Parser plugins
    • These plugins read with buffer. They can't recognize "records". When they detect broken data, they skip the entire file
  • Line-based parser plugins
    • Some parser plugins are based on lines (e.g. csv). They can skip a line and continue parsing from the next line.
  • Formatter and Output plugins
    • Formatter plugins read records. They can skip a record whose schema is fixed by the previous plugins.
  • Filter plugins (#26)
    • Filter plugins read records. They can skip a record whose schema is fixed by the previous plugins.

So, depending on plugins, error output needs to store 3 kinds of data:

a) file from a certain position
b) line
c) record with various schema

@frsyuki frsyuki changed the title from Writing broken records or texts to a separated destination to Writing broken records or texts to a file to retry later Jan 29, 2015
@frsyuki frsyuki changed the title from Writing broken records or texts to a file to retry later to Writing broken records or texts to a file to store them or retry later Jan 29, 2015
@frsyuki
Contributor
frsyuki commented Apr 14, 2015

A possible idea:

XxxPlugin extends interface Plugin;
ErrorReporter Exec.getErrorReporter(Plugin instance);
void ErrorReporter.reportError(XxxException cause, Record record);

static Record Record.copyOf(Schema, PageReader);
static Record Record.ofText(String text);
static RecordBuilder Record.builder();
Schema Record.getSchema();
RecordReader Record.getRecordReader()?

interface ErrorPlugin extends Plugin;
TaskSource ErrorPlugin.configure(ConfigSource config);
ErrorReporter ErrorPlugin.open(TaskSource taskSource, PluginType pluginType);
void ErrorReporter.report(Exception cause, Record record);

static PluginType PluginType.of(Plugin instance);

Plugins (input, parser, etc.) calls Exec.getErrorReporter(this) to get a ErrorReporter and calls ErrorReporter.reportError(error, Record.of(...)) to report a broken record.

An example CsvErrorPlugin writes error records to {exec.getTransactionUniqueName()}.{pluginType}.{schema.hashCode()}.{cause.getClass().getSimpleName()}.{seqId}.csv file.

Another example JsonErrorPlugin writes error in {"cause":"{cause.getClass().getSimpleName()}", "message":"{cause.getMessage()}", "schema":{schema}, "record": {record}} format.

@frsyuki
Contributor
frsyuki commented Apr 15, 2015

I found that this idea by itself doesn't work because it doesn't consider distributed execution environment.
Each ErrorReporter (each ErrorPlugin.open calls) needs an unique identifier. The identifier is a kind of task ID. Such ID is not available in ExecSession.

@frsyuki
Contributor
frsyuki commented Apr 21, 2015

Here is another idea.

At any plugin configuration, support error: field. For example,

in:
  type: file
  parser:
    type: csv
  error:
    parser:
      type: raw
    out:
      type: stdout
filters:
  - type: xyz
    error:
      out:
        type: stdout
out:
  type: mysql
  error:
    out: stdout

The error: section has out: section. out: section is an output plugin that writes the error records to somewhere (stderr as a message, local file, database, etc.).

If the error: section is at FileInputPlugin, out: section has parser: section as well as out: section.

For example, if CSV parser plugin finds a broken line, it encodes the line into a binary and writes it to the plugin created from the error: section (using FileOutput interface).

Another example: if MySQL output plugin finds a broken record, it writes the input record to the plugin created from the error: section (using PageOutput interface).

We would need some plugins to handle errors smoothly. For example,

  • embulk-filter-limit: throws an exception if number of records exceeds certain number.
  • embulk-parser-raw: converts a line into a single-field record like {text: "text..."}
  • embulk-output-warning: show warning through a logger
@hito4t
Contributor
hito4t commented Apr 22, 2015

It seems very good!

I have one request.
In many cases, a same error handler may be used for all plugins (in, filter and out).
If we can define the default error handler for all plugins, very convenient.

For example:

in:
  type: file
  parser:
    type: csv
  error:
    out:
      type: syslog
out:
  type: mysql
default:
  error:
    out: stdout

In this case, the default error handler is stdout.
The error handler of input plugin is overridden to syslog.
The error handler of output plugin is stdout (default).

The default section may be usable for other features.

@hito4t
Contributor
hito4t commented Apr 23, 2015

I have one more question.
What's the purpose of "schema.hashCode()" ?

@frsyuki
Contributor
frsyuki commented Aug 17, 2015

I think error handling needs a sort of mixin mechanism.
InputPlugin or FileInputPlugin includes an error handling module as a mixin.

@muga muga added this to the v0.9 milestone Apr 12, 2016
@muga muga removed this from the v0.9 milestone Feb 24, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment