Skip to content

Commit

Permalink
[FLINK-2636] [streaming] Create common type StreamElement for StreamR…
Browse files Browse the repository at this point in the history
…ecord and Watermark
  • Loading branch information
StephanEwen committed Sep 8, 2015
1 parent 655a891 commit c09d14a
Show file tree
Hide file tree
Showing 16 changed files with 173 additions and 126 deletions.
Expand Up @@ -16,7 +16,6 @@
* limitations under the License. * limitations under the License.
*/ */



package org.apache.flink.api.common.typeutils; package org.apache.flink.api.common.typeutils;


import java.io.IOException; import java.io.IOException;
Expand Down
Expand Up @@ -17,6 +17,8 @@
*/ */
package org.apache.flink.streaming.api.watermark; package org.apache.flink.streaming.api.watermark;


import org.apache.flink.streaming.runtime.streamrecord.StreamElement;

/** /**
* A Watermark tells operators that receive it that no elements with a timestamp older or equal * A Watermark tells operators that receive it that no elements with a timestamp older or equal
* to the watermark timestamp should arrive at the operator. Watermarks are emitted at the * to the watermark timestamp should arrive at the operator. Watermarks are emitted at the
Expand All @@ -31,11 +33,11 @@
* In some cases a watermark is only a heuristic and operators should be able to deal with * In some cases a watermark is only a heuristic and operators should be able to deal with
* late elements. They can either discard those or update the result and emit updates/retractions * late elements. They can either discard those or update the result and emit updates/retractions
* to downstream operations. * to downstream operations.
*
*/ */
public class Watermark { public class Watermark extends StreamElement {


private long timestamp; /** The timestamp of the watermark */
private final long timestamp;


/** /**
* Creates a new watermark with the given timestamp. * Creates a new watermark with the given timestamp.
Expand All @@ -53,16 +55,8 @@ public long getTimestamp() {


@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { return this == o ||
return true; o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp == this.timestamp;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

Watermark watermark = (Watermark) o;

return timestamp == watermark.timestamp;
} }


@Override @Override
Expand All @@ -72,8 +66,6 @@ public int hashCode() {


@Override @Override
public String toString() { public String toString() {
return "Watermark{" + return "Watermark @ " + timestamp;
"timestamp=" + timestamp +
'}';
} }
} }
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand All @@ -36,9 +37,9 @@
*/ */
public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> { public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {


private StreamRecordWriter<SerializationDelegate<Object>> recordWriter; private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;


private SerializationDelegate<Object> serializationDelegate; private SerializationDelegate<StreamElement> serializationDelegate;




@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Expand All @@ -51,19 +52,19 @@ public RecordWriterOutput(


// generic hack: cast the writer to generic Object type so we can use it // generic hack: cast the writer to generic Object type so we can use it
// with multiplexed records and watermarks // with multiplexed records and watermarks
this.recordWriter = (StreamRecordWriter<SerializationDelegate<Object>>) this.recordWriter = (StreamRecordWriter<SerializationDelegate<StreamElement>>)
(StreamRecordWriter<?>) recordWriter; (StreamRecordWriter<?>) recordWriter;


TypeSerializer<Object> outRecordSerializer; TypeSerializer<StreamElement> outRecordSerializer;
if (enableWatermarkMultiplexing) { if (enableWatermarkMultiplexing) {
outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer); outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
} else { } else {
outRecordSerializer = (TypeSerializer<Object>) outRecordSerializer = (TypeSerializer<StreamElement>)
(TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer); (TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer);
} }


if (outSerializer != null) { if (outSerializer != null) {
serializationDelegate = new SerializationDelegate<Object>(outRecordSerializer); serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer);
} }
} }


Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand All @@ -53,9 +54,9 @@
*/ */
public class StreamInputProcessor<IN> extends AbstractReader implements StreamingReader { public class StreamInputProcessor<IN> extends AbstractReader implements StreamingReader {


private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers; private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;


private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer; private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;


// We need to keep track of the channel from which a buffer came, so that we can // We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels // appropriately map the watermarks to input channels
Expand All @@ -68,9 +69,9 @@ public class StreamInputProcessor<IN> extends AbstractReader implements Streamin
private final long[] watermarks; private final long[] watermarks;
private long lastEmittedWatermark; private long lastEmittedWatermark;


private final DeserializationDelegate<Object> deserializationDelegate; private final DeserializationDelegate<StreamElement> deserializationDelegate;


@SuppressWarnings("unchecked") @SuppressWarnings({"unchecked", "rawtypes"})
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
EventListener<CheckpointBarrier> checkpointListener, EventListener<CheckpointBarrier> checkpointListener,
CheckpointingMode checkpointMode, CheckpointingMode checkpointMode,
Expand All @@ -95,18 +96,18 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {


if (enableWatermarkMultiplexing) { if (enableWatermarkMultiplexing) {
MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer); MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(ser); this.deserializationDelegate = new NonReusingDeserializationDelegate<StreamElement>(ser);
} else { } else {
StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer); StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer);
this.deserializationDelegate = (NonReusingDeserializationDelegate<Object>) this.deserializationDelegate = (NonReusingDeserializationDelegate<StreamElement>)
(NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN>>(ser); (NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN>>(ser);
} }


// Initialize one deserializer per input channel // Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];


for (int i = 0; i < recordDeserializers.length; i++) { for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>(); recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
} }


watermarks = new long[inputGate.getNumberOfInputChannels()]; watermarks = new long[inputGate.getNumberOfInputChannels()];
Expand All @@ -132,18 +133,15 @@ public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator) throws
} }


if (result.isFullRecord()) { if (result.isFullRecord()) {
Object recordOrWatermark = deserializationDelegate.getInstance(); StreamElement recordOrWatermark = deserializationDelegate.getInstance();


if (recordOrWatermark instanceof Watermark) { if (recordOrWatermark.isWatermark()) {
Watermark mark = (Watermark) recordOrWatermark; long watermarkMillis = recordOrWatermark.asWatermark().getTimestamp();
long watermarkMillis = mark.getTimestamp();
if (watermarkMillis > watermarks[currentChannel]) { if (watermarkMillis > watermarks[currentChannel]) {
watermarks[currentChannel] = watermarkMillis; watermarks[currentChannel] = watermarkMillis;
long newMinWatermark = Long.MAX_VALUE; long newMinWatermark = Long.MAX_VALUE;
for (long watermark : watermarks) { for (long watermark : watermarks) {
if (watermark < newMinWatermark) { newMinWatermark = Math.min(watermark, newMinWatermark);
newMinWatermark = watermark;
}
} }
if (newMinWatermark > lastEmittedWatermark) { if (newMinWatermark > lastEmittedWatermark) {
lastEmittedWatermark = newMinWatermark; lastEmittedWatermark = newMinWatermark;
Expand All @@ -154,8 +152,7 @@ public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator) throws
} }
else { else {
// now we can do the actual processing // now we can do the actual processing
@SuppressWarnings("unchecked") StreamRecord<IN> record = recordOrWatermark.asRecord();
StreamRecord<IN> record = (StreamRecord<IN>) deserializationDelegate.getInstance();
StreamingRuntimeContext ctx = streamOperator.getRuntimeContext(); StreamingRuntimeContext ctx = streamOperator.getRuntimeContext();
if (ctx != null) { if (ctx != null) {
ctx.setNextInput(record); ctx.setNextInput(record);
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
Expand All @@ -44,6 +45,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;


/** /**
Expand All @@ -61,9 +63,9 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
@SuppressWarnings("unused") @SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class); private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);


private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers; private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;


private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer; private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;


// We need to keep track of the channel from which a buffer came, so that we can // We need to keep track of the channel from which a buffer came, so that we can
// appropriately map the watermarks to input channels // appropriately map the watermarks to input channels
Expand All @@ -81,8 +83,8 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements


private final int numInputChannels1; private final int numInputChannels1;


private final DeserializationDelegate<Object> deserializationDelegate1; private final DeserializationDelegate<StreamElement> deserializationDelegate1;
private final DeserializationDelegate<Object> deserializationDelegate2; private final DeserializationDelegate<StreamElement> deserializationDelegate2;


@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public StreamTwoInputProcessor( public StreamTwoInputProcessor(
Expand Down Expand Up @@ -113,29 +115,29 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {


if (enableWatermarkMultiplexing) { if (enableWatermarkMultiplexing) {
MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1); MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
this.deserializationDelegate1 = new NonReusingDeserializationDelegate<Object>(ser); this.deserializationDelegate1 = new NonReusingDeserializationDelegate<StreamElement>(ser);
} }
else { else {
StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<IN1>(inputSerializer1); StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<IN1>(inputSerializer1);
this.deserializationDelegate1 = (DeserializationDelegate<Object>) this.deserializationDelegate1 = (DeserializationDelegate<StreamElement>)
(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN1>>(ser); (DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN1>>(ser);
} }


if (enableWatermarkMultiplexing) { if (enableWatermarkMultiplexing) {
MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2); MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
this.deserializationDelegate2 = new NonReusingDeserializationDelegate<Object>(ser); this.deserializationDelegate2 = new NonReusingDeserializationDelegate<StreamElement>(ser);
} }
else { else {
StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<IN2>(inputSerializer2); StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<IN2>(inputSerializer2);
this.deserializationDelegate2 = (DeserializationDelegate<Object>) this.deserializationDelegate2 = (DeserializationDelegate<StreamElement>)
(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN2>>(ser); (DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN2>>(ser);
} }


// Initialize one deserializer per input channel // Initialize one deserializer per input channel
this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];


for (int i = 0; i < recordDeserializers.length; i++) { for (int i = 0; i < recordDeserializers.length; i++) {
recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>(); recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
} }


// determine which unioned channels belong to input 1 and which belong to input 2 // determine which unioned channels belong to input 1 and which belong to input 2
Expand All @@ -148,15 +150,11 @@ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1; int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;


watermarks1 = new long[numInputChannels1]; watermarks1 = new long[numInputChannels1];
for (int i = 0; i < numInputChannels1; i++) { Arrays.fill(watermarks1, Long.MIN_VALUE);
watermarks1[i] = Long.MIN_VALUE;
}
lastEmittedWatermark1 = Long.MIN_VALUE; lastEmittedWatermark1 = Long.MIN_VALUE;


watermarks2 = new long[numInputChannels2]; watermarks2 = new long[numInputChannels2];
for (int i = 0; i < numInputChannels2; i++) { Arrays.fill(watermarks2, Long.MIN_VALUE);
watermarks2[i] = Long.MIN_VALUE;
}
lastEmittedWatermark2 = Long.MIN_VALUE; lastEmittedWatermark2 = Long.MIN_VALUE;
} }


Expand All @@ -182,22 +180,25 @@ public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator)


if (result.isFullRecord()) { if (result.isFullRecord()) {
if (currentChannel < numInputChannels1) { if (currentChannel < numInputChannels1) {
Object recordOrWatermark = deserializationDelegate1.getInstance(); StreamElement recordOrWatermark = deserializationDelegate1.getInstance();
if (recordOrWatermark instanceof Watermark) { if (recordOrWatermark.isWatermark()) {
handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel); handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel);
continue; continue;
} else { }
streamOperator.processElement1((StreamRecord<IN1>) deserializationDelegate1.getInstance()); else {
streamOperator.processElement1(recordOrWatermark.<IN1>asRecord());
return true; return true;


} }
} else { }
Object recordOrWatermark = deserializationDelegate2.getInstance(); else {
if (recordOrWatermark instanceof Watermark) { StreamElement recordOrWatermark = deserializationDelegate2.getInstance();
handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel); if (recordOrWatermark.isWatermark()) {
handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel);
continue; continue;
} else { }
streamOperator.processElement2((StreamRecord<IN2>) deserializationDelegate2.getInstance()); else {
streamOperator.processElement2(recordOrWatermark.<IN2>asRecord());
return true; return true;
} }
} }
Expand Down Expand Up @@ -234,10 +235,8 @@ private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> operator, Water
if (watermarkMillis > watermarks1[channelIndex]) { if (watermarkMillis > watermarks1[channelIndex]) {
watermarks1[channelIndex] = watermarkMillis; watermarks1[channelIndex] = watermarkMillis;
long newMinWatermark = Long.MAX_VALUE; long newMinWatermark = Long.MAX_VALUE;
for (long aWatermarks1 : watermarks1) { for (long wm : watermarks1) {
if (aWatermarks1 < newMinWatermark) { newMinWatermark = Math.min(wm, newMinWatermark);
newMinWatermark = aWatermarks1;
}
} }
if (newMinWatermark > lastEmittedWatermark1) { if (newMinWatermark > lastEmittedWatermark1) {
lastEmittedWatermark1 = newMinWatermark; lastEmittedWatermark1 = newMinWatermark;
Expand All @@ -250,10 +249,8 @@ private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> operator, Water
if (watermarkMillis > watermarks2[channelIndex]) { if (watermarkMillis > watermarks2[channelIndex]) {
watermarks2[channelIndex] = watermarkMillis; watermarks2[channelIndex] = watermarkMillis;
long newMinWatermark = Long.MAX_VALUE; long newMinWatermark = Long.MAX_VALUE;
for (long aWatermarks2 : watermarks2) { for (long wm : watermarks2) {
if (aWatermarks2 < newMinWatermark) { newMinWatermark = Math.min(wm, newMinWatermark);
newMinWatermark = aWatermarks2;
}
} }
if (newMinWatermark > lastEmittedWatermark2) { if (newMinWatermark > lastEmittedWatermark2) {
lastEmittedWatermark2 = newMinWatermark; lastEmittedWatermark2 = newMinWatermark;
Expand Down

0 comments on commit c09d14a

Please sign in to comment.