diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java index 825300137b80e..684bea7c1c0e9 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java @@ -213,7 +213,7 @@ public int hashCode() { // -------------------------------------------------------------------------------------------- - private interface RuntimeConverter extends Serializable { + interface RuntimeConverter extends Serializable { Object convert(JsonNode node); } @@ -230,7 +230,7 @@ private static RuntimeConverter createRowRuntimeConverter( return assembleRowRuntimeConverter(ignoreParseErrors, isTopLevel, fieldNames, fieldConverters); } - private static RuntimeConverter[] createFieldRuntimeConverters(boolean ignoreParseErrors, TypeInformation[] fieldTypes) { + static RuntimeConverter[] createFieldRuntimeConverters(boolean ignoreParseErrors, TypeInformation[] fieldTypes) { final RuntimeConverter[] fieldConverters = new RuntimeConverter[fieldTypes.length]; for (int i = 0; i < fieldTypes.length; i++) { fieldConverters[i] = createNullableRuntimeConverter(fieldTypes[i], ignoreParseErrors); @@ -365,7 +365,7 @@ private static RuntimeConverter createByteArrayRuntimeConverter(boolean ignorePa }; } - private static void validateArity(int expected, int actual, boolean ignoreParseErrors) { + static void validateArity(int expected, int actual, boolean ignoreParseErrors) { if (expected != actual && !ignoreParseErrors) { throw new RuntimeException("Row length mismatch. " + expected + " fields expected but was " + actual + "."); diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java new file mode 100644 index 0000000000000..279aca7e60c1e --- /dev/null +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowCsvInputFormat.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.csv.CsvRowDeserializationSchema.RuntimeConverter; +import org.apache.flink.types.Row; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MappingIterator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.compress.utils.BoundedInputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.NoSuchElementException; + +import static org.apache.flink.formats.csv.CsvRowDeserializationSchema.createFieldRuntimeConverters; +import static org.apache.flink.formats.csv.CsvRowDeserializationSchema.validateArity; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Input format that reads csv into {@link Row}. + * + *

Different from old csv {@code org.apache.flink.api.java.io.RowCsvInputFormat}: + * 1.New csv will emit this row (Fill null the remaining fields) when row is too short. + * But Old csv will skip this too short row. + * 2.New csv, escape char will be removed. But old csv will keep the escape char. + * + *

These can be continuously improved in new csv input format: + * 1.New csv not support configure comment char. The comment char is "#". + * 2.New csv not support configure multi chars field delimiter. + * 3.New csv not support read first N, it will throw exception. + * 4.Only support configure line delimiter: "\r" or "\n" or "\r\n". + */ +public class RowCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + private final TypeInformation[] fieldTypes; + private final CsvSchema csvSchema; + + private final String[] selectedFieldNames; + private final boolean ignoreParseErrors; + + /** Runtime instance that performs the actual work. */ + private transient RuntimeConverter runtimeConverter; + + private transient MappingIterator iterator; + + private transient boolean end; + + private RowCsvInputFormat( + Path[] filePaths, + TypeInformation[] fieldTypes, + CsvSchema csvSchema, + int[] selectedFields, + boolean ignoreParseErrors) { + setFilePaths(filePaths); + + this.fieldTypes = checkNotNull(fieldTypes); + this.csvSchema = checkNotNull(csvSchema); + checkArgument(fieldTypes.length == csvSchema.size()); + this.ignoreParseErrors = ignoreParseErrors; + this.selectedFieldNames = Arrays.stream(checkNotNull(selectedFields)) + .mapToObj(csvSchema::columnName).toArray(String[]::new); + } + + @Override + public boolean supportsMultiPaths() { + return true; + } + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + prepareConverter(); + + InputStream realStream = stream; + + long realStart = splitStart; + if (splitStart != 0) { + realStart = findNextLegalSeparator(); + } + + if (splitLength != READ_WHOLE_SPLIT_FLAG) { + stream.seek(splitStart + splitLength); + long firstByteOfNextLine = findNextLegalSeparator(); + stream.seek(realStart); + realStream = new BoundedInputStream(stream, firstByteOfNextLine - realStart); + } else { + stream.seek(realStart); + } + + this.iterator = new CsvMapper().readerFor(JsonNode.class).with(csvSchema).readValues(realStream); + } + + private void prepareConverter() { + RuntimeConverter[] fieldConverters = createFieldRuntimeConverters(ignoreParseErrors, fieldTypes); + + this.runtimeConverter = (node) -> { + final int nodeSize = node.size(); + + validateArity(csvSchema.size(), nodeSize, ignoreParseErrors); + + Row row = new Row(selectedFieldNames.length); + for (int i = 0; i < Math.min(selectedFieldNames.length, nodeSize); i++) { + // Jackson only supports mapping by name in the first level + row.setField(i, fieldConverters[i].convert(node.get(selectedFieldNames[i]))); + } + return row; + }; + } + + /** + * Find next legal line separator and set next offset (first byte offset of next line) to stream. + * + * NOTE: Because of the particularity of UTF-8 encoding, we can determine the number of bytes + * of this character only by comparing the first byte, so we do not need to traverse M*N in comparison. + */ + private long findNextLegalSeparator() throws IOException { + boolean usesEscapeChar = csvSchema.usesEscapeChar(); + byte[] escapeBytes = Character.toString((char) csvSchema.getEscapeChar()) + .getBytes(StandardCharsets.UTF_8); + long startPos = stream.getPos(); + + byte b; + while ((b = (byte) stream.read()) != -1) { + if (b == '\r' || b == '\n') { + // If there may be escape tags ahead + if (usesEscapeChar && stream.getPos() - startPos <= escapeBytes.length) { + long front = stream.getPos() - escapeBytes.length - 1; + if (front > 0) { + stream.seek(front); + byte[] readBytes = new byte[escapeBytes.length]; + stream.read(readBytes); // we have judge front must bigger than zero + stream.read(); // back to current next one + if (Arrays.equals(escapeBytes, readBytes)) { + // equal, we should skip this one line separator + continue; + } + } + } + + long pos = stream.getPos(); + + // deal with "\r\n", next one maybe '\n', so we need skip it. + if (b == '\r' && (byte) stream.read() == '\n') { + return stream.getPos(); + } else { + return pos; + } + } else if (usesEscapeChar && b == escapeBytes[0]) { + boolean equal = true; + for (int i = 1; i < escapeBytes.length; i++) { + if ((byte) stream.read() != escapeBytes[i]) { + equal = false; + break; + } + } + if (equal) { + // equal, we should skip next one + stream.skip(1); + } + } + } + return stream.getPos(); + } + + @Override + public boolean reachedEnd() throws IOException { + return end; + } + + @Override + public Row nextRecord(Row record) throws IOException { + Row returnRecord = null; + do { + try { + JsonNode root = iterator.nextValue(); + returnRecord = (Row) runtimeConverter.convert(root); + } catch (NoSuchElementException e) { + end = true; + } catch (Throwable t) { + if (!ignoreParseErrors) { + throw new IOException("Failed to deserialize CSV row.", t); + } + } + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; + } + + /** + * Create a builder. + */ + public static Builder builder(TypeInformation typeInfo, Path... filePaths) { + return new Builder(typeInfo, filePaths); + } + + /** + * A builder for creating a {@link RowCsvInputFormat}. + */ + public static class Builder implements Serializable { + + private final Path[] filePaths; + private final TypeInformation[] fieldTypes; + private CsvSchema csvSchema; + private boolean ignoreParseErrors; + private int[] selectedFields; + + /** + * Creates a row CSV input format for the given {@link TypeInformation} and file paths + * with optional parameters. + */ + private Builder(TypeInformation typeInfo, Path... filePaths) { + checkNotNull(filePaths, "File paths must not be null."); + checkNotNull(typeInfo, "Type information must not be null."); + + if (!(typeInfo instanceof RowTypeInfo)) { + throw new IllegalArgumentException("Row type information expected."); + } + + this.filePaths = filePaths; + this.fieldTypes = ((RowTypeInfo) typeInfo).getFieldTypes(); + this.csvSchema = CsvRowSchemaConverter.convert((RowTypeInfo) typeInfo); + } + + public Builder setFieldDelimiter(char delimiter) { + this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(delimiter).build(); + return this; + } + + public Builder setAllowComments(boolean allowComments) { + this.csvSchema = this.csvSchema.rebuild().setAllowComments(allowComments).build(); + return this; + } + + public Builder setArrayElementDelimiter(String delimiter) { + checkNotNull(delimiter, "Array element delimiter must not be null."); + this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(delimiter).build(); + return this; + } + + public Builder setQuoteCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); + return this; + } + + public Builder setEscapeCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); + return this; + } + + public Builder setNullLiteral(String nullLiteral) { + checkNotNull(nullLiteral, "Null literal must not be null."); + this.csvSchema = this.csvSchema.rebuild().setNullValue(nullLiteral).build(); + return this; + } + + public Builder setIgnoreParseErrors(boolean ignoreParseErrors) { + this.ignoreParseErrors = ignoreParseErrors; + return this; + } + + public Builder setSelectedFields(int[] selectedFields) { + this.selectedFields = selectedFields; + return this; + } + + public RowCsvInputFormat build() { + if (selectedFields == null) { + selectedFields = new int[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + selectedFields[i] = i; + } + } + + return new RowCsvInputFormat( + filePaths, + fieldTypes, + csvSchema, + selectedFields, + ignoreParseErrors); + } + } +} diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatSplitTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatSplitTest.java new file mode 100644 index 0000000000000..1725de96a6e88 --- /dev/null +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatSplitTest.java @@ -0,0 +1,155 @@ +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.flink.formats.csv.RowCsvInputFormatTest.PATH; +import static org.apache.flink.formats.csv.RowCsvInputFormatTest.createTempFile; +import static org.junit.Assert.assertEquals; + +/** + * Test split logic for {@link RowCsvInputFormat}. + */ +public class RowCsvInputFormatSplitTest { + + @Test + public void readAll() throws Exception { + test("11$\n1,222\n" + "22$2,333\n", 0, -1, '$', asList(Row.of("11\n1", "222"), Row.of("222", "333"))); + } + + @Test + public void readStartOffset() throws Exception { + test("11$\n1,222\n" + "22$2,333\n", 1, -1, '$', singletonList(Row.of("222", "333"))); + } + + @Test + public void readStartOffsetWithSeparator() throws Exception { + test("11$\n1,222\n" + "22$2,333\n", 3, -1, '$', singletonList(Row.of("222", "333"))); + } + + @Test + public void readLengthWithSeparator() throws Exception { + test("11$\n1,222\n" + "22$\n2,333\n", 0, 13, '$', asList(Row.of("11\n1", "222"), Row.of("22\n2", "333"))); + } + + @Test + public void readLengthWithMultiBytesEscapeChar() throws Exception { + test("11好\n1,222\n" + "22好\n2,333\n", 0, 13, '好', asList(Row.of("11\n1", "222"), Row.of("22\n2", "333"))); + } + + @Test + public void readLengthWithMultiBytesEscapeChar2() throws Exception { + test("11好\n1,222\n" + "22好\n2,333\n", 0, 16, '好', asList(Row.of("11\n1", "222"), Row.of("22\n2", "333"))); + } + + @Test + public void readLengthWithMultiBytesEscapeChar3() throws Exception { + test("11好\n1,222\n" + "22好\n2,333\n", 0, 18, '好', asList(Row.of("11\n1", "222"), Row.of("22\n2", "333"))); + } + + @Test + public void readStartOffsetAndLength() throws Exception { + test("11好\n1,222\n" + "22好\n2,333\n", 3, 18, '好', singletonList(Row.of("22\n2", "333"))); + } + + @Test + public void readMultiLineSeparator() throws Exception { + test("111,222\r\n" + "222,333\r\n", 3, 18, '好', singletonList(Row.of("222", "333"))); + } + + @Test + public void readRLineSeparator() throws Exception { + test("111,222\r" + "222,333\r", 3, 18, '好', singletonList(Row.of("222", "333"))); + } + + @Test + public void testQuotationMark() throws Exception { + test("\"111\",222\r" + "222,333\r", 0, 18, '$', asList(Row.of("111", "222"), Row.of("222", "333"))); + test("\"111\",222\r" + "222,333\r", 3, 18, '$', singletonList(Row.of("222", "333"))); + test("\"111\",222\r" + "222,333\r", 5, 18, '$', singletonList(Row.of("222", "333"))); + test("\"111\",222\r" + "222,333\r", 6, 18, '$', singletonList(Row.of("222", "333"))); + + testOneField("\"111\"\r" + "222\r", 0, 18, '$', asList(Row.of("111"), Row.of("222"))); + testOneField("\"111\"\r" + "222\r", 3, 18, '$', singletonList(Row.of("222"))); + testOneField("\"111\"\r" + "222\r", 5, 18, '$', singletonList(Row.of("222"))); + } + + @Test + public void testSurroundEscapedDelimiter() throws Exception { + test("$11$1,222\r" + "222,333\r", 0, 18, '$', asList(Row.of("111", "222"), Row.of("222", "333"))); + test("$11$1,222\r" + "222,333\r", 3, 18, '$', singletonList(Row.of("222", "333"))); + test("$11$1,222\r" + "222,333\r", 5, 18, '$', singletonList(Row.of("222", "333"))); + test("$11$1,222\r" + "222,333\r", 6, 18, '$', singletonList(Row.of("222", "333"))); + + testOneField("123*'4**\r" + "123*'4**\n", 0, 18, '*', asList(Row.of("123'4*"), Row.of("123'4*"))); + testOneField("123*'4**\r" + "123*'4**\n", 3, 18, '*', singletonList(Row.of("123'4*"))); + testOneField("123*'4**\r" + "123*'4**\n", 4, 18, '*', singletonList(Row.of("123'4*"))); + testOneField("123*'4**\r" + "123*'4**\n", 5, 18, '*', singletonList(Row.of("123'4*"))); + + testOneField("'123''4**'\r" + "'123''4**'\n", 0, 18, '*', asList(Row.of("'123''4*'"), Row.of("'123''4*'"))); + testOneField("'123''4**'\r" + "'123''4**'\n", 3, 18, '*', singletonList(Row.of("'123''4*'"))); + testOneField("'123''4**'\r" + "'123''4**'\n", 4, 18, '*', singletonList(Row.of("'123''4*'"))); + testOneField("'123''4**'\r" + "'123''4**'\n", 5, 18, '*', singletonList(Row.of("'123''4*'"))); + testOneField("'123''4**'\r" + "'123''4**'\n", 6, 18, '*', singletonList(Row.of("'123''4*'"))); + } + + private void test(String content, long offset, long length, char escapeChar, List expected) throws Exception { + test( + content, + offset, + length, + escapeChar, + expected, + new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}); + } + + private void testOneField(String content, long offset, long length, char escapeChar, List expected) throws Exception { + test( + content, + offset, + length, + escapeChar, + expected, + new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}); + } + + private void test( + String content, + long offset, long length, + char escapeChar, + List expected, + TypeInformation[] fieldTypes) throws Exception { + FileInputSplit split = createTempFile(content, offset, length); + + RowCsvInputFormat.Builder builder = RowCsvInputFormat.builder(new RowTypeInfo(fieldTypes), PATH) + .setEscapeCharacter(escapeChar); + + RowCsvInputFormat format = builder.build(); + format.configure(new Configuration()); + format.open(split); + + List rows = new ArrayList<>(); + while (!format.reachedEnd()) { + Row result = new Row(3); + result = format.nextRecord(result); + if (result == null) { + break; + } else { + rows.add(result); + } + } + + assertEquals(expected, rows); + } +} diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatTest.java new file mode 100644 index 0000000000000..509dd2e587f84 --- /dev/null +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/RowCsvInputFormatTest.java @@ -0,0 +1,756 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +import static junit.framework.TestCase.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link RowCsvInputFormat}. + */ +public class RowCsvInputFormatTest { + + static final Path PATH = new Path("an/ignored/file/"); + + // static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() throws Exception { + String fileContent = + "#description of the data\n" + + "header1|header2|header3|\n" + + "this is|1|2.0|\n" + + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO}; + RowCsvInputFormat.Builder builder = RowCsvInputFormat.builder(new RowTypeInfo(fieldTypes), PATH) + .setFieldDelimiter('|') + .setIgnoreParseErrors(false); + + RowCsvInputFormat format = builder.build(); + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + try { + result = format.nextRecord(result); + fail("RuntimeException was not thrown! (Row length mismatch. 3 fields expected but was 1)"); + } catch (IOException ignored) { + } // => ok + + try { + result = format.nextRecord(result); + fail("NumberFormatException was not thrown! (For input string: \"header2\")"); + } catch (IOException ignored) { + } // => ok + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("this is", result.getField(0)); + assertEquals(1, result.getField(1)); + assertEquals(2.0, result.getField(2)); + + try { + result = format.nextRecord(result); + fail("RuntimeException was not thrown! (Row length mismatch. 3 fields expected but was 1)"); + } catch (IOException ignored) { + } // => ok + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a test", result.getField(0)); + assertEquals(3, result.getField(1)); + assertEquals(4.0, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("#next", result.getField(0)); + assertEquals(5, result.getField(1)); + assertEquals(6.0, result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + + // re-open with lenient = true + builder.setIgnoreParseErrors(true); + format = builder.build(); + format.configure(parameters); + format.open(split); + + result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("#description of the data", result.getField(0)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("header1", result.getField(0)); + assertNull(result.getField(1)); + assertNull(result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("this is", result.getField(0)); + assertEquals(1, result.getField(1)); + assertEquals(2.0, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("//a comment", result.getField(0)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a test", result.getField(0)); + assertEquals(3, result.getField(1)); + assertEquals(4.0, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("#next", result.getField(0)); + assertEquals(5, result.getField(1)); + assertEquals(6.0, result.getField(2)); + result = format.nextRecord(result); + assertNull(result); + } + + @Test + public void ignorePrefixComments() throws Exception { + String fileContent = + "#description of the data\n" + + "#successive commented line\n" + + "this is|1|2.0|\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat.builder(new RowTypeInfo(fieldTypes), PATH) + .setFieldDelimiter('|') + .setAllowComments(true); + + RowCsvInputFormat format = builder.build(); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("this is", result.getField(0)); + assertEquals(1, result.getField(1)); + assertEquals(2.0, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a test", result.getField(0)); + assertEquals(3, result.getField(1)); + assertEquals(4.0, result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + } + + @Test + public void readStringFields() throws Exception { + String fileContent = "abc|def|ghijk\nabc||hhg\n|||\n||"; + + FileInputSplit split = createTempFile(fileContent); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat.builder(new RowTypeInfo(fieldTypes), PATH) + .setFieldDelimiter('|'); + + RowCsvInputFormat format = builder.build(); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("def", result.getField(1)); + assertEquals("ghijk", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("hhg", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void readMixedQuotedStringFields() throws Exception { + String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||\n"; + + FileInputSplit split = createTempFile(fileContent); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat.builder(new RowTypeInfo(fieldTypes), PATH) + .setFieldDelimiter('|') + .setQuoteCharacter('@'); + + RowCsvInputFormat format = builder.build(); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a|b|c", result.getField(0)); + assertEquals("def", result.getField(1)); + assertEquals("ghijk", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("|hhg", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void testTailingEmptyFields() throws Exception { + String fileContent = "abc|def|ghijk\n" + + "abc|def|\n" + + "abc||\n" + + "|||\n" + + "||\n" + + "abc|def\n"; + + FileInputSplit split = createTempFile(fileContent); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat.builder(new RowTypeInfo(fieldTypes), PATH) + .setFieldDelimiter('|'); + + RowCsvInputFormat format = builder.build(); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("def", result.getField(1)); + assertEquals("ghijk", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("def", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.getField(0)); + assertEquals("", result.getField(1)); + assertEquals("", result.getField(2)); + + try { + format.nextRecord(result); + fail("RuntimeException: Row length mismatch. 3 fields expected but was 2"); + } catch (IOException e) {} + } + + @Test + public void testIntegerFields() throws Exception { + String fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"; + + FileInputSplit split = createTempFile(fileContent); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat.builder(new RowTypeInfo(fieldTypes), PATH) + .setFieldDelimiter('|'); + + RowCsvInputFormat format = builder.build(); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(5); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(111, result.getField(0)); + assertEquals(222, result.getField(1)); + assertEquals(333, result.getField(2)); + assertEquals(444, result.getField(3)); + assertEquals(555, result.getField(4)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(666, result.getField(0)); + assertEquals(777, result.getField(1)); + assertEquals(888, result.getField(2)); + assertEquals(999, result.getField(3)); + assertEquals(0, result.getField(4)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void testEmptyFields() throws Exception { + String fileContent = + ",,,,,,,,\n" + + ",,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,\n" + + ",,,,,,,,\n"; + + FileInputSplit split = createTempFile(fileContent); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.BYTE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.FLOAT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat.builder(new RowTypeInfo(fieldTypes), PATH) + .setFieldDelimiter(',') + .setNullLiteral(""); + + RowCsvInputFormat format = builder.build(); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(8); + int linesCnt = fileContent.split("\n").length; + + for (int i = 0; i < linesCnt; i++) { + result = format.nextRecord(result); + assertNull(result.getField(i)); + } + + // ensure no more rows + assertNull(format.nextRecord(result)); + assertTrue(format.reachedEnd()); + } + + @Test + public void testDoubleFields() throws Exception { + String fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat.builder(new RowTypeInfo(fieldTypes), PATH) + .setFieldDelimiter('|'); + + RowCsvInputFormat format = builder.build(); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(5); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(11.1, result.getField(0)); + assertEquals(22.2, result.getField(1)); + assertEquals(33.3, result.getField(2)); + assertEquals(44.4, result.getField(3)); + assertEquals(55.5, result.getField(4)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(66.6, result.getField(0)); + assertEquals(77.7, result.getField(1)); + assertEquals(88.8, result.getField(2)); + assertEquals(99.9, result.getField(3)); + assertEquals(0.0, result.getField(4)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void testReadSparseWithPositionSetter() throws Exception { + String fileContent = "111|222|333|444|555|666|777|888|999|000|\n" + + "000|999|888|777|666|555|444|333|222|111|"; + + FileInputSplit split = createTempFile(fileContent); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat.builder(new RowTypeInfo(fieldTypes), PATH) + .setFieldDelimiter('|') + .setSelectedFields(new int[]{0, 3, 7}); + + RowCsvInputFormat format = builder.build(); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + result = format.nextRecord(result); + + assertNotNull(result); + assertEquals(111, result.getField(0)); + assertEquals(444, result.getField(1)); + assertEquals(888, result.getField(2)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(0, result.getField(0)); + assertEquals(777, result.getField(1)); + assertEquals(333, result.getField(2)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void testWindowsLineEndRemoval() throws Exception { + + // check typical use case -- linux file is correct and it is set up to linux(\n) + testRemovingTrailingCR("\n"); + + // check typical windows case -- windows file endings and file has windows file endings set up + testRemovingTrailingCR("\r\n"); + + // check problematic case windows file -- windows file endings(\r\n) + // but linux line endings (\n) set up + testRemovingTrailingCR("\r\n"); + + // check problematic case linux file -- linux file endings (\n) + // but windows file endings set up (\r\n) + // specific setup for windows line endings will expect \r\n because + // it has to be set up and is not standard. + } + + @Test + public void testQuotedStringParsingWithIncludeFields() throws Exception { + String fileContent = "\"20:41:52-1-3-2015\"|\"Re: Taskmanager memory error in Eclipse\"|" + + "\"Blahblah \"|\"blaaa\"|\"blubb\""; + File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile)); + writer.write(fileContent); + writer.close(); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat + .builder(new RowTypeInfo(fieldTypes), new Path(tempFile.toURI().toString())) + .setFieldDelimiter('|') + .setSelectedFields(new int[]{0, 2}) + .setQuoteCharacter('"'); + + RowCsvInputFormat inputFormat = builder.build(); + inputFormat.configure(new Configuration()); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + inputFormat.open(splits[0]); + + Row record = inputFormat.nextRecord(new Row(2)); + assertEquals("20:41:52-1-3-2015", record.getField(0)); + assertEquals("Blahblah ", record.getField(1)); + } + + @Test + public void testQuotedStringParsingWithEscapedQuotes() throws Exception { + String fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" young\""; + File tempFile = File.createTempFile("CsvReaderQuotedString", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter writer = new OutputStreamWriter(new FileOutputStream(tempFile)); + writer.write(fileContent); + writer.close(); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat + .builder(new RowTypeInfo(fieldTypes), new Path(tempFile.toURI().toString())) + .setFieldDelimiter('|') + .setQuoteCharacter('"') + .setEscapeCharacter('\\'); + + RowCsvInputFormat inputFormat = builder.build(); + inputFormat.configure(new Configuration()); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + inputFormat.open(splits[0]); + + Row record = inputFormat.nextRecord(new Row(2)); + assertEquals("\"Hello\" World", record.getField(0)); + assertEquals("We are\" young", record.getField(1)); + } + + @Test + public void testSqlTimeFields() throws Exception { + String fileContent = "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5\n" + + "1990-10-14|02:42:25|1990-10-14 02:42:25.123|1990-1-4 2:2:5.3\n"; + + FileInputSplit split = createTempFile(fileContent); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + SqlTimeTypeInfo.DATE, + SqlTimeTypeInfo.TIME, + SqlTimeTypeInfo.TIMESTAMP, + SqlTimeTypeInfo.TIMESTAMP}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat.builder(new RowTypeInfo(fieldTypes), PATH) + .setFieldDelimiter('|'); + + RowCsvInputFormat format = builder.build(); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(4); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(Date.valueOf("1990-10-14"), result.getField(0)); + assertEquals(Time.valueOf("02:42:25"), result.getField(1)); + assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.getField(2)); + assertEquals(Timestamp.valueOf("1990-01-04 02:02:05"), result.getField(3)); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(Date.valueOf("1990-10-14"), result.getField(0)); + assertEquals(Time.valueOf("02:42:25"), result.getField(1)); + assertEquals(Timestamp.valueOf("1990-10-14 02:42:25.123"), result.getField(2)); + assertEquals(Timestamp.valueOf("1990-01-04 02:02:05.3"), result.getField(3)); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + + @Test + public void testScanOrder() throws Exception { + String fileContent = + // first row + "111|222|333|444|555|666|777|888|999|000|\n" + + // second row + "000|999|888|777|666|555|444|333|222|111|"; + FileInputSplit split = createTempFile(fileContent); + + TypeInformation[] fieldTypes = new TypeInformation[]{ + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat.builder(new RowTypeInfo(fieldTypes), PATH) + .setFieldDelimiter('|') + .setSelectedFields(new int[]{7, 3, 0}); + + RowCsvInputFormat format = builder.build(); + format.configure(new Configuration()); + format.open(split); + + Row result = new Row(3); + + // check first row + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(888, result.getField(0)); + assertEquals(444, result.getField(1)); + assertEquals(111, result.getField(2)); + + // check second row + result = format.nextRecord(result); + assertNotNull(result); + assertEquals(333, result.getField(0)); + assertEquals(777, result.getField(1)); + assertEquals(0, result.getField(2)); + } + + private static FileInputSplit createTempFile(String content) throws IOException { + return createTempFile(content, 0, null); + } + + static FileInputSplit createTempFile(String content, long start, Long length) throws IOException { + File tempFile = File.createTempFile("test_contents", "tmp"); + tempFile.deleteOnExit(); + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8); + wrt.write(content); + wrt.close(); + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), start, + length == null ? tempFile.length() : length, new String[]{"localhost"}); + } + + private static void testRemovingTrailingCR(String lineBreakerInFile) throws IOException { + String fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + lineBreakerInFile; + + // create input file + File tempFile = File.createTempFile("CsvInputFormatTest", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write(fileContent); + wrt.close(); + + TypeInformation[] fieldTypes = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO}; + + RowCsvInputFormat.Builder builder = RowCsvInputFormat + .builder(new RowTypeInfo(fieldTypes), new Path(tempFile.toURI().toString())); + + RowCsvInputFormat inputFormat = builder.build(); + + inputFormat.configure(new Configuration()); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + inputFormat.open(splits[0]); + + Row result = inputFormat.nextRecord(new Row(1)); + assertNotNull("Expecting to not return null", result); + assertEquals(FIRST_PART, result.getField(0)); + + result = inputFormat.nextRecord(result); + assertNotNull("Expecting to not return null", result); + assertEquals(SECOND_PART, result.getField(0)); + } +}