From 1bcfda2c8423fb4010e4a6fab46d87de819ff401 Mon Sep 17 00:00:00 2001 From: twalthr Date: Tue, 26 Jul 2016 16:24:24 +0200 Subject: [PATCH 1/2] [FLINK-4081] [core] [table] FieldParsers should support empty strings --- .../flink/types/parser/BooleanParser.java | 4 ++ .../flink/types/parser/DoubleParser.java | 4 ++ .../flink/types/parser/DoubleValueParser.java | 4 ++ .../flink/types/parser/FloatParser.java | 4 ++ .../flink/types/parser/FloatValueParser.java | 4 ++ .../flink/types/parser/StringParser.java | 8 +++- .../flink/types/parser/StringValueParser.java | 6 +++ .../types/parser/BooleanValueParserTest.java | 2 +- .../flink/types/parser/ByteParserTest.java | 3 -- .../types/parser/ByteValueParserTest.java | 2 - .../flink/types/parser/DoubleParserTest.java | 3 -- .../types/parser/DoubleValueParserTest.java | 2 - .../flink/types/parser/FloatParserTest.java | 3 -- .../types/parser/FloatValueParserTest.java | 2 - .../flink/types/parser/IntParserTest.java | 3 -- .../types/parser/IntValueParserTest.java | 2 - .../flink/types/parser/LongParserTest.java | 3 -- .../types/parser/LongValueParserTest.java | 2 - .../flink/types/parser/ParserTestBase.java | 2 + .../types/parser/QuotedStringParserTest.java | 2 +- .../parser/QuotedStringValueParserTest.java | 4 +- .../flink/types/parser/ShortParserTest.java | 3 -- .../types/parser/ShortValueParserTest.java | 2 - .../parser/UnquotedStringParserTest.java | 5 +-- .../parser/VarLengthStringParserTest.java | 1 - .../table/runtime/io/RowCsvInputFormat.scala | 12 ++++-- .../runtime/io/RowCsvInputFormatTest.scala | 37 ++++++++++--------- 27 files changed, 66 insertions(+), 63 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java index 90fa41e7f785a..646af01d275d1 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java @@ -44,6 +44,10 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, Boole while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delim)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } break; } i++; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java index 0b2f5a24257a1..63c8a3f93a934 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java @@ -39,6 +39,10 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, D while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } break; } i++; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java index 333e6c9ad4aec..22d94019c0165 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java @@ -39,6 +39,10 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, D while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } break; } i++; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java index a47877e31d23b..fcce89a4b8ed4 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java @@ -39,6 +39,10 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, F while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } break; } i++; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java index b6da3d327b1fa..e9dec0cd06194 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java @@ -39,6 +39,10 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, F while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); + return -1; + } break; } i++; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index 9cee9909e75a5..ba40373173669 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -47,7 +47,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S final int delimLimit = limit - delimiter.length + 1; if(quotedStringParsing && bytes[i] == quoteCharacter) { - // quoted string parsing enabled and first character Vis a quote + // quoted string parsing enabled and first character is a quote i++; // search for ending quote character, continue when it is escaped @@ -84,10 +84,16 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S if (i >= delimLimit) { // no delimiter found. Take the full string + if (limit == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); // mark empty string + } this.result = new String(bytes, startPos, limit - startPos); return limit; } else { // delimiter found. + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); // mark empty string + } this.result = new String(bytes, startPos, i - startPos); return i + delimiter.length; } diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java index bc899522ae864..bdaf1295475e2 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java @@ -90,10 +90,16 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S if (i >= delimLimit) { // no delimiter found. Take the full string + if (limit == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); // mark empty strings + } reusable.setValueAscii(bytes, startPos, limit - startPos); return limit; } else { // delimiter found. + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_STRING); // mark empty strings + } reusable.setValueAscii(bytes, startPos, i - startPos); return i + delimiter.length; } diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java index ff1885d690891..3b120e2b5bed0 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java @@ -19,9 +19,9 @@ package org.apache.flink.types.parser; - import org.apache.flink.types.BooleanValue; + public class BooleanValueParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java index dac51445def65..579f0039300bf 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.ByteParser; -import org.apache.flink.types.parser.FieldParser; - public class ByteParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java index 31b60d44f47b3..f5abe05cbd626 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.ByteValue; -import org.apache.flink.types.parser.ByteValueParser; -import org.apache.flink.types.parser.FieldParser; public class ByteValueParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java index bda8252bceff4..98655d1a535ea 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.DoubleParser; -import org.apache.flink.types.parser.FieldParser; - public class DoubleParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java index fbbb5f2e013fd..dfe8936ec61b4 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.parser.DoubleValueParser; -import org.apache.flink.types.parser.FieldParser; public class DoubleValueParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java index d05557f95d076..480f1fb47bf39 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.FloatParser; - public class FloatParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java index 5c6e1c3343c6d..d71b20a238acc 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.FloatValue; -import org.apache.flink.types.parser.FloatValueParser; -import org.apache.flink.types.parser.FieldParser; public class FloatValueParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java index 1d33b5196451a..f5870864bf783 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.IntParser; - public class IntParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java index eb0403ebe8889..a70d65c58bfd9 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.IntValue; -import org.apache.flink.types.parser.IntValueParser; -import org.apache.flink.types.parser.FieldParser; public class IntValueParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java index b17de9d70c87f..d32eef11d61eb 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.LongParser; - public class LongParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java index f4d82a0ba3b81..b9c5eec0e5990 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.LongValue; -import org.apache.flink.types.parser.LongValueParser; -import org.apache.flink.types.parser.FieldParser; public class LongValueParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java index b979a38503e1a..7dd47abd8e2c7 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java @@ -414,6 +414,8 @@ public void testEmptyFieldInIsolation() { byte[] bytes = emptyString.getBytes(); int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + assertEquals(FieldParser.ParseErrorState.EMPTY_STRING, parser.getErrorState()); + if(this.allowsEmptyField()) { assertTrue("Parser declared the empty string as invalid.", numRead != -1); assertEquals("Invalid number of bytes read returned.", bytes.length, numRead); diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java index d4b7e1fd7ca38..6fda78a4b4fab 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java @@ -65,4 +65,4 @@ public FieldParser getParser() { public Class getTypeClass() { return String.class; } -} \ No newline at end of file +} diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java index 2801582eec099..2cf901cb80eb6 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.StringValue; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.StringValueParser; public class QuotedStringValueParserTest extends ParserTestBase { @@ -69,4 +67,4 @@ public FieldParser getParser() { public Class getTypeClass() { return StringValue.class; } -} \ No newline at end of file +} diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java index 201714b129080..822d87147e589 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.ShortParser; - public class ShortParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java index 59e9c521181bb..c4b5f0221369f 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.ShortValue; -import org.apache.flink.types.parser.ShortValueParser; -import org.apache.flink.types.parser.FieldParser; public class ShortValueParserTest extends ParserTestBase { diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java index 8e751924dc0a2..739bd765747d5 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.StringParser; -import org.apache.flink.types.parser.FieldParser; - public class UnquotedStringParserTest extends ParserTestBase { @@ -58,4 +55,4 @@ public FieldParser getParser() { public Class getTypeClass() { return String.class; } -} \ No newline at end of file +} diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java index 4f9069e9dd7ff..1fe885068bbab 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java @@ -23,7 +23,6 @@ import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; -import org.apache.flink.types.parser.StringValueParser; import org.junit.Test; public class VarLengthStringParserTest { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala index 1eb056ccc2739..748a20bc4b1ac 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala @@ -36,7 +36,8 @@ class RowCsvInputFormat( rowTypeInfo: RowTypeInfo, lineDelimiter: String = DEFAULT_LINE_DELIMITER, fieldDelimiter: String = DEFAULT_FIELD_DELIMITER, - includedFieldsMask: Array[Boolean] = null) + includedFieldsMask: Array[Boolean] = null, + emptyStringAsNull: Boolean = false) extends CsvInputFormat[Row](filePath) { if (rowTypeInfo.getArity == 0) { @@ -134,7 +135,7 @@ class RowCsvInputFormat( holders(output)) if (!isLenient && (parser.getErrorState ne ParseErrorState.NONE)) { - // Row is able to handle null values + // the error state EMPTY_STRING is ignored if (parser.getErrorState ne ParseErrorState.EMPTY_STRING) { throw new ParseException(s"Parsing error for column $field of row '" + new String(bytes, offset, numBytes) @@ -143,8 +144,11 @@ class RowCsvInputFormat( } holders(output) = parser.getLastResult - // check parse result - if (startPos < 0) { + // check parse result: + // the result is null if it is invalid + // or empty with emptyStringAsNull enabled + if (startPos < 0 || + (emptyStringAsNull && (parser.getErrorState eq ParseErrorState.EMPTY_STRING))) { holders(output) = null startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter) } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala index 540776d2d30a9..d709a5b60e9ce 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala @@ -379,39 +379,40 @@ class RowCsvInputFormatTest { @Test def testEmptyFields() { val fileContent = - "|0|0|0|0|0|\n" + - "1||1|1|1|1|\n" + - "2|2||2|2|2|\n" + - "3|3|3||3|3|\n" + - "4|4|4|4||4|\n" + - "5|5|5|5|5||\n" + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" val split = createTempFile(fileContent) - // TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't handle correctly null values val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.BYTE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.FLOAT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.BYTE_TYPE_INFO)) + BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO)) - val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo) - format.setFieldDelimiter("|") + val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo, emptyStringAsNull = true) + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(split) - var result = new Row(6) + var result = new Row(8) val linesCnt = fileContent.split("\n").length - var i = 0 - while (i < linesCnt) { + for (i <- 0 until linesCnt) yield { result = format.nextRecord(result) assertNull(result.productElement(i)) - i += 1 } - + // ensure no more rows assertNull(format.nextRecord(result)) assertTrue(format.reachedEnd) From 3241f6cf5ea61129b279186dbc89bdb0178cc2d1 Mon Sep 17 00:00:00 2001 From: twalthr Date: Mon, 15 Aug 2016 11:05:17 +0200 Subject: [PATCH 2/2] Renamed EMPTY_STRING to EMPTY_COLUMN --- .../org/apache/flink/types/parser/BooleanParser.java | 2 +- .../java/org/apache/flink/types/parser/ByteParser.java | 2 +- .../org/apache/flink/types/parser/ByteValueParser.java | 2 +- .../org/apache/flink/types/parser/DoubleParser.java | 2 +- .../apache/flink/types/parser/DoubleValueParser.java | 2 +- .../org/apache/flink/types/parser/FieldParser.java | 4 ++-- .../org/apache/flink/types/parser/FloatParser.java | 2 +- .../apache/flink/types/parser/FloatValueParser.java | 2 +- .../java/org/apache/flink/types/parser/IntParser.java | 2 +- .../org/apache/flink/types/parser/IntValueParser.java | 2 +- .../java/org/apache/flink/types/parser/LongParser.java | 2 +- .../org/apache/flink/types/parser/LongValueParser.java | 2 +- .../org/apache/flink/types/parser/ShortParser.java | 2 +- .../apache/flink/types/parser/ShortValueParser.java | 2 +- .../org/apache/flink/types/parser/StringParser.java | 4 ++-- .../apache/flink/types/parser/StringValueParser.java | 4 ++-- .../org/apache/flink/types/parser/ParserTestBase.java | 2 +- .../flink/api/table/runtime/io/RowCsvInputFormat.scala | 10 +++++----- .../api/table/runtime/io/RowCsvInputFormatTest.scala | 2 +- 19 files changed, 26 insertions(+), 26 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java index 646af01d275d1..f8b890a46bbe0 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java @@ -45,7 +45,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, Boole while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delim)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } break; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java index a521ac1ae20d0..7ee257ed85d64 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java @@ -48,7 +48,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, B for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } this.result = (byte) (neg ? -val : val); diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java index 4cda98c1ae57b..c79f5d4dc9b74 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java @@ -55,7 +55,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, B if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } reusable.setValue((byte) (neg ? -val : val)); diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java index 63c8a3f93a934..8af496dfbd50e 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java @@ -40,7 +40,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, D while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } break; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java index 22d94019c0165..5c657be779484 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java @@ -40,7 +40,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, D while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } break; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 67c1bd7913602..5f17840a6bc17 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -69,8 +69,8 @@ public static enum ParseErrorState { /** The parser found characters between the end of the quoted string and the delimiter. */ UNQUOTED_CHARS_AFTER_QUOTED_STRING, - /** The string is empty. */ - EMPTY_STRING, + /** The column is empty. */ + EMPTY_COLUMN, /** Invalid Boolean value **/ BOOLEAN_INVALID diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java index fcce89a4b8ed4..3304f24d591db 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java @@ -40,7 +40,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, F while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } break; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java index e9dec0cd06194..26ee47bae05f4 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java @@ -40,7 +40,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, F while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } break; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java index 4d2ae7ca84229..4e5d43fa4f745 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java @@ -57,7 +57,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, I for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } this.result = (int) (neg ? -val : val); diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java index d487c6621a06d..0229bc7381ede 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java @@ -57,7 +57,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, I for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } reusable.setValue((int) (neg ? -val : val)); diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java index c7b76d265a5cd..79eb0803eb141 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java @@ -51,7 +51,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, L for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } this.result = neg ? -val : val; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java index 597abc0ea75ac..5ddd40c098de6 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java @@ -54,7 +54,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, L for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } reusable.setValue(neg ? -val : val); diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java index 3afa7613dda21..c458a3f6fae6f 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java @@ -55,7 +55,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } this.result = (short) (neg ? -val : val); diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java index 880af254dd9de..47471a300af51 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java @@ -57,7 +57,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } reusable.setValue((short) (neg ? -val : val)); diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index ba40373173669..1a2c7e3b57354 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -85,14 +85,14 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S if (i >= delimLimit) { // no delimiter found. Take the full string if (limit == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); // mark empty string + setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column } this.result = new String(bytes, startPos, limit - startPos); return limit; } else { // delimiter found. if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); // mark empty string + setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column } this.result = new String(bytes, startPos, i - startPos); return i + delimiter.length; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java index bdaf1295475e2..7ac75587eb021 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java @@ -91,14 +91,14 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S if (i >= delimLimit) { // no delimiter found. Take the full string if (limit == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); // mark empty strings + setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column } reusable.setValueAscii(bytes, startPos, limit - startPos); return limit; } else { // delimiter found. if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); // mark empty strings + setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column } reusable.setValueAscii(bytes, startPos, i - startPos); return i + delimiter.length; diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java index 7dd47abd8e2c7..9b021472dd04c 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java @@ -414,7 +414,7 @@ public void testEmptyFieldInIsolation() { byte[] bytes = emptyString.getBytes(); int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); - assertEquals(FieldParser.ParseErrorState.EMPTY_STRING, parser.getErrorState()); + assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); if(this.allowsEmptyField()) { assertTrue("Parser declared the empty string as invalid.", numRead != -1); diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala index 748a20bc4b1ac..b0ab801fbf011 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala @@ -37,7 +37,7 @@ class RowCsvInputFormat( lineDelimiter: String = DEFAULT_LINE_DELIMITER, fieldDelimiter: String = DEFAULT_FIELD_DELIMITER, includedFieldsMask: Array[Boolean] = null, - emptyStringAsNull: Boolean = false) + emptyColumnAsNull: Boolean = false) extends CsvInputFormat[Row](filePath) { if (rowTypeInfo.getArity == 0) { @@ -135,8 +135,8 @@ class RowCsvInputFormat( holders(output)) if (!isLenient && (parser.getErrorState ne ParseErrorState.NONE)) { - // the error state EMPTY_STRING is ignored - if (parser.getErrorState ne ParseErrorState.EMPTY_STRING) { + // the error state EMPTY_COLUMN is ignored + if (parser.getErrorState ne ParseErrorState.EMPTY_COLUMN) { throw new ParseException(s"Parsing error for column $field of row '" + new String(bytes, offset, numBytes) + s"' originated by ${parser.getClass.getSimpleName}: ${parser.getErrorState}.") @@ -146,9 +146,9 @@ class RowCsvInputFormat( // check parse result: // the result is null if it is invalid - // or empty with emptyStringAsNull enabled + // or empty with emptyColumnAsNull enabled if (startPos < 0 || - (emptyStringAsNull && (parser.getErrorState eq ParseErrorState.EMPTY_STRING))) { + (emptyColumnAsNull && (parser.getErrorState eq ParseErrorState.EMPTY_COLUMN))) { holders(output) = null startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter) } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala index d709a5b60e9ce..db01b6909dcac 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala @@ -400,7 +400,7 @@ class RowCsvInputFormatTest { BasicTypeInfo.SHORT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) - val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo, emptyStringAsNull = true) + val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo, emptyColumnAsNull = true) format.setFieldDelimiter(",") format.configure(new Configuration) format.open(split)