diff --git a/kafka-plugins-0.10/src/main/java/io/cdap/plugin/alertpublisher/KafkaAlertPublisher.java b/kafka-plugins-0.10/src/main/java/io/cdap/plugin/alertpublisher/KafkaAlertPublisher.java index 0287894..38e6da7 100644 --- a/kafka-plugins-0.10/src/main/java/io/cdap/plugin/alertpublisher/KafkaAlertPublisher.java +++ b/kafka-plugins-0.10/src/main/java/io/cdap/plugin/alertpublisher/KafkaAlertPublisher.java @@ -27,6 +27,7 @@ import io.cdap.cdap.etl.api.Alert; import io.cdap.cdap.etl.api.AlertPublisher; import io.cdap.cdap.etl.api.AlertPublisherContext; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.plugin.common.KafkaHelpers; import io.cdap.plugin.common.KeyValueListParser; @@ -62,13 +63,14 @@ public KafkaAlertPublisher(Config config) { @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException { - config.validate(); + config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector()); } @Override public void initialize(AlertPublisherContext context) throws Exception { super.initialize(context); - config.validate(); + config.validate(context.getFailureCollector()); + context.getFailureCollector().getOrThrowException(); Properties props = new Properties(); // Add client id property with stage name as value. props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getStageName()); @@ -112,6 +114,7 @@ public void destroy() { */ public static class Config extends PluginConfig { + public static final String TOPIC = "topic"; @Name("brokers") @Description("Specifies the connection string where Producer can find one or more brokers to " + "determine the leader for each topic.") @@ -157,7 +160,7 @@ private Map getProducerProperties() { return producerProps; } - private void validate() { + private void validate(FailureCollector collector) { // If the topic or brokers are macros they would not be available at config time. So do not perform // validations yet. if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(brokers)) { @@ -167,11 +170,13 @@ private void validate() { try { Topic.validate(topic); } catch (InvalidTopicException e) { - throw new IllegalArgumentException(String.format("Topic name %s is not a valid kafka topic. Please provide " + - "valid kafka topic name. %s", topic, e.getMessage())); + collector.addFailure(String.format( + "Topic name %s is not a valid kafka topic. Please provide valid kafka topic name. %s", topic, + e.getMessage()), null) + .withConfigProperty(TOPIC); } - KafkaHelpers.validateKerberosSetting(principal, keytabLocation); + KafkaHelpers.validateKerberosSetting(principal, keytabLocation, collector); } } } diff --git a/kafka-plugins-0.10/src/main/java/io/cdap/plugin/batch/source/KafkaBatchSource.java b/kafka-plugins-0.10/src/main/java/io/cdap/plugin/batch/source/KafkaBatchSource.java index 870452d..1c32851 100644 --- a/kafka-plugins-0.10/src/main/java/io/cdap/plugin/batch/source/KafkaBatchSource.java +++ b/kafka-plugins-0.10/src/main/java/io/cdap/plugin/batch/source/KafkaBatchSource.java @@ -29,7 +29,9 @@ import io.cdap.cdap.api.dataset.lib.KeyValue; import io.cdap.cdap.common.io.ByteBuffers; import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; @@ -53,6 +55,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -122,9 +125,9 @@ public Map getKafkaProperties() { return conf; } - public void validate() { - super.validate(); - KafkaHelpers.validateKerberosSetting(principal, keytabLocation); + public void validate(FailureCollector collector) { + super.validate(collector); + KafkaHelpers.validateKerberosSetting(principal, keytabLocation, collector); } } @@ -134,8 +137,11 @@ public KafkaBatchSource(Kafka10BatchConfig config) { @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { - config.validate(); - pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema()); + StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); + FailureCollector failureCollector = stageConfigurer.getFailureCollector(); + config.validate(failureCollector); + stageConfigurer.setOutputSchema(config.getSchema(failureCollector)); + failureCollector.getOrThrowException(); } @Override @@ -143,7 +149,11 @@ public void prepareRun(BatchSourceContext context) throws Exception { Job job = JobUtils.createInstance(); Configuration conf = job.getConfiguration(); - KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets(); + FailureCollector failureCollector = context.getFailureCollector(); + KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets(failureCollector); + Schema schema = config.getSchema(failureCollector); + Set partitions = config.getPartitions(failureCollector); + failureCollector.getOrThrowException(); // If the offset directory is provided, try to load the file if (!context.isPreviewEnabled() && config.getOffsetDir() != null) { @@ -167,10 +177,11 @@ public void prepareRun(BatchSourceContext context) throws Exception { KafkaHelpers.setupKerberosLogin(kafkaConf, config.getPrincipal(), config.getKeytabLocation()); kafkaConf.putAll(config.getKafkaProperties()); kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), kafkaConf, - config.getPartitions(), config.getMaxNumberRecords(), + partitions, + config.getMaxNumberRecords(), partitionOffsets); LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName); - Schema schema = config.getSchema(); + if (schema != null) { lineageRecorder.createExternalDataset(schema); if (schema.getFields() != null && !schema.getFields().isEmpty()) { @@ -203,8 +214,11 @@ public void onRunFinish(boolean succeeded, BatchSourceContext context) { public void initialize(BatchRuntimeContext context) throws Exception { super.initialize(context); - schema = config.getSchema(); - Schema messageSchema = config.getMessageSchema(); + schema = config.getSchema(context.getFailureCollector()); + Schema messageSchema = config.getMessageSchema(context.getFailureCollector()); + if (schema == null || messageSchema == null) { + return; + } for (Schema.Field field : schema.getFields()) { String name = field.getName(); if (!name.equals(config.getKeyField()) && !name.equals(config.getPartitionField()) && diff --git a/kafka-plugins-0.10/src/main/java/io/cdap/plugin/common/KafkaHelpers.java b/kafka-plugins-0.10/src/main/java/io/cdap/plugin/common/KafkaHelpers.java index 84a159b..d9f152c 100644 --- a/kafka-plugins-0.10/src/main/java/io/cdap/plugin/common/KafkaHelpers.java +++ b/kafka-plugins-0.10/src/main/java/io/cdap/plugin/common/KafkaHelpers.java @@ -17,6 +17,7 @@ package io.cdap.plugin.common; import com.google.common.base.Strings; +import io.cdap.cdap.etl.api.FailureCollector; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; @@ -33,6 +34,8 @@ public final class KafkaHelpers { private static final Logger LOG = LoggerFactory.getLogger(KafkaHelpers.class); public static final String SASL_JAAS_CONFIG = "sasl.jaas.config"; + public static final String PRINCIPAL = "principal"; + public static final String KEYTAB = "keytab"; // This class cannot be instantiated private KafkaHelpers() { @@ -113,11 +116,28 @@ public static void setupKerberosLogin(Map conf, @Nullabl */ public static void validateKerberosSetting(@Nullable String principal, @Nullable String keytab) { if (Strings.isNullOrEmpty(principal) != Strings.isNullOrEmpty(keytab)) { - String emptyField = Strings.isNullOrEmpty(principal) ? "principal" : "keytab"; + String emptyField = Strings.isNullOrEmpty(principal) ? PRINCIPAL : KEYTAB; String message = emptyField + " is empty. When Kerberos security is enabled for Kafka, " + "then both the principal and the keytab have " + "to be specified. If Kerberos is not enabled, then both should be empty."; throw new IllegalArgumentException(message); } } + + /** + * Validates whether the principal and keytab are both set or both of them are null/empty. + * Stores the result in the provided failureCollector. + * + * @param principal Kerberos principal + * @param keytab Kerberos keytab for the principal + * @param collector input failureCollector into which the error will be added if present + */ + public static void validateKerberosSetting(@Nullable String principal, @Nullable String keytab, + FailureCollector collector) { + if (Strings.isNullOrEmpty(principal) != Strings.isNullOrEmpty(keytab)) { + String emptyField = Strings.isNullOrEmpty(principal) ? PRINCIPAL : KEYTAB; + String message = "Field " + emptyField + " must be specified."; + collector.addFailure(message, null).withConfigProperty(emptyField); + } + } } diff --git a/kafka-plugins-0.10/src/main/java/io/cdap/plugin/sink/KafkaBatchSink.java b/kafka-plugins-0.10/src/main/java/io/cdap/plugin/sink/KafkaBatchSink.java index 1c7cb7c..a38f671 100644 --- a/kafka-plugins-0.10/src/main/java/io/cdap/plugin/sink/KafkaBatchSink.java +++ b/kafka-plugins-0.10/src/main/java/io/cdap/plugin/sink/KafkaBatchSink.java @@ -28,6 +28,7 @@ import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; @@ -58,6 +59,8 @@ @Description("KafkaSink to write events to kafka") public class KafkaBatchSink extends ReferenceBatchSink { private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchSink.class); + private static final String ASYNC = "async"; + private static final String TOPIC = "topic"; // Configuration for the plugin. private final Config producerConfig; @@ -77,7 +80,8 @@ public KafkaBatchSink(Config producerConfig) { public void configurePipeline(PipelineConfigurer pipelineConfigurer) { super.configurePipeline(pipelineConfigurer); - producerConfig.validate(); + producerConfig.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector()); + pipelineConfigurer.getStageConfigurer().getFailureCollector().getOrThrowException(); } @Override @@ -135,35 +139,42 @@ public void destroy() { */ public static class Config extends ReferencePluginConfig { - @Name("brokers") + private static final String BROKERS = "brokers"; + private static final String KEY = "key"; + private static final String TOPIC = KafkaBatchSink.TOPIC; + private static final String FORMAT = "format"; + private static final String KAFKA_PROPERTIES = "kafkaProperties"; + private static final String COMPRESSION_TYPE = "compressionType"; + + @Name(BROKERS) @Description("Specifies the connection string where Producer can find one or more brokers to " + "determine the leader for each topic") @Macro private String brokers; - @Name("async") + @Name(ASYNC) @Description("Specifies whether an acknowledgment is required from broker that message was received. " + "Default is FALSE") @Macro private String async; - @Name("key") + @Name(KEY) @Description("Specify the key field to be used in the message. Only String Partitioner is supported.") @Macro @Nullable private String key; - @Name("topic") + @Name(TOPIC) @Description("Topic to which message needs to be published") @Macro private String topic; - @Name("format") + @Name(FORMAT) @Description("Format a structured record should be converted to") @Macro private String format; - @Name("kafkaProperties") + @Name(KAFKA_PROPERTIES) @Description("Additional kafka producer properties to set") @Macro @Nullable @@ -179,7 +190,7 @@ public static class Config extends ReferencePluginConfig { @Nullable private String keytabLocation; - @Name("compressionType") + @Name(COMPRESSION_TYPE) @Description("Compression type to be applied on message") @Macro private String compressionType; @@ -196,21 +207,23 @@ public Config(String brokers, String async, String key, String topic, String for this.compressionType = compressionType; } - private void validate() { + private void validate(FailureCollector collector) { if (!async.equalsIgnoreCase("true") && !async.equalsIgnoreCase("false")) { - throw new IllegalArgumentException("Async flag has to be either TRUE or FALSE."); + collector.addFailure("Async flag has to be either TRUE or FALSE.", null) + .withConfigProperty(ASYNC); } - KafkaHelpers.validateKerberosSetting(principal, keytabLocation); + KafkaHelpers.validateKerberosSetting(principal, keytabLocation, collector); } } private static class KafkaOutputFormatProvider implements OutputFormatProvider { + public static final String HAS_KEY = "hasKey"; private final Map conf; KafkaOutputFormatProvider(Config kafkaSinkConfig) { this.conf = new HashMap<>(); - conf.put("topic", kafkaSinkConfig.topic); + conf.put(TOPIC, kafkaSinkConfig.topic); conf.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSinkConfig.brokers); conf.put("compression.type", kafkaSinkConfig.compressionType); @@ -220,13 +233,13 @@ private static class KafkaOutputFormatProvider implements OutputFormatProvider { KafkaHelpers.setupKerberosLogin(conf, kafkaSinkConfig.principal, kafkaSinkConfig.keytabLocation); addKafkaProperties(kafkaSinkConfig.kafkaProperties); - conf.put("async", kafkaSinkConfig.async); + conf.put(ASYNC, kafkaSinkConfig.async); if (kafkaSinkConfig.async.equalsIgnoreCase("true")) { conf.put(ACKS_REQUIRED, "1"); } if (!Strings.isNullOrEmpty(kafkaSinkConfig.key)) { - conf.put("hasKey", kafkaSinkConfig.key); + conf.put(HAS_KEY, kafkaSinkConfig.key); } } diff --git a/kafka-plugins-0.8/src/main/java/io/cdap/plugin/alertpublisher/KafkaAlertPublisher.java b/kafka-plugins-0.8/src/main/java/io/cdap/plugin/alertpublisher/KafkaAlertPublisher.java index 5bcb0f4..8e8401e 100644 --- a/kafka-plugins-0.8/src/main/java/io/cdap/plugin/alertpublisher/KafkaAlertPublisher.java +++ b/kafka-plugins-0.8/src/main/java/io/cdap/plugin/alertpublisher/KafkaAlertPublisher.java @@ -27,13 +27,14 @@ import io.cdap.cdap.etl.api.Alert; import io.cdap.cdap.etl.api.AlertPublisher; import io.cdap.cdap.etl.api.AlertPublisherContext; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.plugin.common.KeyValueListParser; -import kafka.common.InvalidTopicException; import kafka.common.Topic; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.InvalidTopicException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,13 +62,13 @@ public KafkaAlertPublisher(Config config) { @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) throws IllegalArgumentException { - config.validate(); + config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector()); } @Override public void initialize(AlertPublisherContext context) throws Exception { super.initialize(context); - config.validate(); + config.validate(context.getFailureCollector()); Properties props = new Properties(); // Add client id property with stage name as value. props.put(ProducerConfig.CLIENT_ID_CONFIG, context.getStageName()); @@ -111,18 +112,22 @@ public void destroy() { */ public static class Config extends PluginConfig { - @Name("brokers") + private static final String TOPIC = "topic"; + private static final String BROKERS = "brokers"; + private static final String PRODUCER_PROPERTIES = "producerProperties"; + + @Name(BROKERS) @Description("Specifies the connection string where Producer can find one or more brokers to " + "determine the leader for each topic.") @Macro private String brokers; - @Name("topic") + @Name(TOPIC) @Description("Topic to which message needs to be published. The topic should already exist on kafka.") @Macro private String topic; - @Name("producerProperties") + @Name(PRODUCER_PROPERTIES) @Nullable @Description("Additional kafka producer properties to set.") private String producerProperties; @@ -146,7 +151,7 @@ private Map getProducerProperties() { return producerProps; } - private void validate() { + private void validate(FailureCollector collector) { // If the topic or brokers are macros they would not be available at config time. So do not perform // validations yet. if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(brokers)) { @@ -156,8 +161,9 @@ private void validate() { try { Topic.validate(topic); } catch (InvalidTopicException e) { - throw new IllegalArgumentException(String.format("Topic name %s is not a valid kafka topic. Please provide " + - "valid kafka topic name. %s", topic, e.getMessage())); + collector.addFailure(String.format("Topic name %s is not a valid kafka topic. Please provide valid kafka" + + "topic name. %s", topic, e.getMessage()), null) + .withConfigProperty(TOPIC); } } } diff --git a/kafka-plugins-0.8/src/main/java/io/cdap/plugin/batch/source/Kafka08Reader.java b/kafka-plugins-0.8/src/main/java/io/cdap/plugin/batch/source/Kafka08Reader.java index db1321b..a108e95 100644 --- a/kafka-plugins-0.8/src/main/java/io/cdap/plugin/batch/source/Kafka08Reader.java +++ b/kafka-plugins-0.8/src/main/java/io/cdap/plugin/batch/source/Kafka08Reader.java @@ -65,7 +65,8 @@ final class Kafka08Reader implements KafkaReader { // read data from queue Map conf = request.getConf(); - Broker leader = getLeader(KafkaBatchConfig.parseBrokerMap(conf.get(KafkaInputFormat.KAFKA_BROKERS)), + //no failureCollector is available here + Broker leader = getLeader(KafkaBatchConfig.parseBrokerMap(conf.get(KafkaInputFormat.KAFKA_BROKERS), null), request.getTopic(), request.getPartition()); this.simpleConsumer = new SimpleConsumer(leader.host(), leader.port(), 20 * 1000, fetchBufferSize, "client"); } diff --git a/kafka-plugins-0.8/src/main/java/io/cdap/plugin/batch/source/KafkaBatchSource.java b/kafka-plugins-0.8/src/main/java/io/cdap/plugin/batch/source/KafkaBatchSource.java index 6ec5c38..e00f171 100644 --- a/kafka-plugins-0.8/src/main/java/io/cdap/plugin/batch/source/KafkaBatchSource.java +++ b/kafka-plugins-0.8/src/main/java/io/cdap/plugin/batch/source/KafkaBatchSource.java @@ -27,7 +27,9 @@ import io.cdap.cdap.api.dataset.lib.KeyValue; import io.cdap.cdap.common.io.ByteBuffers; import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; @@ -47,6 +49,8 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -73,8 +77,10 @@ public KafkaBatchSource(KafkaBatchConfig config) { @Override public void configurePipeline(PipelineConfigurer pipelineConfigurer) { - config.validate(); - pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema()); + StageConfigurer stageConfigurer = pipelineConfigurer.getStageConfigurer(); + config.validate(stageConfigurer.getFailureCollector()); + stageConfigurer.setOutputSchema(config.getSchema(stageConfigurer.getFailureCollector())); + stageConfigurer.getFailureCollector().getOrThrowException(); } @Override @@ -82,7 +88,12 @@ public void prepareRun(BatchSourceContext context) throws Exception { Job job = JobUtils.createInstance(); Configuration conf = job.getConfiguration(); - KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets(); + FailureCollector failureCollector = context.getFailureCollector(); + KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets(failureCollector); + Map brokerMap = config.getBrokerMap(failureCollector); + Set partitions = config.getPartitions(failureCollector); + Schema schema = config.getSchema(failureCollector); + failureCollector.getOrThrowException(); // If the offset directory is provided, try to load the file if (!context.isPreviewEnabled() && config.getOffsetDir() != null) { @@ -98,12 +109,9 @@ public void prepareRun(BatchSourceContext context) throws Exception { // Load the offset from the offset file partitionOffsets = KafkaPartitionOffsets.load(fileContext, offsetsFile); } - - kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), config.getBrokerMap(), - config.getPartitions(), config.getMaxNumberRecords(), - partitionOffsets); + kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), brokerMap, partitions, + config.getMaxNumberRecords(), partitionOffsets); LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName); - Schema schema = config.getSchema(); if (schema != null) { lineageRecorder.createExternalDataset(schema); if (schema.getFields() != null && !schema.getFields().isEmpty()) { @@ -135,8 +143,11 @@ public void onRunFinish(boolean succeeded, BatchSourceContext context) { @Override public void initialize(BatchRuntimeContext context) throws Exception { super.initialize(context); - schema = config.getSchema(); - Schema messageSchema = config.getMessageSchema(); + schema = config.getSchema(context.getFailureCollector()); + Schema messageSchema = config.getMessageSchema(context.getFailureCollector()); + if (schema == null || messageSchema == null) { + return; + } for (Schema.Field field : schema.getFields()) { String name = field.getName(); if (!name.equals(config.getKeyField()) && !name.equals(config.getPartitionField()) && diff --git a/kafka-plugins-common/src/main/java/io/cdap/plugin/batch/source/KafkaBatchConfig.java b/kafka-plugins-common/src/main/java/io/cdap/plugin/batch/source/KafkaBatchConfig.java index d15c88d..f3bd717 100644 --- a/kafka-plugins-common/src/main/java/io/cdap/plugin/batch/source/KafkaBatchConfig.java +++ b/kafka-plugins-common/src/main/java/io/cdap/plugin/batch/source/KafkaBatchConfig.java @@ -8,6 +8,7 @@ import io.cdap.cdap.api.data.format.FormatSpecification; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; +import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.format.RecordFormats; import io.cdap.plugin.common.KeyValueListParser; import io.cdap.plugin.common.ReferencePluginConfig; @@ -29,6 +30,14 @@ */ public class KafkaBatchConfig extends ReferencePluginConfig { + public static final String KEY_FIELD = "keyField"; + public static final String PARTITION_FIELD = "partitionField"; + public static final String OFFSET_FIELD = "offsetField"; + public static final String SCHEMA = "schema"; + public static final String INITIAL_PARTITION_OFFSETS = "initialPartitionOffsets"; + public static final String FORMAT = "format"; + public static final String KAFKA_BROKERS = "kafkaBrokers"; + @Description("Kafka topic to read from.") @Macro private String topic; @@ -121,7 +130,7 @@ public String getOffsetDir() { return offsetDir; } - public Set getPartitions() { + public Set getPartitions(FailureCollector collector) { Set partitionSet = new HashSet<>(); if (partitions == null) { return partitionSet; @@ -130,8 +139,8 @@ public Set getPartitions() { try { partitionSet.add(Integer.parseInt(partition)); } catch (NumberFormatException e) { - throw new IllegalArgumentException( - String.format("Invalid partition '%s'. Partitions must be integers.", partition)); + collector.addFailure(String.format("Invalid partition '%s'. Partitions must be integers.", partition), null) + .withConfigElement("partitions", partition); } } return partitionSet; @@ -162,11 +171,12 @@ public String getFormat() { } @Nullable - public Schema getSchema() { + public Schema getSchema(FailureCollector collector) { try { return Strings.isNullOrEmpty(schema) ? null : Schema.parseJson(schema); } catch (IOException e) { - throw new IllegalArgumentException("Unable to parse schema: " + e.getMessage()); + collector.addFailure("Unable to parse schema: " + e.getMessage(), null).withConfigProperty(SCHEMA); + return null; } } @@ -174,8 +184,11 @@ public Schema getSchema() { * Gets the message schema from the schema field. * If the time, key, partition, or offset fields are in the configured schema, they will be removed. */ - public Schema getMessageSchema() { - Schema schema = getSchema(); + public Schema getMessageSchema(FailureCollector collector) { + Schema schema = getSchema(collector); + if (schema == null) { + return null; + } List messageFields = new ArrayList<>(); boolean keyFieldExists = false; boolean partitionFieldExists = false; @@ -190,17 +203,23 @@ public Schema getMessageSchema() { // if the field is not the time field and not the key field, it is a message field. if (fieldName.equals(keyField)) { if (fieldType != Schema.Type.BYTES) { - throw new IllegalArgumentException("The key field must be of type bytes or nullable bytes."); + collector.addFailure("The key field must be of type bytes or nullable bytes.", null) + .withConfigProperty(KEY_FIELD) + .withOutputSchemaField(keyField); } keyFieldExists = true; } else if (fieldName.equals(partitionField)) { if (fieldType != Schema.Type.INT) { - throw new IllegalArgumentException("The partition field must be of type int."); + collector.addFailure("The partition field must be of type int.", null) + .withConfigProperty(PARTITION_FIELD) + .withOutputSchemaField(partitionField); } partitionFieldExists = true; } else if (fieldName.equals(offsetField)) { if (fieldType != Schema.Type.LONG) { - throw new IllegalArgumentException("The offset field must be of type long."); + collector.addFailure("The offset field must be of type long.", null) + .withConfigProperty(OFFSET_FIELD) + .withOutputSchemaField(offsetField); } offsetFieldExists = true; } else { @@ -208,20 +227,23 @@ public Schema getMessageSchema() { } } if (messageFields.isEmpty()) { - throw new IllegalArgumentException( - "Schema must contain at least one other field besides the time and key fields."); + collector.addFailure("Schema must contain at least one other field besides the time and key fields.", null) + .withConfigProperty(SCHEMA); } if (getKeyField() != null && !keyFieldExists) { - throw new IllegalArgumentException(String.format( - "keyField '%s' does not exist in the schema. Please add it to the schema.", keyField)); + collector.addFailure(String.format( + "keyField '%s' does not exist in the schema. Please add it to the schema.", keyField), null) + .withConfigProperty(KEY_FIELD); } if (getPartitionField() != null && !partitionFieldExists) { - throw new IllegalArgumentException(String.format( - "partitionField '%s' does not exist in the schema. Please add it to the schema.", partitionField)); + collector.addFailure(String.format( + "partitionField '%s' does not exist in the schema. Please add it to the schema.", partitionField), null) + .withConfigProperty(PARTITION_FIELD); } if (getOffsetField() != null && !offsetFieldExists) { - throw new IllegalArgumentException(String.format( - "offsetField '%s' does not exist in the schema. Please add it to the schema.", offsetField)); + collector.addFailure(String.format( + "offsetField '%s' does not exist in the schema. Please add it to the schema.", offsetField), null) + .withConfigProperty(OFFSET_FIELD); } return Schema.recordOf("kafka.message", messageFields); } @@ -229,14 +251,14 @@ public Schema getMessageSchema() { /** * @return broker host to broker port mapping. */ - public Map getBrokerMap() { - return parseBrokerMap(kafkaBrokers); + public Map getBrokerMap(FailureCollector collector) { + return parseBrokerMap(kafkaBrokers, collector); } /** * Parses a given Kafka broker string, which is in comma separate host:port format, into a Map of host to port. */ - public static Map parseBrokerMap(String kafkaBrokers) { + public static Map parseBrokerMap(String kafkaBrokers, @Nullable FailureCollector collector) { Map brokerMap = new HashMap<>(); for (KeyValue hostAndPort : KeyValueListParser.DEFAULT.parse(kafkaBrokers)) { String host = hostAndPort.getKey(); @@ -244,12 +266,20 @@ public static Map parseBrokerMap(String kafkaBrokers) { try { brokerMap.put(host, Integer.parseInt(portStr)); } catch (NumberFormatException e) { - throw new IllegalArgumentException(String.format( - "Invalid port '%s' for host '%s'.", portStr, host)); + String errorMessage = String.format("Invalid port '%s' for host '%s'.", portStr, host); + if (collector != null) { + collector.addFailure(errorMessage, null).withConfigElement(KAFKA_BROKERS, host + ":" + portStr); + } else { + throw new IllegalArgumentException(errorMessage); + } } } if (brokerMap.isEmpty()) { - throw new IllegalArgumentException("Must specify kafka brokers."); + if (collector != null) { + collector.addFailure("Must specify kafka brokers.", null).withConfigProperty(KAFKA_BROKERS); + } else { + throw new IllegalArgumentException("Must specify kafka brokers."); + } } return brokerMap; } @@ -257,7 +287,7 @@ public static Map parseBrokerMap(String kafkaBrokers) { /** * Gets the partition offsets as specified by the {@link #initialPartitionOffsets} field. */ - public KafkaPartitionOffsets getInitialPartitionOffsets() { + public KafkaPartitionOffsets getInitialPartitionOffsets(FailureCollector collector) { KafkaPartitionOffsets partitionOffsets = new KafkaPartitionOffsets(Collections.emptyMap()); if (initialPartitionOffsets == null) { @@ -272,16 +302,19 @@ public KafkaPartitionOffsets getInitialPartitionOffsets() { try { partition = Integer.parseInt(partitionStr); } catch (NumberFormatException e) { - throw new IllegalArgumentException(String.format( - "Invalid partition '%s' in initialPartitionOffsets.", partitionStr)); + collector.addFailure(String.format("Invalid partition '%s' in initialPartitionOffsets.", partitionStr), null) + .withConfigElement(INITIAL_PARTITION_OFFSETS, partitionStr + ":" + offsetStr); + continue; } long offset; try { offset = Long.parseLong(offsetStr); } catch (NumberFormatException e) { - throw new IllegalArgumentException(String.format( - "Invalid offset '%s' in initialPartitionOffsets for partition %d.", partitionStr, partition)); + collector.addFailure(String.format("Invalid offset '%s' in initialPartitionOffsets for partition %d.", + partitionStr, partition), null) + .withConfigElement(INITIAL_PARTITION_OFFSETS, partitionStr + ":" + offsetStr); + continue; } partitionOffsets.setPartitionOffset(partition, offset); @@ -290,24 +323,28 @@ public KafkaPartitionOffsets getInitialPartitionOffsets() { return partitionOffsets; } - public void validate() { + public void validate(FailureCollector collector) { // brokers can be null since it is macro enabled. if (kafkaBrokers != null) { - getBrokerMap(); + getBrokerMap(collector); } - getPartitions(); - getInitialPartitionOffsets(); + getPartitions(collector); + getInitialPartitionOffsets(collector); - Schema messageSchema = getMessageSchema(); + Schema messageSchema = getMessageSchema(collector); + if (messageSchema == null) { + return; //since parsing error would have already been added to collector + } // if format is empty, there must be just a single message field of type bytes or nullable types. if (Strings.isNullOrEmpty(format)) { List messageFields = messageSchema.getFields(); if (messageFields.size() > 1) { String fieldNames = messageFields.stream().map(Schema.Field::getName).collect(Collectors.joining(",")); - throw new IllegalArgumentException(String.format( - "Without a format, the schema must contain just a single message field of type bytes or nullable bytes. " + - "Found %s message fields (%s).", messageFields.size(), fieldNames)); + collector.addFailure(String.format( + "Without a format, the schema must contain just a single message field of " + + "type bytes or nullable bytes. Found %s message fields (%s).", messageFields.size(), fieldNames), null) + .withConfigProperty(FORMAT); } Schema.Field messageField = messageFields.get(0); @@ -315,9 +352,11 @@ public void validate() { Schema.Type messageFieldType = messageFieldSchema.isNullable() ? messageFieldSchema.getNonNullable().getType() : messageFieldSchema.getType(); if (messageFieldType != Schema.Type.BYTES) { - throw new IllegalArgumentException(String.format( - "Without a format, the message field must be of type bytes or nullable bytes, but field %s is of type %s.", - messageField.getName(), messageField.getSchema())); + collector.addFailure(String.format( + "Without a format, the message field must be of type bytes or nullable " + + "bytes, but field %s is of type %s.", messageField.getName(), messageField.getSchema()), null) + .withOutputSchemaField(messageField.getName()) + .withConfigProperty(FORMAT); } } else { // otherwise, if there is a format, make sure we can instantiate it. @@ -326,9 +365,9 @@ public void validate() { try { RecordFormats.createInitializedFormat(formatSpec); } catch (Exception e) { - throw new IllegalArgumentException(String.format( - "Unable to instantiate a message parser from format '%s' and message schema '%s': %s", - format, messageSchema, e.getMessage()), e); + collector.addFailure(String.format("Unable to instantiate a message parser from format '%s' and message " + + "schema '%s': %s", format, messageSchema, e.getMessage()), null) + .withConfigProperty(FORMAT); } } }