Skip to content

Commit

Permalink
[FLINK-1539] [streaming] Remove calls to uninitalized runtimecontexts
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora authored and mbalassi committed Feb 16, 2015
1 parent bb5dc7e commit 4470207
Show file tree
Hide file tree
Showing 16 changed files with 92 additions and 46 deletions.
Expand Up @@ -636,7 +636,8 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> sum(String field) {
return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType()));
return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType(),
getExecutionConfig()));
}

/**
Expand Down Expand Up @@ -667,7 +668,7 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
*/
public SingleOutputStreamOperator<OUT, ?> min(String field) {
return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN,
false));
false, getExecutionConfig()));
}

/**
Expand Down Expand Up @@ -698,7 +699,7 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
*/
public SingleOutputStreamOperator<OUT, ?> max(String field) {
return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX,
false));
false, getExecutionConfig()));
}

/**
Expand All @@ -718,7 +719,7 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
*/
public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
return aggregate(ComparableAggregator.getAggregator(field, getType(),
AggregationType.MINBY, first));
AggregationType.MINBY, first, getExecutionConfig()));
}

/**
Expand All @@ -738,7 +739,7 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
return aggregate(ComparableAggregator.getAggregator(field, getType(),
AggregationType.MAXBY, first));
AggregationType.MAXBY, first, getExecutionConfig()));
}

/**
Expand Down
Expand Up @@ -381,7 +381,8 @@ public WindowedDataStream<OUT> sum(int positionToSum) {
* @return The transformed DataStream.
*/
public WindowedDataStream<OUT> sum(String field) {
return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType()));
return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType(),
getExecutionConfig()));
}

/**
Expand Down Expand Up @@ -411,7 +412,7 @@ public WindowedDataStream<OUT> min(int positionToMin) {
*/
public WindowedDataStream<OUT> min(String field) {
return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN,
false));
false, getExecutionConfig()));
}

/**
Expand Down Expand Up @@ -475,7 +476,7 @@ public WindowedDataStream<OUT> minBy(int positionToMinBy, boolean first) {
*/
public WindowedDataStream<OUT> minBy(String field, boolean first) {
return aggregate(ComparableAggregator.getAggregator(field, getType(),
AggregationType.MINBY, first));
AggregationType.MINBY, first, getExecutionConfig()));
}

/**
Expand Down Expand Up @@ -505,7 +506,7 @@ public WindowedDataStream<OUT> max(int positionToMax) {
*/
public WindowedDataStream<OUT> max(String field) {
return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX,
false));
false, getExecutionConfig()));
}

/**
Expand Down Expand Up @@ -569,11 +570,10 @@ public WindowedDataStream<OUT> maxBy(int positionToMaxBy, boolean first) {
*/
public WindowedDataStream<OUT> maxBy(String field, boolean first) {
return aggregate(ComparableAggregator.getAggregator(field, getType(),
AggregationType.MAXBY, first));
AggregationType.MAXBY, first, getExecutionConfig()));
}

private WindowedDataStream<OUT> aggregate(AggregationFunction<OUT> aggregator) {

return reduceWindow(aggregator);
}

Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.lang.reflect.Field;
import java.util.List;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -65,9 +66,10 @@ public static <R> AggregationFunction<R> getAggregator(int positionToAggregate,
}

public static <R> AggregationFunction<R> getAggregator(String field,
TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first) {
TypeInformation<R> typeInfo, AggregationType aggregationType, boolean first,
ExecutionConfig config) {

return new PojoComparableAggregator<R>(field, typeInfo, aggregationType, first);
return new PojoComparableAggregator<R>(field, typeInfo, aggregationType, first, config);
}

private static class TupleComparableAggregator<T> extends ComparableAggregator<T> {
Expand Down Expand Up @@ -177,7 +179,7 @@ private static class PojoComparableAggregator<T> extends ComparableAggregator<T>
PojoComparator<T> pojoComparator;

public PojoComparableAggregator(String field, TypeInformation<?> typeInfo,
AggregationType aggregationType, boolean first) {
AggregationType aggregationType, boolean first, ExecutionConfig config) {
super(0, aggregationType, first);
if (!(typeInfo instanceof CompositeType<?>)) {
throw new IllegalArgumentException(
Expand All @@ -193,7 +195,7 @@ public PojoComparableAggregator(String field, TypeInformation<?> typeInfo,

if (cType instanceof PojoTypeInfo) {
pojoComparator = (PojoComparator<T>) cType.createComparator(
new int[] { logicalKeyPosition }, new boolean[] { false }, 0, getRuntimeContext().getExecutionConfig());
new int[] { logicalKeyPosition }, new boolean[] { false }, 0, config);
} else {
throw new IllegalArgumentException(
"Key expressions are only supported on POJO types. "
Expand Down Expand Up @@ -225,8 +227,8 @@ public T reduce(T value1, T value2) throws Exception {
} else {
if (c == 1) {
keyFields[0].set(value2, field1);
}
}

return value2;
}
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.lang.reflect.Field;
import java.util.List;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
Expand All @@ -47,9 +48,10 @@ public static <T> ReduceFunction<T> getSumFunction(int pos, Class<?> clazz,

}

public static <T> ReduceFunction<T> getSumFunction(String field, TypeInformation<T> typeInfo) {
public static <T> ReduceFunction<T> getSumFunction(String field, TypeInformation<T> typeInfo,
ExecutionConfig config) {

return new PojoSumAggregator<T>(field, typeInfo);
return new PojoSumAggregator<T>(field, typeInfo, config);
}

private static class TupleSumAggregator<T> extends AggregationFunction<T> {
Expand Down Expand Up @@ -126,7 +128,7 @@ private static class PojoSumAggregator<T> extends AggregationFunction<T> {
SumFunction adder;
PojoComparator<T> comparator;

public PojoSumAggregator(String field, TypeInformation<?> type) {
public PojoSumAggregator(String field, TypeInformation<?> type, ExecutionConfig config) {
super(0);
if (!(type instanceof CompositeType<?>)) {
throw new IllegalArgumentException(
Expand All @@ -146,7 +148,7 @@ public PojoSumAggregator(String field, TypeInformation<?> type) {

if (cType instanceof PojoTypeInfo) {
comparator = (PojoComparator<T>) cType.createComparator(
new int[] { logicalKeyPosition }, new boolean[] { false }, 0, getRuntimeContext().getExecutionConfig());
new int[] { logicalKeyPosition }, new boolean[] { false }, 0, config);
} else {
throw new IllegalArgumentException(
"Key expressions are only supported on POJO types. "
Expand Down
Expand Up @@ -23,8 +23,6 @@
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
Expand All @@ -38,17 +36,11 @@ public class FileSourceFunction extends RichSourceFunction<String> {

private InputFormat<String, ?> inputFormat;

private TypeSerializerFactory<String> serializerFactory;
private TypeInformation<String> typeInfo;

public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
this.inputFormat = format;
this.serializerFactory = createSerializer(typeInfo);
}

private TypeSerializerFactory<String> createSerializer(TypeInformation<String> typeInfo) {
TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());

return new RuntimeSerializerFactory<String>(serializer, typeInfo.getTypeClass());
this.typeInfo = typeInfo;
}

@Override
Expand All @@ -60,7 +52,8 @@ public void open(Configuration parameters) throws Exception {

@Override
public void invoke(Collector<String> collector) throws Exception {
final TypeSerializer<String> serializer = serializerFactory.getSerializer();
final TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext()
.getExecutionConfig());
final Iterator<InputSplit> splitIterator = getInputSplits();
@SuppressWarnings("unchecked")
final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat;
Expand Down
Expand Up @@ -73,9 +73,8 @@ public StreamInvokable(Function userFunction) {
*
* @param taskContext
* StreamTaskContext representing the vertex
* @param executionConfig
*/
public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig executionConfig) {
public void setup(StreamTaskContext<OUT> taskContext) {
this.collector = taskContext.getOutputCollector();
this.recordIterator = taskContext.getIndexedInput(0);
this.inSerializer = taskContext.getInputSerializer(0);
Expand All @@ -84,7 +83,7 @@ public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig executionC
this.objectSerializer = inSerializer.getObjectSerializer();
}
this.taskContext = taskContext;
this.executionConfig = executionConfig;
this.executionConfig = taskContext.getExecutionConfig();
}

/**
Expand Down
Expand Up @@ -28,14 +28,15 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,

transient OUT outTuple;
TypeSerializer<OUT> outTypeSerializer;
TypeInformation<OUT> outTypeInformation;
int[] fields;
int numFields;

public ProjectInvokable(int[] fields, TypeInformation<OUT> outTypeInformation) {
super(null);
this.fields = fields;
this.numFields = this.fields.length;
this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
this.outTypeInformation = outTypeInformation;
}

@Override
Expand All @@ -56,6 +57,7 @@ protected void callUserFunction() throws Exception {
@Override
public void open(Configuration config) throws Exception {
super.open(config);
this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
outTuple = outTypeSerializer.createInstance();
}
}
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.streaming.api.invokable.operator.co;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
Expand Down Expand Up @@ -47,7 +46,7 @@ public CoInvokable(Function userFunction) {
protected TypeSerializer<IN2> serializer2;

@Override
public void setup(StreamTaskContext<OUT> taskContext, ExecutionConfig executionConfig) {
public void setup(StreamTaskContext<OUT> taskContext) {
this.collector = taskContext.getOutputCollector();

this.recordIterator = taskContext.getCoReader();
Expand Down
Expand Up @@ -69,7 +69,7 @@ public void setInputsOutputs() {
@Override
protected void setInvokable() {
userInvokable = configuration.getUserInvokable(userClassLoader);
userInvokable.setup(this, getExecutionConfig());
userInvokable.setup(this);
}

protected void setConfigInputs() throws StreamVertexException {
Expand Down
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.streamvertex;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.CoReaderIterator;
Expand All @@ -32,12 +33,14 @@ public interface StreamTaskContext<OUT> {
ClassLoader getUserCodeClassLoader();

<X> MutableObjectIterator<X> getInput(int index);

<X> IndexedReaderIterator<X> getIndexedInput(int index);

<X> StreamRecordSerializer<X> getInputSerializer(int index);

Collector<OUT> getOutputCollector();

<X, Y> CoReaderIterator<X, Y> getCoReader();

ExecutionConfig getExecutionConfig();
}
Expand Up @@ -98,7 +98,7 @@ public void setInputsOutputs() {

protected void setInvokable() {
userInvokable = configuration.getUserInvokable(userClassLoader);
userInvokable.setup(this, getExecutionConfig());
userInvokable.setup(this);
}

public String getName() {
Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.util.Collector;

public interface WindowBuffer<T> extends Serializable {
public interface WindowBuffer<T> extends Serializable, Cloneable {

public void store(T element) throws Exception;

Expand All @@ -31,7 +31,7 @@ public interface WindowBuffer<T> extends Serializable {
public boolean emitWindow(Collector<StreamWindow<T>> collector);

public int size();

public WindowBuffer<T> clone();

}
Expand Up @@ -156,7 +156,7 @@ public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> getIterator() {
public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
List<IN1> input1, List<IN2> input2) {
MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2);
invokable.setup(mockContext, new ExecutionConfig());
invokable.setup(mockContext);

try {
invokable.open(null);
Expand Down Expand Up @@ -222,4 +222,9 @@ public <X> IndexedReaderIterator<X> getIndexedInput(int index) {
"Indexed iterator is currently unsupported for connected streams.");
}

@Override
public ExecutionConfig getExecutionConfig() {
return new ExecutionConfig();
}

}
Expand Up @@ -112,7 +112,7 @@ public MutableObjectIterator<StreamRecord<IN>> getIterator() {
public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable,
List<IN> inputs) {
MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
invokable.setup(mockContext, new ExecutionConfig());
invokable.setup(mockContext);
try {
invokable.open(null);
invokable.invoke();
Expand Down Expand Up @@ -170,4 +170,9 @@ public <X> IndexedReaderIterator<X> getIndexedInput(int index) {
return (IndexedReaderIterator<X>) iterator;
}

@Override
public ExecutionConfig getExecutionConfig() {
return new ExecutionConfig();
}

}
Expand Up @@ -16,4 +16,12 @@
# limitations under the License.
################################################################################

log4j.rootLogger=OFF
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=OFF, A1

# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender

# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

0 comments on commit 4470207

Please sign in to comment.