Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ConfigInputPlugin for easier testing. #678

Merged
merged 3 commits into from Jun 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,170 @@
package org.embulk.standards;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import org.embulk.config.Config;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.Column;
import org.embulk.spi.ColumnVisitor;
import org.embulk.spi.DataException;
import org.embulk.spi.Exec;
import org.embulk.spi.InputPlugin;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
import org.embulk.spi.SchemaConfig;
import org.embulk.spi.json.JsonParser;
import org.embulk.spi.json.JsonParseException;
import org.embulk.spi.time.TimestampParseException;
import org.embulk.spi.time.TimestampParser;
import org.embulk.spi.util.Timestamps;

public class ConfigInputPlugin
implements InputPlugin
{
private interface PluginTask
extends Task, TimestampParser.Task
{
@Config("columns")
SchemaConfig getSchemaConfig();

@Config("values")
List<List<List<JsonNode>>> getValues();
}

@Override
public ConfigDiff transaction(ConfigSource config,
InputPlugin.Control control)
{
final PluginTask task = config.loadConfig(PluginTask.class);
final Schema schema = task.getSchemaConfig().toSchema();
final List<List<List<JsonNode>>> values = task.getValues();
final int taskCount = values.size();

return resume(task.dump(), schema, taskCount, control);
}

@Override
public ConfigDiff resume(TaskSource taskSource,
Schema schema, int taskCount,
InputPlugin.Control control)
{
control.run(taskSource, schema, taskCount);
return Exec.newConfigDiff();
}

@Override
public void cleanup(TaskSource taskSource,
Schema schema, int taskCount,
List<TaskReport> successTaskReports)
{
}

@Override
public TaskReport run(TaskSource taskSource,
Schema schema, int taskIndex,
PageOutput output)
{
final PluginTask task = taskSource.loadTask(PluginTask.class);
final List<List<JsonNode>> taskValues = task.getValues().get(taskIndex);
final TimestampParser[] timestampParsers = Timestamps.newTimestampColumnParsers(task, task.getSchemaConfig());
final JsonParser jsonParser = new JsonParser();

try (final PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output)) {
for (final List<JsonNode> rowValues : taskValues) {
schema.visitColumns(new ColumnVisitor() {
public void booleanColumn(Column column)
{
final JsonNode value = rowValues.get(column.getIndex());
if (value == null || value.isNull()) {
pageBuilder.setNull(column);
}
else {
pageBuilder.setBoolean(column, value.asBoolean());
}
}

public void longColumn(Column column)
{
final JsonNode value = rowValues.get(column.getIndex());
if (value == null || value.isNull()) {
pageBuilder.setNull(column);
}
else {
pageBuilder.setLong(column, value.asLong());
}
}

public void doubleColumn(Column column)
{
final JsonNode value = rowValues.get(column.getIndex());
if (value == null || value.isNull()) {
pageBuilder.setNull(column);
}
else {
pageBuilder.setDouble(column, value.asDouble());
}
}

public void stringColumn(Column column)
{
final JsonNode value = rowValues.get(column.getIndex());
if (value == null || value.isNull()) {
pageBuilder.setNull(column);
}
else {
pageBuilder.setString(column, value.asText());
}
}

public void timestampColumn(Column column)
{
final JsonNode value = rowValues.get(column.getIndex());
if (value == null || value.isNull()) {
pageBuilder.setNull(column);
}
else {
try {
pageBuilder.setTimestamp(column,
timestampParsers[column.getIndex()].parse(value.asText()));
}
catch (TimestampParseException ex) {
throw new DataException(ex);
}
}
}

public void jsonColumn(Column column)
{
final JsonNode value = rowValues.get(column.getIndex());
if (value == null || value.isNull()) {
pageBuilder.setNull(column);
}
else {
try {
pageBuilder.setJson(column, jsonParser.parse(value.toString()));
}
catch (JsonParseException ex) {
throw new DataException(ex);
}
}
}
});
pageBuilder.addRecord();
}
pageBuilder.finish();
}

return Exec.newTaskReport();
}

@Override
public ConfigDiff guess(ConfigSource config)
{
return Exec.newConfigDiff();
}
}
Expand Up @@ -23,6 +23,7 @@ public void configure(Binder binder)
Preconditions.checkNotNull(binder, "binder is null.");

// input plugins
registerPluginTo(binder, InputPlugin.class, "config", ConfigInputPlugin.class);
registerPluginTo(binder, InputPlugin.class, "file", LocalFileInputPlugin.class);

// parser plugins
Expand Down