Skip to content

Commit

Permalink
NIFI-7522 bumping from Apache Kafka 2.5 to Apache Kafka 2.6
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4462.
  • Loading branch information
joewitt authored and pvillard31 committed Aug 10, 2020
1 parent 0f18c59 commit 58e324e
Show file tree
Hide file tree
Showing 40 changed files with 165 additions and 165 deletions.
2 changes: 1 addition & 1 deletion nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-2-5-nar</artifactId>
<artifactId>nifi-kafka-2-6-nar</artifactId>
<version>1.12.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<artifactId>nifi-kafka-bundle</artifactId>
<version>1.12.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-kafka-2-5-nar</artifactId>
<artifactId>nifi-kafka-2-6-nar</artifactId>
<packaging>nar</packaging>
<description>NiFi NAR for interacting with Apache Kafka 2.5</description>
<properties>
Expand All @@ -29,7 +29,7 @@
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-2-5-processors</artifactId>
<artifactId>nifi-kafka-2-6-processors</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<version>1.12.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>nifi-kafka-2-5-processors</artifactId>
<artifactId>nifi-kafka-2-6-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
Expand Down Expand Up @@ -61,12 +61,12 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka2.5.version}</version>
<version>${kafka2.6.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka2.5.version}</version>
<artifactId>kafka_2.13</artifactId>
<version>${kafka2.6.version}</version>
<scope>test</scope>
<exclusions>
<!-- Transitive dependencies excluded because they are located
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.5 Consumer API. "
+ "The complementary NiFi processor for sending messages is PublishKafkaRecord_2_5. Please note that, at this time, the Processor assumes that "
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.6 Consumer API. "
+ "The complementary NiFi processor for sending messages is PublishKafkaRecord_2_6. Please note that, at this time, the Processor assumes that "
+ "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the "
+ "configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the "
+ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. "
+ "A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile. No two Kafka messages will be placed into the same FlowFile if they "
+ "have different schemas, or if they have different values for a message header that is included by the <Headers to Add as Attributes> property.")
@Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "2.5"})
@Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "2.6"})
@WritesAttributes({
@WritesAttribute(attribute = "record.count", description = "The number of records received"),
@WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"),
Expand All @@ -78,8 +78,8 @@
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@SeeAlso({ConsumeKafka_2_5.class, PublishKafka_2_5.class, PublishKafkaRecord_2_5.class})
public class ConsumeKafkaRecord_2_5 extends AbstractProcessor {
@SeeAlso({ConsumeKafka_2_6.class, PublishKafka_2_6.class, PublishKafkaRecord_2_6.class})
public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {

static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
Expand Down Expand Up @@ -310,8 +310,8 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
final String topicListing = context.getProperty(ConsumeKafkaRecord_2_5.TOPICS).evaluateAttributeExpressions().getValue();
final String topicType = context.getProperty(ConsumeKafkaRecord_2_5.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final String topicListing = context.getProperty(ConsumeKafkaRecord_2_6.TOPICS).evaluateAttributeExpressions().getValue();
final String topicType = context.getProperty(ConsumeKafkaRecord_2_6.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;

@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.5 Consumer API. "
+ "The complementary NiFi processor for sending messages is PublishKafka_2_5.")
@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "2.5"})
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.6 Consumer API. "
+ "The complementary NiFi processor for sending messages is PublishKafka_2_6.")
@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "2.6"})
@WritesAttributes({
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
Expand All @@ -76,7 +76,7 @@
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
public class ConsumeKafka_2_5 extends AbstractProcessor {
public class ConsumeKafka_2_6 extends AbstractProcessor {

static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");

Expand Down Expand Up @@ -290,17 +290,17 @@ private synchronized ConsumerPool getConsumerPool(final ProcessContext context)
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
final int maxLeases = context.getMaxConcurrentTasks();
final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final byte[] demarcator = context.getProperty(ConsumeKafka_2_5.MESSAGE_DEMARCATOR).isSet()
? context.getProperty(ConsumeKafka_2_5.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
final byte[] demarcator = context.getProperty(ConsumeKafka_2_6.MESSAGE_DEMARCATOR).isSet()
? context.getProperty(ConsumeKafka_2_6.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
: null;
final Map<String, Object> props = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

final String topicListing = context.getProperty(ConsumeKafka_2_5.TOPICS).evaluateAttributeExpressions().getValue();
final String topicType = context.getProperty(ConsumeKafka_2_5.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final String topicListing = context.getProperty(ConsumeKafka_2_6.TOPICS).evaluateAttributeExpressions().getValue();
final String topicType = context.getProperty(ConsumeKafka_2_6.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>();
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_5.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_5.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "2.5"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 2.5 Producer API. "
+ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
+ "The complementary NiFi processor for fetching messages is ConsumeKafkaRecord_2_5.")
+ "The complementary NiFi processor for fetching messages is ConsumeKafkaRecord_2_6.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
Expand All @@ -92,8 +92,8 @@
expressionLanguageScope = VARIABLE_REGISTRY)
@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
+ "FlowFiles that are routed to success.")
@SeeAlso({PublishKafka_2_5.class, ConsumeKafka_2_5.class, ConsumeKafkaRecord_2_5.class})
public class PublishKafkaRecord_2_5 extends AbstractProcessor {
@SeeAlso({PublishKafka_2_6.class, ConsumeKafka_2_6.class, ConsumeKafkaRecord_2_6.class})
public class PublishKafkaRecord_2_6 extends AbstractProcessor {
protected static final String MSG_COUNT = "msg.count";

static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 2.5 Producer API."
+ "The messages to send may be individual FlowFiles or may be delimited, using a "
+ "user-specified delimiter, such as a new-line. "
+ "The complementary NiFi processor for fetching messages is ConsumeKafka_2_5.")
+ "The complementary NiFi processor for fetching messages is ConsumeKafka_2_6.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
Expand All @@ -79,7 +79,7 @@
@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
+ "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
+ "be greater than 1.")
public class PublishKafka_2_5 extends AbstractProcessor {
public class PublishKafka_2_6 extends AbstractProcessor {
protected static final String MSG_COUNT = "msg.count";

static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@


@Tags({"kafka", "record", "sink"})
@CapabilityDescription("Provides a service to write records to a Kafka 2.x topic.")
public class KafkaRecordSink_2_5 extends AbstractControllerService implements RecordSinkService {
@CapabilityDescription("Provides a service to write records to a Kafka 2.6+ topic.")
public class KafkaRecordSink_2_6 extends AbstractControllerService implements RecordSinkService {

static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
"Records are considered 'transmitted unsuccessfully' unless the message is replicated to the appropriate "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.record.sink.kafka.KafkaRecordSink_2_5
org.apache.nifi.record.sink.kafka.KafkaRecordSink_2_6
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_5
org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_5
org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_5
org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_5
org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_6
org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6
org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6
org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<h2>Description</h2>
<p>
This Processor polls <a href="http://kafka.apache.org/">Apache Kafka</a>
for data using KafkaConsumer API available with Kafka 2.5. When a message is received
for data using KafkaConsumer API available with Kafka 2.6. When a message is received
from Kafka, the message will be deserialized using the configured Record Reader, and then
written to a FlowFile by serializing the message with the configured Record Writer.
</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<h2>Description</h2>
<p>
This Processor polls <a href="http://kafka.apache.org/">Apache Kafka</a>
for data using KafkaConsumer API available with Kafka 2.5. When a message is received
for data using KafkaConsumer API available with Kafka 2.6. When a message is received
from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value
of the Kafka message.
</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ <h2>Description</h2>
<p>
This Processor puts the contents of a FlowFile to a Topic in
<a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available
with Kafka 2.5 API. The contents of the incoming FlowFile will be read using the
with Kafka 2.6 API. The contents of the incoming FlowFile will be read using the
configured Record Reader. Each record will then be serialized using the configured
Record Writer, and this serialized form will be the content of a Kafka message.
This message is optionally assigned a key by using the &lt;Kafka Key&gt; Property.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ <h2>Description</h2>
<p>
This Processor puts the contents of a FlowFile to a Topic in
<a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available
with Kafka 2.5 API. The content of a FlowFile becomes the contents of a Kafka message.
with Kafka 2.6 API. The content of a FlowFile becomes the contents of a Kafka message.
This message is optionally assigned a key by using the &lt;Kafka Key&gt; Property.
</p>

Expand Down
Loading

0 comments on commit 58e324e

Please sign in to comment.