From d497415adc2e58b4e9912ae89a53444825416366 Mon Sep 17 00:00:00 2001 From: zentol Date: Sun, 18 Oct 2015 20:23:23 +0200 Subject: [PATCH 1/3] [FLINK-2692] Untangle CsvInputFormat --- .../wordcount/BoltTokenizerWordCountPojo.java | 3 +- .../BoltTokenizerWordCountWithNames.java | 3 +- .../api/java/io/CommonCsvInputFormat.java | 258 ------------------ .../flink/api/java/io/CsvInputFormat.java | 91 +++++- .../apache/flink/api/java/io/CsvReader.java | 125 ++++----- .../flink/api/java/io/PojoCsvInputFormat.java | 232 ++++++++++++++++ .../api/java/io/TupleCsvInputFormat.java | 120 ++++++++ .../flink/api/java/tuple/TupleGenerator.java | 13 +- .../typeutils/runtime/TupleSerializer.java | 8 + .../runtime/TupleSerializerBase.java | 2 + .../flink/api/java/io/CsvInputFormatTest.java | 96 ++----- .../optimizer/ReplicatingDataSourceTest.java | 26 +- .../scala/operators/ScalaCsvInputFormat.java | 65 ----- .../scala/operators/ScalaCsvOutputFormat.java | 6 +- .../api/scala/ExecutionEnvironment.scala | 56 ++-- .../scala/typeutils/CaseClassSerializer.scala | 4 + .../api/java/common/PlanBinder.java | 13 +- .../api/scala/io/CsvInputFormatTest.scala | 78 ++---- 18 files changed, 600 insertions(+), 599 deletions(-) delete mode 100644 flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java delete mode 100644 flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java index f72acb333863f..94333e6244919 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java @@ -20,6 +20,7 @@ import backtype.storm.topology.IRichBolt; import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.PojoCsvInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -122,7 +123,7 @@ private static DataStream getTextDataStream(final StreamExecutionEnvir // read the text file from given input path PojoTypeInfo sourceType = (PojoTypeInfo) TypeExtractor .getForObject(new Sentence("")); - return env.createInput(new CsvInputFormat(new Path( + return env.createInput(new PojoCsvInputFormat(new Path( textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType), sourceType); diff --git a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java index 7617e95814275..aba5217aa3031 100644 --- a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java +++ b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java @@ -21,6 +21,7 @@ import backtype.storm.tuple.Fields; import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.TupleCsvInputFormat; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; @@ -125,7 +126,7 @@ private static DataStream> getTextDataStream(final StreamExecutio // read the text file from given input path TupleTypeInfo> sourceType = (TupleTypeInfo>)TypeExtractor .getForObject(new Tuple1("")); - return env.createInput(new CsvInputFormat>(new Path( + return env.createInput(new TupleCsvInputFormat>(new Path( textPath), CsvInputFormat.DEFAULT_LINE_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, sourceType), sourceType); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java deleted file mode 100644 index 444d151dc792e..0000000000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CommonCsvInputFormat.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.io.GenericCsvInputFormat; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.apache.flink.types.parser.FieldParser; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -public abstract class CommonCsvInputFormat extends GenericCsvInputFormat { - - private static final long serialVersionUID = 1L; - - public static final String DEFAULT_LINE_DELIMITER = "\n"; - - public static final String DEFAULT_FIELD_DELIMITER = ","; - - protected transient Object[] parsedValues; - - private final Class pojoTypeClass; - - private String[] pojoFieldNames; - - private transient PojoTypeInfo pojoTypeInfo; - private transient Field[] pojoFields; - - public CommonCsvInputFormat(Path filePath, CompositeType typeInformation) { - this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation); - } - - public CommonCsvInputFormat( - Path filePath, - String lineDelimiter, - String fieldDelimiter, - CompositeType compositeTypeInfo) { - super(filePath); - - setDelimiter(lineDelimiter); - setFieldDelimiter(fieldDelimiter); - - Class[] classes = new Class[compositeTypeInfo.getArity()]; - - for (int i = 0; i < compositeTypeInfo.getArity(); i++) { - classes[i] = compositeTypeInfo.getTypeAt(i).getTypeClass(); - } - - setFieldTypes(classes); - - if (compositeTypeInfo instanceof PojoTypeInfo) { - pojoTypeInfo = (PojoTypeInfo) compositeTypeInfo; - - pojoTypeClass = compositeTypeInfo.getTypeClass(); - setOrderOfPOJOFields(compositeTypeInfo.getFieldNames()); - } else { - pojoTypeClass = null; - pojoFieldNames = null; - } - } - - public void setOrderOfPOJOFields(String[] fieldNames) { - Preconditions.checkNotNull(pojoTypeClass, "Field order can only be specified if output type is a POJO."); - Preconditions.checkNotNull(fieldNames); - - int includedCount = 0; - for (boolean isIncluded : fieldIncluded) { - if (isIncluded) { - includedCount++; - } - } - - Preconditions.checkArgument(includedCount == fieldNames.length, includedCount + - " CSV fields and " + fieldNames.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal."); - - for (String field : fieldNames) { - Preconditions.checkNotNull(field, "The field name cannot be null."); - Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1, - "Field \""+ field + "\" is not a member of POJO class " + pojoTypeClass.getName()); - } - - pojoFieldNames = Arrays.copyOfRange(fieldNames, 0, fieldNames.length); - } - - public void setFieldTypes(Class... fieldTypes) { - if (fieldTypes == null || fieldTypes.length == 0) { - throw new IllegalArgumentException("Field types must not be null or empty."); - } - - setFieldTypesGeneric(fieldTypes); - } - - public void setFields(int[] sourceFieldIndices, Class[] fieldTypes) { - Preconditions.checkNotNull(sourceFieldIndices); - Preconditions.checkNotNull(fieldTypes); - - checkForMonotonousOrder(sourceFieldIndices, fieldTypes); - - setFieldsGeneric(sourceFieldIndices, fieldTypes); - } - - public void setFields(boolean[] sourceFieldMask, Class[] fieldTypes) { - Preconditions.checkNotNull(sourceFieldMask); - Preconditions.checkNotNull(fieldTypes); - - setFieldsGeneric(sourceFieldMask, fieldTypes); - } - - public Class[] getFieldTypes() { - return super.getGenericFieldTypes(); - } - - @Override - public void open(FileInputSplit split) throws IOException { - super.open(split); - - @SuppressWarnings("unchecked") - FieldParser[] fieldParsers = (FieldParser[]) getFieldParsers(); - - //throw exception if no field parsers are available - if (fieldParsers.length == 0) { - throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); - } - - // create the value holders - this.parsedValues = new Object[fieldParsers.length]; - for (int i = 0; i < fieldParsers.length; i++) { - this.parsedValues[i] = fieldParsers[i].createValue(); - } - - // left to right evaluation makes access [0] okay - // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default - if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { - this.lineDelimiterIsLinebreak = true; - } - - // for POJO type - if (pojoTypeClass != null) { - pojoFields = new Field[pojoFieldNames.length]; - - Map allFields = new HashMap(); - - findAllFields(pojoTypeClass, allFields); - - for (int i = 0; i < pojoFieldNames.length; i++) { - pojoFields[i] = allFields.get(pojoFieldNames[i]); - - if (pojoFields[i] != null) { - pojoFields[i].setAccessible(true); - } else { - throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName()); - } - } - } - - this.commentCount = 0; - this.invalidLineCount = 0; - } - - /** - * Finds all declared fields in a class and all its super classes. - * - * @param clazz Class for which all declared fields are found - * @param allFields Map containing all found fields so far - */ - private void findAllFields(Class clazz, Map allFields) { - for (Field field: clazz.getDeclaredFields()) { - allFields.put(field.getName(), field); - } - - if (clazz.getSuperclass() != null) { - findAllFields(clazz.getSuperclass(), allFields); - } - } - - @Override - public OUT nextRecord(OUT record) throws IOException { - OUT returnRecord = null; - do { - returnRecord = super.nextRecord(record); - } while (returnRecord == null && !reachedEnd()); - - return returnRecord; - } - - @Override - public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException { - /* - * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n - */ - //Find windows end line, so find carriage return before the newline - if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes -1] == '\r' ) { - //reduce the number of bytes so that the Carriage return is not taken as data - numBytes--; - } - - if (commentPrefix != null && commentPrefix.length <= numBytes) { - //check record for comments - boolean isComment = true; - for (int i = 0; i < commentPrefix.length; i++) { - if (commentPrefix[i] != bytes[offset + i]) { - isComment = false; - break; - } - } - if (isComment) { - this.commentCount++; - return null; - } - } - - if (parseRecord(parsedValues, bytes, offset, numBytes)) { - if (pojoTypeClass == null) { - // result type is tuple - return createTuple(reuse); - } else { - // result type is POJO - for (int i = 0; i < parsedValues.length; i++) { - try { - pojoFields[i].set(reuse, parsedValues[i]); - } catch (IllegalAccessException e) { - throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e); - } - } - return reuse; - } - - } else { - this.invalidLineCount++; - return null; - } - } - - protected abstract OUT createTuple(OUT reuse); -} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java index 7d86f39a9295b..deb2ff9b7d397 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java @@ -18,32 +18,97 @@ package org.apache.flink.api.java.io; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.io.GenericCsvInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.tuple.Tuple; +import java.io.IOException; import org.apache.flink.core.fs.Path; import org.apache.flink.util.StringUtils; -public class CsvInputFormat extends CommonCsvInputFormat { +public abstract class CsvInputFormat extends GenericCsvInputFormat { private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + protected transient Object[] parsedValues; - public CsvInputFormat(Path filePath, CompositeType typeInformation) { - super(filePath, typeInformation); + protected CsvInputFormat(Path filePath) { + super(filePath); } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType typeInformation) { - super(filePath, lineDelimiter, fieldDelimiter, typeInformation); + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + @SuppressWarnings("unchecked") + FieldParser[] fieldParsers = (FieldParser[]) getFieldParsers(); + + //throw exception if no field parsers are available + if (fieldParsers.length == 0) { + throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); + } + + // create the value holders + this.parsedValues = new Object[fieldParsers.length]; + for (int i = 0; i < fieldParsers.length; i++) { + this.parsedValues[i] = fieldParsers[i].createValue(); + } + + // left to right evaluation makes access [0] okay + // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default + if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { + this.lineDelimiterIsLinebreak = true; + } + + this.commentCount = 0; + this.invalidLineCount = 0; } @Override - protected OUT createTuple(OUT reuse) { - Tuple result = (Tuple) reuse; - for (int i = 0; i < parsedValues.length; i++) { - result.setField(parsedValues[i], i); + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; + } + + public Class[] getFieldTypes() { + return super.getGenericFieldTypes(); + } + + protected static boolean[] createDefaultMask(int size) { + boolean[] includedMask = new boolean[size]; + for (int x=0; x DataSource pojoType(Class pojoType, String... pojoFields) { @SuppressWarnings("unchecked") PojoTypeInfo typeInfo = (PojoTypeInfo) TypeExtractor.createTypeInfo(pojoType); - CsvInputFormat inputFormat = new CsvInputFormat(path, typeInfo); - Class[] classes = new Class[pojoFields.length]; - for (int i = 0; i < pojoFields.length; i++) { - int pos = typeInfo.getFieldIndex(pojoFields[i]); - if(pos < 0) { - throw new IllegalArgumentException("Field \""+pojoFields[i]+"\" not part of POJO type "+pojoType.getCanonicalName()); - } - classes[i] = typeInfo.getPojoFieldAt(pos).getTypeInformation().getTypeClass(); - } + CsvInputFormat inputFormat = new PojoCsvInputFormat(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, pojoFields, this.includedMask); - configureInputFormat(inputFormat, classes); - inputFormat.setOrderOfPOJOFields(pojoFields); + configureInputFormat(inputFormat); return new DataSource(executionContext, inputFormat, typeInfo, Utils.getCallLocationName()); } @@ -325,14 +316,14 @@ public DataSource tupleType(Class targetType) { @SuppressWarnings("unchecked") TupleTypeInfo typeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(targetType); - CsvInputFormat inputFormat = new CsvInputFormat(path, typeInfo); + CsvInputFormat inputFormat = new TupleCsvInputFormat(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, this.includedMask); Class[] classes = new Class[typeInfo.getArity()]; for (int i = 0; i < typeInfo.getArity(); i++) { classes[i] = typeInfo.getTypeAt(i).getTypeClass(); } - configureInputFormat(inputFormat, classes); + configureInputFormat(inputFormat); return new DataSource(executionContext, inputFormat, typeInfo, Utils.getCallLocationName()); } @@ -340,7 +331,7 @@ public DataSource tupleType(Class targetType) { // Miscellaneous // -------------------------------------------------------------------------------------------- - private void configureInputFormat(CsvInputFormat format, Class... types) { + private void configureInputFormat(CsvInputFormat format) { format.setDelimiter(this.lineDelimiter); format.setFieldDelimiter(this.fieldDelimiter); format.setCommentPrefix(this.commentPrefix); @@ -349,12 +340,6 @@ private void configureInputFormat(CsvInputFormat format, Class... types) { if (this.parseQuotedStrings) { format.enableQuotedStringParsing(this.quoteCharacter); } - - if (this.includedMask == null) { - format.setFieldTypes(types); - } else { - format.setFields(this.includedMask, types); - } } // -------------------------------------------------------------------------------------------- @@ -374,8 +359,8 @@ private void configureInputFormat(CsvInputFormat format, Class... types) { */ public DataSource> types(Class type0) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -391,8 +376,8 @@ public DataSource> types(Class type0) { */ public DataSource> types(Class type0, Class type1) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -409,8 +394,8 @@ public DataSource> types(Class type0, Class type */ public DataSource> types(Class type0, Class type1, Class type2) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -428,8 +413,8 @@ public DataSource> types(Class type0, Class< */ public DataSource> types(Class type0, Class type1, Class type2, Class type3) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -448,8 +433,8 @@ public DataSource> types(Class type0 */ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -469,8 +454,8 @@ public DataSource> types(Class DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -491,8 +476,8 @@ public DataSource> types */ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -514,8 +499,8 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -538,8 +523,8 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -563,8 +548,8 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -589,8 +574,8 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -616,8 +601,8 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -644,8 +629,8 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -673,8 +658,8 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -703,8 +688,8 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -734,8 +719,8 @@ public DataSou */ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -766,8 +751,8 @@ public Da */ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -799,8 +784,8 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -833,8 +818,8 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -868,8 +853,8 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18, Class type19) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -904,8 +889,8 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18, Class type19, Class type20) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -941,8 +926,8 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18, Class type19, Class type20, Class type21) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -979,8 +964,8 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18, Class type19, Class type20, Class type21, Class type22) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -1018,8 +1003,8 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18, Class type19, Class type20, Class type21, Class type22, Class type23) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } @@ -1058,8 +1043,8 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18, Class type19, Class type20, Class type21, Class type22, Class type23, Class type24) { TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24); - CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); - configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types, this.includedMask); + configureInputFormat(inputFormat); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java new file mode 100644 index 0000000000000..b38a60583f096 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import com.google.common.base.Preconditions; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class PojoCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private Class pojoTypeClass; + + private String[] pojoFieldNames; + + private transient PojoTypeInfo pojoTypeInfo; + private transient Field[] pojoFields; + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo, String[] fieldNames) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, createDefaultMask(pojoTypeInfo.getArity())); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo pojoTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), createDefaultMask(pojoTypeInfo.getArity())); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames) { + this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, createDefaultMask(fieldNames.length)); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, pojoTypeInfo.getFieldNames(), toBooleanMask(includedFieldsMask)); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo pojoTypeInfo, int[] includedFieldsMask) { + this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) + ? createDefaultMask(fieldNames.length) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, mask); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo pojoTypeInfo, boolean[] includedFieldsMask) { + this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) { + + if (includedFieldsMask == null) { + includedFieldsMask = new boolean[fieldNames.length]; + for (int x = 0; x < includedFieldsMask.length; x++) { + includedFieldsMask[x] = true; + } + } + + for (String name : fieldNames) { + if (name == null) { + throw new NullPointerException("Field name must not be null."); + } + if (pojoTypeInfo.getFieldIndex(name) < 0) { + throw new IllegalArgumentException("Field \"" + name + "\" not part of POJO type " + pojoTypeInfo.getTypeClass().getCanonicalName()); + } + } + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[fieldNames.length]; + + for (int i = 0; i < fieldNames.length; i++) { + try { + classes[i] = pojoTypeInfo.getTypeAt(pojoTypeInfo.getFieldIndex(fieldNames[i])).getTypeClass(); + } catch (IndexOutOfBoundsException e) { + throw new IllegalArgumentException("Invalid field name: " + fieldNames[i]); + } + } + + this.pojoTypeClass = pojoTypeInfo.getTypeClass(); + this.pojoTypeInfo = pojoTypeInfo; + setFieldsGeneric(includedFieldsMask, classes); + setOrderOfPOJOFields(fieldNames); + } + + private void setOrderOfPOJOFields(String[] fieldNames) { + Preconditions.checkNotNull(fieldNames); + + int includedCount = 0; + for (boolean isIncluded : fieldIncluded) { + if (isIncluded) { + includedCount++; + } + } + + Preconditions.checkArgument(includedCount == fieldNames.length, includedCount + + " CSV fields and " + fieldNames.length + " POJO fields selected. The number of selected CSV and POJO fields must be equal."); + + for (String field : fieldNames) { + Preconditions.checkNotNull(field, "The field name cannot be null."); + Preconditions.checkArgument(pojoTypeInfo.getFieldIndex(field) != -1, + "Field \"" + field + "\" is not a member of POJO class " + pojoTypeClass.getName()); + } + + pojoFieldNames = Arrays.copyOfRange(fieldNames, 0, fieldNames.length); + } + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + pojoFields = new Field[pojoFieldNames.length]; + + Map allFields = new HashMap(); + + findAllFields(pojoTypeClass, allFields); + + for (int i = 0; i < pojoFieldNames.length; i++) { + pojoFields[i] = allFields.get(pojoFieldNames[i]); + + if (pojoFields[i] != null) { + pojoFields[i].setAccessible(true); + } else { + throw new RuntimeException("There is no field called \"" + pojoFieldNames[i] + "\" in " + pojoTypeClass.getName()); + } + } + } + + /** + * Finds all declared fields in a class and all its super classes. + * + * @param clazz Class for which all declared fields are found + * @param allFields Map containing all found fields so far + */ + private void findAllFields(Class clazz, Map allFields) { + for (Field field : clazz.getDeclaredFields()) { + allFields.put(field.getName(), field); + } + + if (clazz.getSuperclass() != null) { + findAllFields(clazz.getSuperclass(), allFields); + } + } + + @Override + public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException { + /* + * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n + */ + //Find windows end line, so find carriage return before the newline + if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') { + //reduce the number of bytes so that the Carriage return is not taken as data + numBytes--; + } + + if (commentPrefix != null && commentPrefix.length <= numBytes) { + //check record for comments + boolean isComment = true; + for (int i = 0; i < commentPrefix.length; i++) { + if (commentPrefix[i] != bytes[offset + i]) { + isComment = false; + break; + } + } + if (isComment) { + this.commentCount++; + return null; + } + } + + if (parseRecord(parsedValues, bytes, offset, numBytes)) { + for (int i = 0; i < parsedValues.length; i++) { + try { + pojoFields[i].set(reuse, parsedValues[i]); + } catch (IllegalAccessException e) { + throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e); + } + } + return reuse; + } else { + this.invalidLineCount++; + return null; + } + } + +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java new file mode 100644 index 0000000000000..485bd69b966bc --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.core.fs.Path; + +import java.io.IOException; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; + +public class TupleCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private TupleSerializerBase tupleSerializer; + + public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase tupleTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo); + } + + public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase tupleTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity())); + } + + public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase tupleTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); + } + + public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase tupleTypeInfo, int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) + ? createDefaultMask(tupleTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); + } + + public TupleCsvInputFormat(Path filePath, TupleTypeInfoBase tupleTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); + } + + public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase tupleTypeInfo, boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + TupleTypeInfoBase tupleTypeInfo, boolean[] includedFieldsMask) { + + if (tupleTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Tuple size must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity()); + } + + tupleSerializer = (TupleSerializerBase) tupleTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[tupleTypeInfo.getArity()]; + + for (int i = 0; i < tupleTypeInfo.getArity(); i++) { + classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException { + /* + * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n + */ + //Find windows end line, so find carriage return before the newline + if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') { + //reduce the number of bytes so that the Carriage return is not taken as data + numBytes--; + } + + if (commentPrefix != null && commentPrefix.length <= numBytes) { + //check record for comments + boolean isComment = true; + for (int i = 0; i < commentPrefix.length; i++) { + if (commentPrefix[i] != bytes[offset + i]) { + isComment = false; + break; + } + } + if (isComment) { + this.commentCount++; + return null; + } + } + + if (parseRecord(parsedValues, bytes, offset, numBytes)) { + return tupleSerializer.createOrReuseInstance(parsedValues, reuse); + } else { + this.invalidLineCount++; + return null; + } + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java index f306fe0d7c331..e9d5861a7400a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java @@ -469,19 +469,12 @@ private static void modifyCsvReader(File root) throws IOException { // create csv input format sb.append("\t\tCsvInputFormat> inputFormat = new CsvInputFormat> inputFormat = new TupleCsvInputFormat>(path, types);\n"); + sb.append(">>(path, types, this.includedMask);\n"); // configure input format - sb.append("\t\tconfigureInputFormat(inputFormat, "); - for (int i = 0; i < numFields; i++) { - if (i > 0) { - sb.append(", "); - } - sb.append("type" + i); - } - sb.append(");\n"); + sb.append("\t\tconfigureInputFormat(inputFormat);\n"); // return sb.append("\t\treturn new DataSource> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, "\n", "|", typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, "\n", "|", typeInfo); format.setLenient(true); final Configuration parameters = new Configuration(); @@ -121,7 +121,7 @@ public void ignoreSingleCharPrefixComments() { final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, "\n", "|", typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, "\n", "|", typeInfo); format.setCommentPrefix("#"); final Configuration parameters = new Configuration(); @@ -165,7 +165,7 @@ public void ignoreMultiCharPrefixComments() { final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, "\n", "|", typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, "\n", "|", typeInfo); format.setCommentPrefix("//"); final Configuration parameters = new Configuration(); @@ -202,7 +202,7 @@ public void readStringFields() { final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, "\n", "|", typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, "\n", "|", typeInfo); final Configuration parameters = new Configuration(); format.configure(parameters); @@ -245,7 +245,7 @@ public void readMixedQuotedStringFields() { final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, "\n", "|", typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, "\n", "|", typeInfo); final Configuration parameters = new Configuration(); format.configure(parameters); @@ -289,7 +289,7 @@ public void readStringFieldsWithTrailingDelimiters() { final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo); format.setFieldDelimiter("|-"); @@ -333,7 +333,7 @@ public void testIntegerFields() throws IOException { final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class, Integer.class, Integer.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo); format.setFieldDelimiter("|"); @@ -380,7 +380,7 @@ public void testEmptyFields() throws IOException { final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Short.class, Integer.class, Long.class, Float.class, Double.class, Byte.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo); format.setFieldDelimiter("|"); @@ -431,7 +431,7 @@ public void testDoubleFields() throws IOException { final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Double.class, Double.class, Double.class, Double.class, Double.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo); format.setFieldDelimiter("|"); @@ -472,7 +472,7 @@ public void testReadFirstN() throws IOException { final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo); format.setFieldDelimiter("|"); @@ -509,10 +509,9 @@ public void testReadSparseWithNullFieldsForTypes() throws IOException { final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo, new boolean[]{true, false, false, true, false, false, false, true}); format.setFieldDelimiter("|x|"); - format.setFieldTypes(Integer.class, null, null, Integer.class, null, null, null, Integer.class); format.configure(new Configuration()); format.open(split); @@ -547,13 +546,10 @@ public void testReadSparseWithPositionSetter() throws IOException { final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo, new int[]{0, 3, 7}); format.setFieldDelimiter("|"); - format.setFields(new int[]{0, 3, 7}, new Class[]{Integer.class, Integer.class, Integer.class}); - - format.configure(new Configuration()); format.open(split); @@ -579,7 +575,7 @@ public void testReadSparseWithPositionSetter() throws IOException { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } } - + @Test public void testReadSparseWithMask() throws IOException { try { @@ -588,13 +584,9 @@ public void testReadSparseWithMask() throws IOException { final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo, new boolean[]{true, false, false, true, false, false, false, true}); format.setFieldDelimiter("&&"); - - format.setFields(new boolean[]{true, false, false, true, false, false, false, true}, new - Class[]{Integer.class, - Integer.class, Integer.class}); format.configure(new Configuration()); format.open(split); @@ -621,27 +613,6 @@ public void testReadSparseWithMask() throws IOException { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } } - - @Test - public void testReadSparseWithShuffledPositions() throws IOException { - try { - final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, typeInfo); - - format.setFieldDelimiter("|"); - - try { - format.setFields(new int[] {8, 1, 3}, new Class[] {Integer.class, Integer.class, Integer.class}); - fail("Input sequence should have been rejected."); - } - catch (IllegalArgumentException e) { - // that is what we want - } - } - catch (Exception ex) { - fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); - } - } @Test public void testParseStringErrors() throws Exception { @@ -682,14 +653,11 @@ public void testParserCorrectness() throws Exception { final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class, String.class, String.class, Double.class); - final CsvInputFormat> format = new CsvInputFormat>(PATH, typeInfo); + final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo); format.setSkipFirstLineAsHeader(true); format.setFieldDelimiter(','); - format.setFields(new boolean[] { true, true, true, true, true }, new Class[] { - Integer.class, String.class, String.class, String.class, Double.class }); - format.configure(new Configuration()); format.open(split); @@ -764,7 +732,7 @@ private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreaker wrt.close(); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); - final CsvInputFormat> inputFormat = new CsvInputFormat>(new Path(tempFile.toURI().toString()), typeInfo); + final CsvInputFormat> inputFormat = new TupleCsvInputFormat>(new Path(tempFile.toURI().toString()), typeInfo); Configuration parameters = new Configuration(); inputFormat.configure(parameters); @@ -827,7 +795,7 @@ public void testPojoType() throws Exception { @SuppressWarnings("unchecked") PojoTypeInfo typeInfo = (PojoTypeInfo) TypeExtractor.createTypeInfo(PojoItem.class); - CsvInputFormat inputFormat = new CsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo); + CsvInputFormat inputFormat = new PojoCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo); inputFormat.configure(new Configuration()); FileInputSplit[] splits = inputFormat.createInputSplits(1); @@ -850,7 +818,7 @@ public void testPojoTypeWithPrivateField() throws Exception { @SuppressWarnings("unchecked") PojoTypeInfo typeInfo = (PojoTypeInfo) TypeExtractor.createTypeInfo(PrivatePojoItem.class); - CsvInputFormat inputFormat = new CsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo); + CsvInputFormat inputFormat = new PojoCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo); inputFormat.configure(new Configuration()); @@ -886,9 +854,7 @@ public void testPojoTypeWithMappingInformation() throws Exception { @SuppressWarnings("unchecked") PojoTypeInfo typeInfo = (PojoTypeInfo) TypeExtractor.createTypeInfo(PojoItem.class); - CsvInputFormat inputFormat = new CsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo); - inputFormat.setFields(new boolean[]{true, true, true, true}, new Class[]{Integer.class, Double.class, String.class, String.class}); - inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field3", "field2", "field4"}); + CsvInputFormat inputFormat = new PojoCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo, new String[]{"field1", "field3", "field2", "field4"}); inputFormat.configure(new Configuration()); FileInputSplit[] splits = inputFormat.createInputSplits(1); @@ -911,9 +877,7 @@ public void testPojoTypeWithPartialFieldInCSV() throws Exception { @SuppressWarnings("unchecked") PojoTypeInfo typeInfo = (PojoTypeInfo) TypeExtractor.createTypeInfo(PojoItem.class); - CsvInputFormat inputFormat = new CsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo); - inputFormat.setFields(new boolean[]{true, false, true, false, true, true}, new Class[]{Integer.class, String - .class, Double.class, String.class}); + CsvInputFormat inputFormat = new PojoCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo, new boolean[]{true, false, true, false, true, true}); inputFormat.configure(new Configuration()); FileInputSplit[] splits = inputFormat.createInputSplits(1); @@ -936,9 +900,7 @@ public void testPojoTypeWithMappingInfoAndPartialField() throws Exception { @SuppressWarnings("unchecked") PojoTypeInfo typeInfo = (PojoTypeInfo) TypeExtractor.createTypeInfo(PojoItem.class); - CsvInputFormat inputFormat = new CsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo); - inputFormat.setFields(new boolean[]{true, false, false, true}, new Class[]{Integer.class, String.class}); - inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field4"}); + CsvInputFormat inputFormat = new PojoCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo, new String[]{"field1", "field4"}, new boolean[]{true, false, false, true}); inputFormat.configure(new Configuration()); FileInputSplit[] splits = inputFormat.createInputSplits(1); @@ -960,24 +922,23 @@ public void testPojoTypeWithInvalidFieldMapping() throws Exception { @SuppressWarnings("unchecked") PojoTypeInfo typeInfo = (PojoTypeInfo) TypeExtractor.createTypeInfo(PojoItem.class); - CsvInputFormat inputFormat = new CsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo); try { - inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field2"}); + new PojoCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo, new String[]{"field1", "field2"}); fail("The number of POJO fields cannot be same as that of selected CSV fields"); } catch (IllegalArgumentException e) { // success } try { - inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field2", null, "field4"}); + new PojoCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo, new String[]{"field1", "field2", null, "field4"}); fail("Fields mapping cannot contain null."); } catch (NullPointerException e) { // success } try { - inputFormat.setOrderOfPOJOFields(new String[]{"field1", "field2", "field3", "field5"}); + new PojoCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo, new String[]{"field1", "field2", "field3", "field5"}); fail("Invalid field name"); } catch (IllegalArgumentException e) { // success @@ -997,13 +958,12 @@ public void testQuotedStringParsingWithIncludeFields() throws Exception { writer.write(fileContent); writer.close(); - CompositeType> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); - CsvInputFormat> inputFormat = new CsvInputFormat>(new Path(tempFile.toURI().toString()), typeInfo); + TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); + CsvInputFormat> inputFormat = new TupleCsvInputFormat>(new Path(tempFile.toURI().toString()), typeInfo, new boolean[]{true, false, true}); inputFormat.enableQuotedStringParsing('"'); inputFormat.setFieldDelimiter('|'); inputFormat.setDelimiter('\n'); - inputFormat.setFields(new boolean[]{true, false, true}, new Class[]{String.class, String.class}); inputFormat.configure(new Configuration()); FileInputSplit[] splits = inputFormat.createInputSplits(1); @@ -1029,7 +989,7 @@ public void testQuotedStringParsingWithEscapedQuotes() throws Exception { writer.close(); TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); - CsvInputFormat> inputFormat = new CsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo); + CsvInputFormat> inputFormat = new TupleCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo); inputFormat.enableQuotedStringParsing('"'); inputFormat.setFieldDelimiter('|'); @@ -1064,7 +1024,7 @@ public void testPojoSubclassType() throws Exception { @SuppressWarnings("unchecked") PojoTypeInfo typeInfo = (PojoTypeInfo)TypeExtractor.createTypeInfo(TwitterPOJO.class); - CsvInputFormat inputFormat = new CsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo); + CsvInputFormat inputFormat = new PojoCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo); inputFormat.configure(new Configuration()); FileInputSplit[] splits = inputFormat.createInputSplits(1); diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java index 26af3800e6dd1..cb01436cf8649 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/ReplicatingDataSourceTest.java @@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvInputFormat; +import org.apache.flink.api.java.io.TupleCsvInputFormat; import org.apache.flink.api.java.operators.DataSink; import org.apache.flink.api.java.operators.translation.JavaPlan; import org.apache.flink.api.java.tuple.Tuple1; @@ -58,7 +58,7 @@ public void checkJoinWithReplicatedSourceInput() { TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); ReplicatingInputFormat, FileInputSplit> rif = - new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), typeInfo)); + new ReplicatingInputFormat, FileInputSplit>(new TupleCsvInputFormat>(new Path("/some/path"), typeInfo)); DataSet> source1 = env.createInput(rif, new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO)); DataSet> source2 = env.readCsvFile("/some/otherpath").types(String.class); @@ -95,7 +95,7 @@ public void checkJoinWithReplicatedSourceInputBehindMap() { TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); ReplicatingInputFormat, FileInputSplit> rif = - new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), typeInfo)); + new ReplicatingInputFormat, FileInputSplit>(new TupleCsvInputFormat>(new Path("/some/path"), typeInfo)); DataSet> source1 = env.createInput(rif, new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO)); DataSet> source2 = env.readCsvFile("/some/otherpath").types(String.class); @@ -133,7 +133,7 @@ public void checkJoinWithReplicatedSourceInputBehindFilter() { TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); ReplicatingInputFormat, FileInputSplit> rif = - new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), typeInfo)); + new ReplicatingInputFormat, FileInputSplit>(new TupleCsvInputFormat>(new Path("/some/path"), typeInfo)); DataSet> source1 = env.createInput(rif, new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO)); DataSet> source2 = env.readCsvFile("/some/otherpath").types(String.class); @@ -171,7 +171,7 @@ public void checkJoinWithReplicatedSourceInputBehindFlatMap() { TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); ReplicatingInputFormat, FileInputSplit> rif = - new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), typeInfo)); + new ReplicatingInputFormat, FileInputSplit>(new TupleCsvInputFormat>(new Path("/some/path"), typeInfo)); DataSet> source1 = env.createInput(rif, new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO)); DataSet> source2 = env.readCsvFile("/some/otherpath").types(String.class); @@ -209,7 +209,7 @@ public void checkJoinWithReplicatedSourceInputBehindMapPartition() { TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); ReplicatingInputFormat, FileInputSplit> rif = - new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), typeInfo)); + new ReplicatingInputFormat, FileInputSplit>(new TupleCsvInputFormat>(new Path("/some/path"), typeInfo)); DataSet> source1 = env.createInput(rif, new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO)); DataSet> source2 = env.readCsvFile("/some/otherpath").types(String.class); @@ -247,7 +247,7 @@ public void checkJoinWithReplicatedSourceInputBehindMultiMaps() { TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); ReplicatingInputFormat, FileInputSplit> rif = - new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), typeInfo)); + new ReplicatingInputFormat, FileInputSplit>(new TupleCsvInputFormat>(new Path("/some/path"), typeInfo)); DataSet> source1 = env.createInput(rif, new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO)); DataSet> source2 = env.readCsvFile("/some/otherpath").types(String.class); @@ -288,7 +288,7 @@ public void checkCrossWithReplicatedSourceInput() { TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); ReplicatingInputFormat, FileInputSplit> rif = - new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), typeInfo)); + new ReplicatingInputFormat, FileInputSplit>(new TupleCsvInputFormat>(new Path("/some/path"), typeInfo)); DataSet> source1 = env.createInput(rif, new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO)); DataSet> source2 = env.readCsvFile("/some/otherpath").types(String.class); @@ -325,7 +325,7 @@ public void checkCrossWithReplicatedSourceInputBehindMap() { TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); ReplicatingInputFormat, FileInputSplit> rif = - new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), typeInfo)); + new ReplicatingInputFormat, FileInputSplit>(new TupleCsvInputFormat>(new Path("/some/path"), typeInfo)); DataSet> source1 = env.createInput(rif, new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO)); DataSet> source2 = env.readCsvFile("/some/otherpath").types(String.class); @@ -364,7 +364,7 @@ public void checkJoinWithReplicatedSourceInputChangingparallelism() { TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); ReplicatingInputFormat, FileInputSplit> rif = - new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), typeInfo)); + new ReplicatingInputFormat, FileInputSplit>(new TupleCsvInputFormat>(new Path("/some/path"), typeInfo)); DataSet> source1 = env.createInput(rif, new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO)); DataSet> source2 = env.readCsvFile("/some/otherpath").types(String.class); @@ -390,7 +390,7 @@ public void checkJoinWithReplicatedSourceInputBehindMapChangingparallelism() { TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); ReplicatingInputFormat, FileInputSplit> rif = - new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), typeInfo)); + new ReplicatingInputFormat, FileInputSplit>(new TupleCsvInputFormat>(new Path("/some/path"), typeInfo)); DataSet> source1 = env.createInput(rif, new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO)); DataSet> source2 = env.readCsvFile("/some/otherpath").types(String.class); @@ -417,7 +417,7 @@ public void checkJoinWithReplicatedSourceInputBehindReduce() { TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); ReplicatingInputFormat, FileInputSplit> rif = - new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), typeInfo)); + new ReplicatingInputFormat, FileInputSplit>(new TupleCsvInputFormat>(new Path("/some/path"), typeInfo)); DataSet> source1 = env.createInput(rif, new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO)); DataSet> source2 = env.readCsvFile("/some/otherpath").types(String.class); @@ -443,7 +443,7 @@ public void checkJoinWithReplicatedSourceInputBehindRebalance() { TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); ReplicatingInputFormat, FileInputSplit> rif = - new ReplicatingInputFormat, FileInputSplit>(new CsvInputFormat>(new Path("/some/path"), typeInfo)); + new ReplicatingInputFormat, FileInputSplit>(new TupleCsvInputFormat>(new Path("/some/path"), typeInfo)); DataSet> source1 = env.createInput(rif, new TupleTypeInfo>(BasicTypeInfo.STRING_TYPE_INFO)); DataSet> source2 = env.readCsvFile("/some/otherpath").types(String.class); diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java deleted file mode 100644 index 522cc0c7f3f1e..0000000000000 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.operators; - - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.io.CommonCsvInputFormat; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; -import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; -import org.apache.flink.core.fs.Path; -import org.apache.flink.util.StringUtils; - -public class ScalaCsvInputFormat extends CommonCsvInputFormat { - private static final long serialVersionUID = -7347888812778968640L; - - private final TupleSerializerBase tupleSerializer; - - public ScalaCsvInputFormat(Path filePath, CompositeType typeInfo) { - super(filePath, typeInfo); - - Preconditions.checkArgument(typeInfo instanceof PojoTypeInfo || typeInfo instanceof TupleTypeInfoBase, - "Only pojo types or tuple types are supported."); - - if (typeInfo instanceof TupleTypeInfoBase) { - TupleTypeInfoBase tupleTypeInfo = (TupleTypeInfoBase) typeInfo; - - tupleSerializer = (TupleSerializerBase)tupleTypeInfo.createSerializer(new ExecutionConfig()); - } else { - tupleSerializer = null; - } - } - - @Override - protected OUT createTuple(OUT reuse) { - Preconditions.checkNotNull(tupleSerializer, "The tuple serializer must be initialised." + - " It is not initialized if the given type was not a " + - TupleTypeInfoBase.class.getName() + "."); - - return tupleSerializer.createInstance(parsedValues); - } - - @Override - public String toString() { - return "Scala CSV Input (" + StringUtils.showControlCharacters(String.valueOf(getFieldDelimiter())) + ") " + getFilePath(); - } -} diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java index e29a6e6860dfe..afcdc4a9d22b8 100644 --- a/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java +++ b/flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvOutputFormat.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.io.CommonCsvInputFormat; +import org.apache.flink.api.java.io.CsvInputFormat; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.core.fs.Path; import org.apache.flink.types.StringValue; @@ -51,9 +51,9 @@ public class ScalaCsvOutputFormat extends FileOutputFormat // -------------------------------------------------------------------------------------------- - public static final String DEFAULT_LINE_DELIMITER = CommonCsvInputFormat.DEFAULT_LINE_DELIMITER; + public static final String DEFAULT_LINE_DELIMITER = CsvInputFormat.DEFAULT_LINE_DELIMITER; - public static final String DEFAULT_FIELD_DELIMITER = String.valueOf(CommonCsvInputFormat.DEFAULT_FIELD_DELIMITER); + public static final String DEFAULT_FIELD_DELIMITER = String.valueOf(CsvInputFormat.DEFAULT_FIELD_DELIMITER); // -------------------------------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index e27d55a7ae918..7436ac5d46ce1 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -31,7 +31,6 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo} import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv} import org.apache.flink.api.scala.hadoop.{mapred, mapreduce} -import org.apache.flink.api.scala.operators.ScalaCsvInputFormat import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.Path import org.apache.flink.types.StringValue @@ -297,55 +296,34 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { s"The type $typeInfo has to be a tuple or pojo type.", null) - val inputFormat = new ScalaCsvInputFormat[T]( - new Path(filePath), - typeInfo.asInstanceOf[CompositeType[T]]) - inputFormat.setDelimiter(lineDelimiter) - inputFormat.setFieldDelimiter(fieldDelimiter) - inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine) - inputFormat.setLenient(lenient) - inputFormat.setCommentPrefix(ignoreComments) + var inputFormat: CsvInputFormat[T] = null - if (quoteCharacter != null) { - inputFormat.enableQuotedStringParsing(quoteCharacter); - } - - val classesBuf: ArrayBuffer[Class[_]] = new ArrayBuffer[Class[_]] typeInfo match { case info: TupleTypeInfoBase[T] => - for (i <- 0 until info.getArity) { - classesBuf += info.getTypeAt(i).getTypeClass() - } + inputFormat = new TupleCsvInputFormat[T]( + new Path(filePath), + typeInfo.asInstanceOf[TupleTypeInfoBase[T]], + includedFields) case info: PojoTypeInfo[T] => if (pojoFields == null) { throw new IllegalArgumentException( "POJO fields must be specified (not null) if output type is a POJO.") - } else { - for (i <- 0 until pojoFields.length) { - val pos = info.getFieldIndex(pojoFields(i)) - if (pos < 0) { - throw new IllegalArgumentException( - "Field \"" + pojoFields(i) + "\" not part of POJO type " + - info.getTypeClass.getCanonicalName); - } - classesBuf += info.getPojoFieldAt(pos).getTypeInformation().getTypeClass - } } + inputFormat = new PojoCsvInputFormat[T]( + new Path(filePath), + typeInfo.asInstanceOf[PojoTypeInfo[T]], + pojoFields, + includedFields) case _ => throw new IllegalArgumentException("Type information is not valid.") } - - if (includedFields != null) { - require(classesBuf.size == includedFields.length, "Number of tuple fields and" + - " included fields must match.") - inputFormat.setFields(includedFields, classesBuf.toArray) - } else { - inputFormat.setFieldTypes(classesBuf: _*) - } - - if (pojoFields != null) { - inputFormat.setOrderOfPOJOFields(pojoFields) + if (quoteCharacter != null) { + inputFormat.enableQuotedStringParsing(quoteCharacter); } - + inputFormat.setDelimiter(lineDelimiter) + inputFormat.setFieldDelimiter(fieldDelimiter) + inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine) + inputFormat.setLenient(lenient) + inputFormat.setCommentPrefix(ignoreComments) wrap(new DataSource[T](javaEnv, inputFormat, typeInfo, getCallLocationName())) } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala index 3a015f783678b..3824f9accdd59 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala @@ -73,6 +73,10 @@ abstract class CaseClassSerializer[T <: Product]( } } + override def createOrReuseInstance(fields: Array[Object], reuse: T) : T = { + createInstance(fields) + } + def copy(from: T, reuse: T): T = { copy(from) } diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java index ca252f813668c..30cd97fa40206 100644 --- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java +++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java @@ -15,11 +15,10 @@ import java.io.IOException; import java.util.HashMap; -import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.CsvInputFormat; import org.apache.flink.api.java.io.PrintingOutputFormat; +import org.apache.flink.api.java.io.TupleCsvInputFormat; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.CrossOperator.DefaultCross; import org.apache.flink.api.java.operators.CrossOperator.ProjectCross; @@ -30,6 +29,7 @@ import org.apache.flink.api.java.operators.UdfOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint; @@ -258,13 +258,12 @@ protected OperationInfo createOperationInfo(Operation operationIdentifier) throw protected abstract INFO createOperationInfo(AbstractOperation operationIdentifier) throws IOException; private void createCsvSource(OperationInfo info) throws IOException { - if (!(info.types instanceof CompositeType)) { - throw new RuntimeException("The output type of a csv source has to be a tuple or a " + - "pojo type. The derived type is " + info); + if (!(info.types instanceof TupleTypeInfo)) { + throw new RuntimeException("The output type of a csv source has to be a tuple. The derived type is " + info); } - sets.put(info.setID, env.createInput(new CsvInputFormat(new Path(info.path), - info.lineDelimiter, info.fieldDelimiter, (CompositeType)info.types), info.types) + sets.put(info.setID, env.createInput(new TupleCsvInputFormat(new Path(info.path), + info.lineDelimiter, info.fieldDelimiter, (TupleTypeInfo)info.types), info.types) .name("CsvSource")); } diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala index e49f7374cd0f6..47f99ca80854b 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala @@ -21,10 +21,11 @@ import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.io.PojoCsvInputFormat +import org.apache.flink.api.java.io.TupleCsvInputFormat import org.apache.flink.api.java.io.CsvInputFormatTest.TwitterPOJO -import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo} +import org.apache.flink.api.java.typeutils.PojoTypeInfo import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.operators.ScalaCsvInputFormat import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.{FileInputSplit, Path} @@ -50,7 +51,7 @@ class CsvInputFormatTest { "a test|3|4.0|\n" + "#next|5|6.0|\n" val split = createTempFile(fileContent) - val format = new ScalaCsvInputFormat[(String, Integer, Double)]( + val format = new TupleCsvInputFormat[(String, Integer, Double)]( PATH, createTypeInformation[(String, Integer, Double)] .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]]) @@ -92,7 +93,7 @@ class CsvInputFormatTest { "a test|3|4.0|\n" + "//next|5|6.0|\n" val split = createTempFile(fileContent) - val format = new ScalaCsvInputFormat[(String, Integer, Double)]( + val format = new TupleCsvInputFormat[(String, Integer, Double)]( PATH, createTypeInformation[(String, Integer, Double)] .asInstanceOf[CaseClassTypeInfo[(String, Integer, Double)]]) @@ -130,7 +131,7 @@ class CsvInputFormatTest { try { val fileContent = "abc|def|ghijk\nabc||hhg\n|||" val split = createTempFile(fileContent) - val format = new ScalaCsvInputFormat[(String, String, String)]( + val format = new TupleCsvInputFormat[(String, String, String)]( PATH, createTypeInformation[(String, String, String)] .asInstanceOf[CaseClassTypeInfo[(String, String, String)]]) @@ -172,7 +173,7 @@ class CsvInputFormatTest { try { val fileContent = "abc|\"de|f\"|ghijk\n\"a|bc\"||hhg\n|||" val split = createTempFile(fileContent) - val format = new ScalaCsvInputFormat[(String, String, String)]( + val format = new TupleCsvInputFormat[(String, String, String)]( PATH, createTypeInformation[(String, String, String)] .asInstanceOf[CaseClassTypeInfo[(String, String, String)]]) @@ -215,7 +216,7 @@ class CsvInputFormatTest { try { val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n" val split = createTempFile(fileContent) - val format = new ScalaCsvInputFormat[(String, String, String)]( + val format = new TupleCsvInputFormat[(String, String, String)]( PATH, createTypeInformation[(String, String, String)] .asInstanceOf[CaseClassTypeInfo[(String, String, String)]]) @@ -256,7 +257,7 @@ class CsvInputFormatTest { try { val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n" val split = createTempFile(fileContent) - val format = new ScalaCsvInputFormat[(Int, Int, Int, Int, Int)]( + val format = new TupleCsvInputFormat[(Int, Int, Int, Int, Int)]( PATH, createTypeInformation[(Int, Int, Int, Int, Int)]. asInstanceOf[CaseClassTypeInfo[(Int, Int, Int, Int, Int)]]) format.setFieldDelimiter("|") @@ -293,7 +294,7 @@ class CsvInputFormatTest { val fileContent = "111|x|222|x|333|x|444|x|555|x|\n" + "666|x|777|x|888|x|999|x|000|x|\n" val split = createTempFile(fileContent) - val format = new ScalaCsvInputFormat[(Int, Int)]( + val format = new TupleCsvInputFormat[(Int, Int)]( PATH, createTypeInformation[(Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int)]]) format.setFieldDelimiter("|x|") @@ -317,19 +318,17 @@ class CsvInputFormatTest { fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage) } } - @Test def testReadSparseWithPositionSetter(): Unit = { try { val fileContent: String = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666" + "|555|444|333|222|111|" val split = createTempFile(fileContent) - val format = new ScalaCsvInputFormat[(Int, Int, Int)]( + val format = new TupleCsvInputFormat[(Int, Int, Int)]( PATH, - createTypeInformation[(Int, Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int, Int)]]) + createTypeInformation[(Int, Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int, Int)]], + Array(0, 3, 7)) format.setFieldDelimiter("|") - format.setFields(Array(0, 3, 7), - Array(classOf[Integer], classOf[Integer], classOf[Integer]): Array[Class[_]]) format.configure(new Configuration) format.open(split) var result: (Int, Int, Int) = null @@ -353,28 +352,6 @@ class CsvInputFormatTest { } } - @Test - def testReadSparseWithShuffledPositions(): Unit = { - try { - val format = new ScalaCsvInputFormat[(Int, Int, Int)]( - PATH, - createTypeInformation[(Int, Int, Int)].asInstanceOf[CaseClassTypeInfo[(Int, Int, Int)]]) - format.setFieldDelimiter("|") - try { - format.setFields(Array(8, 1, 3), - Array(classOf[Integer], classOf[Integer], classOf[Integer]): Array[Class[_]]) - fail("Input sequence should have been rejected.") - } - catch { - case e: IllegalArgumentException => // ignore - } - } - catch { - case ex: Exception => - fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage) - } - } - private def createTempFile(content: String): FileInputSplit = { val tempFile = File.createTempFile("test_contents", "tmp") tempFile.deleteOnExit() @@ -402,7 +379,7 @@ class CsvInputFormatTest { val wrt = new OutputStreamWriter(new FileOutputStream(tempFile)) wrt.write(fileContent) wrt.close() - val inputFormat = new ScalaCsvInputFormat[Tuple1[String]](new Path(tempFile.toURI.toString), + val inputFormat = new TupleCsvInputFormat[Tuple1[String]](new Path(tempFile.toURI.toString), createTypeInformation[Tuple1[String]].asInstanceOf[CaseClassTypeInfo[Tuple1[String]]]) val parameters = new Configuration inputFormat.configure(parameters) @@ -432,7 +409,7 @@ class CsvInputFormatTest { case class CaseClassItem(field1: Int, field2: String, field3: Double) - private def validatePOJOItem(format: ScalaCsvInputFormat[POJOItem]): Unit = { + private def validatePOJOItem(format: PojoCsvInputFormat[POJOItem]): Unit = { var result = new POJOItem() result = format.nextRecord(result) assertEquals(123, result.field1) @@ -445,7 +422,7 @@ class CsvInputFormatTest { assertEquals(1.234, result.field3, 0.001) } - private def validateCaseClassItem(format: ScalaCsvInputFormat[CaseClassItem]): Unit = { + private def validateCaseClassItem(format: TupleCsvInputFormat[CaseClassItem]): Unit = { var result = format.nextRecord(null) assertEquals(123, result.field1) assertEquals("HELLO", result.field2) @@ -463,7 +440,7 @@ class CsvInputFormatTest { val tempFile = createTempFile(fileContent) val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem] .asInstanceOf[PojoTypeInfo[POJOItem]] - val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo) + val format = new PojoCsvInputFormat[POJOItem](PATH, typeInfo) format.setDelimiter('\n') format.setFieldDelimiter(',') @@ -477,9 +454,10 @@ class CsvInputFormatTest { def testCaseClass(): Unit = { val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234" val tempFile = createTempFile(fileContent) - val typeInfo: CompositeType[CaseClassItem] = createTypeInformation[CaseClassItem] - .asInstanceOf[CompositeType[CaseClassItem]] - val format = new ScalaCsvInputFormat[CaseClassItem](PATH, typeInfo) + val typeInfo: CaseClassTypeInfo[CaseClassItem] = + createTypeInformation[CaseClassItem] + .asInstanceOf[CaseClassTypeInfo[CaseClassItem]] + val format = new TupleCsvInputFormat[CaseClassItem](PATH, typeInfo) format.setDelimiter('\n') format.setFieldDelimiter(',') @@ -495,12 +473,11 @@ class CsvInputFormatTest { val tempFile = createTempFile(fileContent) val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem] .asInstanceOf[PojoTypeInfo[POJOItem]] - val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo) + val format = new PojoCsvInputFormat[POJOItem]( + PATH, typeInfo, Array("field2", "field1", "field3")) format.setDelimiter('\n') format.setFieldDelimiter(',') - format.setFieldTypes(classOf[String], classOf[Integer], classOf[java.lang.Double]) - format.setOrderOfPOJOFields(Array("field2", "field1", "field3")) format.configure(new Configuration) format.open(tempFile) @@ -513,13 +490,12 @@ class CsvInputFormatTest { val tempFile = createTempFile(fileContent) val typeInfo: PojoTypeInfo[POJOItem] = createTypeInformation[POJOItem] .asInstanceOf[PojoTypeInfo[POJOItem]] - val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo) + val format = new PojoCsvInputFormat[POJOItem]( + PATH, typeInfo, Array("field2", "field1", "field3"), + Array(true, true, false, true, false)) format.setDelimiter('\n') format.setFieldDelimiter(',') - format.setFields(Array(true, true, false, true, false), - Array(classOf[String], classOf[Integer], classOf[java.lang.Double]): Array[Class[_]]) - format.setOrderOfPOJOFields(Array("field2", "field1", "field3")) format.configure(new Configuration) format.open(tempFile) @@ -532,7 +508,7 @@ class CsvInputFormatTest { val tempFile = createTempFile(fileContent) val typeInfo: PojoTypeInfo[TwitterPOJO] = createTypeInformation[TwitterPOJO] .asInstanceOf[PojoTypeInfo[TwitterPOJO]] - val format = new ScalaCsvInputFormat[TwitterPOJO](PATH, typeInfo) + val format = new PojoCsvInputFormat[TwitterPOJO](PATH, typeInfo) format.setDelimiter('\n') format.setFieldDelimiter(',') From 89a32e18abf9f547defe7869a058fd5abbabdb18 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 11 Nov 2015 12:48:42 +0100 Subject: [PATCH 2/3] fillRecord added / createDefaultMask used --- .../flink/api/java/io/CsvInputFormat.java | 36 ++++++++++++++ .../flink/api/java/io/PojoCsvInputFormat.java | 49 +++---------------- .../api/java/io/TupleCsvInputFormat.java | 33 +------------ 3 files changed, 46 insertions(+), 72 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java index deb2ff9b7d397..8f0aa64834f15 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java @@ -80,6 +80,42 @@ public OUT nextRecord(OUT record) throws IOException { return returnRecord; } + @Override + public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException { + /* + * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n + */ + //Find windows end line, so find carriage return before the newline + if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') { + //reduce the number of bytes so that the Carriage return is not taken as data + numBytes--; + } + + if (commentPrefix != null && commentPrefix.length <= numBytes) { + //check record for comments + boolean isComment = true; + for (int i = 0; i < commentPrefix.length; i++) { + if (commentPrefix[i] != bytes[offset + i]) { + isComment = false; + break; + } + } + if (isComment) { + this.commentCount++; + return null; + } + } + + if (parseRecord(parsedValues, bytes, offset, numBytes)) { + return fillRecord(reuse, parsedValues); + } else { + this.invalidLineCount++; + return null; + } + } + + protected abstract OUT fillRecord(OUT reuse, Object[] parsedValues); + public Class[] getFieldTypes() { return super.getGenericFieldTypes(); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java index b38a60583f096..2f1139c7f40af 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java @@ -95,10 +95,7 @@ public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelim private void configure(String lineDelimiter, String fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) { if (includedFieldsMask == null) { - includedFieldsMask = new boolean[fieldNames.length]; - for (int x = 0; x < includedFieldsMask.length; x++) { - includedFieldsMask[x] = true; - } + includedFieldsMask = createDefaultMask(fieldNames.length); } for (String name : fieldNames) { @@ -189,44 +186,14 @@ private void findAllFields(Class clazz, Map allFields) { } @Override - public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException { - /* - * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n - */ - //Find windows end line, so find carriage return before the newline - if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') { - //reduce the number of bytes so that the Carriage return is not taken as data - numBytes--; - } - - if (commentPrefix != null && commentPrefix.length <= numBytes) { - //check record for comments - boolean isComment = true; - for (int i = 0; i < commentPrefix.length; i++) { - if (commentPrefix[i] != bytes[offset + i]) { - isComment = false; - break; - } - } - if (isComment) { - this.commentCount++; - return null; - } - } - - if (parseRecord(parsedValues, bytes, offset, numBytes)) { - for (int i = 0; i < parsedValues.length; i++) { - try { - pojoFields[i].set(reuse, parsedValues[i]); - } catch (IllegalAccessException e) { - throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e); - } + public OUT fillRecord(OUT reuse, Object[] parsedValues) { + for (int i = 0; i < parsedValues.length; i++) { + try { + pojoFields[i].set(reuse, parsedValues[i]); + } catch (IllegalAccessException e) { + throw new RuntimeException("Parsed value could not be set in POJO field \"" + pojoFieldNames[i] + "\"", e); } - return reuse; - } else { - this.invalidLineCount++; - return null; } + return reuse; } - } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java index 485bd69b966bc..f544483d30193 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java @@ -85,36 +85,7 @@ private void configure(String lineDelimiter, String fieldDelimiter, } @Override - public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws IOException { - /* - * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n - */ - //Find windows end line, so find carriage return before the newline - if (this.lineDelimiterIsLinebreak == true && numBytes > 0 && bytes[offset + numBytes - 1] == '\r') { - //reduce the number of bytes so that the Carriage return is not taken as data - numBytes--; - } - - if (commentPrefix != null && commentPrefix.length <= numBytes) { - //check record for comments - boolean isComment = true; - for (int i = 0; i < commentPrefix.length; i++) { - if (commentPrefix[i] != bytes[offset + i]) { - isComment = false; - break; - } - } - if (isComment) { - this.commentCount++; - return null; - } - } - - if (parseRecord(parsedValues, bytes, offset, numBytes)) { - return tupleSerializer.createOrReuseInstance(parsedValues, reuse); - } else { - this.invalidLineCount++; - return null; - } + public OUT fillRecord(OUT reuse, Object[] parsedValues) { + return tupleSerializer.createOrReuseInstance(parsedValues, reuse); } } From 29b8a88066bcaa22aceda6eba87d8fa1d1a43bf4 Mon Sep 17 00:00:00 2001 From: zentol Date: Wed, 11 Nov 2015 13:31:17 +0100 Subject: [PATCH 3/3] import fix --- .../java/org/apache/flink/api/java/io/TupleCsvInputFormat.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java index f544483d30193..82caddd40c0b4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java @@ -19,7 +19,6 @@ import org.apache.flink.core.fs.Path; -import java.io.IOException; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;