Skip to content

Commit

Permalink
[FLINK-34123][core][type] Only support non-raw use of collection inte…
Browse files Browse the repository at this point in the history
…rface types and update UT & doc
  • Loading branch information
X-czh committed May 23, 2024
1 parent 63bf63f commit 3c9e2fe
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,17 @@ Flink supports all Java and Scala primitive types such as `Integer`, `String`, a

#### Common Collection Types

Flink supports common Java collection types (currently only `Map` and `List`). Note that Flink will not
preserve the underlying implementation types. For example, Flink will deserialize any map data into a
`HashMap` and any list data into an `ArrayList`. In order to avoid this, a custom serializer is required.
Flink comes with dedicated serialization support for common Java collection types, which is more efficient than going
through a general purpose serialization framework. Currently, only `Map`, `List` and its super interface `Collection`
are supported. To utilize it, you need to declare the collection type with:

1. Concrete type arguments: e.g. `List<String>` but not `List`, `List<T>`, or `List<?>`, as Flink needs them to dispatch
serialization of the element types.
2. Interface types: e.g. `List<String>` but not `LinkedList<String>`, as Flink does not preserve the underlying
implementation types across serialization.

Other nonqualified collection types will be handled by Flink as general class types. If the implementation types are
also required to be preserved, you also need to register it with a custom serializer.

#### General Class Types

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1971,19 +1972,30 @@ private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
return new EnumTypeInfo(clazz);
}

// check for parameterized Collections
// check for parameterized Collections, requirement:
// 1. Interface types: the underlying implementation types are not preserved across
// serialization
// 2. Concrete type arguments: Flink needs them to dispatch serialization of element types
// Example:
// - OK: List<String>, Collection<String>
// - not OK: LinkedList<String> (implementation type), List (raw type), List<T> (generic
// type argument), or List<?> (wildcard type argument)
if (parameterizedType != null) {
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
if (Map.class.isAssignableFrom(clazz)) {
Class<?> keyClass = (Class<?>) actualTypeArguments[0];
Class<?> valueClass = (Class<?>) actualTypeArguments[1];
TypeInformation<?> keyTypeInfo = createTypeInfo(keyClass);
TypeInformation<?> valueTypeInfo = createTypeInfo(valueClass);
return (TypeInformation<OUT>) Types.MAP(keyTypeInfo, valueTypeInfo);
} else if (List.class.isAssignableFrom(clazz)) {
Class<?> elementClass = (Class<?>) actualTypeArguments[0];
TypeInformation<?> elementTypeInfo = createTypeInfo(elementClass);
return (TypeInformation<OUT>) Types.LIST(elementTypeInfo);
boolean allTypeArgumentsConcrete =
Arrays.stream(actualTypeArguments).allMatch(arg -> arg instanceof Class<?>);
if (allTypeArgumentsConcrete) {
if (clazz.isAssignableFrom(Map.class)) {
Class<?> keyClass = (Class<?>) actualTypeArguments[0];
Class<?> valueClass = (Class<?>) actualTypeArguments[1];
TypeInformation<?> keyTypeInfo = createTypeInfo(keyClass);
TypeInformation<?> valueTypeInfo = createTypeInfo(valueClass);
return (TypeInformation<OUT>) Types.MAP(keyTypeInfo, valueTypeInfo);
} else if (clazz.isAssignableFrom(List.class)) {
Class<?> elementClass = (Class<?>) actualTypeArguments[0];
TypeInformation<?> elementTypeInfo = createTypeInfo(elementClass);
return (TypeInformation<OUT>) Types.LIST(elementTypeInfo);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
fail("already seen");
}
collectionSeen = true;
assertThat(field.getTypeInformation()).isEqualTo(new GenericTypeInfo(List.class));
assertThat(field.getTypeInformation()).isEqualTo(new ListTypeInfo<>(String.class));

} else {
fail("Unexpected field " + field);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -2317,23 +2320,23 @@ void testEitherFromObjectException() {
@SuppressWarnings({"unchecked", "rawtypes"})
@Test
void testGenericTypeWithSubclassInput() {
Map<String, Object> inputMap = new HashMap<>();
HashMap<String, Object> inputMap = new LinkedHashMap<>();
inputMap.put("a", "b");
TypeInformation<?> inputType = TypeExtractor.createTypeInfo(inputMap.getClass());

MapFunction<?, ?> function =
new MapFunction<Map<String, Object>, Map<String, Object>>() {
new MapFunction<HashMap<String, Object>, HashMap<String, Object>>() {

@Override
public Map<String, Object> map(Map<String, Object> stringObjectMap)
public HashMap<String, Object> map(HashMap<String, Object> stringObjectMap)
throws Exception {
return stringObjectMap;
}
};

TypeInformation<?> ti =
TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType);
TypeInformation<?> expected = TypeExtractor.createTypeInfo(Map.class);
TypeInformation<?> expected = TypeExtractor.createTypeInfo(HashMap.class);
assertThat(ti).isEqualTo(expected);
}

Expand All @@ -2345,7 +2348,9 @@ void testGenericTypeWithSuperclassInput() {
TypeInformation<?> inputType = TypeExtractor.createTypeInfo(Map.class);

MapFunction<?, ?> function =
(MapFunction<HashMap<String, Object>, Map<String, Object>>)
(MapFunction<
LinkedHashMap<String, Object>,
Map<String, Object>>)
stringObjectMap -> stringObjectMap;
TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType);
})
Expand Down Expand Up @@ -2507,58 +2512,80 @@ public Tuple3<Date, Time, Timestamp> map(Tuple3<Date, Time, Timestamp> value)
.isEqualTo(SqlTimeTypeInfo.TIMESTAMP);
}

public static class PojoWithCollections {
@SuppressWarnings({"rawtypes"})
public static class PojoWithCollections<T> {
// Supported collection types with concrete type arguments, expected built-in serialization
// support
public Map<String, Integer> mapVal = new HashMap<>();
public List<String> listVal = new ArrayList<>();
public Collection<String> collectionVal = new ArrayList<>();

// Collection fields with unsupported collection types, treated as generic types
public LinkedList<String> linkedListVal = new LinkedList<>();

// Collection fields with raw type, treated as generic types
public List rawListVal = new ArrayList<>();

// Collection fields with generic type arguments, treated as generic types
public List<T> genericListVal = new ArrayList<>();

// Collection fields with wildcard type arguments, treated as generic types
public List<?> wildcardListVal = new ArrayList<>();
}

@SuppressWarnings({"unchecked", "rawtypes"})
@Test
public void testCollectionTypes() {
public <T> void testCollectionTypes() {
MapFunction<?, ?> function =
new MapFunction<PojoWithCollections, PojoWithCollections>() {
new MapFunction<PojoWithCollections<T>, PojoWithCollections<T>>() {
@Override
public PojoWithCollections map(PojoWithCollections value) {
public PojoWithCollections map(PojoWithCollections<T> value) {
return null;
}
};
TypeInformation<?> ti =
TypeExtractor.getMapReturnTypes(
function,
(TypeInformation)
TypeInformation.of(new TypeHint<PojoWithCollections>() {}));
Assert.assertTrue(ti instanceof PojoTypeInfo);
PojoTypeInfo<?> pojoTi = (PojoTypeInfo<?>) ti;
Assert.assertTrue(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("mapVal")).getTypeInformation()
instanceof MapTypeInfo);
Assert.assertTrue(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("listVal")).getTypeInformation()
instanceof ListTypeInfo);
TypeInformation.of(new TypeHint<PojoWithCollections<T>>() {}));
assertThat(ti).isInstanceOf(PojoTypeInfo.class);
testCollectionTypesInternal(ti);

// use getForClass()
TypeInformation<?> ti2 = TypeExtractor.getForClass(PojoWithCollections.class);
Assert.assertTrue(ti2 instanceof PojoTypeInfo);
PojoTypeInfo<?> pojoTi2 =
(PojoTypeInfo<?>) TypeExtractor.getForClass(PojoWithCollections.class);
Assert.assertTrue(
pojoTi2.getPojoFieldAt(pojoTi2.getFieldIndex("mapVal")).getTypeInformation()
instanceof MapTypeInfo);
Assert.assertTrue(
pojoTi2.getPojoFieldAt(pojoTi2.getFieldIndex("listVal")).getTypeInformation()
instanceof ListTypeInfo);
assertThat(ti2).isInstanceOf(PojoTypeInfo.class);
testCollectionTypesInternal(ti2);

// use getForObject()
PojoWithCollections t = new PojoWithCollections();
PojoWithCollections<T> t = new PojoWithCollections<>();
TypeInformation<?> ti3 = TypeExtractor.getForObject(t);
Assert.assertTrue(ti3 instanceof PojoTypeInfo);
PojoTypeInfo<?> pojoTi3 =
(PojoTypeInfo<?>) TypeExtractor.getForClass(PojoWithCollections.class);
Assert.assertTrue(
pojoTi3.getPojoFieldAt(pojoTi3.getFieldIndex("mapVal")).getTypeInformation()
instanceof MapTypeInfo);
Assert.assertTrue(
pojoTi3.getPojoFieldAt(pojoTi3.getFieldIndex("listVal")).getTypeInformation()
instanceof ListTypeInfo);
assertThat(ti3).isInstanceOf(PojoTypeInfo.class);
testCollectionTypesInternal(ti3);
}

private void testCollectionTypesInternal(TypeInformation<?> ti) {
PojoTypeInfo<?> pojoTi = (PojoTypeInfo<?>) ti;
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("mapVal")).getTypeInformation())
.isInstanceOf(MapTypeInfo.class);
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("listVal")).getTypeInformation())
.isInstanceOf(ListTypeInfo.class);
assertThat(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("collectionVal"))
.getTypeInformation())
.isInstanceOf(ListTypeInfo.class);
assertThat(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("linkedListVal"))
.getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("rawListVal")).getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("genericListVal"))
.getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("wildcardListVal"))
.getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ public static class Pojo {
}

public static class PojoRequiringKryo {
public List<Integer> x;
public List<?> x;
}
}

0 comments on commit 3c9e2fe

Please sign in to comment.