From d62bf415c39f892914581e76921a73d4eefd3588 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Fri, 14 Jul 2017 10:31:34 +0200 Subject: [PATCH 1/4] [FLINK-7185] Activate checkstyle flink-java/io --- .../api/java/io/CollectionInputFormat.java | 40 +-- .../flink/api/java/io/CsvInputFormat.java | 17 +- .../flink/api/java/io/CsvOutputFormat.java | 26 +- .../apache/flink/api/java/io/CsvReader.java | 122 ++++++---- .../api/java/io/DiscardingOutputFormat.java | 1 - .../api/java/io/IteratorInputFormat.java | 12 +- .../java/io/LocalCollectionOutputFormat.java | 21 +- .../java/io/ParallelIteratorInputFormat.java | 20 +- .../flink/api/java/io/PojoCsvInputFormat.java | 5 + .../api/java/io/PrimitiveInputFormat.java | 5 +- .../api/java/io/PrintingOutputFormat.java | 30 +-- .../flink/api/java/io/RowCsvInputFormat.java | 3 + .../api/java/io/SplitDataProperties.java | 93 +++---- .../flink/api/java/io/TextInputFormat.java | 50 ++-- .../flink/api/java/io/TextOutputFormat.java | 48 ++-- .../api/java/io/TextValueInputFormat.java | 57 ++--- .../api/java/io/TupleCsvInputFormat.java | 9 +- .../java/io/TypeSerializerOutputFormat.java | 6 +- .../flink/api/java/io/CSVReaderTest.java | 90 +++---- .../java/io/CollectionInputFormatTest.java | 66 ++--- .../flink/api/java/io/CsvInputFormatTest.java | 227 +++++++++--------- .../api/java/io/CsvOutputFormatTest.java | 7 +- .../flink/api/java/io/FromElementsTest.java | 8 +- .../api/java/io/PrimitiveInputFormatTest.java | 35 ++- .../api/java/io/RowCsvInputFormatTest.java | 15 +- .../api/java/io/TextInputFormatTest.java | 129 +++++----- .../api/java/io/TypeSerializerFormatTest.java | 6 +- 27 files changed, 613 insertions(+), 535 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java index 90e6712fd3c8a..eebe56f8675f7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CollectionInputFormat.java @@ -18,14 +18,6 @@ package org.apache.flink.api.java.io; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.common.io.NonParallelInput; @@ -34,6 +26,14 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + /** * An input format that returns objects from a collection. */ @@ -55,7 +55,7 @@ public CollectionInputFormat(Collection dataSet, TypeSerializer serializer } this.serializer = serializer; - + this.dataSet = dataSet; } @@ -67,10 +67,10 @@ public boolean reachedEnd() throws IOException { @Override public void open(GenericInputSplit split) throws IOException { super.open(split); - + this.iterator = this.dataSet.iterator(); } - + @Override public T nextRecord(T record) throws IOException { return this.iterator.next(); @@ -80,10 +80,10 @@ public T nextRecord(T record) throws IOException { private void writeObject(ObjectOutputStream out) throws IOException { out.defaultWriteObject(); - + final int size = dataSet.size(); out.writeInt(size); - + if (size > 0) { DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out); for (T element : dataSet){ @@ -97,7 +97,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE int collectionLength = in.readInt(); List list = new ArrayList(collectionLength); - + if (collectionLength > 0) { try { DataInputViewStreamWrapper wrapper = new DataInputViewStreamWrapper(in); @@ -113,9 +113,9 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE dataSet = list; } - + // -------------------------------------------------------------------------------------------- - + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -136,14 +136,14 @@ public String toString() { sb.append(']'); return sb.toString(); } - + // -------------------------------------------------------------------------------------------- - + public static void checkCollection(Collection elements, Class viewedAs) { if (elements == null || viewedAs == null) { throw new NullPointerException(); } - + for (X elem : elements) { if (elem == null) { throw new IllegalArgumentException("The collection must not contain null elements."); @@ -157,7 +157,7 @@ public static void checkCollection(Collection elements, Class viewedAs if (!viewedAs.isAssignableFrom(elem.getClass()) && !(elem.getClass().toString().equals("class scala.runtime.BoxedUnit") && viewedAs.equals(void.class))) { - throw new IllegalArgumentException("The elements in the collection are not all subclasses of " + + throw new IllegalArgumentException("The elements in the collection are not all subclasses of " + viewedAs.getCanonicalName()); } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java index f1a16ea08338f..62e988b18d2e4 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java @@ -21,13 +21,18 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.io.GenericCsvInputFormat; import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; import org.apache.flink.types.parser.FieldParser; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; import java.io.IOException; -import org.apache.flink.core.fs.Path; -import org.apache.flink.util.StringUtils; +/** + * InputFormat thar reads csv files. + * + * @param + */ @Internal public abstract class CsvInputFormat extends GenericCsvInputFormat { @@ -38,7 +43,7 @@ public abstract class CsvInputFormat extends GenericCsvInputFormat { public static final String DEFAULT_FIELD_DELIMITER = ","; protected transient Object[] parsedValues; - + protected CsvInputFormat(Path filePath) { super(filePath); } @@ -63,7 +68,7 @@ public void open(FileInputSplit split) throws IOException { // left to right evaluation makes access [0] okay // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default - if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { + if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n') { this.lineDelimiterIsLinebreak = true; } @@ -123,7 +128,7 @@ public Class[] getFieldTypes() { protected static boolean[] createDefaultMask(int size) { boolean[] includedMask = new boolean[size]; - for (int x=0; x - * by default, null values are disallowed. + * + *

by default, null values are disallowed. * * @param allowNulls Flag to indicate whether the output format should accept null values. */ @@ -144,8 +145,8 @@ public void setCharsetName(String charsetName) { * Configures whether the output format should quote string values. String values are fields * of type {@link java.lang.String} and {@link org.apache.flink.types.StringValue}, as well as * all subclasses of the latter. - *

- * By default, strings are not quoted. + * + *

By default, strings are not quoted. * * @param quoteStrings Flag indicating whether string fields should be quoted. */ @@ -215,7 +216,6 @@ public String toString() { } /** - * * The purpose of this method is solely to check whether the data type to be processed * is in fact a tuple type. */ diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index ce2f4fa43d4ed..47f71740fa91f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -23,9 +23,32 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.operators.DataSource; -//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator -import org.apache.flink.api.java.tuple.*; -//CHECKSTYLE.ON: AvoidStarImport +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple10; +import org.apache.flink.api.java.tuple.Tuple11; +import org.apache.flink.api.java.tuple.Tuple12; +import org.apache.flink.api.java.tuple.Tuple13; +import org.apache.flink.api.java.tuple.Tuple14; +import org.apache.flink.api.java.tuple.Tuple15; +import org.apache.flink.api.java.tuple.Tuple16; +import org.apache.flink.api.java.tuple.Tuple17; +import org.apache.flink.api.java.tuple.Tuple18; +import org.apache.flink.api.java.tuple.Tuple19; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple20; +import org.apache.flink.api.java.tuple.Tuple21; +import org.apache.flink.api.java.tuple.Tuple22; +import org.apache.flink.api.java.tuple.Tuple23; +import org.apache.flink.api.java.tuple.Tuple24; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.tuple.Tuple9; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -44,14 +67,13 @@ public class CsvReader { private final Path path; - + private final ExecutionEnvironment executionContext; - - + protected boolean[] includedMask; - + protected String lineDelimiter = CsvInputFormat.DEFAULT_LINE_DELIMITER; - + protected String fieldDelimiter = CsvInputFormat.DEFAULT_FIELD_DELIMITER; protected String commentPrefix = null; //default: no comments @@ -61,35 +83,35 @@ public class CsvReader { protected char quoteCharacter = '"'; protected boolean skipFirstLineAsHeader = false; - + protected boolean ignoreInvalidLines = false; private String charset = "UTF-8"; - + // -------------------------------------------------------------------------------------------- - + public CsvReader(Path filePath, ExecutionEnvironment executionContext) { Preconditions.checkNotNull(filePath, "The file path may not be null."); Preconditions.checkNotNull(executionContext, "The execution context may not be null."); - + this.path = filePath; this.executionContext = executionContext; } - + public CsvReader(String filePath, ExecutionEnvironment executionContext) { this(new Path(Preconditions.checkNotNull(filePath, "The file path may not be null.")), executionContext); } - + public Path getFilePath() { return this.path; } - + // -------------------------------------------------------------------------------------------- - + /** * Configures the delimiter that separates the lines/rows. The linebreak character * ({@code '\n'}) is used by default. - * + * * @param delimiter The delimiter that separates the rows. * @return The CSV reader instance itself, to allow for fluent function chaining. */ @@ -97,15 +119,15 @@ public CsvReader lineDelimiter(String delimiter) { if (delimiter == null || delimiter.length() == 0) { throw new IllegalArgumentException("The delimiter must not be null or an empty string"); } - + this.lineDelimiter = delimiter; return this; } - + /** * Configures the delimiter that separates the fields within a row. The comma character * ({@code ','}) is used by default. - * + * * @param delimiter The delimiter that separates the fields in one row. * @return The CSV reader instance itself, to allow for fluent function chaining. * @@ -148,7 +170,7 @@ public CsvReader parseQuotedStrings(char quoteCharacter) { * Configures the string that starts comments. * By default comments will be treated as invalid lines. * This function only recognizes comments which start at the beginning of the line! - * + * * @param commentPrefix The string that starts the comments. * @return The CSV reader instance itself, to allow for fluent function chaining. */ @@ -156,7 +178,7 @@ public CsvReader ignoreComments(String commentPrefix) { if (commentPrefix == null || commentPrefix.length() == 0) { throw new IllegalArgumentException("The comment prefix must not be null or an empty string"); } - + this.commentPrefix = commentPrefix; return this; } @@ -172,7 +194,7 @@ public String getCharset() { } /** - * Sets the charset of the reader + * Sets the charset of the reader. * * @param charset The character set to set. */ @@ -189,7 +211,7 @@ public void setCharset(String charset) { * the boolean array is {@code true}. * The number of fields in the result is consequently equal to the number of times that {@code true} * occurs in the fields array. - * + * * @param fields The array of flags that describes which fields are to be included and which not. * @return The CSV reader instance itself, to allow for fluent function chaining. */ @@ -197,14 +219,14 @@ public CsvReader includeFields(boolean ... fields) { if (fields == null || fields.length == 0) { throw new IllegalArgumentException("The set of included fields must not be null or empty."); } - + int lastTruePos = -1; for (int i = 0; i < fields.length; i++) { if (fields[i]) { lastTruePos = i; } } - + if (lastTruePos == -1) { throw new IllegalArgumentException("The description of fields to parse excluded all fields. At least one fields must be included."); } @@ -225,13 +247,13 @@ public CsvReader includeFields(boolean ... fields) { * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value * {@code false}). The result contains the fields where the corresponding position in * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}). - * + * * @param mask The string mask defining which fields to include and which to skip. * @return The CSV reader instance itself, to allow for fluent function chaining. */ public CsvReader includeFields(String mask) { boolean[] includedMask = new boolean[mask.length()]; - + for (int i = 0; i < mask.length(); i++) { char c = mask.charAt(i); if (c == '1' || c == 'T' || c == 't') { @@ -240,10 +262,10 @@ public CsvReader includeFields(String mask) { throw new IllegalArgumentException("Mask string may contain only '0' and '1'."); } } - + return includeFields(includedMask); } - + /** * Configures which fields of the CSV file should be included and which should be skipped. The * bits in the value (read from least significant to most significant) define whether the field at @@ -252,14 +274,14 @@ public CsvReader includeFields(String mask) { * non-zero bit. * The parser will skip over all fields where the character at the corresponding bit is zero, and * include the fields where the corresponding bit is one. - *

- * Examples: + * + *

Examples: *

    *
  • A mask of {@code 0x7} would include the first three fields.
  • *
  • A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields * two and three, skip fields four and five, and include field six.
  • *
- * + * * @param mask The bit mask defining which fields to include and which to skip. * @return The CSV reader instance itself, to allow for fluent function chaining. */ @@ -267,36 +289,36 @@ public CsvReader includeFields(long mask) { if (mask == 0) { throw new IllegalArgumentException("The description of fields to parse excluded all fields. At least one fields must be included."); } - + ArrayList fields = new ArrayList(); while (mask != 0) { fields.add((mask & 0x1L) != 0); mask >>>= 1; } - + boolean[] fieldsArray = new boolean[fields.size()]; for (int i = 0; i < fieldsArray.length; i++) { fieldsArray[i] = fields.get(i); } - + return includeFields(fieldsArray); } /** * Sets the CSV reader to ignore the first line. This is useful for files that contain a header line. - * + * * @return The CSV reader instance itself, to allow for fluent function chaining. */ public CsvReader ignoreFirstLine() { skipFirstLineAsHeader = true; return this; } - + /** - * Sets the CSV reader to ignore any invalid lines. + * Sets the CSV reader to ignore any invalid lines. * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise. - * + * * @return The CSV reader instance itself, to allow for fluent function chaining. */ public CsvReader ignoreInvalidLines(){ @@ -325,12 +347,12 @@ public DataSource pojoType(Class pojoType, String... pojoFields) { return new DataSource(executionContext, inputFormat, typeInfo, Utils.getCallLocationName()); } - + /** * Configures the reader to read the CSV data and parse it to the given type. The type must be a subclass of * {@link Tuple}. The type information for the fields is obtained from the type class. The type * consequently needs to specify all generic field types of the tuple. - * + * * @param targetType The class of the target type, needs to be a subclass of Tuple. * @return The DataSet representing the parsed CSV data. */ @@ -339,24 +361,24 @@ public DataSource tupleType(Class targetType) { if (!Tuple.class.isAssignableFrom(targetType)) { throw new IllegalArgumentException("The target type must be a subclass of " + Tuple.class.getName()); } - + @SuppressWarnings("unchecked") TupleTypeInfo typeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(targetType); CsvInputFormat inputFormat = new TupleCsvInputFormat(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, this.includedMask); - + Class[] classes = new Class[typeInfo.getArity()]; for (int i = 0; i < typeInfo.getArity(); i++) { classes[i] = typeInfo.getTypeAt(i).getTypeClass(); } - + configureInputFormat(inputFormat); return new DataSource(executionContext, inputFormat, typeInfo, Utils.getCallLocationName()); } - + // -------------------------------------------------------------------------------------------- // Miscellaneous // -------------------------------------------------------------------------------------------- - + private void configureInputFormat(CsvInputFormat format) { format.setCharset(this.charset); format.setDelimiter(this.lineDelimiter); @@ -368,10 +390,10 @@ private void configureInputFormat(CsvInputFormat format) { format.enableQuotedStringParsing(this.quoteCharacter); } } - - // -------------------------------------------------------------------------------------------- + + // -------------------------------------------------------------------------------------------- // The following lines are generated. - // -------------------------------------------------------------------------------------------- + // -------------------------------------------------------------------------------------------- // BEGIN_OF_TUPLE_DEPENDENT_CODE // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java index f01d86475c873..7358b14bf4028 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/DiscardingOutputFormat.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.java.io; import org.apache.flink.annotation.Public; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java index cb8bd6a0cb51a..05f3ccd98b5ae 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/IteratorInputFormat.java @@ -16,16 +16,15 @@ * limitations under the License. */ - package org.apache.flink.api.java.io; -import java.io.Serializable; -import java.util.Iterator; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.api.common.io.NonParallelInput; +import java.io.Serializable; +import java.util.Iterator; + /** * An input format that returns objects from an iterator. */ @@ -35,13 +34,12 @@ public class IteratorInputFormat extends GenericInputFormat implements Non private static final long serialVersionUID = 1L; private Iterator iterator; // input data as serializable iterator - - + public IteratorInputFormat(Iterator iterator) { if (!(iterator instanceof Serializable)) { throw new IllegalArgumentException("The data source iterator must be serializable."); } - + this.iterator = iterator; } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java index 65ed6c33a59dc..bcb1cf1060d7e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/LocalCollectionOutputFormat.java @@ -18,13 +18,6 @@ package org.apache.flink.api.java.io; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.RichOutputFormat; @@ -33,15 +26,21 @@ import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.configuration.Configuration; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + /** - * An output format that writes record into collection + * An output format that adds records to a collection. */ @PublicEvolving public class LocalCollectionOutputFormat extends RichOutputFormat implements InputTypeConfigurable { private static final long serialVersionUID = 1L; - private static Map> RESULT_HOLDER = new HashMap>(); + private static final Map> RESULT_HOLDER = new HashMap>(); private transient ArrayList taskResult; @@ -67,7 +66,6 @@ private int generateRandomId() { @Override public void configure(Configuration parameters) {} - @Override public void open(int taskNumber, int numTasks) throws IOException { this.taskResult = new ArrayList(); @@ -80,7 +78,6 @@ public void writeRecord(T record) throws IOException { this.taskResult.add(recordCopy); } - @Override public void close() throws IOException { synchronized (RESULT_HOLDER) { @@ -93,6 +90,6 @@ public void close() throws IOException { @Override @SuppressWarnings("unchecked") public void setInputType(TypeInformation type, ExecutionConfig executionConfig) { - this.typeSerializer = (TypeSerializer)type.createSerializer(executionConfig); + this.typeSerializer = (TypeSerializer) type.createSerializer(executionConfig); } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java index a6ac853ac4930..25e25b451e096 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/ParallelIteratorInputFormat.java @@ -18,14 +18,13 @@ package org.apache.flink.api.java.io; -import java.io.IOException; -import java.util.Iterator; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.io.GenericInputFormat; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.util.SplittableIterator; +import java.io.IOException; +import java.util.Iterator; /** * An input format that generates data in parallel through a {@link SplittableIterator}. @@ -34,25 +33,22 @@ public class ParallelIteratorInputFormat extends GenericInputFormat { private static final long serialVersionUID = 1L; - - + private final SplittableIterator source; - + private transient Iterator splitIterator; - - - + public ParallelIteratorInputFormat(SplittableIterator iterator) { this.source = iterator; } - + @Override public void open(GenericInputSplit split) throws IOException { super.open(split); - + this.splitIterator = this.source.getSplit(split.getSplitNumber(), split.getTotalNumberOfSplits()); } - + @Override public boolean reachedEnd() { return !this.splitIterator.hasNext(); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java index 990e9e6c83a29..804d02b22f979 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.io; import org.apache.flink.annotation.Internal; @@ -29,6 +30,10 @@ import java.util.HashMap; import java.util.Map; +/** + * Input format that reads csv into POJOs. + * @param resulting POJO type + */ @Internal public class PojoCsvInputFormat extends CsvInputFormat { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java index d4547658af082..794703b9f7cd1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java @@ -44,7 +44,6 @@ public class PrimitiveInputFormat extends DelimitedInputFormat { private transient FieldParser parser; - public PrimitiveInputFormat(Path filePath, Class primitiveClass) { super(filePath, null); this.primitiveClass = primitiveClass; @@ -70,7 +69,7 @@ public void open(FileInputSplit split) throws IOException { public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IOException { // Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == NEW_LINE - && offset+numBytes >= 1 && bytes[offset+numBytes-1] == CARRIAGE_RETURN){ + && offset + numBytes >= 1 && bytes[offset + numBytes - 1] == CARRIAGE_RETURN) { numBytes -= 1; } @@ -79,7 +78,7 @@ public OT readRecord(OT reuse, byte[] bytes, int offset, int numBytes) throws IO return parser.getLastResult(); } else { String s = new String(bytes, offset, numBytes, getCharset()); - throw new IOException("Could not parse value: \""+s+"\" as type "+primitiveClass.getSimpleName()); + throw new IOException("Could not parse value: \"" + s + "\" as type " + primitiveClass.getSimpleName()); } } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java index a010fd864756b..cc95690f3eebf 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java @@ -18,12 +18,16 @@ package org.apache.flink.api.java.io; -import java.io.PrintStream; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; +import java.io.PrintStream; + +/** + * Output format that prints results into underlying {@link PrintStream}. + * @param + */ @PublicEvolving public class PrintingOutputFormat extends RichOutputFormat { @@ -37,19 +41,19 @@ public class PrintingOutputFormat extends RichOutputFormat { private boolean target; private transient PrintStream stream; - + private transient String prefix; - + // -------------------------------------------------------------------------------------------- - + /** * Instantiates a printing output format that prints to standard out. */ public PrintingOutputFormat() {} - + /** * Instantiates a printing output format that prints to standard out. - * + * * @param stdErr True, if the format should print to standard error instead of standard out. */ public PrintingOutputFormat(boolean stdErr) { @@ -65,20 +69,18 @@ public PrintingOutputFormat(String sinkIdentifier, boolean stdErr) { this(stdErr); this.sinkIdentifier = sinkIdentifier; } - + public void setTargetToStandardOut() { this.target = STD_OUT; } - + public void setTargetToStandardErr() { this.target = STD_ERR; } - - + @Override public void configure(Configuration parameters) {} - @Override public void open(int taskNumber, int numTasks) { // get the target stream @@ -116,9 +118,9 @@ public void close() { this.prefix = null; this.sinkIdentifier = null; } - + // -------------------------------------------------------------------------------------------- - + @Override public String toString() { return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java index b752966705890..15ef90ec56032 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java @@ -29,6 +29,9 @@ import java.util.Arrays; +/** + * Input format that reads csv into {@link Row}. + */ @PublicEvolving public class RowCsvInputFormat extends CsvInputFormat implements ResultTypeQueryable { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java index db09380525588..9db90639dac71 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java @@ -22,11 +22,11 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.operators.GenericDataSourceBase; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.common.operators.Keys; import java.util.Arrays; @@ -34,7 +34,7 @@ * SplitDataProperties define data properties on {@link org.apache.flink.core.io.InputSplit} * generated by the {@link org.apache.flink.api.common.io.InputFormat} of a {@link DataSource}. * - * InputSplits are units of input which are distributed among and assigned to parallel data source subtasks. + *

InputSplits are units of input which are distributed among and assigned to parallel data source subtasks. * SplitDataProperties can define that the elements which are generated by the associated InputFormat * are *

    @@ -46,7 +46,7 @@ * are in the defined order. *
* - * IMPORTANT: SplitDataProperties can improve the execution of a program because certain + *

IMPORTANT: SplitDataProperties can improve the execution of a program because certain * data reorganization steps such as shuffling or sorting can be avoided. * HOWEVER, if SplitDataProperties are not correctly defined, the result of the program might be wrong! * @@ -90,8 +90,9 @@ public SplitDataProperties(DataSource source) { /** * Defines that data is partitioned across input splits on the fields defined by field positions. * All records sharing the same key (combination) must be contained in a single input split. - *
- * + * + * + *

* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! * * @@ -106,8 +107,9 @@ public SplitDataProperties splitsPartitionedBy(int... partitionFields) { * Defines that data is partitioned using a specific partitioning method * across input splits on the fields defined by field positions. * All records sharing the same key (combination) must be contained in a single input split. - *
- * + * + * + *

* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! * * @@ -137,8 +139,9 @@ public SplitDataProperties splitsPartitionedBy(String partitionMethodId, int. * Defines that data is partitioned across input splits on the fields defined by field expressions. * Multiple field expressions must be separated by the semicolon ';' character. * All records sharing the same key (combination) must be contained in a single input split. - *
- * + * + * + *

* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! * * @@ -154,8 +157,9 @@ public SplitDataProperties splitsPartitionedBy(String partitionFields) { * across input splits on the fields defined by field expressions. * Multiple field expressions must be separated by the semicolon ';' character. * All records sharing the same key (combination) must be contained in a single input split. - *
- * + * + * + *

* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! * * @@ -165,7 +169,7 @@ public SplitDataProperties splitsPartitionedBy(String partitionFields) { */ public SplitDataProperties splitsPartitionedBy(String partitionMethodId, String partitionFields) { - if(partitionFields == null) { + if (partitionFields == null) { throw new InvalidProgramException("PartitionFields may not be null."); } @@ -175,7 +179,7 @@ public SplitDataProperties splitsPartitionedBy(String partitionMethodId, Stri } this.splitPartitionKeys = getAllFlatKeys(partitionKeysA); - if(partitionMethodId != null) { + if (partitionMethodId != null) { this.splitPartitioner = new SourcePartitionerMarker<>(partitionMethodId); } else { @@ -189,8 +193,9 @@ public SplitDataProperties splitsPartitionedBy(String partitionMethodId, Stri * Defines that the data within an input split is grouped on the fields defined by the field positions. * All records sharing the same key (combination) must be subsequently emitted by the input * format for each input split. - *
- * + * + * + *

* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! * * @@ -199,13 +204,13 @@ public SplitDataProperties splitsPartitionedBy(String partitionMethodId, Stri */ public SplitDataProperties splitsGroupedBy(int... groupFields) { - if(groupFields == null) { + if (groupFields == null) { throw new InvalidProgramException("GroupFields may not be null."); } else if (groupFields.length == 0) { throw new InvalidProgramException("GroupFields may not be empty."); } - if(this.splitOrdering != null) { + if (this.splitOrdering != null) { throw new InvalidProgramException("DataSource may either be grouped or sorted."); } @@ -219,8 +224,9 @@ public SplitDataProperties splitsGroupedBy(int... groupFields) { * Multiple field expressions must be separated by the semicolon ';' character. * All records sharing the same key (combination) must be subsequently emitted by the input * format for each input split. - *
- * + * + * + *

* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! * * @@ -229,7 +235,7 @@ public SplitDataProperties splitsGroupedBy(int... groupFields) { */ public SplitDataProperties splitsGroupedBy(String groupFields) { - if(groupFields == null) { + if (groupFields == null) { throw new InvalidProgramException("GroupFields may not be null."); } @@ -238,7 +244,7 @@ public SplitDataProperties splitsGroupedBy(String groupFields) { throw new InvalidProgramException("GroupFields may not be empty."); } - if(this.splitOrdering != null) { + if (this.splitOrdering != null) { throw new InvalidProgramException("DataSource may either be grouped or sorted."); } @@ -251,8 +257,9 @@ public SplitDataProperties splitsGroupedBy(String groupFields) { * Defines that the data within an input split is sorted on the fields defined by the field positions * in the specified orders. * All records of an input split must be emitted by the input format in the defined order. - *
- * + * + * + *

* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! * * @@ -262,7 +269,7 @@ public SplitDataProperties splitsGroupedBy(String groupFields) { */ public SplitDataProperties splitsOrderedBy(int[] orderFields, Order[] orders) { - if(orderFields == null || orders == null) { + if (orderFields == null || orders == null) { throw new InvalidProgramException("OrderFields or Orders may not be null."); } else if (orderFields.length == 0) { throw new InvalidProgramException("OrderFields may not be empty."); @@ -272,17 +279,17 @@ public SplitDataProperties splitsOrderedBy(int[] orderFields, Order[] orders) throw new InvalidProgramException("Number of OrderFields and Orders must match."); } - if(this.splitGroupKeys != null) { + if (this.splitGroupKeys != null) { throw new InvalidProgramException("DataSource may either be grouped or sorted."); } this.splitOrdering = new Ordering(); - for(int i=0; i splitsOrderedBy(int[] orderFields, Order[] orders) } } // append key - this.splitOrdering.appendOrdering(key, null, orders[i] ); + this.splitOrdering.appendOrdering(key, null, orders[i]); } } return this; @@ -300,8 +307,9 @@ public SplitDataProperties splitsOrderedBy(int[] orderFields, Order[] orders) * Defines that the data within an input split is sorted on the fields defined by the field expressions * in the specified orders. Multiple field expressions must be separated by the semicolon ';' character. * All records of an input split must be emitted by the input format in the defined order. - *
- * + * + * + *

* IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! * * @@ -311,7 +319,7 @@ public SplitDataProperties splitsOrderedBy(int[] orderFields, Order[] orders) */ public SplitDataProperties splitsOrderedBy(String orderFields, Order[] orders) { - if(orderFields == null || orders == null) { + if (orderFields == null || orders == null) { throw new InvalidProgramException("OrderFields or Orders may not be null."); } @@ -324,18 +332,18 @@ public SplitDataProperties splitsOrderedBy(String orderFields, Order[] orders throw new InvalidProgramException("Number of OrderFields and Orders must match."); } - if(this.splitGroupKeys != null) { + if (this.splitGroupKeys != null) { throw new InvalidProgramException("DataSource may either be grouped or sorted."); } this.splitOrdering = new Ordering(); - for(int i=0; i ek = new Keys.ExpressionKeys<>(keyExp, this.type); int[] flatKeys = ek.computeLogicalKeyPositions(); - for(int key : flatKeys) { + for (int key : flatKeys) { // check for duplicates for (int okey : splitOrdering.getFieldPositions()) { if (key == okey) { @@ -343,7 +351,7 @@ public SplitDataProperties splitsOrderedBy(String orderFields, Order[] orders } } // append key - this.splitOrdering.appendOrdering(key, null, orders[i] ); + this.splitOrdering.appendOrdering(key, null, orders[i]); } } return this; @@ -365,25 +373,24 @@ public Ordering getSplitOrder() { return this.splitOrdering; } - /////////////////////// FLAT FIELD EXTRACTION METHODS private int[] getAllFlatKeys(String[] fieldExpressions) { int[] allKeys = null; - for(String keyExp : fieldExpressions) { + for (String keyExp : fieldExpressions) { Keys.ExpressionKeys ek = new Keys.ExpressionKeys<>(keyExp, this.type); int[] flatKeys = ek.computeLogicalKeyPositions(); - if(allKeys == null) { + if (allKeys == null) { allKeys = flatKeys; } else { // check for duplicates - for(int key1 : flatKeys) { - for(int key2 : allKeys) { - if(key1 == key2) { - throw new InvalidProgramException("Duplicate fields in field expression "+keyExp); + for (int key1 : flatKeys) { + for (int key2 : allKeys) { + if (key1 == key2) { + throw new InvalidProgramException("Duplicate fields in field expression " + keyExp); } } } @@ -425,7 +432,7 @@ public int partition(T key, int numPartitions) { @Override public boolean equals(Object o) { - if(o instanceof SourcePartitionerMarker) { + if (o instanceof SourcePartitionerMarker) { return this.partitionMarker.equals(((SourcePartitionerMarker) o).partitionMarker); } else { return false; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java index b2554bf9be45a..7d4cf9c42025c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java @@ -18,61 +18,63 @@ package org.apache.flink.api.java.io; -import java.io.IOException; -import java.nio.charset.Charset; - import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.io.DelimitedInputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; +import java.io.IOException; +import java.nio.charset.Charset; + +/** + * Input Format that reads text files. Each line results in another element. + */ @PublicEvolving public class TextInputFormat extends DelimitedInputFormat { - + private static final long serialVersionUID = 1L; - + /** - * Code of \r, used to remove \r from a line when the line ends with \r\n + * Code of \r, used to remove \r from a line when the line ends with \r\n. */ private static final byte CARRIAGE_RETURN = (byte) '\r'; /** - * Code of \n, used to identify if \n is used as delimiter + * Code of \n, used to identify if \n is used as delimiter. */ private static final byte NEW_LINE = (byte) '\n'; - - + /** * The name of the charset to use for decoding. */ private String charsetName = "UTF-8"; - + // -------------------------------------------------------------------------------------------- - + public TextInputFormat(Path filePath) { super(filePath, null); } - - // -------------------------------------------------------------------------------------------- - + + // -------------------------------------------------------------------------------------------- + public String getCharsetName() { return charsetName; } - + public void setCharsetName(String charsetName) { if (charsetName == null) { throw new IllegalArgumentException("Charset must not be null."); } - + this.charsetName = charsetName; } - + // -------------------------------------------------------------------------------------------- @Override public void configure(Configuration parameters) { super.configure(parameters); - + if (charsetName == null || !Charset.isSupported(charsetName)) { throw new RuntimeException("Unsupported charset: " + charsetName); } @@ -83,17 +85,17 @@ public void configure(Configuration parameters) { @Override public String readRecord(String reusable, byte[] bytes, int offset, int numBytes) throws IOException { //Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line - if (this.getDelimiter() != null && this.getDelimiter().length == 1 - && this.getDelimiter()[0] == NEW_LINE && offset+numBytes >= 1 - && bytes[offset+numBytes-1] == CARRIAGE_RETURN){ + if (this.getDelimiter() != null && this.getDelimiter().length == 1 + && this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1 + && bytes[offset + numBytes - 1] == CARRIAGE_RETURN){ numBytes -= 1; } - + return new String(bytes, offset, numBytes, this.charsetName); } - + // -------------------------------------------------------------------------------------------- - + @Override public String toString() { return "TextInputFormat (" + getFilePath() + ") - " + this.charsetName; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java index d466082d38927..a07c7f199ac62 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java @@ -18,65 +18,73 @@ package org.apache.flink.api.java.io; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.io.FileOutputFormat; +import org.apache.flink.core.fs.Path; + import java.io.IOException; import java.io.Serializable; import java.nio.charset.Charset; import java.nio.charset.IllegalCharsetNameException; import java.nio.charset.UnsupportedCharsetException; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.io.FileOutputFormat; -import org.apache.flink.core.fs.Path; - +/** + * {@link FileOutputFormat} that stores values by calling {@link Object#toString()} method. + * @param type of elements + */ @PublicEvolving public class TextOutputFormat extends FileOutputFormat { private static final long serialVersionUID = 1L; - + private static final int NEWLINE = '\n'; private String charsetName; - + private transient Charset charset; // -------------------------------------------------------------------------------------------- - public static interface TextFormatter extends Serializable { - public String format(IN value); + + /** + * Formatter that transforms values into its {@link String} representations. + * @param type of input elements + */ + public interface TextFormatter extends Serializable { + String format(IN value); } public TextOutputFormat(Path outputPath) { this(outputPath, "UTF-8"); } - + public TextOutputFormat(Path outputPath, String charset) { super(outputPath); this.charsetName = charset; } - - + public String getCharsetName() { return charsetName; } - + public void setCharsetName(String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { if (charsetName == null) { throw new NullPointerException(); } - + if (!Charset.isSupported(charsetName)) { throw new UnsupportedCharsetException("The charset " + charsetName + " is not supported."); } - + this.charsetName = charsetName; } - + // -------------------------------------------------------------------------------------------- - + @Override public void open(int taskNumber, int numTasks) throws IOException { super.open(taskNumber, numTasks); - + try { this.charset = Charset.forName(charsetName); } @@ -87,16 +95,16 @@ public void open(int taskNumber, int numTasks) throws IOException { throw new IOException("The charset " + charsetName + " is not supported.", e); } } - + @Override public void writeRecord(T record) throws IOException { byte[] bytes = record.toString().getBytes(charset); this.stream.write(bytes); this.stream.write(NEWLINE); } - + // -------------------------------------------------------------------------------------------- - + @Override public String toString() { return "TextOutputFormat (" + getOutputFilePath() + ") - " + this.charsetName; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java index 45a2e3e1dcd8d..4721439fe7fdb 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java @@ -18,6 +18,12 @@ package org.apache.flink.api.java.io; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.io.DelimitedInputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.StringValue; + import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; @@ -26,61 +32,58 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.io.DelimitedInputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; -import org.apache.flink.types.StringValue; - +/** + * Input format that reads text files. + */ @PublicEvolving public class TextValueInputFormat extends DelimitedInputFormat { - + private static final long serialVersionUID = 1L; - + private String charsetName = "UTF-8"; - + private boolean skipInvalidLines; - + private transient CharsetDecoder decoder; - + private transient ByteBuffer byteWrapper; - + private transient boolean ascii; - + // -------------------------------------------------------------------------------------------- - + public TextValueInputFormat(Path filePath) { super(filePath, null); } - - // -------------------------------------------------------------------------------------------- - + + // -------------------------------------------------------------------------------------------- + public String getCharsetName() { return charsetName; } - + public void setCharsetName(String charsetName) { if (charsetName == null) { throw new IllegalArgumentException("The charset name may not be null."); } - + this.charsetName = charsetName; } - + public boolean isSkipInvalidLines() { return skipInvalidLines; } - + public void setSkipInvalidLines(boolean skipInvalidLines) { this.skipInvalidLines = skipInvalidLines; } - + // -------------------------------------------------------------------------------------------- @Override public void configure(Configuration parameters) { super.configure(parameters); - + if (charsetName == null || !Charset.isSupported(charsetName)) { throw new RuntimeException("Unsupported charset: " + charsetName); } @@ -88,7 +91,7 @@ public void configure(Configuration parameters) { if (charsetName.equalsIgnoreCase(StandardCharsets.US_ASCII.name())) { ascii = true; } - + this.decoder = Charset.forName(charsetName).newDecoder(); this.byteWrapper = ByteBuffer.allocate(1); } @@ -109,7 +112,7 @@ public StringValue readRecord(StringValue reuse, byte[] bytes, int offset, int n } byteWrapper.limit(offset + numBytes); byteWrapper.position(offset); - + try { CharBuffer result = this.decoder.decode(byteWrapper); reuse.setValue(result); @@ -126,9 +129,9 @@ public StringValue readRecord(StringValue reuse, byte[] bytes, int offset, int n } } } - + // -------------------------------------------------------------------------------------------- - + @Override public String toString() { return "TextValueInputFormat (" + getFilePath() + ") - " + this.charsetName + (this.skipInvalidLines ? "(skipping invalid lines)" : ""); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java index 6efd5665585f5..887620a92e156 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TupleCsvInputFormat.java @@ -15,15 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.api.java.io; import org.apache.flink.annotation.Internal; -import org.apache.flink.core.fs.Path; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; +import org.apache.flink.core.fs.Path; +/** + * Input format that reads csv into tuples. + */ @Internal public class TupleCsvInputFormat extends CsvInputFormat { @@ -59,7 +62,7 @@ public TupleCsvInputFormat(Path filePath, String lineDelimiter, String fieldDeli super(filePath); configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask); } - + private void configure(String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase tupleTypeInfo, boolean[] includedFieldsMask) { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java index 81a142ee3d879..108448dd2fd9d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TypeSerializerOutputFormat.java @@ -36,16 +36,16 @@ public class TypeSerializerOutputFormat extends BinaryOutputFormat implements InputTypeConfigurable { private static final long serialVersionUID = -6653022644629315158L; - + private TypeSerializer serializer; @Override protected void serialize(T record, DataOutputView dataOutput) throws IOException { - if(serializer == null){ + if (serializer == null){ throw new RuntimeException("TypeSerializerOutputFormat requires a type serializer to " + "be defined."); } - + serializer.serialize(record, dataOutput); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java index de57e5c547966..5622930fedcc4 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java @@ -39,6 +39,7 @@ import org.apache.flink.types.ShortValue; import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; + import org.junit.Assert; import org.junit.Test; @@ -59,7 +60,7 @@ public void testIgnoreHeaderConfigure() { reader.ignoreFirstLine(); Assert.assertTrue(reader.skipFirstLineAsHeader); } - + @Test public void testIgnoreInvalidLinesConfigure() { CsvReader reader = getCsvReader(); @@ -67,7 +68,7 @@ public void testIgnoreInvalidLinesConfigure() { reader.ignoreInvalidLines(); Assert.assertTrue(reader.ignoreInvalidLines); } - + @Test public void testIgnoreComments() { CsvReader reader = getCsvReader(); @@ -89,38 +90,38 @@ public void testIncludeFieldsDense() { CsvReader reader = getCsvReader(); reader.includeFields(true, true, true); Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask)); - + reader = getCsvReader(); reader.includeFields("ttt"); Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask)); - + reader = getCsvReader(); reader.includeFields("TTT"); Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask)); - + reader = getCsvReader(); reader.includeFields("111"); Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask)); - + reader = getCsvReader(); reader.includeFields(0x7L); Assert.assertTrue(Arrays.equals(new boolean[] {true, true, true}, reader.includedMask)); } - + @Test public void testIncludeFieldsSparse() { CsvReader reader = getCsvReader(); reader.includeFields(false, true, true, false, false, true, false, false); Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask)); - + reader = getCsvReader(); reader.includeFields("fttfftff"); Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask)); - + reader = getCsvReader(); reader.includeFields("FTTFFTFF"); Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask)); - + reader = getCsvReader(); reader.includeFields("01100100"); Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask)); @@ -128,16 +129,16 @@ public void testIncludeFieldsSparse() { reader = getCsvReader(); reader.includeFields("0t1f0TFF"); Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask)); - + reader = getCsvReader(); reader.includeFields(0x26L); Assert.assertTrue(Arrays.equals(new boolean[] {false, true, true, false, false, true}, reader.includedMask)); } - + @Test public void testIllegalCharInStringMask() { CsvReader reader = getCsvReader(); - + try { reader.includeFields("1t0Tfht"); Assert.fail("Reader accepted an invalid mask string"); @@ -146,12 +147,11 @@ public void testIllegalCharInStringMask() { // expected } } - - + @Test public void testIncludeFieldsErrorWhenExcludingAll() { CsvReader reader = getCsvReader(); - + try { reader.includeFields(false, false, false, false, false, false); Assert.fail("The reader accepted a fields configuration that excludes all fields."); @@ -159,7 +159,7 @@ public void testIncludeFieldsErrorWhenExcludingAll() { catch (IllegalArgumentException e) { // all good } - + try { reader.includeFields(0); Assert.fail("The reader accepted a fields configuration that excludes all fields."); @@ -167,7 +167,7 @@ public void testIncludeFieldsErrorWhenExcludingAll() { catch (IllegalArgumentException e) { // all good } - + try { reader.includeFields("ffffffffffffff"); Assert.fail("The reader accepted a fields configuration that excludes all fields."); @@ -175,7 +175,7 @@ public void testIncludeFieldsErrorWhenExcludingAll() { catch (IllegalArgumentException e) { // all good } - + try { reader.includeFields("00000000000000000"); Assert.fail("The reader accepted a fields configuration that excludes all fields."); @@ -191,12 +191,12 @@ public void testReturnType() throws Exception { DataSource items = reader.tupleType(Item.class); Assert.assertTrue(items.getType().getTypeClass() == Item.class); } - + @Test public void testFieldTypes() throws Exception { CsvReader reader = getCsvReader(); DataSource items = reader.tupleType(Item.class); - + TypeInformation info = items.getType(); if (!info.isTupleType()) { Assert.fail(); @@ -208,44 +208,44 @@ public void testFieldTypes() throws Exception { Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3)); } - + CsvInputFormat inputFormat = (CsvInputFormat) items.getInputFormat(); Assert.assertArrayEquals(new Class[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes()); } - + @Test public void testSubClass() throws Exception { CsvReader reader = getCsvReader(); DataSource sitems = reader.tupleType(SubItem.class); TypeInformation info = sitems.getType(); - + Assert.assertEquals(true, info.isTupleType()); Assert.assertEquals(SubItem.class, info.getTypeClass()); - + @SuppressWarnings("unchecked") TupleTypeInfo tinfo = (TupleTypeInfo) info; - + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0)); Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1)); Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2)); Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3)); - + CsvInputFormat inputFormat = (CsvInputFormat) sitems.getInputFormat(); Assert.assertArrayEquals(new Class[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes()); } - + @Test public void testSubClassWithPartialsInHierarchie() throws Exception { CsvReader reader = getCsvReader(); DataSource sitems = reader.tupleType(FinalItem.class); TypeInformation info = sitems.getType(); - + Assert.assertEquals(true, info.isTupleType()); Assert.assertEquals(FinalItem.class, info.getTypeClass()); - + @SuppressWarnings("unchecked") TupleTypeInfo tinfo = (TupleTypeInfo) info; - + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, tinfo.getTypeAt(0)); Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(1)); Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, tinfo.getTypeAt(2)); @@ -253,15 +253,15 @@ public void testSubClassWithPartialsInHierarchie() throws Exception { Assert.assertEquals(ValueTypeInfo.class, tinfo.getTypeAt(4).getClass()); Assert.assertEquals(StringValue.class, ((ValueTypeInfo) tinfo.getTypeAt(3)).getTypeClass()); Assert.assertEquals(LongValue.class, ((ValueTypeInfo) tinfo.getTypeAt(4)).getTypeClass()); - + CsvInputFormat inputFormat = (CsvInputFormat) sitems.getInputFormat(); Assert.assertArrayEquals(new Class[] {Integer.class, String.class, Double.class, StringValue.class, LongValue.class}, inputFormat.getFieldTypes()); } - + @Test public void testUnsupportedPartialitem() throws Exception { CsvReader reader = getCsvReader(); - + try { reader.tupleType(PartialItem.class); Assert.fail("tupleType() accepted an underspecified generic class."); @@ -295,32 +295,32 @@ public void testWithInvalidValueType2() throws Exception { // CsvReader doesn't support custom Value type reader.types(ValueItem.class); } - + private static CsvReader getCsvReader() { return new CsvReader("/some/none/existing/path", ExecutionEnvironment.createLocalEnvironment(1)); } - + // -------------------------------------------------------------------------------------------- // Custom types for testing // -------------------------------------------------------------------------------------------- - - public static class Item extends Tuple4 { + + private static class Item extends Tuple4 { private static final long serialVersionUID = -7444437337392053502L; } - - public static class SubItem extends Item { + + private static class SubItem extends Item { private static final long serialVersionUID = 1L; } - - public static class PartialItem extends Tuple5 { + + private static class PartialItem extends Tuple5 { private static final long serialVersionUID = 1L; } - - public static class FinalItem extends PartialItem { + + private static class FinalItem extends PartialItem { private static final long serialVersionUID = 1L; } - public static class ValueItem implements Value { + private static class ValueItem implements Value { private int v1; public int getV1() { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java index 4dabaca3a14f3..26e94119ee7ca 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java @@ -18,11 +18,6 @@ package org.apache.flink.api.java.io; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -33,6 +28,7 @@ import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; + import org.junit.Test; import java.io.ByteArrayInputStream; @@ -47,9 +43,17 @@ import java.util.List; import java.util.Objects; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link CollectionInputFormat}. + */ public class CollectionInputFormatTest { - - public static class ElementType { + + private static class ElementType { private final int id; public ElementType(){ @@ -73,7 +77,7 @@ public boolean equals(Object obj) { return false; } } - + @Override public int hashCode() { return id; @@ -90,7 +94,7 @@ public String toString() { @Test public void testSerializability() { try (ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - ObjectOutputStream out = new ObjectOutputStream(buffer)) { + ObjectOutputStream out = new ObjectOutputStream(buffer)) { Collection inputCollection = new ArrayList(); ElementType element1 = new ElementType(1); ElementType element2 = new ElementType(2); @@ -98,10 +102,10 @@ public void testSerializability() { inputCollection.add(element1); inputCollection.add(element2); inputCollection.add(element3); - + @SuppressWarnings("unchecked") TypeInformation info = (TypeInformation) TypeExtractor.createTypeInfo(ElementType.class); - + CollectionInputFormat inputFormat = new CollectionInputFormat(inputCollection, info.createSerializer(new ExecutionConfig())); @@ -121,23 +125,23 @@ public void testSerializability() { inputFormat.open(inputSplit); result.open(inputSplit); - while(!inputFormat.reachedEnd() && !result.reachedEnd()){ + while (!inputFormat.reachedEnd() && !result.reachedEnd()){ ElementType expectedElement = inputFormat.nextRecord(null); ElementType actualElement = result.nextRecord(null); assertEquals(expectedElement, actualElement); } } - catch(Exception e) { + catch (Exception e) { e.printStackTrace(); fail(e.toString()); } } - + @Test public void testSerializabilityStrings() { - + final String[] data = new String[] { "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", @@ -175,33 +179,33 @@ public void testSerializabilityStrings() { "The fair Ophelia!--Nymph, in thy orisons", "Be all my sins remember'd." }; - + try { List inputCollection = Arrays.asList(data); CollectionInputFormat inputFormat = new CollectionInputFormat(inputCollection, BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())); - + // serialize ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(inputFormat); oos.close(); - + // deserialize ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); ObjectInputStream ois = new ObjectInputStream(bais); Object result = ois.readObject(); - + assertTrue(result instanceof CollectionInputFormat); - + int i = 0; @SuppressWarnings("unchecked") CollectionInputFormat in = (CollectionInputFormat) result; in.open(new GenericInputSplit(0, 1)); - + while (!in.reachedEnd()) { assertEquals(data[i++], in.nextRecord("")); } - + assertEquals(data.length, i); } catch (Exception e) { @@ -209,7 +213,7 @@ public void testSerializabilityStrings() { fail(e.getMessage()); } } - + @Test public void testSerializationFailure() { try (ByteArrayOutputStream buffer = new ByteArrayOutputStream(); @@ -217,7 +221,7 @@ public void testSerializationFailure() { // a mock serializer that fails when writing CollectionInputFormat inFormat = new CollectionInputFormat( Collections.singleton(new ElementType()), new TestSerializer(false, true)); - + try { out.writeObject(inFormat); fail("should throw an exception"); @@ -234,21 +238,21 @@ public void testSerializationFailure() { fail(e.getMessage()); } } - + @Test public void testDeserializationFailure() { try (ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - ObjectOutputStream out = new ObjectOutputStream(buffer)) { + ObjectOutputStream out = new ObjectOutputStream(buffer)) { // a mock serializer that fails when writing CollectionInputFormat inFormat = new CollectionInputFormat( Collections.singleton(new ElementType()), new TestSerializer(true, false)); out.writeObject(inFormat); out.close(); - + ByteArrayInputStream bais = new ByteArrayInputStream(buffer.toByteArray()); ObjectInputStream in = new ObjectInputStream(bais); - + try { in.readObject(); fail("should throw an exception"); @@ -296,14 +300,14 @@ public void testToStringOnBigCollection() { private static class TestException extends IOException{ private static final long serialVersionUID = 1L; } - + private static class TestSerializer extends TypeSerializer { private static final long serialVersionUID = 1L; - + private final boolean failOnRead; private final boolean failOnWrite; - + public TestSerializer(boolean failOnRead, boolean failOnWrite) { this.failOnRead = failOnRead; this.failOnWrite = failOnWrite; diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java index d047aa6bcb639..8276ba1f46ca9 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java @@ -16,11 +16,14 @@ * limitations under the License. */ - package org.apache.flink.api.java.io; import org.apache.flink.api.common.io.ParseException; -import org.apache.flink.api.java.tuple.*; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -30,6 +33,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.types.parser.FieldParser; import org.apache.flink.types.parser.StringParser; + import org.junit.Assert; import org.junit.Test; @@ -49,13 +53,16 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +/** + * Tests for {@link CsvInputFormat}. + */ public class CsvInputFormatTest { - + private static final Path PATH = new Path("an/ignored/file/"); - + //Static variables for testing the removal of \r\n to \n private static final String FIRST_PART = "That is the first part"; - + private static final String SECOND_PART = "That is the second part"; @Test @@ -70,7 +77,7 @@ public void testSplitCsvInputStreamInSmallBuffer() throws Exception { private void testSplitCsvInputStream(int bufferSize, boolean failAtStart) throws Exception { final String fileContent = - "this is|1|2.0|\n"+ + "this is|1|2.0|\n" + "a test|3|4.0|\n" + "#next|5|6.0|\n" + "asdadas|5|30.0|\n"; @@ -173,9 +180,9 @@ public void ignoreInvalidLinesAndGetOffsetInSmallBuffer() { private void ignoreInvalidLines(int bufferSize) { try { - final String fileContent = "#description of the data\n" + - "header1|header2|header3|\n"+ - "this is|1|2.0|\n"+ + final String fileContent = "#description of the data\n" + + "header1|header2|header3|\n" + + "this is|1|2.0|\n" + "//a comment\n" + "a test|3|4.0|\n" + "#next|5|6.0|\n" + @@ -191,8 +198,7 @@ private void ignoreInvalidLines(int bufferSize) { final Configuration parameters = new Configuration(); format.configure(parameters); format.open(split); - - + Tuple3 result = new Tuple3(); result = format.nextRecord(result); assertNotNull(result); @@ -224,34 +230,31 @@ private void ignoreInvalidLines(int bufferSize) { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } } - + @Test public void ignoreSingleCharPrefixComments() { try { final String fileContent = "#description of the data\n" + - "#successive commented line\n" + - "this is|1|2.0|\n" + - "a test|3|4.0|\n" + - "#next|5|6.0|\n"; - + "#successive commented line\n" + "this is|1|2.0|\n" + "a test|3|4.0|\n" + "#next|5|6.0|\n"; + final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class); final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, "\n", "|", typeInfo); format.setCommentPrefix("#"); - + final Configuration parameters = new Configuration(); format.configure(parameters); format.open(split); - + Tuple3 result = new Tuple3(); - + result = format.nextRecord(result); assertNotNull(result); assertEquals("this is", result.f0); assertEquals(Integer.valueOf(1), result.f1); assertEquals(new Double(2.0), result.f2); - + result = format.nextRecord(result); assertNotNull(result); assertEquals("a test", result.f0); @@ -266,42 +269,41 @@ public void ignoreSingleCharPrefixComments() { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } } - + @Test public void ignoreMultiCharPrefixComments() { try { - - + final String fileContent = "//description of the data\n" + - "//successive commented line\n" + - "this is|1|2.0|\n"+ - "a test|3|4.0|\n" + - "//next|5|6.0|\n"; - + "//successive commented line\n" + + "this is|1|2.0|\n" + + "a test|3|4.0|\n" + + "//next|5|6.0|\n"; + final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Double.class); final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, "\n", "|", typeInfo); format.setCommentPrefix("//"); - + final Configuration parameters = new Configuration(); format.configure(parameters); format.open(split); - + Tuple3 result = new Tuple3(); - + result = format.nextRecord(result); assertNotNull(result); assertEquals("this is", result.f0); assertEquals(Integer.valueOf(1), result.f1); assertEquals(new Double(2.0), result.f2); - + result = format.nextRecord(result); assertNotNull(result); assertEquals("a test", result.f0); assertEquals(Integer.valueOf(3), result.f1); assertEquals(new Double(4.0), result.f2); - + result = format.nextRecord(result); assertNull(result); } @@ -323,27 +325,27 @@ public void readStringFields() { final Configuration parameters = new Configuration(); format.configure(parameters); format.open(split); - + Tuple3 result = new Tuple3(); - + result = format.nextRecord(result); assertNotNull(result); assertEquals("abc", result.f0); assertEquals("def", result.f1); assertEquals("ghijk", result.f2); - + result = format.nextRecord(result); assertNotNull(result); assertEquals("abc", result.f0); assertEquals("", result.f1); assertEquals("hhg", result.f2); - + result = format.nextRecord(result); assertNotNull(result); assertEquals("", result.f0); assertEquals("", result.f1); assertEquals("", result.f2); - + result = format.nextRecord(result); assertNull(result); assertTrue(format.reachedEnd()); @@ -397,7 +399,7 @@ public void readMixedQuotedStringFields() { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } } - + @Test public void readStringFieldsWithTrailingDelimiters() { try { @@ -406,26 +408,26 @@ public void readStringFieldsWithTrailingDelimiters() { final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class, String.class); final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo); - + format.setFieldDelimiter("|-"); format.configure(new Configuration()); format.open(split); Tuple3 result = new Tuple3(); - + result = format.nextRecord(result); assertNotNull(result); assertEquals("abc", result.f0); assertEquals("def", result.f1); assertEquals("ghijk", result.f2); - + result = format.nextRecord(result); assertNotNull(result); assertEquals("abc", result.f0); assertEquals("", result.f1); assertEquals("hhg", result.f2); - + result = format.nextRecord(result); assertNotNull(result); assertEquals("", result.f0); @@ -594,19 +596,19 @@ public void testEmptyFields() throws IOException { public void testDoubleFields() throws IOException { try { final String fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n"; - final FileInputSplit split = createTempFile(fileContent); + final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Double.class, Double.class, Double.class, Double.class, Double.class); final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo); - + format.setFieldDelimiter("|"); format.configure(new Configuration()); format.open(split); - + Tuple5 result = new Tuple5(); - + result = format.nextRecord(result); assertNotNull(result); assertEquals(Double.valueOf(11.1), result.f0); @@ -614,7 +616,7 @@ public void testDoubleFields() throws IOException { assertEquals(Double.valueOf(33.3), result.f2); assertEquals(Double.valueOf(44.4), result.f3); assertEquals(Double.valueOf(55.5), result.f4); - + result = format.nextRecord(result); assertNotNull(result); assertEquals(Double.valueOf(66.6), result.f0); @@ -622,7 +624,7 @@ public void testDoubleFields() throws IOException { assertEquals(Double.valueOf(88.8), result.f2); assertEquals(Double.valueOf(99.9), result.f3); assertEquals(Double.valueOf(00.0), result.f4); - + result = format.nextRecord(result); assertNull(result); assertTrue(format.reachedEnd()); @@ -631,7 +633,7 @@ public void testDoubleFields() throws IOException { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } } - + @Test public void testReadFirstN() throws IOException { try { @@ -640,24 +642,24 @@ public void testReadFirstN() throws IOException { final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class); final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo); - + format.setFieldDelimiter("|"); format.configure(new Configuration()); format.open(split); - + Tuple2 result = new Tuple2(); - + result = format.nextRecord(result); assertNotNull(result); assertEquals(Integer.valueOf(111), result.f0); assertEquals(Integer.valueOf(222), result.f1); - + result = format.nextRecord(result); assertNotNull(result); assertEquals(Integer.valueOf(666), result.f0); assertEquals(Integer.valueOf(777), result.f1); - + result = format.nextRecord(result); assertNull(result); assertTrue(format.reachedEnd()); @@ -665,38 +667,38 @@ public void testReadFirstN() throws IOException { catch (Exception ex) { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } - + } - + @Test public void testReadSparseWithNullFieldsForTypes() throws IOException { try { final String fileContent = "111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" + "000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|"; - final FileInputSplit split = createTempFile(fileContent); + final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class); final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo, new boolean[]{true, false, false, true, false, false, false, true}); - + format.setFieldDelimiter("|x|"); - + format.configure(new Configuration()); format.open(split); - + Tuple3 result = new Tuple3(); - + result = format.nextRecord(result); assertNotNull(result); assertEquals(Integer.valueOf(111), result.f0); assertEquals(Integer.valueOf(444), result.f1); assertEquals(Integer.valueOf(888), result.f2); - + result = format.nextRecord(result); assertNotNull(result); assertEquals(Integer.valueOf(000), result.f0); assertEquals(Integer.valueOf(777), result.f1); assertEquals(Integer.valueOf(333), result.f2); - + result = format.nextRecord(result); assertNull(result); assertTrue(format.reachedEnd()); @@ -705,35 +707,35 @@ public void testReadSparseWithNullFieldsForTypes() throws IOException { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } } - + @Test public void testReadSparseWithPositionSetter() throws IOException { try { final String fileContent = "111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|"; - final FileInputSplit split = createTempFile(fileContent); + final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class); final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo, new int[]{0, 3, 7}); - + format.setFieldDelimiter("|"); - + format.configure(new Configuration()); format.open(split); - + Tuple3 result = new Tuple3(); - + result = format.nextRecord(result); assertNotNull(result); assertEquals(Integer.valueOf(111), result.f0); assertEquals(Integer.valueOf(444), result.f1); assertEquals(Integer.valueOf(888), result.f2); - + result = format.nextRecord(result); assertNotNull(result); assertEquals(Integer.valueOf(000), result.f0); assertEquals(Integer.valueOf(777), result.f1); assertEquals(Integer.valueOf(333), result.f2); - + result = format.nextRecord(result); assertNull(result); assertTrue(format.reachedEnd()); @@ -748,30 +750,30 @@ public void testReadSparseWithMask() throws IOException { try { final String fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" + "000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&"; - final FileInputSplit split = createTempFile(fileContent); + final FileInputSplit split = createTempFile(fileContent); final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class, Integer.class); final CsvInputFormat> format = new TupleCsvInputFormat>(PATH, typeInfo, new boolean[]{true, false, false, true, false, false, false, true}); - + format.setFieldDelimiter("&&"); - + format.configure(new Configuration()); format.open(split); - + Tuple3 result = new Tuple3(); - + result = format.nextRecord(result); assertNotNull(result); assertEquals(Integer.valueOf(111), result.f0); assertEquals(Integer.valueOf(444), result.f1); assertEquals(Integer.valueOf(888), result.f2); - + result = format.nextRecord(result); assertNotNull(result); assertEquals(Integer.valueOf(000), result.f0); assertEquals(Integer.valueOf(777), result.f1); assertEquals(Integer.valueOf(333), result.f2); - + result = format.nextRecord(result); assertNull(result); assertTrue(format.reachedEnd()); @@ -784,7 +786,7 @@ public void testReadSparseWithMask() throws IOException { @Test public void testParseStringErrors() throws Exception { StringParser stringParser = new StringParser(); - stringParser.enableQuotedStringParsing((byte)'"'); + stringParser.enableQuotedStringParsing((byte) '"'); Object[][] failures = { {"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING}, @@ -801,7 +803,6 @@ public void testParseStringErrors() throws Exception { assertThat(stringParser.getErrorState(), is(failure[1])); } - } // Test disabled becase we do not support double-quote escaped quotes right now. @@ -836,7 +837,7 @@ public void testParserCorrectness() throws Exception { new Tuple5(1997, "Ford", "E350", "ac, abs, moon", 3000.0), new Tuple5(1999, "Chevy", "Venture \"Extended Edition\"", "", 4900.0), new Tuple5(1996, "Jeep", "Grand Cherokee", "MUST SELL! air, moon roof, loaded", 4799.00), - new Tuple5(1999, "Chevy", "Venture \"Extended Edition, Very Large\"", "", 5000.00 ), + new Tuple5(1999, "Chevy", "Venture \"Extended Edition, Very Large\"", "", 5000.00), new Tuple5(0, "", "Venture \"Extended Edition\"", "", 4900.0) }; @@ -864,37 +865,37 @@ private FileInputSplit createTempFile(String content) throws IOException { ); wrt.write(content); wrt.close(); - + return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] {"localhost"}); } - + @Test public void testWindowsLineEndRemoval() { - + //Check typical use case -- linux file is correct and it is set up to linuc(\n) this.testRemovingTrailingCR("\n", "\n"); - + //Check typical windows case -- windows file endings and file has windows file endings set up this.testRemovingTrailingCR("\r\n", "\r\n"); - + //Check problematic case windows file -- windows file endings(\r\n) but linux line endings (\n) set up this.testRemovingTrailingCR("\r\n", "\n"); - + //Check problematic case linux file -- linux file endings (\n) but windows file endings set up (\r\n) //Specific setup for windows line endings will expect \r\n because it has to be set up and is not standard. } - + private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreakerSetup) { - File tempFile=null; - + File tempFile = null; + String fileContent = CsvInputFormatTest.FIRST_PART + lineBreakerInFile + CsvInputFormatTest.SECOND_PART + lineBreakerInFile; - + try { // create input file tempFile = File.createTempFile("CsvInputFormatTest", "tmp"); tempFile.deleteOnExit(); tempFile.setWritable(true); - + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); wrt.write(fileContent); wrt.close(); @@ -902,28 +903,26 @@ private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreaker final TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class); final CsvInputFormat> inputFormat = new TupleCsvInputFormat>(new Path(tempFile.toURI().toString()), typeInfo); - Configuration parameters = new Configuration(); + Configuration parameters = new Configuration(); inputFormat.configure(parameters); - + inputFormat.setDelimiter(lineBreakerSetup); - + FileInputSplit[] splits = inputFormat.createInputSplits(1); - + inputFormat.open(splits[0]); - + Tuple1 result = inputFormat.nextRecord(new Tuple1()); - + assertNotNull("Expecting to not return null", result); - - - + assertEquals(FIRST_PART, result.f0); - + result = inputFormat.nextRecord(result); - + assertNotNull("Expecting to not return null", result); assertEquals(SECOND_PART, result.f0); - + } catch (Throwable t) { System.err.println("test failed with exception: " + t.getMessage()); @@ -1219,7 +1218,7 @@ public void testPojoSubclassType() throws Exception { writer.close(); @SuppressWarnings("unchecked") - PojoTypeInfo typeInfo = (PojoTypeInfo)TypeExtractor.createTypeInfo(TwitterPOJO.class); + PojoTypeInfo typeInfo = (PojoTypeInfo) TypeExtractor.createTypeInfo(TwitterPOJO.class); CsvInputFormat inputFormat = new PojoCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo); inputFormat.configure(new Configuration()); @@ -1238,7 +1237,7 @@ public void testPojoSubclassType() throws Exception { TwitterPOJO pojo; - while((pojo = inputFormat.nextRecord(new TwitterPOJO())) != null) { + while ((pojo = inputFormat.nextRecord(new TwitterPOJO())) != null) { actual.add(pojo); } @@ -1249,6 +1248,9 @@ public void testPojoSubclassType() throws Exception { // Custom types for testing // -------------------------------------------------------------------------------------------- + /** + * Sample test pojo. + */ public static class PojoItem { public int field1; public String field2; @@ -1256,6 +1258,9 @@ public static class PojoItem { public String field4; } + /** + * Sample test pojo with private fields. + */ public static class PrivatePojoItem { private int field1; private String field2; @@ -1295,6 +1300,9 @@ public void setField4(String field4) { } } + /** + * Sample test pojo. + */ public static class POJO { public String table; public String time; @@ -1319,6 +1327,9 @@ public boolean equals(Object obj) { } } + /** + * Sample test pojo representing tweets. + */ public static class TwitterPOJO extends POJO { public String tweet; diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java index a8ce495a2a6da..a244306c9dbcd 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvOutputFormatTest.java @@ -37,13 +37,16 @@ import static org.junit.Assert.fail; +/** + * Tests for {@link CsvOutputFormat}. + */ public class CsvOutputFormatTest { private String path = null; @Before public void createFile() throws Exception { - path = File.createTempFile("csv_output_test_file",".csv").getAbsolutePath(); + path = File.createTempFile("csv_output_test_file", ".csv").getAbsolutePath(); } @Test @@ -80,7 +83,7 @@ public void testNullDisallowOnDefault() throws Exception { } catch (RuntimeException e) { // expected } - + } finally { csvOutputFormat.close(); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java index 2f403aae61b67..88882d9d0851a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java @@ -18,8 +18,12 @@ package org.apache.flink.api.java.io; import org.apache.flink.api.java.ExecutionEnvironment; + import org.junit.Test; +/** + * Tests for {@link ExecutionEnvironment::fromElements}. + */ public class FromElementsTest { @Test @@ -34,7 +38,7 @@ public void fromElementsWithBaseTypeTest2() { executionEnvironment.fromElements(SubType.class, new SubType(1, "Java"), new ParentType(1, "hello")); } - public static class ParentType { + private static class ParentType { int num; String string; public ParentType(int num, String string) { @@ -43,7 +47,7 @@ public ParentType(int num, String string) { } } - public static class SubType extends ParentType{ + private static class SubType extends ParentType{ public SubType(int num, String string) { super(num, string); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java index f9dc28ac99691..d90d6573bd2eb 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/PrimitiveInputFormatTest.java @@ -18,26 +18,28 @@ package org.apache.flink.api.java.io; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; + +import org.junit.Test; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import org.junit.Test; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +/** + * Tests for {@link PrimitiveInputFormat}. + */ public class PrimitiveInputFormatTest { private static final Path PATH = new Path("an/ignored/file/"); - @Test public void testStringInput() { try { @@ -71,15 +73,13 @@ public void testStringInput() { } } - - @Test public void testIntegerInput() throws IOException { try { final String fileContent = "111|222|"; final FileInputSplit split = createInputSplit(fileContent); - final PrimitiveInputFormat format = new PrimitiveInputFormat(PATH,"|", Integer.class); + final PrimitiveInputFormat format = new PrimitiveInputFormat(PATH, "|", Integer.class); format.configure(new Configuration()); format.open(split); @@ -99,7 +99,6 @@ public void testIntegerInput() throws IOException { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } } - @Test public void testDoubleInputLinewise() throws IOException { @@ -136,7 +135,7 @@ public void testRemovingTrailingCR() { String fileContent = first + "\r\n" + second + "\r\n"; final FileInputSplit split = createInputSplit(fileContent); - final PrimitiveInputFormat format = new PrimitiveInputFormat(PATH ,String.class); + final PrimitiveInputFormat format = new PrimitiveInputFormat(PATH, String.class); format.configure(new Configuration()); format.open(split); @@ -153,14 +152,14 @@ public void testRemovingTrailingCR() { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } } - + @Test(expected = IOException.class) public void testFailingInput() throws IOException { - + final String fileContent = "111|222|asdf|17"; final FileInputSplit split = createInputSplit(fileContent); - final PrimitiveInputFormat format = new PrimitiveInputFormat(PATH,"|", Integer.class); + final PrimitiveInputFormat format = new PrimitiveInputFormat(PATH, "|", Integer.class); format.configure(new Configuration()); format.open(split); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java index 943db36efa7be..f44d5bfa1a505 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; @@ -30,6 +29,7 @@ import org.apache.flink.types.Row; import org.apache.flink.types.parser.FieldParser; import org.apache.flink.types.parser.StringParser; + import org.junit.Ignore; import org.junit.Test; @@ -45,18 +45,21 @@ import java.util.Map; import static junit.framework.TestCase.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +/** + * Tests for {@link RowCsvInputFormat}. + */ public class RowCsvInputFormatTest { - private static Path PATH = new Path("an/ignored/file/"); + private static final Path PATH = new Path("an/ignored/file/"); // static variables for testing the removal of \r\n to \n - private static String FIRST_PART = "That is the first part"; - private static String SECOND_PART = "That is the second part"; + private static final String FIRST_PART = "That is the first part"; + private static final String SECOND_PART = "That is the second part"; @Test public void ignoreInvalidLines() throws Exception { @@ -588,7 +591,7 @@ public void testReadSparseWithNullFieldsForTypes() throws Exception { RowCsvInputFormat format = new RowCsvInputFormat( PATH, fieldTypes, - new int[]{0,3,7}); + new int[]{0, 3, 7}); format.setFieldDelimiter("|x|"); format.configure(new Configuration()); format.open(split); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java index 6bff9dbc7e05c..e78232ac1e51c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java @@ -16,15 +16,14 @@ * limitations under the License. */ - package org.apache.flink.api.java.io; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; + +import org.junit.Test; import java.io.File; import java.io.FileOutputStream; @@ -34,51 +33,55 @@ import java.util.Collections; import java.util.List; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +/** + * Tests for {@link TextInputFormat}. + */ public class TextInputFormatTest { @Test public void testSimpleRead() { - final String FIRST = "First line"; - final String SECOND = "Second line"; - + final String first = "First line"; + final String second = "Second line"; + try { // create input file File tempFile = File.createTempFile("TextInputFormatTest", "tmp"); tempFile.deleteOnExit(); tempFile.setWritable(true); - + PrintStream ps = new PrintStream(tempFile); - ps.println(FIRST); - ps.println(SECOND); + ps.println(first); + ps.println(second); ps.close(); - + TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString())); - - Configuration parameters = new Configuration(); + + Configuration parameters = new Configuration(); inputFormat.configure(parameters); - + FileInputSplit[] splits = inputFormat.createInputSplits(1); assertTrue("expected at least one input split", splits.length >= 1); - + inputFormat.open(splits[0]); - + String result = ""; - + assertFalse(inputFormat.reachedEnd()); result = inputFormat.nextRecord(""); assertNotNull("Expecting first record here", result); - assertEquals(FIRST, result); - + assertEquals(first, result); + assertFalse(inputFormat.reachedEnd()); result = inputFormat.nextRecord(result); assertNotNull("Expecting second record here", result); - assertEquals(SECOND, result); - + assertEquals(second, result); + assertTrue(inputFormat.reachedEnd() || null == inputFormat.nextRecord(result)); } catch (Throwable t) { @@ -142,70 +145,68 @@ public void testNestedFileRead() { } /** - * This tests cases when line ends with \r\n and \n is used as delimiter, the last \r should be removed + * This tests cases when line ends with \r\n and \n is used as delimiter, the last \r should be removed. */ @Test public void testRemovingTrailingCR() { - - testRemovingTrailingCR("\n","\n"); - testRemovingTrailingCR("\r\n","\n"); - - testRemovingTrailingCR("|","|"); - testRemovingTrailingCR("|","\n"); + + testRemovingTrailingCR("\n", "\n"); + testRemovingTrailingCR("\r\n", "\n"); + + testRemovingTrailingCR("|", "|"); + testRemovingTrailingCR("|", "\n"); } - - private void testRemovingTrailingCR(String lineBreaker,String delimiter) { - File tempFile=null; - - String FIRST = "First line"; - String SECOND = "Second line"; - String CONTENT = FIRST + lineBreaker + SECOND + lineBreaker; - + + private void testRemovingTrailingCR(String lineBreaker, String delimiter) { + File tempFile = null; + + String first = "First line"; + String second = "Second line"; + String content = first + lineBreaker + second + lineBreaker; + try { // create input file tempFile = File.createTempFile("TextInputFormatTest", "tmp"); tempFile.deleteOnExit(); tempFile.setWritable(true); - + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); - wrt.write(CONTENT); + wrt.write(content); wrt.close(); - + TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString())); inputFormat.setFilePath(tempFile.toURI().toString()); - - Configuration parameters = new Configuration(); + + Configuration parameters = new Configuration(); inputFormat.configure(parameters); - + inputFormat.setDelimiter(delimiter); - + FileInputSplit[] splits = inputFormat.createInputSplits(1); - + inputFormat.open(splits[0]); - String result = ""; - if ( (delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n") ) ) - || (lineBreaker.equals(delimiter)) ){ - + if ((delimiter.equals("\n") && (lineBreaker.equals("\n") || lineBreaker.equals("\r\n"))) + || (lineBreaker.equals(delimiter))){ + result = inputFormat.nextRecord(""); assertNotNull("Expecting first record here", result); - assertEquals(FIRST, result); - + assertEquals(first, result); + result = inputFormat.nextRecord(result); assertNotNull("Expecting second record here", result); - assertEquals(SECOND, result); - + assertEquals(second, result); + result = inputFormat.nextRecord(result); assertNull("The input file is over", result); - + } else { result = inputFormat.nextRecord(""); assertNotNull("Expecting first record here", result); - assertEquals(CONTENT, result); + assertEquals(content, result); } - - + } catch (Throwable t) { System.err.println("test failed with exception: " + t.getMessage()); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java index a119d59bde9af..676a7c4252b20 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TypeSerializerFormatTest.java @@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataOutputView; + import org.junit.Assert; import org.junit.Before; import org.junit.runner.RunWith; @@ -38,6 +39,9 @@ import java.io.IOException; +/** + * Tests for type serialization format. + */ @RunWith(Parameterized.class) public class TypeSerializerFormatTest extends SequentialFormatTestBase> { @@ -50,7 +54,7 @@ public class TypeSerializerFormatTest extends SequentialFormatTestBase Date: Sat, 15 Jul 2017 11:04:55 +0200 Subject: [PATCH 2/4] comments --- .../apache/flink/api/java/io/CsvInputFormat.java | 2 +- .../flink/api/java/io/PrintingOutputFormat.java | 2 +- .../apache/flink/api/java/io/TextOutputFormat.java | 6 ++++-- .../api/java/io/CollectionInputFormatTest.java | 3 ++- .../flink/api/java/io/CsvInputFormatTest.java | 13 ++++++++----- .../apache/flink/api/java/io/FromElementsTest.java | 2 +- 6 files changed, 17 insertions(+), 11 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java index 62e988b18d2e4..0bd4e693c5f94 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java @@ -29,7 +29,7 @@ import java.io.IOException; /** - * InputFormat thar reads csv files. + * InputFormat that reads csv files. * * @param */ diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java index cc95690f3eebf..0ab1abb2efbd7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java @@ -25,7 +25,7 @@ import java.io.PrintStream; /** - * Output format that prints results into underlying {@link PrintStream}. + * Output format that prints results into either stdout or stderr. * @param */ @PublicEvolving diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java index a07c7f199ac62..006b5710346e5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextOutputFormat.java @@ -29,7 +29,9 @@ import java.nio.charset.UnsupportedCharsetException; /** - * {@link FileOutputFormat} that stores values by calling {@link Object#toString()} method. + * A {@link FileOutputFormat} that writes objects to a text file. + * + *

Objects are converted to Strings using either {@link Object#toString()} or a {@link TextFormatter}. * @param type of elements */ @PublicEvolving @@ -47,7 +49,7 @@ public class TextOutputFormat extends FileOutputFormat { /** - * Formatter that transforms values into its {@link String} representations. + * Formatter that transforms values into their {@link String} representations. * @param type of input elements */ public interface TextFormatter extends Serializable { diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java index 26e94119ee7ca..77945cc7dbc02 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java @@ -94,7 +94,8 @@ public String toString() { @Test public void testSerializability() { try (ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - ObjectOutputStream out = new ObjectOutputStream(buffer)) { + ObjectOutputStream out = new ObjectOutputStream(buffer)) { + Collection inputCollection = new ArrayList(); ElementType element1 = new ElementType(1); ElementType element2 = new ElementType(2); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java index 8276ba1f46ca9..8939c5a8ba07c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java @@ -235,7 +235,10 @@ private void ignoreInvalidLines(int bufferSize) { public void ignoreSingleCharPrefixComments() { try { final String fileContent = "#description of the data\n" + - "#successive commented line\n" + "this is|1|2.0|\n" + "a test|3|4.0|\n" + "#next|5|6.0|\n"; + "#successive commented line\n" + + "this is|1|2.0|\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; final FileInputSplit split = createTempFile(fileContent); @@ -275,10 +278,10 @@ public void ignoreMultiCharPrefixComments() { try { final String fileContent = "//description of the data\n" + - "//successive commented line\n" + - "this is|1|2.0|\n" + - "a test|3|4.0|\n" + - "//next|5|6.0|\n"; + "//successive commented line\n" + + "this is|1|2.0|\n" + + "a test|3|4.0|\n" + + "//next|5|6.0|\n"; final FileInputSplit split = createTempFile(fileContent); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java index 88882d9d0851a..4815b72597747 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java @@ -22,7 +22,7 @@ import org.junit.Test; /** - * Tests for {@link ExecutionEnvironment::fromElements}. + * Tests for {@link ExecutionEnvironment#fromElements}}. */ public class FromElementsTest { From 1e754dc604ffc346e5177db8fd3b9024ceced52e Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Tue, 25 Jul 2017 10:04:24 +0200 Subject: [PATCH 3/4] Removed appropriate suppression --- tools/maven/suppressions-java.xml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tools/maven/suppressions-java.xml b/tools/maven/suppressions-java.xml index 3b7d60ba697bb..3b32506197a81 100644 --- a/tools/maven/suppressions-java.xml +++ b/tools/maven/suppressions-java.xml @@ -23,14 +23,6 @@ under the License. "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd"> - - - - From e2de4295d1ee49b8d65050cb2c0bec4aed899be4 Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Mon, 31 Jul 2017 14:57:01 +0200 Subject: [PATCH 4/4] comments --- .../apache/flink/api/java/io/CsvReader.java | 29 ++----------------- .../flink/api/java/io/FromElementsTest.java | 2 +- 2 files changed, 3 insertions(+), 28 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index 47f71740fa91f..a56330fb00af1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -22,33 +22,8 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.Utils; -import org.apache.flink.api.java.operators.DataSource; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple10; -import org.apache.flink.api.java.tuple.Tuple11; -import org.apache.flink.api.java.tuple.Tuple12; -import org.apache.flink.api.java.tuple.Tuple13; -import org.apache.flink.api.java.tuple.Tuple14; -import org.apache.flink.api.java.tuple.Tuple15; -import org.apache.flink.api.java.tuple.Tuple16; -import org.apache.flink.api.java.tuple.Tuple17; -import org.apache.flink.api.java.tuple.Tuple18; -import org.apache.flink.api.java.tuple.Tuple19; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple20; -import org.apache.flink.api.java.tuple.Tuple21; -import org.apache.flink.api.java.tuple.Tuple22; -import org.apache.flink.api.java.tuple.Tuple23; -import org.apache.flink.api.java.tuple.Tuple24; -import org.apache.flink.api.java.tuple.Tuple25; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.api.java.tuple.Tuple6; -import org.apache.flink.api.java.tuple.Tuple7; -import org.apache.flink.api.java.tuple.Tuple8; -import org.apache.flink.api.java.tuple.Tuple9; +import org.apache.flink.api.java.operators.DataSource; //CHECKSTYLE OFF: AvoidStarImport +import org.apache.flink.api.java.tuple.*; //CHECKSTYLE ON: AvoidStarImport import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java index 4815b72597747..928000983634c 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java @@ -22,7 +22,7 @@ import org.junit.Test; /** - * Tests for {@link ExecutionEnvironment#fromElements}}. + * Tests for {@link ExecutionEnvironment#fromElements}. */ public class FromElementsTest {