From 7aa9992e681761e255f3a24492f202354e77ab2e Mon Sep 17 00:00:00 2001 From: tonycox Date: Fri, 16 Dec 2016 20:55:40 +0400 Subject: [PATCH 1/3] [FLINK-5358] add RowTypeInfo exctraction in TypeExtractor --- .../flink/api/java/typeutils/TypeExtractor.java | 17 +++++++++++++++++ .../api/java/typeutils/TypeExtractorTest.java | 17 ++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 08f8c5317142c..72cd567555504 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -72,6 +72,7 @@ import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars; import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass; import org.apache.flink.types.Either; +import org.apache.flink.types.Row; import org.apache.flink.types.Value; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; @@ -1968,6 +1969,22 @@ else if (value instanceof Either) { + "Please specify the types directly."); } } + else if (value instanceof Row) { + Row row = (Row) value; + int arity = row.getArity(); + for (int i = 0; i < arity; i++) { + if (row.getField(i) == null) { + LOG.warn("cannot implicitly fully define RowTypeInfo, because of Row field[" + i + "] is null. " + + "Should define RowTypeInfo explicitly"); + return privateGetForClass((Class) value.getClass(), new ArrayList()); + } + } + TypeInformation[] typeArray = new TypeInformation[arity]; + for (int i = 0; i < arity; i++) { + typeArray[i] = TypeExtractor.getForObject(row.getField(i)); + } + return (TypeInformation) new RowTypeInfo(typeArray); + } else { return privateGetForClass((Class) value.getClass(), new ArrayList()); } diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java index 55cd42debfb66..fa069ccb03f66 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java @@ -61,6 +61,7 @@ import org.apache.flink.types.IntValue; import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; +import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.junit.Assert; @@ -345,8 +346,22 @@ public CustomType cross(CustomType first, Integer second) throws Exception { Assert.assertFalse(TypeExtractor.getForClass(PojoWithNonPublicDefaultCtor.class) instanceof PojoTypeInfo); } - + @Test + public void testRow() { + Row row = new Row(2); + row.setField(0, "string"); + row.setField(1, 15); + TypeInformation rowInfo = TypeExtractor.getForObject(row); + Assert.assertEquals(rowInfo.getClass(), RowTypeInfo.class); + Assert.assertEquals(2, rowInfo.getArity()); + + Row nullRow = new Row(2); + TypeInformation genericRowInfo = TypeExtractor.getForObject(nullRow); + Assert.assertEquals(genericRowInfo, new GenericTypeInfo<>(Row.class)); + int arity = genericRowInfo.getArity(); + genericRowInfo.hashCode(); + } public static class CustomType { public String myField1; From 2dafa6bb92d36f683f7c0872058799fa7b731c5e Mon Sep 17 00:00:00 2001 From: tonycox Date: Tue, 20 Dec 2016 09:14:31 +0400 Subject: [PATCH 2/3] test nested type info --- .../apache/flink/api/java/typeutils/TypeExtractorTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java index fa069ccb03f66..ff936fe80b728 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java @@ -359,8 +359,11 @@ public void testRow() { Row nullRow = new Row(2); TypeInformation genericRowInfo = TypeExtractor.getForObject(nullRow); Assert.assertEquals(genericRowInfo, new GenericTypeInfo<>(Row.class)); - int arity = genericRowInfo.getArity(); - genericRowInfo.hashCode(); + Assert.assertEquals( + new RowTypeInfo( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO), + rowInfo); } public static class CustomType { From adf41ccce11d3494b5b5a48bf353ac85865e151c Mon Sep 17 00:00:00 2001 From: tonycox Date: Mon, 26 Dec 2016 11:43:21 +0400 Subject: [PATCH 3/3] move assert in testRow method --- .../flink/api/java/typeutils/TypeExtractorTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java index ff936fe80b728..804cf88ae47c5 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java @@ -355,15 +355,15 @@ public void testRow() { TypeInformation rowInfo = TypeExtractor.getForObject(row); Assert.assertEquals(rowInfo.getClass(), RowTypeInfo.class); Assert.assertEquals(2, rowInfo.getArity()); - - Row nullRow = new Row(2); - TypeInformation genericRowInfo = TypeExtractor.getForObject(nullRow); - Assert.assertEquals(genericRowInfo, new GenericTypeInfo<>(Row.class)); Assert.assertEquals( new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), rowInfo); + + Row nullRow = new Row(2); + TypeInformation genericRowInfo = TypeExtractor.getForObject(nullRow); + Assert.assertEquals(genericRowInfo, new GenericTypeInfo<>(Row.class)); } public static class CustomType {