Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.cdap.cdap.etl.api.Alert;
import io.cdap.cdap.etl.api.AlertPublisher;
import io.cdap.cdap.etl.api.AlertPublisherContext;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.plugin.common.KafkaHelpers;
import io.cdap.plugin.common.KeyValueListParser;
Expand Down Expand Up @@ -62,13 +63,14 @@ public KafkaAlertPublisher(Config config) {

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
config.validate();
config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
}

@Override
public void initialize(AlertPublisherContext context) throws Exception {
super.initialize(context);
config.validate();
config.validate(context.getFailureCollector());
context.getFailureCollector().getOrThrowException();
Properties props = new Properties();
// Add client id property with stage name as value.
props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getStageName());
Expand Down Expand Up @@ -112,6 +114,7 @@ public void destroy() {
*/
public static class Config extends PluginConfig {

public static final String TOPIC = "topic";
@Name("brokers")
@Description("Specifies the connection string where Producer can find one or more brokers to " +
"determine the leader for each topic.")
Expand Down Expand Up @@ -157,7 +160,7 @@ private Map<String, String> getProducerProperties() {
return producerProps;
}

private void validate() {
private void validate(FailureCollector collector) {
// If the topic or brokers are macros they would not be available at config time. So do not perform
// validations yet.
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(brokers)) {
Expand All @@ -167,11 +170,13 @@ private void validate() {
try {
Topic.validate(topic);
} catch (InvalidTopicException e) {
throw new IllegalArgumentException(String.format("Topic name %s is not a valid kafka topic. Please provide " +
"valid kafka topic name. %s", topic, e.getMessage()));
collector.addFailure(String.format(
"Topic name %s is not a valid kafka topic. Please provide valid kafka topic name. %s", topic,
e.getMessage()), null)
.withConfigProperty(TOPIC);
}

KafkaHelpers.validateKerberosSetting(principal, keytabLocation);
KafkaHelpers.validateKerberosSetting(principal, keytabLocation, collector);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.common.io.ByteBuffers;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
Expand All @@ -53,6 +55,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -122,9 +125,9 @@ public Map<String, String> getKafkaProperties() {
return conf;
}

public void validate() {
super.validate();
KafkaHelpers.validateKerberosSetting(principal, keytabLocation);
public void validate(FailureCollector collector) {
super.validate(collector);
KafkaHelpers.validateKerberosSetting(principal, keytabLocation, collector);
}
}

Expand All @@ -134,16 +137,23 @@ public KafkaBatchSource(Kafka10BatchConfig config) {

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
config.validate();
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer();
FailureCollector failureCollector = stageConfigurer.getFailureCollector();
config.validate(failureCollector);
stageConfigurer.setOutputSchema(config.getSchema(failureCollector));
failureCollector.getOrThrowException();
}

@Override
public void prepareRun(BatchSourceContext context) throws Exception {
Job job = JobUtils.createInstance();
Configuration conf = job.getConfiguration();

KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets();
FailureCollector failureCollector = context.getFailureCollector();
KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets(failureCollector);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned before, do all the validation here before anything else happens. This should be:

    FailureCollector failureCollector = context.getFailureCollector();
    KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets(failureCollector);
    Schema schema = config.getSchema(failureCollector);
    ... partitions = config.getPartitions(failureCollector);
    failureCollector.getOrThrowException();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but I previously moved FailureCollector later in the function, since I didn't think the if block or the saveKafkaRequests call needed to be done after getOrThrowException.

Schema schema = config.getSchema(failureCollector);
Set<Integer> partitions = config.getPartitions(failureCollector);
failureCollector.getOrThrowException();

// If the offset directory is provided, try to load the file
if (!context.isPreviewEnabled() && config.getOffsetDir() != null) {
Expand All @@ -167,10 +177,11 @@ public void prepareRun(BatchSourceContext context) throws Exception {
KafkaHelpers.setupKerberosLogin(kafkaConf, config.getPrincipal(), config.getKeytabLocation());
kafkaConf.putAll(config.getKafkaProperties());
kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), kafkaConf,
config.getPartitions(), config.getMaxNumberRecords(),
partitions,
config.getMaxNumberRecords(),
partitionOffsets);
LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName);
Schema schema = config.getSchema();

if (schema != null) {
lineageRecorder.createExternalDataset(schema);
if (schema.getFields() != null && !schema.getFields().isEmpty()) {
Expand Down Expand Up @@ -203,8 +214,11 @@ public void onRunFinish(boolean succeeded, BatchSourceContext context) {
public void initialize(BatchRuntimeContext context) throws Exception {
super.initialize(context);

schema = config.getSchema();
Schema messageSchema = config.getMessageSchema();
schema = config.getSchema(context.getFailureCollector());
Schema messageSchema = config.getMessageSchema(context.getFailureCollector());
if (schema == null || messageSchema == null) {
return;
}
for (Schema.Field field : schema.getFields()) {
String name = field.getName();
if (!name.equals(config.getKeyField()) && !name.equals(config.getPartitionField()) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.cdap.plugin.common;

import com.google.common.base.Strings;
import io.cdap.cdap.etl.api.FailureCollector;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
Expand All @@ -33,6 +34,8 @@
public final class KafkaHelpers {
private static final Logger LOG = LoggerFactory.getLogger(KafkaHelpers.class);
public static final String SASL_JAAS_CONFIG = "sasl.jaas.config";
public static final String PRINCIPAL = "principal";
public static final String KEYTAB = "keytab";

// This class cannot be instantiated
private KafkaHelpers() {
Expand Down Expand Up @@ -113,11 +116,28 @@ public static void setupKerberosLogin(Map<String, ? super String> conf, @Nullabl
*/
public static void validateKerberosSetting(@Nullable String principal, @Nullable String keytab) {
if (Strings.isNullOrEmpty(principal) != Strings.isNullOrEmpty(keytab)) {
String emptyField = Strings.isNullOrEmpty(principal) ? "principal" : "keytab";
String emptyField = Strings.isNullOrEmpty(principal) ? PRINCIPAL : KEYTAB;
String message = emptyField + " is empty. When Kerberos security is enabled for Kafka, " +
"then both the principal and the keytab have " +
"to be specified. If Kerberos is not enabled, then both should be empty.";
throw new IllegalArgumentException(message);
}
}

/**
* Validates whether the principal and keytab are both set or both of them are null/empty.
* Stores the result in the provided failureCollector.
*
* @param principal Kerberos principal
* @param keytab Kerberos keytab for the principal
* @param collector input failureCollector into which the error will be added if present
*/
public static void validateKerberosSetting(@Nullable String principal, @Nullable String keytab,
FailureCollector collector) {
if (Strings.isNullOrEmpty(principal) != Strings.isNullOrEmpty(keytab)) {
String emptyField = Strings.isNullOrEmpty(principal) ? PRINCIPAL : KEYTAB;
String message = "Field " + emptyField + " must be specified.";
collector.addFailure(message, null).withConfigProperty(emptyField);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
Expand Down Expand Up @@ -58,6 +59,8 @@
@Description("KafkaSink to write events to kafka")
public class KafkaBatchSink extends ReferenceBatchSink<StructuredRecord, Text, Text> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchSink.class);
private static final String ASYNC = "async";
private static final String TOPIC = "topic";

// Configuration for the plugin.
private final Config producerConfig;
Expand All @@ -77,7 +80,8 @@ public KafkaBatchSink(Config producerConfig) {
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
super.configurePipeline(pipelineConfigurer);

producerConfig.validate();
producerConfig.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
pipelineConfigurer.getStageConfigurer().getFailureCollector().getOrThrowException();
}

@Override
Expand Down Expand Up @@ -135,35 +139,42 @@ public void destroy() {
*/
public static class Config extends ReferencePluginConfig {

@Name("brokers")
private static final String BROKERS = "brokers";
private static final String KEY = "key";
private static final String TOPIC = KafkaBatchSink.TOPIC;
private static final String FORMAT = "format";
private static final String KAFKA_PROPERTIES = "kafkaProperties";
private static final String COMPRESSION_TYPE = "compressionType";

@Name(BROKERS)
@Description("Specifies the connection string where Producer can find one or more brokers to " +
"determine the leader for each topic")
@Macro
private String brokers;

@Name("async")
@Name(ASYNC)
@Description("Specifies whether an acknowledgment is required from broker that message was received. " +
"Default is FALSE")
@Macro
private String async;

@Name("key")
@Name(KEY)
@Description("Specify the key field to be used in the message. Only String Partitioner is supported.")
@Macro
@Nullable
private String key;

@Name("topic")
@Name(TOPIC)
@Description("Topic to which message needs to be published")
@Macro
private String topic;

@Name("format")
@Name(FORMAT)
@Description("Format a structured record should be converted to")
@Macro
private String format;

@Name("kafkaProperties")
@Name(KAFKA_PROPERTIES)
@Description("Additional kafka producer properties to set")
@Macro
@Nullable
Expand All @@ -179,7 +190,7 @@ public static class Config extends ReferencePluginConfig {
@Nullable
private String keytabLocation;

@Name("compressionType")
@Name(COMPRESSION_TYPE)
@Description("Compression type to be applied on message")
@Macro
private String compressionType;
Expand All @@ -196,21 +207,23 @@ public Config(String brokers, String async, String key, String topic, String for
this.compressionType = compressionType;
}

private void validate() {
private void validate(FailureCollector collector) {
if (!async.equalsIgnoreCase("true") && !async.equalsIgnoreCase("false")) {
throw new IllegalArgumentException("Async flag has to be either TRUE or FALSE.");
collector.addFailure("Async flag has to be either TRUE or FALSE.", null)
.withConfigProperty(ASYNC);
}

KafkaHelpers.validateKerberosSetting(principal, keytabLocation);
KafkaHelpers.validateKerberosSetting(principal, keytabLocation, collector);
}
}

private static class KafkaOutputFormatProvider implements OutputFormatProvider {
public static final String HAS_KEY = "hasKey";
private final Map<String, String> conf;

KafkaOutputFormatProvider(Config kafkaSinkConfig) {
this.conf = new HashMap<>();
conf.put("topic", kafkaSinkConfig.topic);
conf.put(TOPIC, kafkaSinkConfig.topic);

conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.brokers);
conf.put("compression.type", kafkaSinkConfig.compressionType);
Expand All @@ -220,13 +233,13 @@ private static class KafkaOutputFormatProvider implements OutputFormatProvider {
KafkaHelpers.setupKerberosLogin(conf, kafkaSinkConfig.principal, kafkaSinkConfig.keytabLocation);
addKafkaProperties(kafkaSinkConfig.kafkaProperties);

conf.put("async", kafkaSinkConfig.async);
conf.put(ASYNC, kafkaSinkConfig.async);
if (kafkaSinkConfig.async.equalsIgnoreCase("true")) {
conf.put(ACKS_REQUIRED, "1");
}

if (!Strings.isNullOrEmpty(kafkaSinkConfig.key)) {
conf.put("hasKey", kafkaSinkConfig.key);
conf.put(HAS_KEY, kafkaSinkConfig.key);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import io.cdap.cdap.etl.api.Alert;
import io.cdap.cdap.etl.api.AlertPublisher;
import io.cdap.cdap.etl.api.AlertPublisherContext;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.plugin.common.KeyValueListParser;
import kafka.common.InvalidTopicException;
import kafka.common.Topic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -61,13 +62,13 @@ public KafkaAlertPublisher(Config config) {

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException {
config.validate();
config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
}

@Override
public void initialize(AlertPublisherContext context) throws Exception {
super.initialize(context);
config.validate();
config.validate(context.getFailureCollector());
Properties props = new Properties();
// Add client id property with stage name as value.
props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getStageName());
Expand Down Expand Up @@ -111,18 +112,22 @@ public void destroy() {
*/
public static class Config extends PluginConfig {

@Name("brokers")
private static final String TOPIC = "topic";
private static final String BROKERS = "brokers";
private static final String PRODUCER_PROPERTIES = "producerProperties";

@Name(BROKERS)
@Description("Specifies the connection string where Producer can find one or more brokers to " +
"determine the leader for each topic.")
@Macro
private String brokers;

@Name("topic")
@Name(TOPIC)
@Description("Topic to which message needs to be published. The topic should already exist on kafka.")
@Macro
private String topic;

@Name("producerProperties")
@Name(PRODUCER_PROPERTIES)
@Nullable
@Description("Additional kafka producer properties to set.")
private String producerProperties;
Expand All @@ -146,7 +151,7 @@ private Map<String, String> getProducerProperties() {
return producerProps;
}

private void validate() {
private void validate(FailureCollector collector) {
// If the topic or brokers are macros they would not be available at config time. So do not perform
// validations yet.
if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(brokers)) {
Expand All @@ -156,8 +161,9 @@ private void validate() {
try {
Topic.validate(topic);
} catch (InvalidTopicException e) {
throw new IllegalArgumentException(String.format("Topic name %s is not a valid kafka topic. Please provide " +
"valid kafka topic name. %s", topic, e.getMessage()));
collector.addFailure(String.format("Topic name %s is not a valid kafka topic. Please provide valid kafka" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing space between 'kafka' and 'topic' on the next line

"topic name. %s", topic, e.getMessage()), null)
.withConfigProperty(TOPIC);
}
}
}
Expand Down
Loading