From 25795388a7585dc100470e9cb5bcf89833ea3a03 Mon Sep 17 00:00:00 2001 From: Flavio Pompermaier Date: Thu, 19 May 2016 11:41:43 +0200 Subject: [PATCH 1/2] [Flink-3908] Fixed Parser's error state reset --- .../flink/types/parser/BooleanParser.java | 2 +- .../types/parser/BooleanValueParser.java | 2 +- .../apache/flink/types/parser/ByteParser.java | 2 +- .../flink/types/parser/ByteValueParser.java | 6 ++-- .../flink/types/parser/DoubleParser.java | 2 +- .../flink/types/parser/DoubleValueParser.java | 4 +-- .../flink/types/parser/FieldParser.java | 28 ++++++++++++++++--- .../flink/types/parser/FloatParser.java | 2 +- .../flink/types/parser/FloatValueParser.java | 4 +-- .../apache/flink/types/parser/IntParser.java | 2 +- .../flink/types/parser/IntValueParser.java | 4 +-- .../apache/flink/types/parser/LongParser.java | 8 +++--- .../flink/types/parser/LongValueParser.java | 6 ++-- .../flink/types/parser/ShortParser.java | 4 +-- .../flink/types/parser/ShortValueParser.java | 6 ++-- .../flink/types/parser/StringParser.java | 10 +++---- .../flink/types/parser/StringValueParser.java | 10 +++---- 17 files changed, 61 insertions(+), 41 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java index 90fa41e7f785a..83a511a54fa4c 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java @@ -36,7 +36,7 @@ public class BooleanParser extends FieldParser { }; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, Boolean reuse) { + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delim, Boolean reuse) { final int delimLimit = limit - delim.length + 1; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanValueParser.java index a90c198519fcc..da573bb041b6a 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanValueParser.java @@ -29,7 +29,7 @@ public class BooleanValueParser extends FieldParser { private BooleanValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, BooleanValue reuse) { + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delim, BooleanValue reuse) { int returnValue = parser.parseField(bytes, startPos, limit, delim, reuse.getValue()); setErrorState(parser.getErrorState()); reuse.setValue(parser.getLastResult()); diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java index a521ac1ae20d0..bf239c9c0aa5c 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java @@ -27,7 +27,7 @@ public class ByteParser extends FieldParser { private byte result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Byte reusable) { + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, Byte reusable) { int val = 0; boolean neg = false; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java index 416a498d77646..7e78a2eed7c9a 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java @@ -32,14 +32,14 @@ public class ByteValueParser extends FieldParser { private ByteValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, ByteValue reusable) { + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, ByteValue reusable) { int val = 0; boolean neg = false; this.result = reusable; - final int delimLimit = limit-delimiter.length+1; - + final int delimLimit = limit-delimiter.length + 1; + if (bytes[startPos] == '-') { neg = true; startPos++; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java index 0b2f5a24257a1..f3eeb3139f9d4 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java @@ -32,7 +32,7 @@ public class DoubleParser extends FieldParser { private double result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) { + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) { int i = startPos; final int delimLimit = limit - delimiter.length + 1; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java index 333e6c9ad4aec..482e337c2b488 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java @@ -31,8 +31,8 @@ public class DoubleValueParser extends FieldParser { private DoubleValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, DoubleValue reusable) { - + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, DoubleValue reusable) { + int i = startPos; final int delimLimit = limit - delimiter.length + 1; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 57689a8b3b271..73aca1234c20d 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -79,8 +79,9 @@ public static enum ParseErrorState { private ParseErrorState errorState = ParseErrorState.NONE; /** - * Parses the value of a field from the byte array. - * The start position within the byte array and the array's valid length is given. + * Parses the value of a field from the byte array, taking care of properly reset + * the state of this parser. + * The start position within the byte array and the array's valid length is given. * The content of the value is delimited by a field delimiter. * * @param bytes The byte array that holds the value. @@ -92,8 +93,27 @@ public static enum ParseErrorState { * * @return The index of the next delimiter, if the field was parsed correctly. A value less than 0 otherwise. */ - public abstract int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse); - + public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse) { + resetParserState(); + return parseFieldImpl(bytes, startPos, limit, delim, reuse); + } + + /** + * Each parser's logic should be implemented inside this method + * + * @see {@link FieldParser#parseField(byte[], int, int, byte[], Object)} + * */ + protected abstract int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delim, T reuse); + + /** + * Reset the state of the parser. Called as the very first method inside + * {@link FieldParser#parseField(byte[], int, int, byte[], Object)}, by default it just reset + * its error state. + * */ + protected void resetParserState() { + this.errorState = ParseErrorState.NONE; + } + /** * Gets the parsed field. This method returns the value parsed by the last successful invocation of * {@link #parseField(byte[], int, int, byte[], Object)}. It objects are mutable and reused, it will return diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java index a47877e31d23b..91d897e14a1d6 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java @@ -30,7 +30,7 @@ public class FloatParser extends FieldParser { private float result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, Float reusable) { int i = startPos; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java index b6da3d327b1fa..39176cdebd5ad 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java @@ -31,8 +31,8 @@ public class FloatValueParser extends FieldParser { private FloatValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, FloatValue reusable) { - + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, FloatValue reusable) { + int i = startPos; final int delimLimit = limit - delimiter.length + 1; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java index 4d2ae7ca84229..500f71855be69 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java @@ -35,7 +35,7 @@ public class IntParser extends FieldParser { private int result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer reusable) { long val = 0; boolean neg = false; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java index d493e2dffcb64..fca6e0bddbf66 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java @@ -35,11 +35,11 @@ public class IntValueParser extends FieldParser { private IntValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, IntValue reusable) { + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, IntValue reusable) { long val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit-delimiter.length + 1; this.result = reusable; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java index e8ac09b4e2b4b..2e6930fb47b17 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java @@ -31,12 +31,12 @@ public class LongParser extends FieldParser { private long result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Long reusable) { + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, Long reusable) { long val = 0; boolean neg = false; final int delimLimit = limit - delimiter.length + 1; - + if (bytes[startPos] == '-') { neg = true; startPos++; @@ -72,7 +72,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, L if (i+1 >= limit) { return limit; - } else if (i+1 < delimLimit && delimiterNext(bytes, i+1, delimiter)) { + } else if (i + 1 < delimLimit && delimiterNext(bytes, i + 1, delimiter)) { return i + 1 + delimiter.length; } else { setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW); @@ -160,7 +160,7 @@ public static final long parseField(byte[] bytes, int startPos, int length, char if (val < 0) { // this is an overflow/underflow, unless we hit exactly the Long.MIN_VALUE if (neg && val == Long.MIN_VALUE) { - if (length == 1 || bytes[startPos+1] == delimiter) { + if (length == 1 || bytes[startPos + 1] == delimiter) { return Long.MIN_VALUE; } else { throw new NumberFormatException("value overflow"); diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java index eaaf619b86f8b..394db5ee88bbb 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java @@ -32,11 +32,11 @@ public class LongValueParser extends FieldParser { private LongValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, LongValue reusable) { + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, LongValue reusable) { long val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit-delimiter.length + 1; this.result = reusable; @@ -75,7 +75,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, L if (i+1 >= limit) { return limit; - } else if (i+1 < delimLimit && delimiterNext(bytes, i+1, delimiter)) { + } else if (i + 1 < delimLimit && delimiterNext(bytes, i + 1, delimiter)) { return i + 1 + delimiter.length; } else { setErrorState(ParseErrorState.NUMERIC_VALUE_OVERFLOW_UNDERFLOW); diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java index 6f1fc7b5a0797..0a38e08fa1663 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java @@ -35,11 +35,11 @@ public class ShortParser extends FieldParser { private short result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Short reusable) { + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, Short reusable) { int val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit-delimiter.length + 1; if (bytes[startPos] == '-') { neg = true; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java index c1df9ce961aed..2fdac32faa969 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java @@ -35,12 +35,12 @@ public class ShortValueParser extends FieldParser { private ShortValue result; @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, ShortValue reusable) { + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, ShortValue reusable) { int val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length+1; - + final int delimLimit = limit-delimiter.length + 1; + this.result = reusable; if (bytes[startPos] == '-') { diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index 7dc80e5fd9c43..7b8390bf518c3 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -40,18 +40,18 @@ public void enableQuotedStringParsing(byte quoteCharacter) { } @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, String reusable) { + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, String reusable) { int i = startPos; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit-delimiter.length + 1; if(quotedStringParsing && bytes[i] == quoteCharacter) { // quoted string parsing enabled and first character Vis a quote i++; // search for ending quote character, continue when it is escaped - while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)){ + while (i < limit && (bytes[i] != quoteCharacter || bytes[i - 1] == BACKSLASH)) { i++; } @@ -63,11 +63,11 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S // check for proper termination if (i == limit) { // either by end of line - this.result = new String(bytes, startPos+1, i - startPos - 2); + this.result = new String(bytes, startPos + 1, i - startPos - 2); return limit; } else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) { // or following field delimiter - this.result = new String(bytes, startPos+1, i - startPos - 2); + this.result = new String(bytes, startPos + 1, i - startPos - 2); return i + delimiter.length; } else { // no proper termination diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java index 586c2cc632d5d..06a2dd0dff049 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java @@ -43,19 +43,19 @@ public void enableQuotedStringParsing(byte quoteCharacter) { } @Override - public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, StringValue reusable) { + public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, StringValue reusable) { this.result = reusable; int i = startPos; - final int delimLimit = limit-delimiter.length+1; + final int delimLimit = limit-delimiter.length + 1; if(quotedStringParsing == true && bytes[i] == quoteCharacter) { // quoted string parsing enabled and first character is a quote i++; // search for ending quote character, continue when it is escaped - while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)){ + while (i < limit && (bytes[i] != quoteCharacter || bytes[i - 1] == BACKSLASH)) { i++; } @@ -67,11 +67,11 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S // check for proper termination if (i == limit) { // either by end of line - reusable.setValueAscii(bytes, startPos+1, i - startPos - 2); + reusable.setValueAscii(bytes, startPos + 1, i - startPos - 2); return limit; } else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) { // or following field delimiter - reusable.setValueAscii(bytes, startPos+1, i - startPos - 2); + reusable.setValueAscii(bytes, startPos + 1, i - startPos - 2); return i + delimiter.length; } else { // no proper termination From 7efffc6f9651763e9d36cc17da870ca981bf028c Mon Sep 17 00:00:00 2001 From: Flavio Pompermaier Date: Fri, 3 Jun 2016 10:39:06 +0200 Subject: [PATCH 2/2] Implemented suggestions to PR --- .../apache/flink/api/common/io/GenericCsvInputFormat.java | 2 +- .../java/org/apache/flink/types/parser/BooleanParser.java | 2 +- .../org/apache/flink/types/parser/BooleanValueParser.java | 2 +- .../java/org/apache/flink/types/parser/ByteParser.java | 2 +- .../org/apache/flink/types/parser/ByteValueParser.java | 6 +++--- .../java/org/apache/flink/types/parser/DoubleParser.java | 2 +- .../org/apache/flink/types/parser/DoubleValueParser.java | 4 ++-- .../java/org/apache/flink/types/parser/FieldParser.java | 8 ++++---- .../java/org/apache/flink/types/parser/FloatParser.java | 2 +- .../org/apache/flink/types/parser/FloatValueParser.java | 4 ++-- .../java/org/apache/flink/types/parser/IntParser.java | 2 +- .../org/apache/flink/types/parser/IntValueParser.java | 4 ++-- .../java/org/apache/flink/types/parser/LongParser.java | 4 ++-- .../org/apache/flink/types/parser/LongValueParser.java | 4 ++-- .../java/org/apache/flink/types/parser/ShortParser.java | 4 ++-- .../org/apache/flink/types/parser/ShortValueParser.java | 6 +++--- .../java/org/apache/flink/types/parser/StringParser.java | 4 ++-- .../org/apache/flink/types/parser/StringValueParser.java | 4 ++-- 18 files changed, 33 insertions(+), 33 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index 31d42ff04c607..e2c54ad0a05fb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -392,7 +392,7 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu @SuppressWarnings("unchecked") FieldParser parser = (FieldParser) this.fieldParsers[output]; Object reuse = holders[output]; - startPos = parser.parseField(bytes, startPos, limit, this.fieldDelim, reuse); + startPos = parser.resetErrorStateAndParse(bytes, startPos, limit, this.fieldDelim, reuse); holders[output] = parser.getLastResult(); // check parse result diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java index 83a511a54fa4c..90fa41e7f785a 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java @@ -36,7 +36,7 @@ public class BooleanParser extends FieldParser { }; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delim, Boolean reuse) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, Boolean reuse) { final int delimLimit = limit - delim.length + 1; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanValueParser.java index da573bb041b6a..a90c198519fcc 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanValueParser.java @@ -29,7 +29,7 @@ public class BooleanValueParser extends FieldParser { private BooleanValue result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delim, BooleanValue reuse) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, BooleanValue reuse) { int returnValue = parser.parseField(bytes, startPos, limit, delim, reuse.getValue()); setErrorState(parser.getErrorState()); reuse.setValue(parser.getLastResult()); diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java index bf239c9c0aa5c..a521ac1ae20d0 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java @@ -27,7 +27,7 @@ public class ByteParser extends FieldParser { private byte result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, Byte reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Byte reusable) { int val = 0; boolean neg = false; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java index 7e78a2eed7c9a..212a6093cd6f3 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java @@ -32,14 +32,14 @@ public class ByteValueParser extends FieldParser { private ByteValue result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, ByteValue reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, ByteValue reusable) { int val = 0; boolean neg = false; this.result = reusable; - final int delimLimit = limit-delimiter.length + 1; - + final int delimLimit = limit -delimiter.length + 1; + if (bytes[startPos] == '-') { neg = true; startPos++; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java index f3eeb3139f9d4..0b2f5a24257a1 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java @@ -32,7 +32,7 @@ public class DoubleParser extends FieldParser { private double result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Double reusable) { int i = startPos; final int delimLimit = limit - delimiter.length + 1; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java index 482e337c2b488..333e6c9ad4aec 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java @@ -31,8 +31,8 @@ public class DoubleValueParser extends FieldParser { private DoubleValue result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, DoubleValue reusable) { - + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, DoubleValue reusable) { + int i = startPos; final int delimLimit = limit - delimiter.length + 1; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 73aca1234c20d..67c1bd7913602 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -93,9 +93,9 @@ public static enum ParseErrorState { * * @return The index of the next delimiter, if the field was parsed correctly. A value less than 0 otherwise. */ - public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse) { + public int resetErrorStateAndParse(byte[] bytes, int startPos, int limit, byte[] delim, T reuse) { resetParserState(); - return parseFieldImpl(bytes, startPos, limit, delim, reuse); + return parseField(bytes, startPos, limit, delim, reuse); } /** @@ -103,11 +103,11 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reu * * @see {@link FieldParser#parseField(byte[], int, int, byte[], Object)} * */ - protected abstract int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delim, T reuse); + protected abstract int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse); /** * Reset the state of the parser. Called as the very first method inside - * {@link FieldParser#parseField(byte[], int, int, byte[], Object)}, by default it just reset + * {@link FieldParser#resetErrorStateAndParse(byte[], int, int, byte[], Object)}, by default it just reset * its error state. * */ protected void resetParserState() { diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java index 91d897e14a1d6..a47877e31d23b 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java @@ -30,7 +30,7 @@ public class FloatParser extends FieldParser { private float result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, Float + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Float reusable) { int i = startPos; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java index 39176cdebd5ad..b6da3d327b1fa 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java @@ -31,8 +31,8 @@ public class FloatValueParser extends FieldParser { private FloatValue result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, FloatValue reusable) { - + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, FloatValue reusable) { + int i = startPos; final int delimLimit = limit - delimiter.length + 1; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java index 500f71855be69..4d2ae7ca84229 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java @@ -35,7 +35,7 @@ public class IntParser extends FieldParser { private int result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Integer reusable) { long val = 0; boolean neg = false; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java index fca6e0bddbf66..d487c6621a06d 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java @@ -35,11 +35,11 @@ public class IntValueParser extends FieldParser { private IntValue result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, IntValue reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, IntValue reusable) { long val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length + 1; + final int delimLimit = limit - delimiter.length + 1; this.result = reusable; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java index 2e6930fb47b17..c7b76d265a5cd 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java @@ -31,12 +31,12 @@ public class LongParser extends FieldParser { private long result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, Long reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Long reusable) { long val = 0; boolean neg = false; final int delimLimit = limit - delimiter.length + 1; - + if (bytes[startPos] == '-') { neg = true; startPos++; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java index 394db5ee88bbb..597abc0ea75ac 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java @@ -32,11 +32,11 @@ public class LongValueParser extends FieldParser { private LongValue result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, LongValue reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, LongValue reusable) { long val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length + 1; + final int delimLimit = limit - delimiter.length + 1; this.result = reusable; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java index 0a38e08fa1663..3afa7613dda21 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java @@ -35,11 +35,11 @@ public class ShortParser extends FieldParser { private short result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, Short reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, Short reusable) { int val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length + 1; + final int delimLimit = limit - delimiter.length + 1; if (bytes[startPos] == '-') { neg = true; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java index 2fdac32faa969..880af254dd9de 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java @@ -35,12 +35,12 @@ public class ShortValueParser extends FieldParser { private ShortValue result; @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, ShortValue reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, ShortValue reusable) { int val = 0; boolean neg = false; - final int delimLimit = limit-delimiter.length + 1; - + final int delimLimit = limit - delimiter.length + 1; + this.result = reusable; if (bytes[startPos] == '-') { diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index 7b8390bf518c3..9cee9909e75a5 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -40,11 +40,11 @@ public void enableQuotedStringParsing(byte quoteCharacter) { } @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, String reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, String reusable) { int i = startPos; - final int delimLimit = limit-delimiter.length + 1; + final int delimLimit = limit - delimiter.length + 1; if(quotedStringParsing && bytes[i] == quoteCharacter) { // quoted string parsing enabled and first character Vis a quote diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java index 06a2dd0dff049..bc899522ae864 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java @@ -43,12 +43,12 @@ public void enableQuotedStringParsing(byte quoteCharacter) { } @Override - public int parseFieldImpl(byte[] bytes, int startPos, int limit, byte[] delimiter, StringValue reusable) { + public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, StringValue reusable) { this.result = reusable; int i = startPos; - final int delimLimit = limit-delimiter.length + 1; + final int delimLimit = limit - delimiter.length + 1; if(quotedStringParsing == true && bytes[i] == quoteCharacter) { // quoted string parsing enabled and first character is a quote