From f96d1d6082491a986e6841cd4a2ab6da9a50d054 Mon Sep 17 00:00:00 2001 From: smdmts Date: Sun, 28 May 2017 20:11:16 +0900 Subject: [PATCH 1/4] add clean illegal char mode to json parser. --- .../embulk/standards/JsonParserPlugin.java | 85 ++++++++++++++- .../standards/TestJsonParserPlugin.java | 100 +++++++++++++++++- 2 files changed, 176 insertions(+), 9 deletions(-) diff --git a/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java b/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java index 6020aab1b..f23b7595e 100644 --- a/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java +++ b/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java @@ -1,6 +1,10 @@ package org.embulk.standards; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.io.CharSource; +import com.google.common.io.CharStreams; import org.embulk.config.Config; import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigSource; @@ -18,10 +22,18 @@ import org.embulk.spi.json.JsonParser; import org.embulk.spi.type.Types; import org.embulk.spi.util.FileInputInputStream; +import org.jruby.embed.io.ReaderInputStream; import org.msgpack.value.Value; import org.slf4j.Logger; +import javax.annotation.Nullable; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; +import java.nio.charset.StandardCharsets; +import java.util.regex.Pattern; public class JsonParserPlugin implements ParserPlugin @@ -32,6 +44,10 @@ public interface PluginTask @Config("stop_on_invalid_record") @ConfigDefault("false") boolean getStopOnInvalidRecord(); + + @Config("clean_illegal_char") + @ConfigDefault("false") + boolean getCleanIllegalChar(); } private final Logger log; @@ -63,9 +79,9 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu final Column column = schema.getColumn(0); // record column try (PageBuilder pageBuilder = newPageBuilder(schema, output); - FileInputInputStream in = new FileInputInputStream(input)) { + FileInputInputStream in = new FileInputInputStream(input)) { while (in.nextFile()) { - try (JsonParser.Stream stream = newJsonStream(in)) { + try (JsonParser.Stream stream = newJsonStream(in, task)) { Value value; while ((value = stream.next()) != null) { try { @@ -73,7 +89,6 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu throw new JsonRecordValidateException( String.format("A Json record must not represent map value but it's %s", value.getValueType().name())); } - pageBuilder.setJson(column, value); pageBuilder.addRecord(); } @@ -99,12 +114,72 @@ private PageBuilder newPageBuilder(Schema schema, PageOutput output) return new PageBuilder(Exec.getBufferAllocator(), schema, output); } - private JsonParser.Stream newJsonStream(FileInputInputStream in) + private JsonParser.Stream newJsonStream(FileInputInputStream in , PluginTask task) throws IOException { - return new JsonParser().open(in); + if (task.getCleanIllegalChar()) + { + final CharsetDecoder charsetDecoder = StandardCharsets.UTF_8.newDecoder(); + charsetDecoder.onMalformedInput(CodingErrorAction.IGNORE); + charsetDecoder.onUnmappableCharacter(CodingErrorAction.IGNORE); + + Iterable lines = Lists.transform(CharStreams.readLines(new BufferedReader(new InputStreamReader(in, charsetDecoder))), cleanIllegalBackslashFunction); + return new JsonParser().open(new ReaderInputStream(CharSource.concat(lines).openStream())); + } + else + { + return new JsonParser().open(in); + } } + Function cleanIllegalBackslashFunction = new Function() + { + Pattern p = Pattern.compile("\\p{XDigit}+"); + @Override + public CharSource apply(@Nullable String input) + { + assert input != null; + int index = 0; + StringBuilder s = new StringBuilder(); + char[] charArray = input.toCharArray(); + for (char c:charArray) { + if (c == '\\') { + if (charArray.length > index + 1) { + char next = charArray[index + 1]; + switch (next) { + case 'b': + case 'f': + case 'n': + case 'r': + case 't': + case '"': + case '\\': + case '/': + s.append(c); + break; + case 'u': // hexstring + if (charArray.length > index + 5) { + char[] hexChars = { charArray[index + 2] , charArray[index + 3] , charArray[index + 4] ,charArray[index + 5] }; + String hexString = new String(hexChars); + if (p.matcher(hexString).matches()) { + s.append(c); + } + } + break; + default: + // ignore backslash. + break; + } + } + } else { + s.append(c); + } + index++; + } + return CharSource.wrap(s.toString()); + } + }; + static class JsonRecordValidateException extends DataException { diff --git a/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java b/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java index f1520ce44..b3d622307 100644 --- a/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java +++ b/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java @@ -1,6 +1,7 @@ package org.embulk.standards; import com.google.common.collect.ImmutableList; +import com.google.common.io.CharSource; import org.embulk.EmbulkTestRuntime; import org.embulk.config.ConfigSource; import org.embulk.config.TaskSource; @@ -16,9 +17,10 @@ import org.junit.Test; import org.msgpack.value.Value; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CodingErrorAction; import java.util.List; import java.util.Map; @@ -60,7 +62,7 @@ public void readNormalJson() "\"_c1\":-10,\n" + "\"_c2\":\"エンバルク\",\n" + "\"_c3\":[\"e0\",\"e1\"]\n" + - "}", + "}", "[1, 2, 3]", // this line should be skipped. "\"embulk\"", // this line should be skipped. "10", // this line should be skipped. @@ -132,6 +134,96 @@ public void readBrokenJson() } } + + @Test + public void cleanIllegalChar() + throws Exception + { + + ConfigSource config = this.config.deepCopy().set("clean_illegal_char", true); + transaction(config, new InputStreamFileInput(runtime.getBufferAllocator(), provider(createBrokenRecord()))); + + List records = Pages.toObjects(plugin.newSchema(), output.pages); + assertEquals(2, records.size()); + + Object[] record; + Map map; + { // "{\"_c0\":true,\"_c1\":10,\"_c2\":\"embulk\",\"_c3\":{\"k\":\"v\"}}" + record = records.get(0); + assertEquals(1, record.length); + map = ((Value)record[0]).asMapValue().map(); + + assertEquals(newBoolean(true), map.get(newString("_c0"))); + assertEquals(newInteger(10L), map.get(newString("_c1"))); + assertEquals(newString("embulk"), map.get(newString("_c2"))); + assertEquals(newMap(newString("k"), newString("v")), map.get(newString("_c3"))); + } + { // "{"_c0":"embulk0xF00x5cabc"}" + record = records.get(1); + assertEquals(1, record.length); + map = ((Value)record[0]).asMapValue().map(); + + assertEquals(newString("embulkabc"), map.get(newString("_c0"))); + } + + } + + @Test + public void defaultCleanIllegalChar() + throws Exception + { + try { + transaction(config, new InputStreamFileInput(runtime.getBufferAllocator(), provider(createBrokenRecord()))); + fail(); + } catch (Throwable t) { + assertTrue(t instanceof DataException); + } + } + + private ByteArrayInputStream createBrokenRecord() throws Exception { + // out of utf-8 range's byte. + // 0x5c is backslash. + byte[] brokenBytes = { Integer.valueOf(0xF0).byteValue() , 0x5c }; + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + // normal record + outputStream.write("{\"_c0\":true,\"_c1\":10,\"_c2\":\"embulk\",\"_c3\":{\"k\":\"v\"}}\n".getBytes("UTF-8")); + + // contain illegal char record + outputStream.write("{\"_c0\":\"embulk".getBytes("UTF-8")); + outputStream.write(brokenBytes); // append dust bytes. + outputStream.write("abc".getBytes("UTF-8")); + outputStream.write("\"}\n".getBytes("UTF-8")); + + return new ByteArrayInputStream(outputStream.toByteArray()); + } + + @Test + public void checkCleanBackslash() + throws Exception + { + { + String json = "{\\\"_c0\\\":true,\\\"_c1\\\":10,\\\"_c2\\\":\\\"embulk\\\",\\\"_c3\\\":{\\\"k\\\":\\\"v\\\"}}"; + CharSource actual = plugin.cleanIllegalBackslashFunction.apply(json); + assertEquals(json , actual.read()); + } + + { + String json = "{\"abc\b\f\n\r\t\\\\u0001\":\"efg\"}"; + CharSource actual = plugin.cleanIllegalBackslashFunction.apply(json); + assertEquals(json , actual.read()); + } + + { + // {"\a":"b"}\ \a and last \ is not allowed. + String json = "{\"\\a\":\"b\"}\\"; + CharSource actual = plugin.cleanIllegalBackslashFunction.apply(json); + // backslash will removed. + assertEquals("{\"a\":\"b\"}" , actual.read()); + } + + } + private ConfigSource config() { return runtime.getExec().newConfigSource(); From a662d92919e1ffff17bcfa2fd8ae851795f054bb Mon Sep 17 00:00:00 2001 From: smdmts Date: Tue, 30 May 2017 01:48:39 +0900 Subject: [PATCH 2/4] add InvalidEscapeStringPolicy --- .../embulk/standards/JsonParserPlugin.java | 148 ++++++++------- .../standards/TestJsonParserPlugin.java | 174 ++++++++++++------ 2 files changed, 205 insertions(+), 117 deletions(-) diff --git a/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java b/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java index f23b7595e..22a7d5dd6 100644 --- a/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java +++ b/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java @@ -30,14 +30,31 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; -import java.nio.charset.CharsetDecoder; -import java.nio.charset.CodingErrorAction; -import java.nio.charset.StandardCharsets; import java.util.regex.Pattern; public class JsonParserPlugin implements ParserPlugin { + + public enum InvalidEscapeStringPolicy + { + PASSTHROUGH("PASSTHROUGH"), + SKIP("SKIP"), + UNESCAPE("UNESCAPE"); + + private final String string; + + private InvalidEscapeStringPolicy(String string) + { + this.string = string; + } + + public String getString() + { + return string; + } + } + public interface PluginTask extends Task { @@ -45,9 +62,9 @@ public interface PluginTask @ConfigDefault("false") boolean getStopOnInvalidRecord(); - @Config("clean_illegal_char") - @ConfigDefault("false") - boolean getCleanIllegalChar(); + @Config("invalid_string_escapes") + @ConfigDefault("\"PASSTHROUGH\"") + InvalidEscapeStringPolicy getInvalidEscapeStringPolicy(); } private final Logger log; @@ -115,70 +132,77 @@ private PageBuilder newPageBuilder(Schema schema, PageOutput output) } private JsonParser.Stream newJsonStream(FileInputInputStream in , PluginTask task) - throws IOException - { - if (task.getCleanIllegalChar()) - { - final CharsetDecoder charsetDecoder = StandardCharsets.UTF_8.newDecoder(); - charsetDecoder.onMalformedInput(CodingErrorAction.IGNORE); - charsetDecoder.onUnmappableCharacter(CodingErrorAction.IGNORE); - - Iterable lines = Lists.transform(CharStreams.readLines(new BufferedReader(new InputStreamReader(in, charsetDecoder))), cleanIllegalBackslashFunction); - return new JsonParser().open(new ReaderInputStream(CharSource.concat(lines).openStream())); - } - else - { - return new JsonParser().open(in); + throws IOException { + InvalidEscapeStringPolicy policy = task.getInvalidEscapeStringPolicy(); + switch (policy) { + case SKIP: + case UNESCAPE: + Iterable lines = Lists.transform(CharStreams.readLines(new BufferedReader(new InputStreamReader(in))), + invalidEscapeStringFunction(policy)); + return new JsonParser().open(new ReaderInputStream(CharSource.concat(lines).openStream())); + case PASSTHROUGH: + default: + return new JsonParser().open(in); } } - Function cleanIllegalBackslashFunction = new Function() - { - Pattern p = Pattern.compile("\\p{XDigit}+"); - @Override - public CharSource apply(@Nullable String input) - { - assert input != null; - int index = 0; - StringBuilder s = new StringBuilder(); - char[] charArray = input.toCharArray(); - for (char c:charArray) { - if (c == '\\') { - if (charArray.length > index + 1) { - char next = charArray[index + 1]; - switch (next) { - case 'b': - case 'f': - case 'n': - case 'r': - case 't': - case '"': - case '\\': - case '/': - s.append(c); - break; - case 'u': // hexstring - if (charArray.length > index + 5) { - char[] hexChars = { charArray[index + 2] , charArray[index + 3] , charArray[index + 4] ,charArray[index + 5] }; - String hexString = new String(hexChars); - if (p.matcher(hexString).matches()) { - s.append(c); + Function invalidEscapeStringFunction(final InvalidEscapeStringPolicy policy) { + return new Function() { + final Pattern p = Pattern.compile("\\p{XDigit}+"); + @Override + public CharSource apply(@Nullable String input) { + assert input != null; + if (policy == InvalidEscapeStringPolicy.PASSTHROUGH) { + return CharSource.wrap(input); + } + StringBuilder s = new StringBuilder(); + char[] charArray = input.toCharArray(); + for (int i=0 ; i < charArray.length ; i++) { + char c = charArray[i]; + if (c == '\\') { + if (charArray.length > i + 1) { + char next = charArray[i + 1]; + switch (next) { + case 'b': + case 'f': + case 'n': + case 'r': + case 't': + case '"': + case '\\': + case '/': + s.append(c); + break; + case 'u': // hexstring such as \u0001 + if (charArray.length > i + 5) { + char[] hexChars = {charArray[i + 2], charArray[i + 3], charArray[i + 4], charArray[i + 5]}; + String hexString = new String(hexChars); + if (p.matcher(hexString).matches()) { + s.append(c); + } } - } - break; - default: - // ignore backslash. - break; + break; + default: + switch (policy) { + case SKIP: + i++; + break; + case UNESCAPE: + break; + } + break; + } } + } else { + s.append(c); } - } else { - s.append(c); } - index++; + return CharSource.wrap(s.toString()); } - return CharSource.wrap(s.toString()); - } - }; + }; + } + + static class JsonRecordValidateException extends DataException diff --git a/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java b/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java index b3d622307..d75d8772a 100644 --- a/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java +++ b/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java @@ -5,13 +5,12 @@ import org.embulk.EmbulkTestRuntime; import org.embulk.config.ConfigSource; import org.embulk.config.TaskSource; -import org.embulk.spi.DataException; -import org.embulk.spi.FileInput; -import org.embulk.spi.ParserPlugin; -import org.embulk.spi.Schema; +import org.embulk.spi.*; import org.embulk.spi.TestPageBuilderReader.MockPageOutput; import org.embulk.spi.util.InputStreamFileInput; +import org.embulk.spi.util.Newline; import org.embulk.spi.util.Pages; +import org.joda.time.DateTimeZone; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -19,11 +18,10 @@ import java.io.*; import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.nio.charset.CodingErrorAction; import java.util.List; import java.util.Map; +import static org.embulk.standards.JsonParserPlugin.InvalidEscapeStringPolicy.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -50,6 +48,17 @@ public void createResource() output = new MockPageOutput(); } + @Test + public void checkDefaultValues() + { + ConfigSource config = Exec.newConfigSource(); + + JsonParserPlugin.PluginTask task = config.loadConfig(JsonParserPlugin.PluginTask.class); + assertEquals(false, task.getStopOnInvalidRecord()); + assertEquals(JsonParserPlugin.InvalidEscapeStringPolicy.PASSTHROUGH, task.getInvalidEscapeStringPolicy()); + } + + @Test public void readNormalJson() throws Exception @@ -134,90 +143,145 @@ public void readBrokenJson() } } - @Test - public void cleanIllegalChar() + public void useDefaultInvalidEscapeStringFunction() throws Exception { - - ConfigSource config = this.config.deepCopy().set("clean_illegal_char", true); - transaction(config, new InputStreamFileInput(runtime.getBufferAllocator(), provider(createBrokenRecord()))); - - List records = Pages.toObjects(plugin.newSchema(), output.pages); - assertEquals(2, records.size()); - - Object[] record; - Map map; - { // "{\"_c0\":true,\"_c1\":10,\"_c2\":\"embulk\",\"_c3\":{\"k\":\"v\"}}" - record = records.get(0); - assertEquals(1, record.length); - map = ((Value)record[0]).asMapValue().map(); - - assertEquals(newBoolean(true), map.get(newString("_c0"))); - assertEquals(newInteger(10L), map.get(newString("_c1"))); - assertEquals(newString("embulk"), map.get(newString("_c2"))); - assertEquals(newMap(newString("k"), newString("v")), map.get(newString("_c3"))); + try { + transaction(config, fileInput( + "{\"\\a\":\"b\"}\\" // throw DataException + )); + fail(); } - { // "{"_c0":"embulk0xF00x5cabc"}" - record = records.get(1); - assertEquals(1, record.length); - map = ((Value)record[0]).asMapValue().map(); - - assertEquals(newString("embulkabc"), map.get(newString("_c0"))); + catch (Throwable t) { + assertTrue(t instanceof DataException); } - } @Test - public void defaultCleanIllegalChar() + public void usePassthroughInvalidEscapeStringFunction() throws Exception { try { - transaction(config, new InputStreamFileInput(runtime.getBufferAllocator(), provider(createBrokenRecord()))); + ConfigSource config = this.config.deepCopy().set("invalid_string_escapes", "PASSTHROUGH"); + transaction(config, fileInput( + "{\"\\a\":\"b\"}\\" // throw DataException + )); fail(); - } catch (Throwable t) { + } + catch (Throwable t) { assertTrue(t instanceof DataException); } } - private ByteArrayInputStream createBrokenRecord() throws Exception { - // out of utf-8 range's byte. - // 0x5c is backslash. - byte[] brokenBytes = { Integer.valueOf(0xF0).byteValue() , 0x5c }; - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + @Test + public void useSkipInvalidEscapeString() + throws Exception + { + ConfigSource config = this.config.deepCopy().set("invalid_string_escapes", "SKIP"); + transaction(config, fileInput( + "{\"\\a\":\"b\"}\\" + )); - // normal record - outputStream.write("{\"_c0\":true,\"_c1\":10,\"_c2\":\"embulk\",\"_c3\":{\"k\":\"v\"}}\n".getBytes("UTF-8")); + List records = Pages.toObjects(plugin.newSchema(), output.pages); + assertEquals(1, records.size()); + Object[] record = records.get(0); + Map map = ((Value)record[0]).asMapValue().map(); + assertEquals(newString("b"), map.get(newString(""))); + } - // contain illegal char record - outputStream.write("{\"_c0\":\"embulk".getBytes("UTF-8")); - outputStream.write(brokenBytes); // append dust bytes. - outputStream.write("abc".getBytes("UTF-8")); - outputStream.write("\"}\n".getBytes("UTF-8")); + @Test + public void useUnEscapeInvalidEscapeString() + throws Exception + { + ConfigSource config = this.config.deepCopy().set("invalid_string_escapes", "UNESCAPE"); + transaction(config, fileInput( + "{\"\\a\":\"b\"}\\" + )); - return new ByteArrayInputStream(outputStream.toByteArray()); + List records = Pages.toObjects(plugin.newSchema(), output.pages); + assertEquals(1, records.size()); + Object[] record = records.get(0); + Map map = ((Value)record[0]).asMapValue().map(); + assertEquals(newString("b"), map.get(newString("a"))); } @Test - public void checkCleanBackslash() + public void checkInvalidEscapeStringFunction() throws Exception { + //PASSTHROUGH { String json = "{\\\"_c0\\\":true,\\\"_c1\\\":10,\\\"_c2\\\":\\\"embulk\\\",\\\"_c3\\\":{\\\"k\\\":\\\"v\\\"}}"; - CharSource actual = plugin.cleanIllegalBackslashFunction.apply(json); + CharSource actual = plugin.invalidEscapeStringFunction(PASSTHROUGH).apply(json); assertEquals(json , actual.read()); } { - String json = "{\"abc\b\f\n\r\t\\\\u0001\":\"efg\"}"; - CharSource actual = plugin.cleanIllegalBackslashFunction.apply(json); + String json = "{\"abc\b\f\n\r\t\\\\u0001\":\"efg\"}\\"; + CharSource actual = plugin.invalidEscapeStringFunction(PASSTHROUGH).apply(json); assertEquals(json , actual.read()); } { - // {"\a":"b"}\ \a and last \ is not allowed. String json = "{\"\\a\":\"b\"}\\"; - CharSource actual = plugin.cleanIllegalBackslashFunction.apply(json); + CharSource actual = plugin.invalidEscapeStringFunction(PASSTHROUGH).apply(json); + assertEquals(json , actual.read()); + } + + //SKIP + { + String json = "{\\\"_c0\\\":true,\\\"_c1\\\":10,\\\"_c2\\\":\\\"embulk\\\",\\\"_c3\\\":{\\\"k\\\":\\\"v\\\"}}"; + CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json); + assertEquals(json , actual.read()); + } + + { + // valid charset u0001 + String json = "{\"abc\b\f\n\r\t\\\\u0001\":\"efg\"}\\"; + CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json); + assertEquals("{\"abc\b\f\n\r\t\\\\u0001\":\"efg\"}" , actual.read()); + } + + + { + // invalid charset u000x remove forwarding backslash + String json = "{\"\\u000x\":\"efg\"}\\"; + CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json); + assertEquals("{\"u000x\":\"efg\"}" , actual.read()); + } + + { + String json = "{\"\\a\":\"b\"}\\"; + CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json); + // backslash and `a` will removed. + assertEquals("{\"\":\"b\"}" , actual.read()); + } + + //UNESCAPE + { + String json = "{\\\"_c0\\\":true,\\\"_c1\\\":10,\\\"_c2\\\":\\\"embulk\\\",\\\"_c3\\\":{\\\"k\\\":\\\"v\\\"}}"; + CharSource actual = plugin.invalidEscapeStringFunction(UNESCAPE).apply(json); + assertEquals(json , actual.read()); + } + + { + String json = "{\"abc\b\f\n\r\t\\\\u0001\":\"efg\"}\\"; + CharSource actual = plugin.invalidEscapeStringFunction(UNESCAPE).apply(json); + assertEquals("{\"abc\b\f\n\r\t\\\\u0001\":\"efg\"}" , actual.read()); + } + + { + // invalid charset u000x remove forwarding backslash + String json = "{\"\\u000x\":\"efg\"}\\"; + CharSource actual = plugin.invalidEscapeStringFunction(UNESCAPE).apply(json); + assertEquals("{\"u000x\":\"efg\"}" , actual.read()); + } + + + { + String json = "{\"\\a\":\"b\"}\\"; + CharSource actual = plugin.invalidEscapeStringFunction(UNESCAPE).apply(json); // backslash will removed. assertEquals("{\"a\":\"b\"}" , actual.read()); } From 5ad7f8572786e4b1186387ad8249f3f432966b8f Mon Sep 17 00:00:00 2001 From: "masatoshi.shimada" Date: Tue, 30 May 2017 14:09:37 +0900 Subject: [PATCH 3/4] fix reviewed. --- .../embulk/standards/JsonParserPlugin.java | 55 +++++++++++-------- .../standards/TestJsonParserPlugin.java | 40 +++++++++++--- 2 files changed, 64 insertions(+), 31 deletions(-) diff --git a/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java b/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java index 22a7d5dd6..ccc4288c3 100644 --- a/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java +++ b/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java @@ -23,6 +23,7 @@ import org.embulk.spi.type.Types; import org.embulk.spi.util.FileInputInputStream; import org.jruby.embed.io.ReaderInputStream; +import org.msgpack.core.Preconditions; import org.msgpack.value.Value; import org.slf4j.Logger; @@ -96,7 +97,7 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu final Column column = schema.getColumn(0); // record column try (PageBuilder pageBuilder = newPageBuilder(schema, output); - FileInputInputStream in = new FileInputInputStream(input)) { + FileInputInputStream in = new FileInputInputStream(input)) { while (in.nextFile()) { try (JsonParser.Stream stream = newJsonStream(in, task)) { Value value; @@ -106,6 +107,7 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu throw new JsonRecordValidateException( String.format("A Json record must not represent map value but it's %s", value.getValueType().name())); } + pageBuilder.setJson(column, value); pageBuilder.addRecord(); } @@ -131,8 +133,9 @@ private PageBuilder newPageBuilder(Schema schema, PageOutput output) return new PageBuilder(Exec.getBufferAllocator(), schema, output); } - private JsonParser.Stream newJsonStream(FileInputInputStream in , PluginTask task) - throws IOException { + private JsonParser.Stream newJsonStream(FileInputInputStream in, PluginTask task) + throws IOException + { InvalidEscapeStringPolicy policy = task.getInvalidEscapeStringPolicy(); switch (policy) { case SKIP: @@ -146,22 +149,26 @@ private JsonParser.Stream newJsonStream(FileInputInputStream in , PluginTask tas } } - Function invalidEscapeStringFunction(final InvalidEscapeStringPolicy policy) { - return new Function() { - final Pattern p = Pattern.compile("\\p{XDigit}+"); + Function invalidEscapeStringFunction(final InvalidEscapeStringPolicy policy) + { + return new Function() + { + final Pattern digitsPattern = Pattern.compile("\\p{XDigit}+"); + @Override - public CharSource apply(@Nullable String input) { - assert input != null; + public CharSource apply(@Nullable String input) + { + Preconditions.checkNotNull(input); if (policy == InvalidEscapeStringPolicy.PASSTHROUGH) { return CharSource.wrap(input); } - StringBuilder s = new StringBuilder(); + StringBuilder builder = new StringBuilder(); char[] charArray = input.toCharArray(); - for (int i=0 ; i < charArray.length ; i++) { - char c = charArray[i]; + for (int characterIndex = 0; characterIndex < charArray.length; characterIndex++) { + char c = charArray[characterIndex]; if (c == '\\') { - if (charArray.length > i + 1) { - char next = charArray[i + 1]; + if (charArray.length > characterIndex + 1) { + char next = charArray[characterIndex + 1]; switch (next) { case 'b': case 'f': @@ -171,21 +178,22 @@ public CharSource apply(@Nullable String input) { case '"': case '\\': case '/': - s.append(c); + builder.append(c); break; case 'u': // hexstring such as \u0001 - if (charArray.length > i + 5) { - char[] hexChars = {charArray[i + 2], charArray[i + 3], charArray[i + 4], charArray[i + 5]}; + if (charArray.length > characterIndex + 5) { + char[] hexChars = {charArray[characterIndex + 2], charArray[characterIndex + 3], charArray[characterIndex + 4], + charArray[characterIndex + 5]}; String hexString = new String(hexChars); - if (p.matcher(hexString).matches()) { - s.append(c); + if (digitsPattern.matcher(hexString).matches()) { + builder.append(c); } } break; default: switch (policy) { case SKIP: - i++; + characterIndex++; break; case UNESCAPE: break; @@ -193,17 +201,16 @@ public CharSource apply(@Nullable String input) { break; } } - } else { - s.append(c); + } + else { + builder.append(c); } } - return CharSource.wrap(s.toString()); + return CharSource.wrap(builder.toString()); } }; } - - static class JsonRecordValidateException extends DataException { diff --git a/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java b/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java index d75d8772a..39ae852b8 100644 --- a/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java +++ b/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java @@ -5,23 +5,28 @@ import org.embulk.EmbulkTestRuntime; import org.embulk.config.ConfigSource; import org.embulk.config.TaskSource; -import org.embulk.spi.*; +import org.embulk.spi.DataException; +import org.embulk.spi.Exec; +import org.embulk.spi.FileInput; +import org.embulk.spi.ParserPlugin; +import org.embulk.spi.Schema; import org.embulk.spi.TestPageBuilderReader.MockPageOutput; import org.embulk.spi.util.InputStreamFileInput; -import org.embulk.spi.util.Newline; import org.embulk.spi.util.Pages; -import org.joda.time.DateTimeZone; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.msgpack.value.Value; -import java.io.*; -import java.nio.charset.Charset; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.Map; -import static org.embulk.standards.JsonParserPlugin.InvalidEscapeStringPolicy.*; +import static org.embulk.standards.JsonParserPlugin.InvalidEscapeStringPolicy.PASSTHROUGH; +import static org.embulk.standards.JsonParserPlugin.InvalidEscapeStringPolicy.SKIP; +import static org.embulk.standards.JsonParserPlugin.InvalidEscapeStringPolicy.UNESCAPE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -71,7 +76,7 @@ public void readNormalJson() "\"_c1\":-10,\n" + "\"_c2\":\"エンバルク\",\n" + "\"_c3\":[\"e0\",\"e1\"]\n" + - "}", + "}", "[1, 2, 3]", // this line should be skipped. "\"embulk\"", // this line should be skipped. "10", // this line should be skipped. @@ -258,6 +263,16 @@ public void checkInvalidEscapeStringFunction() assertEquals("{\"\":\"b\"}" , actual.read()); } + { + // end of lines backspash. + String json = "{\"\\a\":\"b\"}" + + "\n" + + "\\"; + CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json); + // backslash and `a` will removed. + assertEquals("{\"\":\"b\"}\n" , actual.read()); + } + //UNESCAPE { String json = "{\\\"_c0\\\":true,\\\"_c1\\\":10,\\\"_c2\\\":\\\"embulk\\\",\\\"_c3\\\":{\\\"k\\\":\\\"v\\\"}}"; @@ -286,6 +301,17 @@ public void checkInvalidEscapeStringFunction() assertEquals("{\"a\":\"b\"}" , actual.read()); } + { + // end of lines backspash. + String json = "{\"\\a\":\"b\"}" + + "\n" + + "\\"; + CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json); + // backslash and `a` will removed. + assertEquals("{\"\":\"b\"}\n" , actual.read()); + } + + } private ConfigSource config() From 310a1a7f9a348f3039167cf66ee6086f50545a74 Mon Sep 17 00:00:00 2001 From: "masatoshi.shimada" Date: Thu, 1 Jun 2017 11:14:09 +0900 Subject: [PATCH 4/4] modify SKIP mode in removing backslash with u. --- .../main/java/org/embulk/standards/JsonParserPlugin.java | 5 +++++ .../java/org/embulk/standards/TestJsonParserPlugin.java | 6 +++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java b/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java index ccc4288c3..24563dcfb 100644 --- a/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java +++ b/embulk-standards/src/main/java/org/embulk/standards/JsonParserPlugin.java @@ -187,6 +187,11 @@ public CharSource apply(@Nullable String input) String hexString = new String(hexChars); if (digitsPattern.matcher(hexString).matches()) { builder.append(c); + } else { + if (policy == InvalidEscapeStringPolicy.SKIP) { + // remove \\u + characterIndex++; + } } } break; diff --git a/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java b/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java index 39ae852b8..9a902c2e5 100644 --- a/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java +++ b/embulk-standards/src/test/java/org/embulk/standards/TestJsonParserPlugin.java @@ -250,10 +250,10 @@ public void checkInvalidEscapeStringFunction() { - // invalid charset u000x remove forwarding backslash - String json = "{\"\\u000x\":\"efg\"}\\"; + // invalid charset \\u12xY remove forwarding backslash and u + String json = "{\"\\u12xY\":\"efg\"}\\"; CharSource actual = plugin.invalidEscapeStringFunction(SKIP).apply(json); - assertEquals("{\"u000x\":\"efg\"}" , actual.read()); + assertEquals("{\"12xY\":\"efg\"}" , actual.read()); } {