From bbc88bf1cf5bc00d535e52ec1cc222ac895d7827 Mon Sep 17 00:00:00 2001 From: Chiwan Park Date: Tue, 25 Aug 2015 17:53:43 +0900 Subject: [PATCH] [FLINK-2569] [core] Add CsvReader support for Value types - Extend ValueTypeInfo to check whether the type is basic value or not - Extend TupleTypeInfo to support Value types - Add three unit tests and an integration test - Rename CsvReaderWithPOJOITCase to CsvReaderITCase - Refactor CsvReaderITCase to use collect method --- .../apache/flink/api/java/io/CsvReader.java | 50 ++++++------- .../flink/api/java/tuple/TupleGenerator.java | 2 +- .../api/java/typeutils/TupleTypeInfo.java | 37 +++++++++- .../api/java/typeutils/ValueTypeInfo.java | 16 +++++ .../flink/api/java/io/CSVReaderTest.java | 63 ++++++++++++++++- ...thPOJOITCase.java => CsvReaderITCase.java} | 70 +++++++++---------- 6 files changed, 172 insertions(+), 66 deletions(-) rename flink-tests/src/test/java/org/apache/flink/test/io/{CsvReaderWithPOJOITCase.java => CsvReaderITCase.java} (68%) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index c5d6cb5e9287c..052f960f71668 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -373,7 +373,7 @@ private void configureInputFormat(CsvInputFormat format, Class... types) { * @return The {@link org.apache.flink.api.java.DataSet} representing the parsed CSV data. */ public DataSource> types(Class type0) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -390,7 +390,7 @@ public DataSource> types(Class type0) { * @return The {@link org.apache.flink.api.java.DataSet} representing the parsed CSV data. */ public DataSource> types(Class type0, Class type1) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -408,7 +408,7 @@ public DataSource> types(Class type0, Class type * @return The {@link org.apache.flink.api.java.DataSet} representing the parsed CSV data. */ public DataSource> types(Class type0, Class type1, Class type2) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -427,7 +427,7 @@ public DataSource> types(Class type0, Class< * @return The {@link org.apache.flink.api.java.DataSet} representing the parsed CSV data. */ public DataSource> types(Class type0, Class type1, Class type2, Class type3) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -447,7 +447,7 @@ public DataSource> types(Class type0 * @return The {@link org.apache.flink.api.java.DataSet} representing the parsed CSV data. */ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -468,7 +468,7 @@ public DataSource> types(Class DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -490,7 +490,7 @@ public DataSource> types * @return The {@link org.apache.flink.api.java.DataSet} representing the parsed CSV data. */ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -513,7 +513,7 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -537,7 +537,7 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -562,7 +562,7 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -588,7 +588,7 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -615,7 +615,7 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -643,7 +643,7 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -672,7 +672,7 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -702,7 +702,7 @@ public DataSource DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -733,7 +733,7 @@ public DataSou * @return The {@link org.apache.flink.api.java.DataSet} representing the parsed CSV data. */ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -765,7 +765,7 @@ public Da * @return The {@link org.apache.flink.api.java.DataSet} representing the parsed CSV data. */ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -798,7 +798,7 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -832,7 +832,7 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -867,7 +867,7 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18, Class type19) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -903,7 +903,7 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18, Class type19, Class type20) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -940,7 +940,7 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18, Class type19, Class type20, Class type21) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -978,7 +978,7 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18, Class type19, Class type20, Class type21, Class type22) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -1017,7 +1017,7 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18, Class type19, Class type20, Class type21, Class type22, Class type23) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); @@ -1057,7 +1057,7 @@ public DataSource> types(Class type0, Class type1, Class type2, Class type3, Class type4, Class type5, Class type6, Class type7, Class type8, Class type9, Class type10, Class type11, Class type12, Class type13, Class type14, Class type15, Class type16, Class type17, Class type18, Class type19, Class type20, Class type21, Class type22, Class type23, Class type24) { - TupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24); + TupleTypeInfo> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24); CsvInputFormat> inputFormat = new CsvInputFormat>(path, types); configureInputFormat(inputFormat, type0, type1, type2, type3, type4, type5, type6, type7, type8, type9, type10, type11, type12, type13, type14, type15, type16, type17, type18, type19, type20, type21, type22, type23, type24); return new DataSource>(executionContext, inputFormat, types, Utils.getCallLocationName()); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java index 6ab02e4535fc9..f306fe0d7c331 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java @@ -457,7 +457,7 @@ private static void modifyCsvReader(File root) throws IOException { // get TupleTypeInfo sb.append("\t\tTupleTypeInfo> types = TupleTypeInfo.getBasicTupleTypeInfo("); + sb.append(">> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo("); for (int i = 0; i < numFields; i++) { if (i > 0) { sb.append(", "); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java index a671bd4462f2c..618b190aa1f39 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java @@ -21,6 +21,7 @@ import java.util.Arrays; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator @@ -31,6 +32,7 @@ //CHECKSTYLE.ON: AvoidStarImport import org.apache.flink.api.java.typeutils.runtime.TupleComparator; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.types.Value; /** * A {@link TypeInformation} for the tuple types of the Java API. @@ -159,7 +161,7 @@ public String toString() { } // -------------------------------------------------------------------------------------------- - + public static TupleTypeInfo getBasicTupleTypeInfo(Class... basicTypes) { if (basicTypes == null || basicTypes.length == 0) { throw new IllegalArgumentException(); @@ -178,9 +180,40 @@ public static TupleTypeInfo getBasicTupleTypeInfo(Class. } infos[i] = info; } - + @SuppressWarnings("unchecked") TupleTypeInfo tupleInfo = (TupleTypeInfo) new TupleTypeInfo(infos); return tupleInfo; } + + @SuppressWarnings("unchecked") + public static TupleTypeInfo getBasicAndBasicValueTupleTypeInfo(Class... basicTypes) { + if (basicTypes == null || basicTypes.length == 0) { + throw new IllegalArgumentException(); + } + + TypeInformation[] infos = new TypeInformation[basicTypes.length]; + for (int i = 0; i < infos.length; i++) { + Class type = basicTypes[i]; + if (type == null) { + throw new IllegalArgumentException("Type at position " + i + " is null."); + } + + TypeInformation info = BasicTypeInfo.getInfoFor(type); + if (info == null) { + try { + info = ValueTypeInfo.getValueTypeInfo((Class) type); + if (!((ValueTypeInfo) info).isBasicValueType()) { + throw new IllegalArgumentException("Type at position " + i + " is not a basic or value type."); + } + } catch (ClassCastException | InvalidTypesException e) { + throw new IllegalArgumentException("Type at position " + i + " is not a basic or value type.", e); + } + } + infos[i] = info; + } + + + return (TupleTypeInfo) new TupleTypeInfo<>(infos); + } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java index 8288b453e7570..e61acd898b23c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java @@ -28,7 +28,17 @@ import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer; import org.apache.flink.api.java.typeutils.runtime.ValueComparator; import org.apache.flink.api.java.typeutils.runtime.ValueSerializer; +import org.apache.flink.types.BooleanValue; +import org.apache.flink.types.ByteValue; +import org.apache.flink.types.CharValue; import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.FloatValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.apache.flink.types.ShortValue; +import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; /** @@ -75,6 +85,12 @@ public boolean isBasicType() { return false; } + public boolean isBasicValueType() { + return type.equals(StringValue.class) || type.equals(ByteValue.class) || type.equals(ShortValue.class) || type.equals(CharValue.class) || + type.equals(DoubleValue.class) || type.equals(FloatValue.class) || type.equals(IntValue.class) || type.equals(LongValue.class) || + type.equals(NullValue.class) || type.equals(BooleanValue.class); + } + @Override public boolean isTupleType() { return false; diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java index 6676cd1c3a8c7..8b12315d6c025 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CSVReaderTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import java.io.IOException; import java.util.Arrays; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -28,10 +29,21 @@ import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.BooleanValue; +import org.apache.flink.types.ByteValue; +import org.apache.flink.types.CharValue; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.FloatValue; +import org.apache.flink.types.IntValue; import org.apache.flink.types.LongValue; +import org.apache.flink.types.ShortValue; import org.apache.flink.types.StringValue; +import org.apache.flink.types.Value; import org.junit.Assert; import org.junit.Test; import org.apache.flink.api.java.ExecutionEnvironment; @@ -190,7 +202,7 @@ public void testFieldTypes() throws Exception { } CsvInputFormat inputFormat = (CsvInputFormat) items.getInputFormat(); - Assert.assertArrayEquals(new Class[] {Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes()); + Assert.assertArrayEquals(new Class[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes()); } @Test @@ -211,7 +223,7 @@ public void testSubClass() throws Exception { Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tinfo.getTypeAt(3)); CsvInputFormat inputFormat = (CsvInputFormat) sitems.getInputFormat(); - Assert.assertArrayEquals(new Class[] {Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes()); + Assert.assertArrayEquals(new Class[]{Integer.class, String.class, Double.class, String.class}, inputFormat.getFieldTypes()); } @Test @@ -250,6 +262,31 @@ public void testUnsupportedPartialitem() throws Exception { // okay. } } + + @Test + public void testWithValueType() throws Exception { + CsvReader reader = getCsvReader(); + DataSource> items = + reader.types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class); + TypeInformation info = items.getType(); + + Assert.assertEquals(true, info.isTupleType()); + Assert.assertEquals(Tuple8.class, info.getTypeClass()); + } + + @Test(expected = IllegalArgumentException.class) + public void testWithInvalidValueType1() throws Exception { + CsvReader reader = getCsvReader(); + // CsvReader doesn't support CharValue + reader.types(CharValue.class); + } + + @Test(expected = IllegalArgumentException.class) + public void testWithInvalidValueType2() throws Exception { + CsvReader reader = getCsvReader(); + // CsvReader doesn't support custom Value type + reader.types(ValueItem.class); + } private static CsvReader getCsvReader() { return new CsvReader("/some/none/existing/path", ExecutionEnvironment.createLocalEnvironment(1)); @@ -274,4 +311,26 @@ public static class PartialItem extends Tuple5 { private static final long serialVersionUID = 1L; } + + public static class ValueItem implements Value { + private int v1; + + public int getV1() { + return v1; + } + + public void setV1(int v1) { + this.v1 = v1; + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(v1); + } + + @Override + public void read(DataInputView in) throws IOException { + v1 = in.readInt(); + } + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java b/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java similarity index 68% rename from flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java index 6a614e98405c9..4d20b54d7925d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderITCase.java @@ -22,9 +22,16 @@ import com.google.common.io.Files; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple8; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; +import org.apache.flink.types.BooleanValue; +import org.apache.flink.types.ByteValue; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.FloatValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.ShortValue; +import org.apache.flink.types.StringValue; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -32,31 +39,19 @@ import org.junit.runners.Parameterized; import java.io.File; - -import static org.junit.Assert.fail; +import java.util.List; @RunWith(Parameterized.class) -public class CsvReaderWithPOJOITCase extends MultipleProgramsTestBase { - private String resultPath; +public class CsvReaderITCase extends MultipleProgramsTestBase { private String expected; @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); - public CsvReaderWithPOJOITCase(TestExecutionMode mode) { + public CsvReaderITCase(TestExecutionMode mode) { super(mode); } - @Before - public void before() throws Exception { - resultPath = tempFolder.newFile("result").toURI().toString(); - } - - @After - public void after() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } - private String createInputData(String data) throws Exception { File file = tempFolder.newFile("input"); Files.write(data, file, Charsets.UTF_8); @@ -71,11 +66,10 @@ public void testPOJOType() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet data = env.readCsvFile(dataPath).pojoType(POJOItem.class, new String[]{"f1", "f3", "f2"}); - data.writeAsText(resultPath); - - env.execute(); + List result = data.collect(); expected = "ABC,3,2.20\nDEF,5,5.10\nDEF,1,3.30\nGHI,10,3.30"; + compareResultAsText(result, expected); } @Test @@ -85,28 +79,19 @@ public void testPOJOTypeWithFieldsOrder() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet data = env.readCsvFile(dataPath).pojoType(POJOItem.class, new String[]{"f3", "f1", "f2"}); - data.writeAsText(resultPath); - - env.execute(); + List result = data.collect(); expected = "ABC,3,2.20\nDEF,5,5.10\nDEF,1,3.30\nGHI,10,3.30"; + compareResultAsText(result, expected); } - @Test + @Test(expected = NullPointerException.class) public void testPOJOTypeWithoutFieldsOrder() throws Exception { final String inputData = ""; final String dataPath = createInputData(inputData); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - try { - env.readCsvFile(dataPath).pojoType(POJOItem.class, null); - fail("POJO type without fields order must raise NullPointerException!"); - } catch (NullPointerException e) { - // success - } - - expected = ""; - resultPath = dataPath; + env.readCsvFile(dataPath).pojoType(POJOItem.class, null); } @Test @@ -116,11 +101,24 @@ public void testPOJOTypeWithFieldsOrderAndFieldsSelection() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet data = env.readCsvFile(dataPath).includeFields(true, false, true).pojoType(POJOItem.class, new String[]{"f2", "f1"}); - data.writeAsText(resultPath); - - env.execute(); + List result = data.collect(); expected = "ABC,3,0.00\nDEF,5,0.00\nDEF,1,0.00\nGHI,10,0.00"; + compareResultAsText(result, expected); + } + + @Test + public void testValueTypes() throws Exception { + final String inputData = "ABC,true,1,2,3,4,5.0,6.0\nBCD,false,1,2,3,4,5.0,6.0"; + final String dataPath = createInputData(inputData); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet> data = + env.readCsvFile(dataPath).types(StringValue.class, BooleanValue.class, ByteValue.class, ShortValue.class, IntValue.class, LongValue.class, FloatValue.class, DoubleValue.class); + List> result = data.collect(); + + expected = inputData; + compareResultAsTuples(result, expected); } public static class POJOItem {