Skip to content

Commit

Permalink
[FLINK-1625] [streaming] [api-breaking] Added proper cancellation to …
Browse files Browse the repository at this point in the history
…StreamInvokables + Sink- and SourceFunction interfaces extended with cancel method
  • Loading branch information
gyfora authored and mbalassi committed Mar 4, 2015
1 parent 3abd6c8 commit 8436e9c
Show file tree
Hide file tree
Showing 51 changed files with 481 additions and 178 deletions.
Expand Up @@ -133,6 +133,13 @@ public void close() {
client.client.close(); client.client.close();
} }


@Override
public void cancel() {
if (client != null) {
client.client.close();
}
}

@Override @Override
public void open(Configuration config) { public void open(Configuration config) {
client = new FlinkRpcClientFacade(); client = new FlinkRpcClientFacade();
Expand Down
Expand Up @@ -130,12 +130,16 @@ public void configureAvroSource(Collector<OUT> collector) {
* The Collector for sending data to the datastream * The Collector for sending data to the datastream
*/ */
@Override @Override
public void invoke(Collector<OUT> collector) throws Exception { public void run(Collector<OUT> collector) throws Exception {
configureAvroSource(collector); configureAvroSource(collector);
avroSource.start(); avroSource.start();
while (!finished) { while (!finished) {
this.wait(); this.wait();
} }
} }


@Override
public void cancel() {
}

} }
Expand Up @@ -41,14 +41,20 @@ public static void main(String[] args) throws Exception {
@SuppressWarnings({ "unused", "serial" }) @SuppressWarnings({ "unused", "serial" })
DataStream<String> stream1 = env.addSource(new SourceFunction<String>() { DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
@Override @Override
public void invoke(Collector<String> collector) throws Exception { public void run(Collector<String> collector) throws Exception {
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
collector.collect("message #" + i); collector.collect("message #" + i);
Thread.sleep(100L); Thread.sleep(100L);
} }


collector.collect(new String("q")); collector.collect(new String("q"));
} }

@Override
public void cancel() {
}


}).addSink( }).addSink(
new KafkaSink<String>(topic, host + ":" + port, new JavaDefaultStringSchema()) new KafkaSink<String>(topic, host + ":" + port, new JavaDefaultStringSchema())
) )
Expand Down
Expand Up @@ -33,9 +33,9 @@


/** /**
* Sink that emits its inputs to a Kafka topic. * Sink that emits its inputs to a Kafka topic.
* *
* @param <IN> * @param <IN>
* Type of the sink input * Type of the sink input
*/ */
public class KafkaSink<IN> extends RichSinkFunction<IN> { public class KafkaSink<IN> extends RichSinkFunction<IN> {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
Expand All @@ -49,31 +49,33 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
private KafkaPartitioner<IN> partitioner; private KafkaPartitioner<IN> partitioner;


/** /**
* Creates a KafkaSink for a given topic. The partitioner distributes the messages between the partitions of the topics. * Creates a KafkaSink for a given topic. The partitioner distributes the
* * messages between the partitions of the topics.
*
* @param topicId * @param topicId
* ID of the Kafka topic. * ID of the Kafka topic.
* @param brokerAddr * @param brokerAddr
* Address of the Kafka broker (with port number). * Address of the Kafka broker (with port number).
* @param serializationSchema * @param serializationSchema
* User defined serialization schema. * User defined serialization schema.
*/ */
public KafkaSink(String topicId, String brokerAddr, public KafkaSink(String topicId, String brokerAddr,
SerializationSchema<IN, byte[]> serializationSchema) { SerializationSchema<IN, byte[]> serializationSchema) {
this(topicId, brokerAddr, serializationSchema, new KafkaDistributePartitioner<IN>()); this(topicId, brokerAddr, serializationSchema, new KafkaDistributePartitioner<IN>());
} }


/** /**
* Creates a KafkaSink for a given topic. The sink produces its input into the topic. * Creates a KafkaSink for a given topic. The sink produces its input into
* * the topic.
*
* @param topicId * @param topicId
* ID of the Kafka topic. * ID of the Kafka topic.
* @param brokerAddr * @param brokerAddr
* Address of the Kafka broker (with port number). * Address of the Kafka broker (with port number).
* @param serializationSchema * @param serializationSchema
* User defined serialization schema. * User defined serialization schema.
* @param partitioner * @param partitioner
* User defined partitioner. * User defined partitioner.
*/ */
public KafkaSink(String topicId, String brokerAddr, public KafkaSink(String topicId, String brokerAddr,
SerializationSchema<IN, byte[]> serializationSchema, KafkaPartitioner<IN> partitioner) { SerializationSchema<IN, byte[]> serializationSchema, KafkaPartitioner<IN> partitioner) {
Expand Down Expand Up @@ -111,9 +113,9 @@ public void initialize() {


/** /**
* Called when new data arrives to the sink, and forwards it to Kafka. * Called when new data arrives to the sink, and forwards it to Kafka.
* *
* @param next * @param next
* The incoming data * The incoming data
*/ */
@Override @Override
public void invoke(IN next) { public void invoke(IN next) {
Expand All @@ -132,4 +134,9 @@ public void close() {
} }
} }


@Override
public void cancel() {
close();
}

} }
Expand Up @@ -53,6 +53,8 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
private long zookeeperSyncTimeMillis; private long zookeeperSyncTimeMillis;
private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200; private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;


private volatile boolean isRunning = false;

/** /**
* Creates a KafkaSource that consumes a topic. * Creates a KafkaSource that consumes a topic.
* *
Expand Down Expand Up @@ -107,21 +109,31 @@ private void initializeConnection() {
* The Collector for sending data to the dataStream * The Collector for sending data to the dataStream
*/ */
@Override @Override
public void invoke(Collector<OUT> collector) throws Exception { public void run(Collector<OUT> collector) throws Exception {

isRunning = true;
while (consumerIterator.hasNext()) { try {
OUT out = schema.deserialize(consumerIterator.next().message()); while (isRunning && consumerIterator.hasNext()) {
if (schema.isEndOfStream(out)) { OUT out = schema.deserialize(consumerIterator.next().message());
break; if (schema.isEndOfStream(out)) {
break;
}
collector.collect(out);
} }
collector.collect(out); } finally {
consumer.shutdown();
} }
consumer.shutdown();

} }


@Override @Override
public void open(Configuration config) { public void open(Configuration config) throws Exception {
initializeConnection(); initializeConnection();
} }

@Override
public void cancel() {
isRunning = false;
if (consumer != null) {
consumer.shutdown();
}
}
} }
Expand Up @@ -108,4 +108,9 @@ public void close() {
closeChannel(); closeChannel();
} }


@Override
public void cancel() {
close();
}

} }
Expand Up @@ -46,6 +46,8 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
private transient QueueingConsumer consumer; private transient QueueingConsumer consumer;
private transient QueueingConsumer.Delivery delivery; private transient QueueingConsumer.Delivery delivery;


private volatile boolean isRunning = false;

OUT out; OUT out;


public RMQSource(String HOST_NAME, String QUEUE_NAME, public RMQSource(String HOST_NAME, String QUEUE_NAME,
Expand Down Expand Up @@ -80,42 +82,46 @@ private void initializeConnection() {
* The Collector for sending data to the dataStream * The Collector for sending data to the dataStream
*/ */
@Override @Override
public void invoke(Collector<OUT> collector) throws Exception { public void run(Collector<OUT> collector) throws Exception {

isRunning = true;
while (true) { try {

while (isRunning) {
try {
delivery = consumer.nextDelivery(); try {
} catch (Exception e) { delivery = consumer.nextDelivery();
if (LOG.isErrorEnabled()) { } catch (Exception e) {
LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME); if (LOG.isErrorEnabled()) {
LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
}
} }
}


out = schema.deserialize(delivery.getBody()); out = schema.deserialize(delivery.getBody());
if (schema.isEndOfStream(out)) { if (schema.isEndOfStream(out)) {
break; break;
} else { } else {
collector.collect(out); collector.collect(out);
}
} }
} finally {
connection.close();
} }


} }


@Override @Override
public void open(Configuration config) { public void open(Configuration config) throws Exception {
initializeConnection(); initializeConnection();
} }


@Override @Override
public void close() { public void cancel() {
isRunning = false;
try { try {
connection.close(); connection.close();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
+ " at " + HOST_NAME, e); + " at " + HOST_NAME, e);
} }

} }


} }
Expand Up @@ -57,6 +57,8 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
private boolean streaming; private boolean streaming;
private int numberOfTweets; private int numberOfTweets;


private volatile boolean isRunning = false;

/** /**
* Create {@link TwitterSource} for streaming * Create {@link TwitterSource} for streaming
* *
Expand Down Expand Up @@ -90,20 +92,20 @@ public void open(Configuration parameters) throws Exception {
} }


@Override @Override
public void invoke(Collector<String> collector) throws Exception { public void run(Collector<String> collector) throws Exception {

isRunning = true;
if (streaming) { try {
collectMessages(collector); if (streaming) {
} else { collectMessages(collector);
collectFiniteMessages(collector); } else {
collectFiniteMessages(collector);
}
} finally {
closeConnection();
isRunning = false;
} }
} }


@Override
public void close() throws Exception {
closeConnection();
}

/** /**
* Initialize Hosebird Client to be able to consume Twitter's Streaming API * Initialize Hosebird Client to be able to consume Twitter's Streaming API
*/ */
Expand Down Expand Up @@ -196,7 +198,7 @@ protected void collectMessages(Collector<String> collector) {
LOG.info("Tweet-stream begins"); LOG.info("Tweet-stream begins");
} }


while (true) { while (isRunning) {
collectOneMessage(collector); collectOneMessage(collector);
} }
} }
Expand Down Expand Up @@ -246,7 +248,8 @@ private void closeConnection() {
/** /**
* Get the size of the queue in which the tweets are contained temporarily. * Get the size of the queue in which the tweets are contained temporarily.
* *
* @return the size of the queue in which the tweets are contained temporarily * @return the size of the queue in which the tweets are contained
* temporarily
*/ */
public int getQueueSize() { public int getQueueSize() {
return queueSize; return queueSize;
Expand Down Expand Up @@ -280,4 +283,10 @@ public int getWaitSec() {
public void setWaitSec(int waitSec) { public void setWaitSec(int waitSec) {
this.waitSec = waitSec; this.waitSec = waitSec;
} }

@Override
public void cancel() {
isRunning = false;
closeConnection();
}
} }
Expand Up @@ -47,6 +47,10 @@ public void invoke(Tuple5<Long, Integer, String, String, String> tuple) {
System.out.println(""); System.out.println("");
} }


@Override
public void cancel() {
}

} }


public static class SelectDataFlatMap extends public static class SelectDataFlatMap extends
Expand Down
Expand Up @@ -115,4 +115,13 @@ protected void flush() {
*/ */
protected abstract void resetParameters(); protected abstract void resetParameters();


@Override
public void cancel() {
try {
close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

} }
Expand Up @@ -84,15 +84,19 @@ public void invoke(IN record) {
} }


@Override @Override
public void close() throws Exception { public void close() {
this.stream = null; this.stream = null;
this.prefix = null; this.prefix = null;
super.close();
} }


@Override @Override
public String toString() { public String toString() {
return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
} }


@Override
public void cancel() {
close();
}

} }
Expand Up @@ -23,6 +23,8 @@


public interface SinkFunction<IN> extends Function, Serializable { public interface SinkFunction<IN> extends Function, Serializable {


public abstract void invoke(IN value) throws Exception; public void invoke(IN value) throws Exception;

public void cancel();


} }

0 comments on commit 8436e9c

Please sign in to comment.