Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34123][core][type] Introduce built-in serialization support for map and lists #24634

Merged
merged 2 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ Flink places some restrictions on the type of elements that can be in a DataStre
The reason for this is that the system analyzes the types to determine
efficient execution strategies.

There are seven different categories of data types:
There are eight different categories of data types:

1. **Java Tuples** and **Scala Case Classes**
2. **Java POJOs**
3. **Primitive Types**
4. **Regular Classes**
5. **Values**
6. **Hadoop Writables**
7. **Special Types**
4. **Common Collection Types**
X-czh marked this conversation as resolved.
Show resolved Hide resolved
5. **Regular Classes**
6. **Values**
7. **Hadoop Writables**
8. **Special Types**

#### Tuples and Case Classes

Expand Down Expand Up @@ -167,6 +168,20 @@ input.keyBy(_.word)

Flink supports all Java and Scala primitive types such as `Integer`, `String`, and `Double`.

#### Common Collection Types

Flink comes with dedicated serialization support for common Java collection types, which is more efficient than going
through a general purpose serialization framework. Currently, only `Map`, `List` and its super interface `Collection`
are supported. To utilize it, you need to declare the collection type with:

1. Concrete type arguments: e.g. `List<String>` but not `List`, `List<T>`, or `List<?>`, as Flink needs them to dispatch
serialization of the element types.
2. Interface types: e.g. `List<String>` but not `LinkedList<String>`, as Flink does not preserve the underlying
implementation types across serialization.

Other nonqualified collection types will be handled by Flink as general class types. If the implementation types are
also required to be preserved, you also need to register it with a custom serializer.

#### General Class Types

Flink supports most Java and Scala classes (API and custom).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.api.common.typeinfo.TypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
Expand All @@ -66,6 +67,7 @@
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1970,6 +1972,33 @@ private <OUT, IN1, IN2> TypeInformation<OUT> privateGetForClass(
return new EnumTypeInfo(clazz);
}

// check for parameterized Collections, requirement:
// 1. Interface types: the underlying implementation types are not preserved across
// serialization
// 2. Concrete type arguments: Flink needs them to dispatch serialization of element types
// Example:
// - OK: List<String>, Collection<String>
// - not OK: LinkedList<String> (implementation type), List (raw type), List<T> (generic
// type argument), or List<?> (wildcard type argument)
if (parameterizedType != null) {
Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
boolean allTypeArgumentsConcrete =
Arrays.stream(actualTypeArguments).allMatch(arg -> arg instanceof Class<?>);
if (allTypeArgumentsConcrete) {
if (clazz.isAssignableFrom(Map.class)) {
Class<?> keyClass = (Class<?>) actualTypeArguments[0];
Class<?> valueClass = (Class<?>) actualTypeArguments[1];
TypeInformation<?> keyTypeInfo = createTypeInfo(keyClass);
TypeInformation<?> valueTypeInfo = createTypeInfo(valueClass);
return (TypeInformation<OUT>) Types.MAP(keyTypeInfo, valueTypeInfo);
} else if (clazz.isAssignableFrom(List.class)) {
Class<?> elementClass = (Class<?>) actualTypeArguments[0];
TypeInformation<?> elementTypeInfo = createTypeInfo(elementClass);
return (TypeInformation<OUT>) Types.LIST(elementTypeInfo);
}
}
}

// special case for POJOs generated by Avro.
if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
fail("already seen");
}
collectionSeen = true;
assertThat(field.getTypeInformation()).isEqualTo(new GenericTypeInfo(List.class));
assertThat(field.getTypeInformation()).isEqualTo(new ListTypeInfo<>(String.class));

} else {
fail("Unexpected field " + field);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -2317,23 +2320,23 @@ void testEitherFromObjectException() {
@SuppressWarnings({"unchecked", "rawtypes"})
@Test
void testGenericTypeWithSubclassInput() {
Map<String, Object> inputMap = new HashMap<>();
HashMap<String, Object> inputMap = new LinkedHashMap<>();
inputMap.put("a", "b");
TypeInformation<?> inputType = TypeExtractor.createTypeInfo(inputMap.getClass());

MapFunction<?, ?> function =
new MapFunction<Map<String, Object>, Map<String, Object>>() {
new MapFunction<HashMap<String, Object>, HashMap<String, Object>>() {

@Override
public Map<String, Object> map(Map<String, Object> stringObjectMap)
public HashMap<String, Object> map(HashMap<String, Object> stringObjectMap)
throws Exception {
return stringObjectMap;
}
};

TypeInformation<?> ti =
TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType);
TypeInformation<?> expected = TypeExtractor.createTypeInfo(Map.class);
TypeInformation<?> expected = TypeExtractor.createTypeInfo(HashMap.class);
assertThat(ti).isEqualTo(expected);
}

Expand All @@ -2345,7 +2348,9 @@ void testGenericTypeWithSuperclassInput() {
TypeInformation<?> inputType = TypeExtractor.createTypeInfo(Map.class);

MapFunction<?, ?> function =
(MapFunction<HashMap<String, Object>, Map<String, Object>>)
(MapFunction<
LinkedHashMap<String, Object>,
Map<String, Object>>)
stringObjectMap -> stringObjectMap;
TypeExtractor.getMapReturnTypes(function, (TypeInformation) inputType);
})
Expand Down Expand Up @@ -2506,4 +2511,81 @@ public Tuple3<Date, Time, Timestamp> map(Tuple3<Date, Time, Timestamp> value)
assertThat(TypeExtractor.getForObject(Timestamp.valueOf("1998-12-12 12:37:45")))
.isEqualTo(SqlTimeTypeInfo.TIMESTAMP);
}

@SuppressWarnings({"rawtypes"})
public static class PojoWithCollections<T> {
// Supported collection types with concrete type arguments, expected built-in serialization
// support
public Map<String, Integer> mapVal = new HashMap<>();
public List<String> listVal = new ArrayList<>();
public Collection<String> collectionVal = new ArrayList<>();

// Collection fields with unsupported collection types, treated as generic types
public LinkedList<String> linkedListVal = new LinkedList<>();

// Collection fields with raw type, treated as generic types
public List rawListVal = new ArrayList<>();

// Collection fields with generic type arguments, treated as generic types
public List<T> genericListVal = new ArrayList<>();

// Collection fields with wildcard type arguments, treated as generic types
public List<?> wildcardListVal = new ArrayList<>();
}

@SuppressWarnings({"unchecked", "rawtypes"})
@Test
public <T> void testCollectionTypes() {
MapFunction<?, ?> function =
new MapFunction<PojoWithCollections<T>, PojoWithCollections<T>>() {
@Override
public PojoWithCollections map(PojoWithCollections<T> value) {
return null;
}
};
TypeInformation<?> ti =
TypeExtractor.getMapReturnTypes(
function,
(TypeInformation)
TypeInformation.of(new TypeHint<PojoWithCollections<T>>() {}));
assertThat(ti).isInstanceOf(PojoTypeInfo.class);
testCollectionTypesInternal(ti);

// use getForClass()
TypeInformation<?> ti2 = TypeExtractor.getForClass(PojoWithCollections.class);
assertThat(ti2).isInstanceOf(PojoTypeInfo.class);
testCollectionTypesInternal(ti2);

// use getForObject()
PojoWithCollections<T> t = new PojoWithCollections<>();
TypeInformation<?> ti3 = TypeExtractor.getForObject(t);
assertThat(ti3).isInstanceOf(PojoTypeInfo.class);
testCollectionTypesInternal(ti3);
}

private void testCollectionTypesInternal(TypeInformation<?> ti) {
PojoTypeInfo<?> pojoTi = (PojoTypeInfo<?>) ti;
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("mapVal")).getTypeInformation())
.isInstanceOf(MapTypeInfo.class);
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("listVal")).getTypeInformation())
.isInstanceOf(ListTypeInfo.class);
assertThat(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("collectionVal"))
.getTypeInformation())
.isInstanceOf(ListTypeInfo.class);
assertThat(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("linkedListVal"))
.getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("rawListVal")).getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("genericListVal"))
.getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
assertThat(
pojoTi.getPojoFieldAt(pojoTi.getFieldIndex("wildcardListVal"))
.getTypeInformation())
.isInstanceOf(GenericTypeInfo.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ public static class Pojo {
}

public static class PojoRequiringKryo {
public List<Integer> x;
public List<?> x;
}
}