Skip to content

Commit

Permalink
[FLINK-7553] Use new SinkFunction interface in FlinkKafkaProducer010
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Sep 21, 2017
1 parent e7996b0 commit 6886f63
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 202 deletions.

Large diffs are not rendered by default.

Expand Up @@ -276,7 +276,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
* The incoming data * The incoming data
*/ */
@Override @Override
public void invoke(IN next) throws Exception { public void invoke(IN next, Context context) throws Exception {
// propagate asynchronous errors // propagate asynchronous errors
checkErroneous(); checkErroneous();


Expand Down
Expand Up @@ -215,6 +215,9 @@ public void testOneToOneAtLeastOnceCustomOperator() throws Exception {
* This test sets KafkaProducer so that it will not automatically flush the data and * This test sets KafkaProducer so that it will not automatically flush the data and
* simulate network failure between Flink and Kafka to check whether FlinkKafkaProducer * simulate network failure between Flink and Kafka to check whether FlinkKafkaProducer
* flushed records manually on snapshotState. * flushed records manually on snapshotState.
*
* <p>Due to legacy reasons there are two different ways of instantiating a Kafka 0.10 sink. The
* parameter controls which method is used.
*/ */
protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception { protected void testOneToOneAtLeastOnce(boolean regularSink) throws Exception {
final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator"; final String topic = regularSink ? "oneToOneTopicRegularSink" : "oneToOneTopicCustomOperator";
Expand Down
Expand Up @@ -43,14 +43,9 @@ public long currentWatermark() {
} }


@Override @Override
public long timestamp() { public Long timestamp() {
return timestamp; return timestamp;
} }

@Override
public boolean hasTimestamp() {
return true;
}
}; };
} }
} }
Expand Up @@ -31,22 +31,22 @@
public interface SinkFunction<IN> extends Function, Serializable { public interface SinkFunction<IN> extends Function, Serializable {


/** /**
* Function for standard sink behaviour. This function is called for every record.
*
* @param value The input record.
* @throws Exception
* @deprecated Use {@link #invoke(Object, Context)}. * @deprecated Use {@link #invoke(Object, Context)}.
*/ */
@Deprecated @Deprecated
default void invoke(IN value) throws Exception { default void invoke(IN value) throws Exception {}
}


/** /**
* Writes the given value to the sink. This function is called for every record. * Writes the given value to the sink. This function is called for every record.
* *
* <p>You have to override this method when implementing a {@code SinkFunction}, this is a
* {@code default} method for backward compatibility with the old-style method only.
*
* @param value The input record. * @param value The input record.
* @param context Additional context about the input record. * @param context Additional context about the input record.
* @throws Exception *
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/ */
default void invoke(IN value, Context context) throws Exception { default void invoke(IN value, Context context) throws Exception {
invoke(value); invoke(value);
Expand All @@ -72,15 +72,9 @@ interface Context<T> {
long currentWatermark(); long currentWatermark();


/** /**
* Returns the timestamp of the current input record. * Returns the timestamp of the current input record or {@code null} if the element does not
*/ * have an assigned timestamp.
long timestamp();

/**
* Checks whether this record has a timestamp.
*
* @return True if the record has a timestamp, false if not.
*/ */
boolean hasTimestamp(); Long timestamp();
} }
} }
Expand Up @@ -91,19 +91,11 @@ public long currentWatermark() {
} }


@Override @Override
public long timestamp() { public Long timestamp() {
if (!element.hasTimestamp()) { if (element.hasTimestamp()) {
throw new IllegalStateException( return element.getTimestamp();
"Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " +
"did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");

} }
return element.getTimestamp(); return null;
}

public boolean hasTimestamp() {
return element.hasTimestamp();
} }

} }
} }
Expand Up @@ -96,7 +96,8 @@ public BufferingQueryingSink() {
@Override @Override
public void invoke( public void invoke(
T value, Context context) throws Exception { T value, Context context) throws Exception {
if (context.hasTimestamp()) { Long timestamp = context.timestamp();
if (timestamp != null) {
data.add( data.add(
new Tuple4<>( new Tuple4<>(
context.currentWatermark(), context.currentWatermark(),
Expand Down

0 comments on commit 6886f63

Please sign in to comment.