From 675b6a44e76ae71901bc6d4eaea1d09b6f789ff6 Mon Sep 17 00:00:00 2001 From: Joshi Date: Wed, 1 Jun 2016 13:26:47 -0700 Subject: [PATCH 1/7] [FLINK-3921] StringParser encoding --- .../org/apache/flink/types/parser/StringParser.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index 7dc80e5fd9c43..5ae1268a689ca 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -20,6 +20,8 @@ import org.apache.flink.annotation.PublicEvolving; +import java.nio.charset.Charset; + /** * Converts a variable length field of a byte array into a {@link String}. The byte contents between * delimiters is interpreted as an ASCII string. The string may be quoted in double quotes. For quoted @@ -31,6 +33,7 @@ public class StringParser extends FieldParser { private boolean quotedStringParsing = false; private byte quoteCharacter; private static final byte BACKSLASH = 92; + private final Charset ascii = Charset.forName("US-ASCII"); private String result; @@ -63,11 +66,11 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S // check for proper termination if (i == limit) { // either by end of line - this.result = new String(bytes, startPos+1, i - startPos - 2); + this.result = new String(bytes, startPos+1, i - startPos - 2, ascii); return limit; } else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) { // or following field delimiter - this.result = new String(bytes, startPos+1, i - startPos - 2); + this.result = new String(bytes, startPos+1, i - startPos - 2, ascii); return i + delimiter.length; } else { // no proper termination @@ -84,11 +87,11 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S if (i >= delimLimit) { // no delimiter found. Take the full string - this.result = new String(bytes, startPos, limit - startPos); + this.result = new String(bytes, startPos, limit - startPos, ascii); return limit; } else { // delimiter found. - this.result = new String(bytes, startPos, i - startPos); + this.result = new String(bytes, startPos, i - startPos, ascii); return i + delimiter.length; } } From 08a5468979ea00f4c6a6c6f9f8831e30027373f3 Mon Sep 17 00:00:00 2001 From: Joshi Date: Sat, 4 Jun 2016 19:15:50 -0700 Subject: [PATCH 2/7] [FLINK-3921] Parser encoding --- .../api/common/io/GenericCsvInputFormat.java | 6 ++- .../flink/types/parser/FieldParser.java | 37 ++++++++++++++++++- .../flink/types/parser/StringParser.java | 10 ++--- .../parser/VarLengthStringParserTest.java | 19 ++++++++++ 4 files changed, 62 insertions(+), 10 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index 31d42ff04c607..3e8b99cdee31e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -50,7 +50,9 @@ public abstract class GenericCsvInputFormat extends DelimitedInputFormat /** The default charset to convert strings to bytes */ private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8"); - + + private static final Charset ASCII_CHARSET = Charset.forName("US-ASCII"); + private static final Class[] EMPTY_TYPES = new Class[0]; private static final boolean[] EMPTY_INCLUDED = new boolean[0]; @@ -392,7 +394,7 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu @SuppressWarnings("unchecked") FieldParser parser = (FieldParser) this.fieldParsers[output]; Object reuse = holders[output]; - startPos = parser.parseField(bytes, startPos, limit, this.fieldDelim, reuse); + startPos = parser.parseField(bytes, startPos, limit, this.fieldDelim, reuse, ASCII_CHARSET); holders[output] = parser.getLastResult(); // check parse result diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 57689a8b3b271..096c7a4ef8a72 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -19,6 +19,7 @@ package org.apache.flink.types.parser; +import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; @@ -75,8 +76,30 @@ public static enum ParseErrorState { /** Invalid Boolean value **/ BOOLEAN_INVALID } - + + private Charset charset = Charset.forName("US-ASCII"); + private ParseErrorState errorState = ParseErrorState.NONE; + + /** + * Parses the value of a field from the byte array. + * The start position within the byte array and the array's valid length is given. + * The content of the value is delimited by a field delimiter. + * + * @param bytes The byte array that holds the value. + * @param startPos The index where the field starts + * @param limit The limit unto which the byte contents is valid for the parser. The limit is the + * position one after the last valid byte. + * @param delim The field delimiter character + * @param reuse An optional reusable field to hold the value + * @param charset The charset to parse with + * + * @return The index of the next delimiter, if the field was parsed correctly. A value less than 0 otherwise. + */ + public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse, Charset charset){ + this.charset = charset; + return parseField(bytes, startPos, limit, delim, reuse); + } /** * Parses the value of a field from the byte array. @@ -152,7 +175,17 @@ protected void setErrorState(ParseErrorState error) { public ParseErrorState getErrorState() { return this.errorState; } - + + /** + * Gets the Charset for the parser.Default is set to ASCII + * + * @return The charset for the parser. + */ + public Charset getCharset() { + return this.charset; + } + + // -------------------------------------------------------------------------------------------- // Mapping from types to parsers // -------------------------------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index 5ae1268a689ca..3104d1eaebf40 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -33,8 +33,6 @@ public class StringParser extends FieldParser { private boolean quotedStringParsing = false; private byte quoteCharacter; private static final byte BACKSLASH = 92; - private final Charset ascii = Charset.forName("US-ASCII"); - private String result; public void enableQuotedStringParsing(byte quoteCharacter) { @@ -66,11 +64,11 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S // check for proper termination if (i == limit) { // either by end of line - this.result = new String(bytes, startPos+1, i - startPos - 2, ascii); + this.result = new String(bytes, startPos+1, i - startPos - 2, getCharset()); return limit; } else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) { // or following field delimiter - this.result = new String(bytes, startPos+1, i - startPos - 2, ascii); + this.result = new String(bytes, startPos+1, i - startPos - 2, getCharset()); return i + delimiter.length; } else { // no proper termination @@ -87,11 +85,11 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S if (i >= delimLimit) { // no delimiter found. Take the full string - this.result = new String(bytes, startPos, limit - startPos, ascii); + this.result = new String(bytes, startPos, limit - startPos, getCharset()); return limit; } else { // delimiter found. - this.result = new String(bytes, startPos, i - startPos, ascii); + this.result = new String(bytes, startPos, i - startPos, getCharset()); return i + delimiter.length; } } diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java index 4f9069e9dd7ff..fa5e82bd7e170 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java @@ -26,6 +26,8 @@ import org.apache.flink.types.parser.StringValueParser; import org.junit.Test; +import java.nio.charset.Charset; + public class VarLengthStringParserTest { public StringValueParser parser = new StringValueParser(); @@ -195,4 +197,21 @@ public void testParseInvalidQuotedStrings() { startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos < 0); } + + @Test + public void testParseValidMixedStringsWithCharset() { + + Charset charset = Charset.forName("US-ASCII"); + this.parser = new StringValueParser(); + this.parser.enableQuotedStringParsing((byte) '@'); + + // check valid strings with out whitespaces and trailing delimiter + byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes(); + StringValue s = new StringValue(); + + int startPos = 0; + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[]{'|'}, s, charset); + assertTrue(startPos == 11); + assertTrue(s.getValue().equals("abcde|gh")); + } } From cc1444aad3fd4c3035debe33fa95c97f7e414ffd Mon Sep 17 00:00:00 2001 From: Joshi Date: Sat, 4 Jun 2016 19:19:46 -0700 Subject: [PATCH 3/7] [FLINK-3921] Parser encoding --- .../org/apache/flink/api/common/io/GenericCsvInputFormat.java | 4 ++-- .../main/java/org/apache/flink/types/parser/StringParser.java | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index 3e8b99cdee31e..ee39a362b45f3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -51,7 +51,7 @@ public abstract class GenericCsvInputFormat extends DelimitedInputFormat /** The default charset to convert strings to bytes */ private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8"); - private static final Charset ASCII_CHARSET = Charset.forName("US-ASCII"); + private static Charset charset = Charset.forName("US-ASCII"); private static final Class[] EMPTY_TYPES = new Class[0]; @@ -394,7 +394,7 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu @SuppressWarnings("unchecked") FieldParser parser = (FieldParser) this.fieldParsers[output]; Object reuse = holders[output]; - startPos = parser.parseField(bytes, startPos, limit, this.fieldDelim, reuse, ASCII_CHARSET); + startPos = parser.parseField(bytes, startPos, limit, this.fieldDelim, reuse, charset); holders[output] = parser.getLastResult(); // check parse result diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index 3104d1eaebf40..83778e7ec9aa1 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -20,8 +20,6 @@ import org.apache.flink.annotation.PublicEvolving; -import java.nio.charset.Charset; - /** * Converts a variable length field of a byte array into a {@link String}. The byte contents between * delimiters is interpreted as an ASCII string. The string may be quoted in double quotes. For quoted @@ -33,6 +31,7 @@ public class StringParser extends FieldParser { private boolean quotedStringParsing = false; private byte quoteCharacter; private static final byte BACKSLASH = 92; + private String result; public void enableQuotedStringParsing(byte quoteCharacter) { From 6dfd355198cb712121f6c3656c52b41ea5d68bf7 Mon Sep 17 00:00:00 2001 From: Joshi Date: Mon, 13 Jun 2016 16:40:59 -0700 Subject: [PATCH 4/7] FLINK-3921: updated --- .../api/common/io/GenericCsvInputFormat.java | 32 ++++++++++++++++--- .../flink/types/parser/FieldParser.java | 10 ++++++ .../common/io/GenericCsvInputFormatTest.java | 32 +++++++++++++++++++ 3 files changed, 69 insertions(+), 5 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index ee39a362b45f3..f94826e248230 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -49,9 +49,7 @@ public abstract class GenericCsvInputFormat extends DelimitedInputFormat private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class); /** The default charset to convert strings to bytes */ - private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8"); - - private static Charset charset = Charset.forName("US-ASCII"); + private static Charset charset = Charset.forName("UTF-8"); private static final Class[] EMPTY_TYPES = new Class[0]; @@ -108,6 +106,11 @@ protected GenericCsvInputFormat() { protected GenericCsvInputFormat(Path filePath) { super(filePath); } + + protected GenericCsvInputFormat(Path filePath, Charset charset) { + super(filePath); + this.charset = charset != null ? charset : Charset.forName("UTF-8"); + } // -------------------------------------------------------------------------------------------- @@ -132,7 +135,7 @@ public void setCommentPrefix(char commentPrefix) { } public void setCommentPrefix(String commentPrefix) { - setCommentPrefix(commentPrefix, UTF_8_CHARSET); + setCommentPrefix(commentPrefix, charset); } public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { @@ -176,7 +179,7 @@ public void setFieldDelimiter(char delimiter) { } public void setFieldDelimiter(String delimiter) { - this.fieldDelim = delimiter.getBytes(UTF_8_CHARSET); + this.fieldDelim = delimiter.getBytes(charset); } public boolean isLenient() { @@ -316,6 +319,25 @@ protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { this.fieldIncluded = includedMask; } + /** + * Gets the Charset for the parser.Default is set to UTF-8 + * + * @return The charset for the parser. + */ + public Charset getCharset() { + return this.charset; + } + + /** + * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset + * when doing a parse. + * + * @param charset The charset to set. + */ + protected void setCharset(Charset charset){ + this.charset = charset != null ? charset : Charset.forName("UTF-8"); + } + // -------------------------------------------------------------------------------------------- // Runtime methods // -------------------------------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 096c7a4ef8a72..808920a05bdf2 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -185,6 +185,16 @@ public Charset getCharset() { return this.charset; } + /** + * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset + * when doing a parse. + * + * @param charset The charset to set. + */ + protected void setCharset(Charset charset){ + this.charset = charset; + } + // -------------------------------------------------------------------------------------------- // Mapping from types to parsers diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java index e3215c655aec2..48b6d7b08169b 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.charset.Charset; import java.util.Arrays; import java.util.zip.DeflaterOutputStream; import java.util.zip.GZIPOutputStream; @@ -547,6 +548,37 @@ public void testReadInvalidContentsLenientWithSkipping() { fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); } } + + @Test + public void testReadWithCharset() throws IOException { + try { + final String fileContent = "abc|def|ghijk"; + final FileInputSplit split = createTempFile(fileContent); + + final Configuration parameters = new Configuration(); + + format.setCharset(Charset.forName("UTF-8")); + format.setFieldDelimiter("|"); + format.setFieldTypesGeneric(StringValue.class, StringValue.class, StringValue.class); + + format.configure(parameters); + format.open(split); + + Value[] values = new Value[] { new StringValue(), new StringValue(), new StringValue()}; + + values = format.nextRecord(values); + assertNotNull(values); + assertEquals("abc", ((StringValue) values[0]).getValue()); + assertEquals("def", ((StringValue) values[1]).getValue()); + assertEquals("ghijk", ((StringValue) values[2]).getValue()); + + assertNull(format.nextRecord(values)); + assertTrue(format.reachedEnd()); + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } + } @Test public void readWithEmptyField() { From 211665c985d746645576a9a98098a257881bc378 Mon Sep 17 00:00:00 2001 From: Joshi Date: Thu, 8 Sep 2016 14:36:35 -0700 Subject: [PATCH 5/7] FLINK-3921: updated for review comment --- .../org/apache/flink/api/common/io/GenericCsvInputFormat.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index f94826e248230..a7e82d3200ffe 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -26,6 +26,7 @@ import org.apache.flink.types.parser.StringValueParser; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,7 +110,7 @@ protected GenericCsvInputFormat(Path filePath) { protected GenericCsvInputFormat(Path filePath, Charset charset) { super(filePath); - this.charset = charset != null ? charset : Charset.forName("UTF-8"); + this.charset = Preconditions.checkNotNull(charset); } // -------------------------------------------------------------------------------------------- From 445d578f623bf1baa26f8eb0cb5d099d1d41a2bb Mon Sep 17 00:00:00 2001 From: Joshi Date: Fri, 9 Sep 2016 10:47:11 -0700 Subject: [PATCH 6/7] FLINK-3921: updated for review comment --- .../api/common/io/GenericCsvInputFormat.java | 13 +++++----- .../flink/types/parser/FieldParser.java | 24 ++----------------- .../flink/types/parser/StringParser.java | 2 +- .../parser/VarLengthStringParserTest.java | 4 ++-- 4 files changed, 12 insertions(+), 31 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index a7e82d3200ffe..f22133481b7ac 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -321,7 +321,7 @@ protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { } /** - * Gets the Charset for the parser.Default is set to UTF-8 + * Gets the character set for the parser. Default is set to UTF-8. * * @return The charset for the parser. */ @@ -333,10 +333,10 @@ public Charset getCharset() { * Sets the charset of the parser. Called by subclasses of the parser to set the type of charset * when doing a parse. * - * @param charset The charset to set. + * @param charset The character set to set. */ - protected void setCharset(Charset charset){ - this.charset = charset != null ? charset : Charset.forName("UTF-8"); + protected void setCharset(Charset charset) { + this.charset = Preconditions.checkNotNull(charset); } // -------------------------------------------------------------------------------------------- @@ -359,6 +359,7 @@ public void open(FileInputSplit split) throws IOException { FieldParser p = InstantiationUtil.instantiate(parserType, FieldParser.class); + p.setCharset(this.getCharset()); if (this.quotedStringParsing) { if (p instanceof StringParser) { ((StringParser)p).enableQuotedStringParsing(this.quoteCharacter); @@ -417,7 +418,7 @@ protected boolean parseRecord(Object[] holders, byte[] bytes, int offset, int nu @SuppressWarnings("unchecked") FieldParser parser = (FieldParser) this.fieldParsers[output]; Object reuse = holders[output]; - startPos = parser.parseField(bytes, startPos, limit, this.fieldDelim, reuse, charset); + startPos = parser.parseField(bytes, startPos, limit, this.fieldDelim, reuse); holders[output] = parser.getLastResult(); // check parse result @@ -474,7 +475,7 @@ protected int skipFields(byte[] bytes, int startPos, int limit, byte[] delim) { // search for ending quote character, continue when it is escaped i++; - while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)){ + while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)) { i++; } i++; diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 808920a05bdf2..a3f23860f7a4c 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -77,30 +77,10 @@ public static enum ParseErrorState { BOOLEAN_INVALID } - private Charset charset = Charset.forName("US-ASCII"); + private Charset charset = Charset.forName("UTF-8"); private ParseErrorState errorState = ParseErrorState.NONE; - /** - * Parses the value of a field from the byte array. - * The start position within the byte array and the array's valid length is given. - * The content of the value is delimited by a field delimiter. - * - * @param bytes The byte array that holds the value. - * @param startPos The index where the field starts - * @param limit The limit unto which the byte contents is valid for the parser. The limit is the - * position one after the last valid byte. - * @param delim The field delimiter character - * @param reuse An optional reusable field to hold the value - * @param charset The charset to parse with - * - * @return The index of the next delimiter, if the field was parsed correctly. A value less than 0 otherwise. - */ - public int parseField(byte[] bytes, int startPos, int limit, byte[] delim, T reuse, Charset charset){ - this.charset = charset; - return parseField(bytes, startPos, limit, delim, reuse); - } - /** * Parses the value of a field from the byte array. * The start position within the byte array and the array's valid length is given. @@ -191,7 +171,7 @@ public Charset getCharset() { * * @param charset The charset to set. */ - protected void setCharset(Charset charset){ + public void setCharset(Charset charset) { this.charset = charset; } diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index 83778e7ec9aa1..b6cb28ef08776 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -51,7 +51,7 @@ public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, S i++; // search for ending quote character, continue when it is escaped - while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)){ + while (i < limit && (bytes[i] != quoteCharacter || bytes[i-1] == BACKSLASH)) { i++; } diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java index fa5e82bd7e170..1c5579ef72572 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java @@ -23,7 +23,6 @@ import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; -import org.apache.flink.types.parser.StringValueParser; import org.junit.Test; import java.nio.charset.Charset; @@ -210,7 +209,8 @@ public void testParseValidMixedStringsWithCharset() { StringValue s = new StringValue(); int startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[]{'|'}, s, charset); + parser.setCharset(charset); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[]{'|'}, s); assertTrue(startPos == 11); assertTrue(s.getValue().equals("abcde|gh")); } From 342eac100cc3f6796e8c9d0dce57f7ad002d3e03 Mon Sep 17 00:00:00 2001 From: Joshi Date: Mon, 21 Nov 2016 14:44:21 -0800 Subject: [PATCH 7/7] FLINK-3921: updated for review comment --- .../api/common/io/GenericCsvInputFormat.java | 29 ++----------------- .../common/io/GenericCsvInputFormatTest.java | 16 +++++----- .../apache/flink/api/java/io/CsvReader.java | 24 ++++++++++++++- .../flink/api/java/io/CSVReaderTest.java | 9 ++++++ 4 files changed, 43 insertions(+), 35 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index f22133481b7ac..aa68aed605bf3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -32,8 +32,6 @@ import java.io.IOException; import java.nio.charset.Charset; -import java.nio.charset.IllegalCharsetNameException; -import java.nio.charset.UnsupportedCharsetException; import java.util.ArrayList; import java.util.Map; import java.util.TreeMap; @@ -127,32 +125,11 @@ public byte[] getCommentPrefix() { return commentPrefix; } - public void setCommentPrefix(byte[] commentPrefix) { - this.commentPrefix = commentPrefix; - } - - public void setCommentPrefix(char commentPrefix) { - setCommentPrefix(String.valueOf(commentPrefix)); - } - public void setCommentPrefix(String commentPrefix) { setCommentPrefix(commentPrefix, charset); } - public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { - if (charsetName == null) { - throw new IllegalArgumentException("Charset name must not be null"); - } - - if (commentPrefix != null) { - Charset charset = Charset.forName(charsetName); - setCommentPrefix(commentPrefix, charset); - } else { - this.commentPrefix = null; - } - } - - public void setCommentPrefix(String commentPrefix, Charset charset) { + private void setCommentPrefix(String commentPrefix, Charset charset) { if (charset == null) { throw new IllegalArgumentException("Charset must not be null"); } @@ -325,7 +302,7 @@ protected void setFieldsGeneric(boolean[] includedMask, Class[] fieldTypes) { * * @return The charset for the parser. */ - public Charset getCharset() { + Charset getCharset() { return this.charset; } @@ -335,7 +312,7 @@ public Charset getCharset() { * * @param charset The character set to set. */ - protected void setCharset(Charset charset) { + public void setCharset(Charset charset) { this.charset = Preconditions.checkNotNull(charset); } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java index 48b6d7b08169b..d063ddc8315eb 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java @@ -486,7 +486,7 @@ public void testReadInvalidContents() throws IOException { fail("Input format accepted on invalid input."); } catch (ParseException e) { - ; // all good + // all good } } catch (Exception ex) { @@ -552,7 +552,7 @@ public void testReadInvalidContentsLenientWithSkipping() { @Test public void testReadWithCharset() throws IOException { try { - final String fileContent = "abc|def|ghijk"; + final String fileContent = "\u00bf|Flink|\u00f1"; final FileInputSplit split = createTempFile(fileContent); final Configuration parameters = new Configuration(); @@ -568,9 +568,9 @@ public void testReadWithCharset() throws IOException { values = format.nextRecord(values); assertNotNull(values); - assertEquals("abc", ((StringValue) values[0]).getValue()); - assertEquals("def", ((StringValue) values[1]).getValue()); - assertEquals("ghijk", ((StringValue) values[2]).getValue()); + assertEquals("\u00bf", ((StringValue) values[0]).getValue()); + assertEquals("Flink", ((StringValue) values[1]).getValue()); + assertEquals("\u00f1", ((StringValue) values[2]).getValue()); assertNull(format.nextRecord(values)); assertTrue(format.reachedEnd()); @@ -579,7 +579,7 @@ public void testReadWithCharset() throws IOException { fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); } } - + @Test public void readWithEmptyField() { try { @@ -754,7 +754,7 @@ private FileInputSplit createTempGzipFile(String content) throws IOException { return new FileInputSplit(0, new Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] {"localhost"}); } - private final Value[] createIntValues(int num) { + private Value[] createIntValues(int num) { Value[] v = new Value[num]; for (int i = 0; i < num; i++) { @@ -764,7 +764,7 @@ private final Value[] createIntValues(int num) { return v; } - private final Value[] createLongValues(int num) { + private Value[] createLongValues(int num) { Value[] v = new Value[num]; for (int i = 0; i < num; i++) { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index 8be5dc23840c7..b13b8aab64706 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -18,6 +18,7 @@ package org.apache.flink.api.java.io; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; @@ -64,6 +65,8 @@ public class CsvReader { protected boolean skipFirstLineAsHeader = false; protected boolean ignoreInvalidLines = false; + + private Charset charset = Charset.forName("UTF-8"); // -------------------------------------------------------------------------------------------- @@ -157,7 +160,25 @@ public CsvReader ignoreComments(String commentPrefix) { this.commentPrefix = commentPrefix; return this; } - + + /** + * Gets the character set for the reader. Default is set to UTF-8. + * + * @return The charset for the reader. + */ + public Charset getCharset() { + return this.charset; + } + + /** + * Sets the charset of the reader + * + * @param charset The character set to set. + */ + public void setCharset(Charset charset) { + this.charset = Preconditions.checkNotNull(charset); + } + /** * Configures which fields of the CSV file should be included and which should be skipped. The * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean @@ -340,6 +361,7 @@ private void configureInputFormat(CsvInputFormat format) { format.setCommentPrefix(this.commentPrefix); format.setSkipFirstLineAsHeader(skipFirstLineAsHeader); format.setLenient(ignoreInvalidLines); + format.setCharset(this.charset); if (this.parseQuotedStrings) { format.enableQuotedStringParsing(this.quoteCharacter); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java index 8b12315d6c025..e1c8023eda0fc 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull; import java.io.IOException; +import java.nio.charset.Charset; import java.util.Arrays; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -75,6 +76,14 @@ public void testIgnoreComments() { reader.ignoreComments("#"); assertEquals("#", reader.commentPrefix); } + + @Test + public void testCharset() { + CsvReader reader = getCsvReader(); + assertEquals(reader.getCharset(), Charset.forName("UTF-8")); + reader.setCharset(Charset.forName("US-ASCII")); + assertEquals(reader.getCharset(), Charset.forName("US-ASCII")); + } @Test public void testIncludeFieldsDense() {