Skip to content

Commit

Permalink
[FLINK-6711] Activate strict checkstyle for flink-connector-kafka*
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed May 27, 2017
1 parent c20b396 commit 28e8043
Show file tree
Hide file tree
Showing 127 changed files with 1,261 additions and 1,233 deletions.
4 changes: 2 additions & 2 deletions flink-connectors/flink-connector-kafka-0.10/pom.xml
Expand Up @@ -130,7 +130,7 @@ under the License.
<version>${kafka.version}</version> <version>${kafka.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>

<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-tests_${scala.binary.version}</artifactId> <artifactId>flink-tests_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -209,5 +209,5 @@ under the License.
</plugin> </plugin>
</plugins> </plugins>
</build> </build>

</project> </project>
Expand Up @@ -29,21 +29,21 @@
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.util.PropertiesUtil; import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue; import org.apache.flink.util.SerializedValue;

import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;


import java.util.Collections; import java.util.Collections;
import java.util.Map;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Properties; import java.util.Properties;



/** /**
* The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
* Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, each of which will pull * Apache Kafka 0.10.x. The consumer can run in multiple parallel instances, each of which will pull
* data from one or more Kafka partitions. * data from one or more Kafka partitions.
* *
* <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
* during a failure, and that the computation processes elements "exactly once". * during a failure, and that the computation processes elements "exactly once".
* (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p> * (Note: These guarantees naturally assume that Kafka itself does not loose any data.)</p>
* *
* <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets * <p>Please note that Flink snapshots the offsets internally as part of its distributed checkpoints. The offsets
Expand All @@ -62,11 +62,10 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {


private static final long serialVersionUID = 2324564345203409112L; private static final long serialVersionUID = 2324564345203409112L;



// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


/** /**
* Creates a new Kafka streaming source consumer for Kafka 0.10.x * Creates a new Kafka streaming source consumer for Kafka 0.10.x.
* *
* @param topic * @param topic
* The name of the topic that should be consumed. * The name of the topic that should be consumed.
Expand All @@ -82,7 +81,7 @@ public FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeseria
/** /**
* Creates a new Kafka streaming source consumer for Kafka 0.10.x * Creates a new Kafka streaming source consumer for Kafka 0.10.x
* *
* This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value * <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka. * pairs, offsets, and topic names from Kafka.
* *
* @param topic * @param topic
Expand All @@ -99,7 +98,7 @@ public FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema<T> deseria
/** /**
* Creates a new Kafka streaming source consumer for Kafka 0.10.x * Creates a new Kafka streaming source consumer for Kafka 0.10.x
* *
* This constructor allows passing multiple topics to the consumer. * <p>This constructor allows passing multiple topics to the consumer.
* *
* @param topics * @param topics
* The Kafka topics to read from. * The Kafka topics to read from.
Expand All @@ -115,7 +114,7 @@ public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deser
/** /**
* Creates a new Kafka streaming source consumer for Kafka 0.10.x * Creates a new Kafka streaming source consumer for Kafka 0.10.x
* *
* This constructor allows passing multiple topics and a key/value deserialization schema. * <p>This constructor allows passing multiple topics and a key/value deserialization schema.
* *
* @param topics * @param topics
* The Kafka topics to read from. * The Kafka topics to read from.
Expand Down
Expand Up @@ -17,8 +17,6 @@


package org.apache.flink.streaming.connectors.kafka; package org.apache.flink.streaming.connectors.kafka;


import java.util.Properties;

import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.functions.RuntimeContext;
Expand All @@ -37,34 +35,35 @@
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.streaming.util.serialization.SerializationSchema;

import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;


import java.util.Properties;

import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic; import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic;
import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;



/** /**
* Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x * Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x
* *
* Implementation note: This producer is a hybrid between a regular regular sink function (a) * <p>Implementation note: This producer is a hybrid between a regular regular sink function (a)
* and a custom operator (b). * and a custom operator (b).
* *
* For (a), the class implements the SinkFunction and RichFunction interfaces. * <p>For (a), the class implements the SinkFunction and RichFunction interfaces.
* For (b), it extends the StreamTask class. * For (b), it extends the StreamTask class.
* *
* Details about approach (a): * <p>Details about approach (a):
*
* Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the * Pre Kafka 0.10 producers only follow approach (a), allowing users to use the producer using the
* DataStream.addSink() method. * DataStream.addSink() method.
* Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record * Since the APIs exposed in that variant do not allow accessing the the timestamp attached to the record
* the Kafka 0.10 producer has a second invocation option, approach (b). * the Kafka 0.10 producer has a second invocation option, approach (b).
* *
* Details about approach (b): * <p>Details about approach (b):
* Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the * Kafka 0.10 supports writing the timestamp attached to a record to Kafka. When adding the
* FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer * FlinkKafkaProducer010 using the FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
* can access the internal record timestamp of the record and write it to Kafka. * can access the internal record timestamp of the record and write it to Kafka.
* *
* All methods and constructors in this class are marked with the approach they are needed for. * <p>All methods and constructors in this class are marked with the approach they are needed for.
*/ */
public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction { public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunction<T>, RichFunction {


Expand All @@ -79,7 +78,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic. * the topic.
* *
* This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
* *
* @param inStream The stream to write to Kafka * @param inStream The stream to write to Kafka
* @param topicId ID of the Kafka topic. * @param topicId ID of the Kafka topic.
Expand All @@ -93,12 +92,11 @@ public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestam
return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>()); return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
} }



/** /**
* Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
* the topic. * the topic.
* *
* This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
* *
* @param inStream The stream to write to Kafka * @param inStream The stream to write to Kafka
* @param topicId ID of the Kafka topic. * @param topicId ID of the Kafka topic.
Expand All @@ -116,7 +114,7 @@ public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestam
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic. * the topic.
* *
* This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
* *
* @param inStream The stream to write to Kafka * @param inStream The stream to write to Kafka
* @param topicId The name of the target topic * @param topicId The name of the target topic
Expand Down Expand Up @@ -212,11 +210,11 @@ public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializati
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) { public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>()); this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
} }

/** /**
* Create Kafka producer * Create Kafka producer.
* *
* This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
*/ */
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) { public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) {
// We create a Kafka 09 producer instance here and only "override" (by intercepting) the // We create a Kafka 09 producer instance here and only "override" (by intercepting) the
Expand All @@ -230,7 +228,7 @@ public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> seriali
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic. * the topic.
* *
* This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) * <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
* *
* @param inStream The stream to write to Kafka * @param inStream The stream to write to Kafka
* @param topicId The name of the target topic * @param topicId The name of the target topic
Expand Down Expand Up @@ -275,9 +273,9 @@ public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializatio
} }


/** /**
* Create Kafka producer * Create Kafka producer.
* *
* This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) * <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
* *
* @deprecated This is a deprecated constructor that does not correctly handle partitioning when * @deprecated This is a deprecated constructor that does not correctly handle partitioning when
* producing to multiple topics. Use * producing to multiple topics. Use
Expand Down Expand Up @@ -306,13 +304,13 @@ private void invokeInternal(T next, long elementTimestamp) throws Exception {
} }


Long timestamp = null; Long timestamp = null;
if(this.writeTimestampToKafka) { if (this.writeTimestampToKafka) {
timestamp = elementTimestamp; timestamp = elementTimestamp;
} }


ProducerRecord<byte[], byte[]> record; ProducerRecord<byte[], byte[]> record;
int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic); int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic);
if(null == partitions) { if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, internalProducer.producer); partitions = getPartitionsByTopic(targetTopic, internalProducer.producer);
internalProducer.topicPartitionsMap.put(targetTopic, partitions); internalProducer.topicPartitionsMap.put(targetTopic, partitions);
} }
Expand All @@ -329,10 +327,8 @@ record = new ProducerRecord<>(targetTopic, internalProducer.flinkKafkaPartitione
internalProducer.producer.send(record, internalProducer.callback); internalProducer.producer.send(record, internalProducer.callback);
} }



// ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ---- // ----------------- Helper methods implementing methods from SinkFunction and RichFunction (Approach (a)) ----



// ---- Configuration setters // ---- Configuration setters


/** /**
Expand All @@ -341,7 +337,7 @@ record = new ProducerRecord<>(targetTopic, internalProducer.flinkKafkaPartitione
* exceptions will be eventually thrown and cause the streaming program to * exceptions will be eventually thrown and cause the streaming program to
* fail (and enter recovery). * fail (and enter recovery).
* *
* Method is only accessible for approach (a) (see above) * <p>Method is only accessible for approach (a) (see above)
* *
* @param logFailuresOnly The flag to indicate logging-only on exceptions. * @param logFailuresOnly The flag to indicate logging-only on exceptions.
*/ */
Expand All @@ -355,7 +351,7 @@ public void setLogFailuresOnly(boolean logFailuresOnly) {
* to be acknowledged by the Kafka producer on a checkpoint. * to be acknowledged by the Kafka producer on a checkpoint.
* This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint. * This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
* *
* Method is only accessible for approach (a) (see above) * <p>Method is only accessible for approach (a) (see above)
* *
* @param flush Flag indicating the flushing mode (true = flush on checkpoint) * @param flush Flag indicating the flushing mode (true = flush on checkpoint)
*/ */
Expand All @@ -365,8 +361,7 @@ public void setFlushOnCheckpoint(boolean flush) {
} }


/** /**
* This method is used for approach (a) (see above) * This method is used for approach (a) (see above).
*
*/ */
@Override @Override
public void open(Configuration parameters) throws Exception { public void open(Configuration parameters) throws Exception {
Expand All @@ -375,7 +370,7 @@ public void open(Configuration parameters) throws Exception {
} }


/** /**
* This method is used for approach (a) (see above) * This method is used for approach (a) (see above).
*/ */
@Override @Override
public IterationRuntimeContext getIterationRuntimeContext() { public IterationRuntimeContext getIterationRuntimeContext() {
Expand All @@ -384,7 +379,7 @@ public IterationRuntimeContext getIterationRuntimeContext() {
} }


/** /**
* This method is used for approach (a) (see above) * This method is used for approach (a) (see above).
*/ */
@Override @Override
public void setRuntimeContext(RuntimeContext t) { public void setRuntimeContext(RuntimeContext t) {
Expand All @@ -395,7 +390,7 @@ public void setRuntimeContext(RuntimeContext t) {
/** /**
* Invoke method for using the Sink as DataStream.addSink() sink. * Invoke method for using the Sink as DataStream.addSink() sink.
* *
* This method is used for approach (a) (see above) * <p>This method is used for approach (a) (see above)
* *
* @param value The input record. * @param value The input record.
*/ */
Expand All @@ -404,14 +399,12 @@ public void invoke(T value) throws Exception {
invokeInternal(value, Long.MAX_VALUE); invokeInternal(value, Long.MAX_VALUE);
} }



// ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ---- // ----------------- Helper methods and classes implementing methods from StreamSink (Approach (b)) ----



/** /**
* Process method for using the sink with timestamp support. * Process method for using the sink with timestamp support.
* *
* This method is used for approach (b) (see above) * <p>This method is used for approach (b) (see above)
*/ */
@Override @Override
public void processElement(StreamRecord<T> element) throws Exception { public void processElement(StreamRecord<T> element) throws Exception {
Expand Down Expand Up @@ -467,5 +460,4 @@ public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
} }
} }



} }
Expand Up @@ -18,13 +18,15 @@


package org.apache.flink.streaming.connectors.kafka; package org.apache.flink.streaming.connectors.kafka;


import java.util.Properties;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;


import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;

import java.util.Properties;

/** /**
* Kafka {@link StreamTableSource} for Kafka 0.10. * Kafka {@link StreamTableSource} for Kafka 0.10.
*/ */
Expand Down
Expand Up @@ -19,9 +19,9 @@
package org.apache.flink.streaming.connectors.kafka; package org.apache.flink.streaming.connectors.kafka;


import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;


import java.util.Properties; import java.util.Properties;


Expand Down
Expand Up @@ -19,9 +19,9 @@
package org.apache.flink.streaming.connectors.kafka; package org.apache.flink.streaming.connectors.kafka;


import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;


import java.util.Properties; import java.util.Properties;


Expand Down
Expand Up @@ -36,10 +36,10 @@


/** /**
* A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API. * A fetcher that fetches data from Kafka brokers via the Kafka 0.10 consumer API.
* *
* <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally * <p>This fetcher re-uses basically all functionality of the 0.9 fetcher. It only additionally
* takes the KafkaRecord-attached timestamp and attaches it to the Flink records. * takes the KafkaRecord-attached timestamp and attaches it to the Flink records.
* *
* @param <T> The type of elements produced by the fetcher. * @param <T> The type of elements produced by the fetcher.
*/ */
public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> { public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
Expand All @@ -57,8 +57,7 @@ public Kafka010Fetcher(
KeyedDeserializationSchema<T> deserializer, KeyedDeserializationSchema<T> deserializer,
Properties kafkaProperties, Properties kafkaProperties,
long pollTimeout, long pollTimeout,
boolean useMetrics) throws Exception boolean useMetrics) throws Exception {
{
super( super(
sourceContext, sourceContext,
assignedPartitionsWithInitialOffsets, assignedPartitionsWithInitialOffsets,
Expand Down Expand Up @@ -88,7 +87,7 @@ protected void emitRecord(


/** /**
* This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10, * This method needs to be overridden because Kafka broke binary compatibility between 0.9 and 0.10,
* changing binary signatures * changing binary signatures.
*/ */
@Override @Override
protected KafkaConsumerCallBridge010 createCallBridge() { protected KafkaConsumerCallBridge010 createCallBridge() {
Expand Down
Expand Up @@ -26,11 +26,11 @@


/** /**
* The ConsumerCallBridge simply calls the {@link KafkaConsumer#assign(java.util.Collection)} method. * The ConsumerCallBridge simply calls the {@link KafkaConsumer#assign(java.util.Collection)} method.
* *
* This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10, * <p>This indirection is necessary, because Kafka broke binary compatibility between 0.9 and 0.10,
* changing {@code assign(List)} to {@code assign(Collection)}. * changing {@code assign(List)} to {@code assign(Collection)}.
* *
* Because of that, we need two versions whose compiled code goes against different method signatures. * <p>Because of that, we need two versions whose compiled code goes against different method signatures.
*/ */
public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge { public class KafkaConsumerCallBridge010 extends KafkaConsumerCallBridge {


Expand Down

0 comments on commit 28e8043

Please sign in to comment.