Skip to content

Conversation

@vravish
Copy link
Collaborator

@vravish vravish commented Sep 19, 2019

Using the new Failure collectors to store several errors and not stop after the first one.

@vravish vravish requested a review from albertshau September 19, 2019 04:59
if (fieldName.equals(keyField)) {
if (fieldType != Schema.Type.BYTES) {
throw new IllegalArgumentException("The key field must be of type bytes or nullable bytes.");
collector.addFailure("The key field must be of type bytes or nullable bytes.", null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.withConfigProperty("keyField")
.withOutputSchemaField(keyField)

This should highlight both the keyField textbox as well as the corresponding field in the output schema.

} else if (fieldName.equals(partitionField)) {
if (fieldType != Schema.Type.INT) {
throw new IllegalArgumentException("The partition field must be of type int.");
collector.addFailure("The partition field must be of type int.", null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.withConfigProperty("partitionField")
.withOutputSchemaField(partitionField)

} else if (fieldName.equals(offsetField)) {
if (fieldType != Schema.Type.LONG) {
throw new IllegalArgumentException("The offset field must be of type long.");
collector.addFailure("The offset field must be of type long.", null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.withConfigProperty("offsetField")
.withOutputSchemaField(offsetField)

throw new IllegalArgumentException(String.format(
"keyField '%s' does not exist in the schema. Please add it to the schema.", keyField));
collector.addFailure(String.format(
"keyField '%s' does not exist in the schema. Please add it to the schema.", keyField), null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.withConfigProperty("keyField")

should also create a constant private static final String KEY_FIELD = "keyField"; and use that everywhere I'm writing "keyField"

collector.addFailure(String.format(
"Unable to instantiate a message parser from format '%s' and message schema '%s': %s",
format, messageSchema, e.getMessage()), e);
format, messageSchema, e.getMessage()), null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.withConfigProperty("format")

collector.addFailure(String.format(
"Without a format, the schema must contain just a single message field of type bytes or nullable bytes. " +
"Found %s message fields (%s).", messageFields.size(), fieldNames));
"Found %s message fields (%s).", messageFields.size(), fieldNames), null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.withConfigProperty("format")

kafkaConf.putAll(config.getKafkaProperties());
kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), kafkaConf,
config.getPartitions(), config.getMaxNumberRecords(),
config.getPartitions(context.getFailureCollector()), config.getMaxNumberRecords(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should perform all these gets at the start of the method, then call getOrThrowException on the failure collector.

config.validate();
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema(pipelineConfigurer.getStageConfigurer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is pretty verbose, let's create a variable for the StageConfigurer so that it's easier to read.

kafkaConf.putAll(config.getKafkaProperties());
kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), kafkaConf,
config.getPartitions(), config.getMaxNumberRecords(),
config.getPartitions(context.getFailureCollector()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this to the start of the method so that all the validation happens once at the start and so that execution is stopped if anything is invalid. It should look something like:

FailureCollector failureCollector = context.getFailureCollector();
KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets(failureCollector);
Partitions partitions = config.getPartitions(failureCollector);
Schema schema = config.getSchema(failureCollector);
failureCollector.getOrThrowException();

...

* @param collector input failureCollector into which the error will be added if present
*/
public static void validateKerberosSetting(@Nullable String principal, @Nullable String keytab,
FailureCollector collector) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: fix indentation

private void validate(FailureCollector collector) {
if (!async.equalsIgnoreCase("true") && !async.equalsIgnoreCase("false")) {
throw new IllegalArgumentException("Async flag has to be either TRUE or FALSE.");
collector.addFailure("Async flag has to be either TRUE or FALSE.", null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withConfigProperty(ASYNC)

Use constants instead of hardcoded strings everywhere. This comment applies to the other classes as well. For example:

public static class Config extends ReferencePluginConfig {
  public static final String ASYNC = "async";

  @Name(ASYNC)
  @Description("Specifies whether an acknowledgment is required from broker that message was received. " +
      "Default is FALSE")
  @Macro
  private String async;

  ...
}

String.format("Invalid partition '%s'. Partitions must be integers.", partition));
collector.addFailure(
String.format("Invalid partition '%s'. Partitions must be integers.", partition), null)
.withConfigElement("partitions", partition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

public Schema getMessageSchema() {
Schema schema = getSchema();
public Schema getMessageSchema(FailureCollector collector) {
Schema schema = getSchema(collector);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this has a problem, schema will be null and you'll get a NullPointerException below. Need to do a null check to avoid that.

throw new IllegalArgumentException(
"Schema must contain at least one other field besides the time and key fields.");
collector.addFailure("Schema must contain at least one other field besides the time and key fields.", null)
.withConfigProperty("schema");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation. Similar comment throughout the PR.

throw new IllegalArgumentException(String.format("Topic name %s is not a valid kafka topic. Please provide " +
"valid kafka topic name. %s", topic, e.getMessage()));
collector.addFailure(String.format("Topic name %s is not a valid kafka topic. Please provide " +
"valid kafka topic name. %s", topic, e.getMessage()), null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

String message = emptyField + " is empty. When Kerberos security is enabled for Kafka, " +
"then both the principal and the keytab have " +
"to be specified. If Kerberos is not enabled, then both should be empty.";
collector.addFailure(message, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to add a cause so that whatever field(s) have problems are highlighted. Similarly, the message should be specific to what is wrong.

In other words, if the principal is defined and the keytab is not, the keytab should be added as a cause, with a simple message saying the keytab must be specified. If the keytab is defined and the principal is not, the principal should be added as a cause with a message saying the principal must be specified.

throw new IllegalArgumentException(String.format("Topic name %s is not a valid kafka topic. Please provide " +
"valid kafka topic name. %s", topic, e.getMessage()));
collector.addFailure(String.format(
"Topic name %s is not a valid kafka topic. Please provide valid kafka topic name. %s", topic,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation is still off in this PR when there is a new line. This should either line up with the arguments from the previous line or be indented 2 spaces if its a method call.

    collector.addFailure(String.format(
      "Topic name %s is not a valid kafka topic. Please provide valid kafka topic name. %s", topic,
      e.getMessage()), null)
      .withConfigProperty(TOPIC);


KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets();
FailureCollector failureCollector = context.getFailureCollector();
KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets(failureCollector);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned before, do all the validation here before anything else happens. This should be:

    FailureCollector failureCollector = context.getFailureCollector();
    KafkaPartitionOffsets partitionOffsets = config.getInitialPartitionOffsets(failureCollector);
    Schema schema = config.getSchema(failureCollector);
    ... partitions = config.getPartitions(failureCollector);
    failureCollector.getOrThrowException();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but I previously moved FailureCollector later in the function, since I didn't think the if block or the saveKafkaRequests call needed to be done after getOrThrowException.

kafkaConf.putAll(config.getKafkaProperties());
kafkaRequests = KafkaInputFormat.saveKafkaRequests(conf, config.getTopic(), kafkaConf,
config.getPartitions(), config.getMaxNumberRecords(),
config.getPartitions(failureCollector),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a variable that was calculated at the start of the method.

Copy link
Collaborator Author

@vravish vravish Sep 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before or after the call to getOrThrowException? And which variable are you referring to?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, I just understood that you wanted me to move every reference to failureCollector to the beginning (including getPartitions), right before the call to getOrThrowException

partitionOffsets);
LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName);
Schema schema = config.getSchema();
Schema schema = config.getSchema(failureCollector);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a variable that was calculated at the start of the method.

LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName);
Schema schema = config.getSchema();
Schema schema = config.getSchema(failureCollector);
failureCollector.getOrThrowException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be at the top, as mentioned above

FailureCollector collector) {
if (Strings.isNullOrEmpty(principal) != Strings.isNullOrEmpty(keytab)) {
String emptyField = Strings.isNullOrEmpty(principal) ? "principal" : "keytab";
String message = emptyField + " is empty. When Kerberos security is enabled for Kafka, then both the principal " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the principal is defined but the keytab is not, this should be a simple message: "A keytab must be specified."

If the keytab is defined by the principal is not, there should be a simple message: "A principal must be specified."

No need for a corrective action.

public static void validateKerberosSetting(@Nullable String principal, @Nullable String keytab,
FailureCollector collector) {
if (Strings.isNullOrEmpty(principal) != Strings.isNullOrEmpty(keytab)) {
String emptyField = Strings.isNullOrEmpty(principal) ? "principal" : "keytab";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

principal and keytab should also be constants, not hardcoded strings

Copy link
Contributor

@albertshau albertshau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one minor comment. Please fix then squash commits and we'll merge. Thanks for your contribution!

} catch (InvalidTopicException e) {
throw new IllegalArgumentException(String.format("Topic name %s is not a valid kafka topic. Please provide " +
"valid kafka topic name. %s", topic, e.getMessage()));
collector.addFailure(String.format("Topic name %s is not a valid kafka topic. Please provide valid kafka" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing space between 'kafka' and 'topic' on the next line

@albertshau albertshau force-pushed the feature_release/add-validation branch from 91af377 to 58641ed Compare September 25, 2019 22:22
@albertshau
Copy link
Contributor

due to timing, fixed indentation and squashed commits.

@albertshau albertshau merged commit 9d0afe2 into release/2.2 Sep 25, 2019
@albertshau albertshau deleted the feature_release/add-validation branch September 25, 2019 22:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants