From eb99dc3402e1f4885c1a8a356af55c1669346d0d Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Thu, 17 May 2018 17:35:55 +0200 Subject: [PATCH] [FLINK-6909] [types] Fix error message in CsvReader for wrong type class --- .../org/apache/flink/api/java/io/CsvReader.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index df90d54be3fc8..6c4d4b708ea56 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.operators.DataSource; @@ -316,14 +317,18 @@ public DataSource pojoType(Class pojoType, String... pojoFields) { Preconditions.checkNotNull(pojoType, "The POJO type class must not be null."); Preconditions.checkNotNull(pojoFields, "POJO fields must be specified (not null) if output type is a POJO."); - @SuppressWarnings("unchecked") - PojoTypeInfo typeInfo = (PojoTypeInfo) TypeExtractor.createTypeInfo(pojoType); + final TypeInformation ti = TypeExtractor.createTypeInfo(pojoType); + if (!(ti instanceof PojoTypeInfo)) { + throw new IllegalArgumentException( + "The specified class is not a POJO. The type class must meet the POJO requirements. Found: " + ti); + } + final PojoTypeInfo pti = (PojoTypeInfo) ti; - CsvInputFormat inputFormat = new PojoCsvInputFormat(path, this.lineDelimiter, this.fieldDelimiter, typeInfo, pojoFields, this.includedMask); + CsvInputFormat inputFormat = new PojoCsvInputFormat(path, this.lineDelimiter, this.fieldDelimiter, pti, pojoFields, this.includedMask); configureInputFormat(inputFormat); - return new DataSource(executionContext, inputFormat, typeInfo, Utils.getCallLocationName()); + return new DataSource(executionContext, inputFormat, pti, Utils.getCallLocationName()); } /**