Skip to content

Commit

Permalink
[hotfix] Fix warnings introduced by recent Watermark Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Jul 24, 2015
1 parent efca79c commit 31c0c29
Show file tree
Hide file tree
Showing 30 changed files with 83 additions and 93 deletions.
Expand Up @@ -33,7 +33,7 @@ public BroadcastOutputSelectorWrapper() {
outputs = new ArrayList<Collector<StreamRecord<OUT>>>();
}

@SuppressWarnings("unchecked")
@SuppressWarnings("unchecked,rawtypes")
@Override
public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
Collector output1 = output;
Expand Down
Expand Up @@ -47,7 +47,7 @@ public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> outputSelectors)
this.outputMap = new HashMap<String, List<Collector<StreamRecord<OUT>>>>();
}

@SuppressWarnings("unchecked")
@SuppressWarnings("unchecked,rawtypes")
@Override
public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
Collector output1 = output;
Expand Down
Expand Up @@ -28,7 +28,7 @@ public class StreamFlatMap<IN, OUT>

private static final long serialVersionUID = 1L;

private TimestampedCollector<OUT> collector;
private transient TimestampedCollector<OUT> collector;

public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
super(flatMapper);
Expand All @@ -38,7 +38,7 @@ public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
collector = new TimestampedCollector(output);
collector = new TimestampedCollector<OUT>(output);
}

@Override
Expand Down
Expand Up @@ -132,7 +132,7 @@ protected synchronized void triggerOnFakeElement(Object input) {
* if not empty
*/
protected void emitWindow() {
output.collect(new StreamRecord(windowEvent.setTrigger()));
output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setTrigger()));
}

private void activeEvict(Object input) {
Expand All @@ -144,7 +144,7 @@ private void activeEvict(Object input) {
}

if (numToEvict > 0) {
output.collect(new StreamRecord(windowEvent.setEviction(numToEvict)));
output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setEviction(numToEvict)));
bufferSize -= numToEvict;
bufferSize = bufferSize >= 0 ? bufferSize : 0;
}
Expand All @@ -154,7 +154,7 @@ private void evict(IN input, boolean isTriggered) {
int numToEvict = evictionPolicy.notifyEviction(input, isTriggered, bufferSize);

if (numToEvict > 0) {
output.collect(new StreamRecord(windowEvent.setEviction(numToEvict)));
output.collect(new StreamRecord<WindowEvent<IN>>(windowEvent.setEviction(numToEvict)));
bufferSize -= numToEvict;
bufferSize = bufferSize >= 0 ? bufferSize : 0;
}
Expand Down
Expand Up @@ -38,7 +38,8 @@ public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) {
allOutputs = new ArrayList<Output<OUT>>();
}

public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
@SuppressWarnings("unchecked,rawtypes")
public void addCollector(Output<StreamRecord<?>> output, StreamEdge edge) {
outputSelectorWrapper.addCollector(output, edge);
allOutputs.add((Output) output);
}
Expand Down
Expand Up @@ -39,12 +39,12 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {

private static final Logger LOG = LoggerFactory.getLogger(RecordWriterOutput.class);

private RecordWriter<SerializationDelegate> recordWriter;
private SerializationDelegate serializationDelegate;
private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
private SerializationDelegate<StreamRecord<OUT>> serializationDelegate;

@SuppressWarnings("unchecked")
public RecordWriterOutput(
RecordWriter<SerializationDelegate> recordWriter,
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
TypeSerializer<OUT> outSerializer,
boolean enableWatermarkMultiplexing) {
Preconditions.checkNotNull(recordWriter);
Expand Down Expand Up @@ -79,9 +79,9 @@ public void collect(StreamRecord<OUT> record) {
}

@Override
@SuppressWarnings("unchecked")
@SuppressWarnings("unchecked,rawtypes")
public void emitWatermark(Watermark mark) {
serializationDelegate.setInstance(mark);
((SerializationDelegate)serializationDelegate).setInstance(mark);
try {
recordWriter.broadcastEmit(serializationDelegate);
} catch (Exception e) {
Expand All @@ -95,7 +95,7 @@ public void emitWatermark(Watermark mark) {
@Override
public void close() {
if (recordWriter instanceof StreamRecordWriter) {
((StreamRecordWriter) recordWriter).close();
((StreamRecordWriter<?>) recordWriter).close();
} else {
try {
recordWriter.flush();
Expand Down
Expand Up @@ -57,9 +57,9 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class);

private final RecordDeserializer<DeserializationDelegate>[] recordDeserializers;
private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers;

private RecordDeserializer<DeserializationDelegate> currentRecordDeserializer;
private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer;

// We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels
Expand All @@ -72,7 +72,7 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
private long[] watermarks;
private long lastEmittedWatermark;

private DeserializationDelegate deserializationDelegate;
private DeserializationDelegate<Object> deserializationDelegate;

@SuppressWarnings("unchecked")
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
Expand All @@ -86,12 +86,12 @@ public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSeri
} else {
inputRecordSerializer = new StreamRecordSerializer<IN>(inputSerializer);
}
this.deserializationDelegate = new NonReusingDeserializationDelegate(inputRecordSerializer);
this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(inputRecordSerializer);

// Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate>();
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
}

watermarks = new long[inputGate.getNumberOfInputChannels()];
Expand Down
Expand Up @@ -58,9 +58,9 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);

private final RecordDeserializer[] recordDeserializers;
private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers;

private RecordDeserializer currentRecordDeserializer;
private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer;

// We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels
Expand All @@ -79,8 +79,8 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
private int numInputChannels1;
private int numInputChannels2;

private DeserializationDelegate deserializationDelegate1;
private DeserializationDelegate deserializationDelegate2;
private DeserializationDelegate<Object> deserializationDelegate1;
private DeserializationDelegate<Object> deserializationDelegate2;

@SuppressWarnings("unchecked")
public StreamTwoInputProcessor(
Expand Down
Expand Up @@ -103,7 +103,7 @@ public boolean equals(Object o) {
return false;
}

StreamRecord that = (StreamRecord) o;
StreamRecord<?> that = (StreamRecord<?>) o;

return value.equals(that.value) && timestamp == that.timestamp;
}
Expand Down
Expand Up @@ -58,7 +58,7 @@ public boolean isImmutableType() {

@Override
@SuppressWarnings("unchecked")
public TypeSerializer duplicate() {
public StreamRecordSerializer<T> duplicate() {
return this;
}

Expand Down
Expand Up @@ -44,7 +44,6 @@
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -146,18 +145,18 @@ private <X> Output<StreamRecord<X>> createChainedCollector(StreamConfig chainedT

// Create collectors for the network outputs
for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
Collector<?> outCollector = outputMap.get(outputEdge);
Output<?> output = outputMap.get(outputEdge);

wrapper.addCollector(outCollector, outputEdge);
wrapper.addCollector(output, outputEdge);
}

// Create collectors for the chained outputs
for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
Integer output = outputEdge.getTargetId();
Integer outputId = outputEdge.getTargetId();

Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output), accumulatorMap);
Output<?> output = createChainedCollector(chainedConfigs.get(outputId), accumulatorMap);

wrapper.addCollector(outCollector, outputEdge);
wrapper.addCollector(output, outputEdge);
}

if (chainedTaskConfig.isChainStart()) {
Expand Down Expand Up @@ -200,7 +199,7 @@ public Output<StreamRecord<OUT>> getOutput() {
* the configuration of its source task
*
* @param outputVertex
* Name of the output to which the streamoutput will be set up
* Name of the output to which the stream output will be set up
* @param upStreamConfig
* The config of upStream task
* @return The created StreamOutput
Expand All @@ -222,7 +221,7 @@ private <T> RecordWriterOutput<T> createStreamOutput(StreamEdge edge, Integer ou
output.setReporter(reporter);

@SuppressWarnings("unchecked")
RecordWriterOutput<T> streamOutput = new RecordWriterOutput<T>((RecordWriter) output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled());
RecordWriterOutput<T> streamOutput = new RecordWriterOutput<T>(output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled());

if (LOG.isTraceEnabled()) {
LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
Expand All @@ -245,9 +244,9 @@ public void clearWriters() {
}

private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
protected OneInputStreamOperator operator;
protected OneInputStreamOperator<T, ?> operator;

public ChainingOutput(OneInputStreamOperator<?, T> operator) {
public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
this.operator = operator;
}

Expand Down Expand Up @@ -292,7 +291,7 @@ public void close() {
private static class CopyingChainingOutput<T> extends ChainingOutput<T> {
private final TypeSerializer<StreamRecord<T>> serializer;

public CopyingChainingOutput(OneInputStreamOperator<?, T> operator,
public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator,
TypeSerializer<StreamRecord<T>> serializer) {
super(operator);
this.serializer = serializer;
Expand Down
Expand Up @@ -103,6 +103,7 @@ public OneKeySelector(TypeComparator<IN> comparator) {
}

@Override
@SuppressWarnings("unchecked")
public K getKey(IN value) throws Exception {
comparator.extractKeys(value, keyArray, 0);
key = (K) keyArray[0];
Expand Down
Expand Up @@ -62,6 +62,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {

private ConcurrentLinkedQueue<InputValue<Object>>[] inputQueues;

@SuppressWarnings("unchecked")
public StreamTestSingleInputGate(
int numInputChannels,
int bufferSize,
Expand All @@ -84,8 +85,8 @@ private void setupInputChannels() throws IOException, InterruptedException {

for (int i = 0; i < numInputChannels; i++) {
final int channelIndex = i;
final RecordSerializer<SerializationDelegate<StreamRecord<T>>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<StreamRecord<T>>>();
final SerializationDelegate delegate = new SerializationDelegate(new MultiplexingStreamRecordSerializer<T>(serializer));
final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
final SerializationDelegate<Object> delegate = new SerializationDelegate(new MultiplexingStreamRecordSerializer<T>(serializer));

inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
Expand Down
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.joda.time.Instant;
import org.junit.Test;

/**
Expand All @@ -43,7 +42,7 @@ public void testCount() throws Exception {
OneInputStreamOperatorTestHarness<String, Long> testHarness = new OneInputStreamOperatorTestHarness<String, Long>(operator);

long initialTime = 0L;
ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();

testHarness.open();

Expand Down
Expand Up @@ -58,7 +58,7 @@ public void testFilter() throws Exception {
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);

long initialTime = 0L;
ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();

testHarness.open();

Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -62,7 +61,7 @@ public void testFlatMap() throws Exception {
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);

long initialTime = 0L;
ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();

testHarness.open();

Expand Down
Expand Up @@ -76,7 +76,7 @@ public String getKey(Integer value) throws Exception {
OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);

long initialTime = 0L;
ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();

testHarness.open();

Expand Down
Expand Up @@ -70,7 +70,7 @@ public Integer getKey(Integer value) throws Exception {
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);

long initialTime = 0L;
ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();

testHarness.open();

Expand Down
Expand Up @@ -57,7 +57,7 @@ public void testMap() throws Exception {
OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);

long initialTime = 0L;
ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();

testHarness.open();

Expand Down
Expand Up @@ -42,7 +42,6 @@
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.joda.time.Instant;
import org.junit.Test;

/**
Expand Down Expand Up @@ -75,7 +74,7 @@ public void testProject() throws Exception {
OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> testHarness = new OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(operator);

long initialTime = 0L;
ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();

testHarness.open();

Expand Down Expand Up @@ -110,13 +109,17 @@ public void APIWithoutTypesTest() {
StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);

env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
@Override
public Tuple3<Long, Character, Double> map(Long value) throws Exception {
return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
}
})
private static final long serialVersionUID = 1L;

@Override
public Tuple3<Long, Character, Double> map(Long value) throws Exception {
return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
}
})
.project(0, 2)
.addSink(new SinkFunction<Tuple>() {
private static final long serialVersionUID = 1L;

@Override
@SuppressWarnings("unchecked")
public void invoke(Tuple value) throws Exception {
Expand Down

0 comments on commit 31c0c29

Please sign in to comment.