Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add clean illegal characters mode to json parser. #651

Merged
merged 5 commits into from Jun 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,6 +1,10 @@
package org.embulk.standards;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.io.CharSource;
import com.google.common.io.CharStreams;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigSource;
Expand All @@ -18,20 +22,50 @@
import org.embulk.spi.json.JsonParser;
import org.embulk.spi.type.Types;
import org.embulk.spi.util.FileInputInputStream;
import org.jruby.embed.io.ReaderInputStream;
import org.msgpack.core.Preconditions;
import org.msgpack.value.Value;
import org.slf4j.Logger;

import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.regex.Pattern;

public class JsonParserPlugin
implements ParserPlugin
{

public enum InvalidEscapeStringPolicy
{
PASSTHROUGH("PASSTHROUGH"),
SKIP("SKIP"),
UNESCAPE("UNESCAPE");

private final String string;

private InvalidEscapeStringPolicy(String string)
{
this.string = string;
}

public String getString()
{
return string;
}
}

public interface PluginTask
extends Task
{
@Config("stop_on_invalid_record")
@ConfigDefault("false")
boolean getStopOnInvalidRecord();

@Config("invalid_string_escapes")
@ConfigDefault("\"PASSTHROUGH\"")
InvalidEscapeStringPolicy getInvalidEscapeStringPolicy();
}

private final Logger log;
Expand Down Expand Up @@ -66,7 +100,7 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu
FileInputInputStream in = new FileInputInputStream(input)) {
while (in.nextFile()) {
boolean evenOneJsonParsed = false;
try (JsonParser.Stream stream = newJsonStream(in)) {
try (JsonParser.Stream stream = newJsonStream(in, task)) {
Value value;
while ((value = stream.next()) != null) {
try {
Expand Down Expand Up @@ -107,10 +141,87 @@ private PageBuilder newPageBuilder(Schema schema, PageOutput output)
return new PageBuilder(Exec.getBufferAllocator(), schema, output);
}

private JsonParser.Stream newJsonStream(FileInputInputStream in)
private JsonParser.Stream newJsonStream(FileInputInputStream in, PluginTask task)
throws IOException
{
return new JsonParser().open(in);
InvalidEscapeStringPolicy policy = task.getInvalidEscapeStringPolicy();
switch (policy) {
case SKIP:
case UNESCAPE:
Iterable<CharSource> lines = Lists.transform(CharStreams.readLines(new BufferedReader(new InputStreamReader(in))),
invalidEscapeStringFunction(policy));
return new JsonParser().open(new ReaderInputStream(CharSource.concat(lines).openStream()));
case PASSTHROUGH:
default:
return new JsonParser().open(in);
}
}

Function<String, CharSource> invalidEscapeStringFunction(final InvalidEscapeStringPolicy policy)
{
return new Function<String, CharSource>()
{
final Pattern digitsPattern = Pattern.compile("\\p{XDigit}+");

@Override
public CharSource apply(@Nullable String input)
{
Preconditions.checkNotNull(input);
if (policy == InvalidEscapeStringPolicy.PASSTHROUGH) {
return CharSource.wrap(input);
}
StringBuilder builder = new StringBuilder();
char[] charArray = input.toCharArray();
for (int characterIndex = 0; characterIndex < charArray.length; characterIndex++) {
char c = charArray[characterIndex];
if (c == '\\') {
if (charArray.length > characterIndex + 1) {
char next = charArray[characterIndex + 1];
switch (next) {
case 'b':
case 'f':
case 'n':
case 'r':
case 't':
case '"':
case '\\':
case '/':
builder.append(c);
break;
case 'u': // hexstring such as \u0001
if (charArray.length > characterIndex + 5) {
char[] hexChars = {charArray[characterIndex + 2], charArray[characterIndex + 3], charArray[characterIndex + 4],
charArray[characterIndex + 5]};
String hexString = new String(hexChars);
if (digitsPattern.matcher(hexString).matches()) {
builder.append(c);
} else {
if (policy == InvalidEscapeStringPolicy.SKIP) {
// remove \\u
characterIndex++;
}
}
}
break;
default:
switch (policy) {
case SKIP:
characterIndex++;
break;
case UNESCAPE:
break;
}
break;
}
}
}
else {
builder.append(c);
}
}
return CharSource.wrap(builder.toString());
}
};
}

static class JsonRecordValidateException
Expand Down
@@ -1,10 +1,12 @@
package org.embulk.standards;

import com.google.common.collect.ImmutableList;
import com.google.common.io.CharSource;
import org.embulk.EmbulkTestRuntime;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskSource;
import org.embulk.spi.DataException;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInput;
import org.embulk.spi.ParserPlugin;
import org.embulk.spi.Schema;
Expand All @@ -22,6 +24,9 @@
import java.util.List;
import java.util.Map;

import static org.embulk.standards.JsonParserPlugin.InvalidEscapeStringPolicy.PASSTHROUGH;
import static org.embulk.standards.JsonParserPlugin.InvalidEscapeStringPolicy.SKIP;
import static org.embulk.standards.JsonParserPlugin.InvalidEscapeStringPolicy.UNESCAPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -48,6 +53,17 @@ public void createResource()
output = new MockPageOutput();
}

@Test
public void checkDefaultValues()
{
ConfigSource config = Exec.newConfigSource();

JsonParserPlugin.PluginTask task = config.loadConfig(JsonParserPlugin.PluginTask.class);
assertEquals(false, task.getStopOnInvalidRecord());
assertEquals(JsonParserPlugin.InvalidEscapeStringPolicy.PASSTHROUGH, task.getInvalidEscapeStringPolicy());
}


@Test
public void readNormalJson()
throws Exception
Expand Down Expand Up @@ -132,6 +148,172 @@ public void readBrokenJson()
}
}

@Test
public void useDefaultInvalidEscapeStringFunction()
throws Exception
{
try {
transaction(config, fileInput(
"{\"\\a\":\"b\"}\\" // throw DataException
));
fail();
}
catch (Throwable t) {
assertTrue(t instanceof DataException);
}
}

@Test
public void usePassthroughInvalidEscapeStringFunction()
throws Exception
{
try {
ConfigSource config = this.config.deepCopy().set("invalid_string_escapes", "PASSTHROUGH");
transaction(config, fileInput(
"{\"\\a\":\"b\"}\\" // throw DataException
));
fail();
}
catch (Throwable t) {
assertTrue(t instanceof DataException);
}
}

@Test
public void useSkipInvalidEscapeString()
throws Exception
{
ConfigSource config = this.config.deepCopy().set("invalid_string_escapes", "SKIP");
transaction(config, fileInput(
"{\"\\a\":\"b\"}\\"
));

List<Object[]> records = Pages.toObjects(plugin.newSchema(), output.pages);
assertEquals(1, records.size());
Object[] record = records.get(0);
Map<Value, Value> map = ((Value)record[0]).asMapValue().map();
assertEquals(newString("b"), map.get(newString("")));
}

@Test
public void useUnEscapeInvalidEscapeString()
throws Exception
{
ConfigSource config = this.config.deepCopy().set("invalid_string_escapes", "UNESCAPE");
transaction(config, fileInput(
"{\"\\a\":\"b\"}\\"
));

List<Object[]> records = Pages.toObjects(plugin.newSchema(), output.pages);
assertEquals(1, records.size());
Object[] record = records.get(0);
Map<Value, Value> map = ((Value)record[0]).asMapValue().map();
assertEquals(newString("b"), map.get(newString("a")));
}

@Test
public void checkInvalidEscapeStringFunction()
throws Exception
{
//PASSTHROUGH
{
String json = "{\\\"_c0\\\":true,\\\"_c1\\\":10,\\\"_c2\\\":\\\"embulk\\\",\\\"_c3\\\":{\\\"k\\\":\\\"v\\\"}}";
CharSource actual = plugin.invalidEscapeStringFunction(PASSTHROUGH).apply(json);
assertEquals(json , actual.read());
}

{
String json = "{\"abc\b\f\n\r\t\\\\u0001\":\"efg\"}\\";
CharSource actual = plugin.invalidEscapeStringFunction(PASSTHROUGH).apply(json);
assertEquals(json , actual.read());
}

{
String json = "{\"\\a\":\"b\"}\\";
CharSource actual = plugin.invalidEscapeStringFunction(PASSTHROUGH).apply(json);
assertEquals(json , actual.read());
}

//SKIP
{
String json = "{\\\"_c0\\\":true,\\\"_c1\\\":10,\\\"_c2\\\":\\\"embulk\\\",\\\"_c3\\\":{\\\"k\\\":\\\"v\\\"}}";
CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json);
assertEquals(json , actual.read());
}

{
// valid charset u0001
String json = "{\"abc\b\f\n\r\t\\\\u0001\":\"efg\"}\\";
CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json);
assertEquals("{\"abc\b\f\n\r\t\\\\u0001\":\"efg\"}" , actual.read());
}


{
// invalid charset \\u12xY remove forwarding backslash and u
String json = "{\"\\u12xY\":\"efg\"}\\";
CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json);
assertEquals("{\"12xY\":\"efg\"}" , actual.read());
}

{
String json = "{\"\\a\":\"b\"}\\";
CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json);
// backslash and `a` will removed.
assertEquals("{\"\":\"b\"}" , actual.read());
}

{
// end of lines backspash.
String json = "{\"\\a\":\"b\"}" +
"\n" +
"\\";
CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json);
// backslash and `a` will removed.
assertEquals("{\"\":\"b\"}\n" , actual.read());
}

//UNESCAPE
{
String json = "{\\\"_c0\\\":true,\\\"_c1\\\":10,\\\"_c2\\\":\\\"embulk\\\",\\\"_c3\\\":{\\\"k\\\":\\\"v\\\"}}";
CharSource actual = plugin.invalidEscapeStringFunction(UNESCAPE).apply(json);
assertEquals(json , actual.read());
}

{
String json = "{\"abc\b\f\n\r\t\\\\u0001\":\"efg\"}\\";
CharSource actual = plugin.invalidEscapeStringFunction(UNESCAPE).apply(json);
assertEquals("{\"abc\b\f\n\r\t\\\\u0001\":\"efg\"}" , actual.read());
}

{
// invalid charset u000x remove forwarding backslash
String json = "{\"\\u000x\":\"efg\"}\\";
CharSource actual = plugin.invalidEscapeStringFunction(UNESCAPE).apply(json);
assertEquals("{\"u000x\":\"efg\"}" , actual.read());
}


{
String json = "{\"\\a\":\"b\"}\\";
CharSource actual = plugin.invalidEscapeStringFunction(UNESCAPE).apply(json);
// backslash will removed.
assertEquals("{\"a\":\"b\"}" , actual.read());
}

{
// end of lines backspash.
String json = "{\"\\a\":\"b\"}" +
"\n" +
"\\";
CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json);
// backslash and `a` will removed.
assertEquals("{\"\":\"b\"}\n" , actual.read());
}


}

private ConfigSource config()
{
return runtime.getExec().newConfigSource();
Expand Down