Skip to content

Commit

Permalink
[FLINK-3303] [core] Move all type utilities to flink-core
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Feb 2, 2016
1 parent 7081836 commit 21a7158
Show file tree
Hide file tree
Showing 168 changed files with 503 additions and 612 deletions.
47 changes: 41 additions & 6 deletions flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ under the License.
<artifactId>flink-annotations</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${shading-artifact.name}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>commons-collections</groupId>
Expand All @@ -56,20 +50,61 @@ under the License.
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<!-- managed version -->
</dependency>

<!-- Avro is needed for the interoperability with Avro types for serialization -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<!-- managed version -->
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Hadoop is only needed here for serialization interoperability with the Writable type -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${shading-artifact.name}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<!-- test depedencies -->

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.5</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.joda</groupId>
<artifactId>joda-convert</artifactId>
<version>1.7</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,20 @@
* limitations under the License.
*/

package org.apache.flink.api.java.operators;
package org.apache.flink.api.common.operators;

import java.util.ArrayList;
import java.util.List;

import com.google.common.base.Joiner;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -177,87 +167,6 @@ public <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInform
}
}

@SuppressWarnings("unchecked")
public static <T, K> Operator<Tuple2<K, T>> appendKeyExtractor(
Operator<T> input,
SelectorFunctionKeys<T, K> key)
{

TypeInformation<T> inputType = key.getInputType();
TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper(key.getKeyExtractor());

MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> mapper =
new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(
extractor,
new UnaryOperatorInformation(inputType, typeInfoWithKey),
"Key Extractor"
);

mapper.setInput(input);
mapper.setParallelism(input.getParallelism());

return mapper;
}

@SuppressWarnings("unchecked")
public static <T, K1, K2> Operator<Tuple3<K1, K2, T>> appendKeyExtractor(
Operator<T> input,
SelectorFunctionKeys<T, K1> key1,
SelectorFunctionKeys<T, K2> key2)
{

TypeInformation<T> inputType = key1.getInputType();
TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = createTypeWithKey(key1, key2);
TwoKeyExtractingMapper<T, K1, K2> extractor =
new TwoKeyExtractingMapper<>(key1.getKeyExtractor(), key2.getKeyExtractor());

MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>> mapper =
new MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>>(
extractor,
new UnaryOperatorInformation<>(inputType, typeInfoWithKey),
"Key Extractor"
);

mapper.setInput(input);
mapper.setParallelism(input.getParallelism());

return mapper;
}

public static <T, K> org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> appendKeyRemover(
Operator<Tuple2<K, T>> inputWithKey,
SelectorFunctionKeys<T, K> key)
{

TypeInformation<T> inputType = key.getInputType();
TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);

MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> mapper =
new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(
new KeyRemovingMapper<T, K>(),
new UnaryOperatorInformation<>(typeInfoWithKey, inputType),
"Key Remover"
);
mapper.setInput(inputWithKey);
mapper.setParallelism(inputWithKey.getParallelism());

return mapper;
}

public static <T, K> TypeInformation<Tuple2<K, T>> createTypeWithKey(
SelectorFunctionKeys<T, K> key)
{
return new TupleTypeInfo<>(key.getKeyType(), key.getInputType());
}

public static <T, K1, K2> TypeInformation<Tuple3<K1, K2, T>> createTypeWithKey(
SelectorFunctionKeys<T, K1> key1,
SelectorFunctionKeys<T, K2> key2)
{
return new TupleTypeInfo<>(key1.getKeyType(), key2.getKeyType(), key1.getInputType());
}

@Override
public String toString() {
return "Key function (Type: " + keyType + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
import org.apache.flink.types.Either;

/**
* A {@link TypeInformation} for the {@link Either} type of the Java API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@
import java.util.regex.Pattern;

import com.google.common.base.Preconditions;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;

import com.google.common.base.Joiner;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* This interface can be implemented by functions and input formats to tell the framework
* about their produced data type. This method acts as an alternative to the reflection analysis
* that is otherwise performed and is useful in situations where the produced data type may vary
* depending on parameterization.
* depending on parametrization.
*/
public interface ResultTypeQueryable<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import java.util.regex.Pattern;

import com.google.common.base.Preconditions;

import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;

public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {

Expand All @@ -35,7 +36,7 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
private final static String REGEX_FIELD = "(f?)([0-9]+)";
private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?";
private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS
+"|\\"+ExpressionKeys.SELECT_ALL_CHAR
+"|\\"+ ExpressionKeys.SELECT_ALL_CHAR
+"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;

private static final Pattern PATTERN_FIELD = Pattern.compile(REGEX_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import java.util.List;

import org.apache.avro.specific.SpecificRecordBase;

import org.apache.commons.lang3.ClassUtils;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
Expand All @@ -54,9 +56,12 @@
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.types.Either;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;

import org.apache.hadoop.io.Writable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

package org.apache.flink.api.java.typeutils.runtime;

import static org.apache.flink.api.java.typeutils.Either.Left;
import static org.apache.flink.api.java.typeutils.Either.Right;
import static org.apache.flink.types.Either.Left;
import static org.apache.flink.types.Either.Right;

import java.io.IOException;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.Either;
import org.apache.flink.types.Either;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.CollectionSerializer;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificRecordBase;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand Down Expand Up @@ -64,7 +65,7 @@ public static void recursivelyRegisterType(TypeInformation<?> typeInfo, Executio
}
else if (typeInfo instanceof CompositeType) {
List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<>();
Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite);
getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite);
for (GenericTypeInfo<?> gt : genericTypesInComposite) {
Serializers.recursivelyRegisterType(gt.getTypeClass(), config, alreadySeen);
}
Expand Down Expand Up @@ -127,41 +128,38 @@ else if (fieldType instanceof Class) {
}
}

private static void checkAndAddSerializerForTypeAvro(ExecutionConfig reg, Class<?> type) {
if (GenericData.Record.class.isAssignableFrom(type)) {
registerGenericAvro(reg);
}
if (SpecificRecordBase.class.isAssignableFrom(type)) {
@SuppressWarnings("unchecked")
Class<? extends SpecificRecordBase> specRecordClass = (Class<? extends SpecificRecordBase>) type;
registerSpecificAvro(reg, specRecordClass);
}
}

/**
* Register these serializers for using Avro's {@link GenericData.Record} and classes
* implementing {@link org.apache.avro.specific.SpecificRecordBase}
* Returns all GenericTypeInfos contained in a composite type.
*
* @param typeInfo {@link CompositeType}
*/
private static void registerGenericAvro(ExecutionConfig reg) {
// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
// because Kryo is not able to serialize them properly, we use this serializer for them
reg.registerTypeWithKryoSerializer(GenericData.Array.class, SpecificInstanceCollectionSerializerForArrayList.class);

// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
// Kryo is able to serialize everything in there, except for the Schema.
// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
private static void getContainedGenericTypes(CompositeType<?> typeInfo, List<GenericTypeInfo<?>> target) {
for (int i = 0; i < typeInfo.getArity(); i++) {
TypeInformation<?> type = typeInfo.getTypeAt(i);
if (type instanceof CompositeType) {
getContainedGenericTypes((CompositeType<?>) type, target);
} else if (type instanceof GenericTypeInfo) {
if (!target.contains(type)) {
target.add((GenericTypeInfo<?>) type);
}
}
}
}

// ------------------------------------------------------------------------

private static void checkAndAddSerializerForTypeAvro(ExecutionConfig reg, Class<?> type) {
if (GenericData.Record.class.isAssignableFrom(type) || SpecificRecordBase.class.isAssignableFrom(type)) {
// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
// because Kryo is not able to serialize them properly, we use this serializer for them
reg.registerTypeWithKryoSerializer(GenericData.Array.class, SpecificInstanceCollectionSerializerForArrayList.class);

private static void registerSpecificAvro(ExecutionConfig reg, Class<? extends SpecificRecordBase> avroType) {
registerGenericAvro(reg);
// This rule only applies if users explicitly use the GenericTypeInformation for the avro types
// usually, we are able to handle Avro POJOs with the POJO serializer.
// (However only if the GenericData.Array type is registered!)

// ClassTag<SpecificRecordBase> tag = scala.reflect.ClassTag$.MODULE$.apply(avroType);
// reg.registerTypeWithKryoSerializer(avroType, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag));
// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
// Kryo is able to serialize everything in there, except for the Schema.
// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
}
}

// --------------------------------------------------------------------------------------------
Expand Down

6 comments on commit 21a7158

@alexeyegorov
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has this commit been added to the current 1.0-SNAPSHOT? I become a problem while building my project and find that org.apache.flink.streaming.api.datastream.DataStream still imports import org.apache.flink.api.java.operators.Keys; while in this commit I see that it has been moved from package ...java.operators to package ...common.operators. I hope you can fix it asap.

@uce
Copy link
Contributor

@uce uce commented on 21a7158 Feb 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consult this page and use the new Maven artifacts (with Scala suffixes): https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version

Does it resolve the issue?

@alexeyegorov
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uce yes, I've already added _2.10 to my dependencies that are scala dependent... but I still get the build problem.

@StephanEwen
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current master, the DataStream class imports the correct o.a.f.api.common.operators.Keys.

https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L43

Is it possible that your Maven cache has mixed versions?

@alexeyegorov
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StephanEwen ok, I'm sorry. I was able to build everything only after removing .m2/repository and clearing all caches. Seems to have been my own problem. ;)

@StephanEwen
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, it's quite a common problem that maven caches get inconsistent.

Please sign in to comment.