Skip to content

KAFKA-8013: Avoid underflow when reading a Struct from a partially correct buffer#6340

Merged
hachikuji merged 6 commits intoapache:trunkfrom
kkonstantine:kafka-8013
Apr 8, 2019
Merged

KAFKA-8013: Avoid underflow when reading a Struct from a partially correct buffer#6340
hachikuji merged 6 commits intoapache:trunkfrom
kkonstantine:kafka-8013

Conversation

@kkonstantine
Copy link
Copy Markdown
Contributor

Protocol compatibility can be facilitated if a Struct, that has been defined as an extension of a previous Struct by adding fields at the end of the older version, can read a message of an older version by ignoring the absence of the missing new fields. Reading the missing fields should be allowed by the definition of these fields (they have to be nullable).

  • Tested by adding unit tests around Schema.read in both directions

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@kkonstantine
Copy link
Copy Markdown
Contributor Author

@hachikuji @ewencp @rhauch this is particularly required by KIP-415 that will upgrade Connect's protocol version. But since it's a general fix, I'd prefer to add it separately. Please let me know if you have any concerns and feel free to consider backporting if that makes sense.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: this is already supported and the test is added here for completeness. The code change does not affect this case.

@kkonstantine kkonstantine force-pushed the kafka-8013 branch 2 times, most recently from 3be9cdb to ccc2ce3 Compare February 28, 2019 14:47
Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm less familiar with this code, so I'll defer an approval. I do have one question below, but otherwise looks pretty good.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider adding buffer.canRead() to the for loop criteria?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! hasRemaining, of course 🤦‍♂️
I guess it read more easily if I was explicit about leaving this null in the field. But that's a good suggestion.

@kkonstantine
Copy link
Copy Markdown
Contributor Author

retest this please

@kkonstantine
Copy link
Copy Markdown
Contributor Author

Rebased on top of latest trunk, triggering another test run

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to approve, but because I'm less familiar with this area of the code I would prefer that @hachikuji, @ewencp, or another committer also review this.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 2, 2019

We should update some documentation to specify the new behaviour, right? Also, we may want to make this optional, it seems like the schema could define if missing fields are expected. Any reason not to do that?

@kkonstantine
Copy link
Copy Markdown
Contributor Author

Good question regarding optionality @ijuma

I thought this was getting resolved in
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java#L55-L63
with

private Object getFieldOrDefault(BoundField field) { 
    Object value = this.values[field.index];
    if (value != null)
        return value;
    else if (field.def.hasDefaultValue)
        return field.def.defaultValue;
    else if (field.def.type.isNullable())
        return null;
    else
        throw new SchemaException("Missing value for field '" + field.def.name + "' which has no default value.");
} 

And it does, for all types except the nullable ones. I assumed a nullable type has an implicit default null but looking closer I now see this is not true here.

I changed the code to check for a default value in place and if it's missing throw an exception with the right message (which is the above ).

W.r.t. documentation, is javadoc what you have in mind? I extended the description at Schema.read. Didn't find anything relevant in https://kafka.apache.org/documentation/

@kkonstantine
Copy link
Copy Markdown
Contributor Author

@ijuma @hachikuji @ewencp latest build was green. Let me know if this change looks correct and sufficiently tested now. I'm rebasing to keep this PR up-to-date as it will be used by KIP-415 changes too. Thanks

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Mar 25, 2019

@ewencp, @hachikuji: would one of you mind looking at this changes to the Schema class in the client protocol? As mentioned above, the changes look good to me, but I'd appreciate someone with more familiarity with the serialization code to review.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other usages of Type#read which need to guard against underflow?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question @mumrah ! I think I had checked when I added the fix, but I checked again.

An similar check for underflow does not apply to the other implementations of Type#read. Specifically:

  • ArrayOf is accompanied by the size of the array of object that will be read, and having an array with some objects missing should not be supported.
  • All the primitive types have fixed size, so the type is either there as a whole or underflow is a valid exception.
  • Then, all the variable length types such as STRING, NULLABLE_STRING, BYTES, NULLABLE_BYTES, , VARINT, VARLONG,
  • Finally, RECORDS is just a wrapper to the byte buffer, so read just wraps the buffer and actual reading is left to the classes that use this type.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set the default value here or let it be null and handle the defaults in Struct#getFieldOrDefault

Copy link
Copy Markdown
Contributor Author

@kkonstantine kkonstantine Apr 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you and I'd also like to have only one place where we set defaults. But it seems that the methods that call Struct#getFieldOrDefault are not the only way to read a Struct's field. And in this case, this could mean that we set a nullable field that has a default to null even if its default might not be null. I think it's more safe to be proactive here and set the default right away, since we know what the default value is already in this method.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. Thanks for the clarification! Yea, that's a tricky aspect of nullable fields with defaults -- how to determine whether the value is explicitly null or has been defaulted to null :)

@kkonstantine
Copy link
Copy Markdown
Contributor Author

Thanks @mumrah for taking a close look to these changes here as well. I replied to your two points, and if I didn't miss something, it seems that no code change is required.

@kkonstantine
Copy link
Copy Markdown
Contributor Author

retest this please

@kkonstantine
Copy link
Copy Markdown
Contributor Author

Rebased on top of recent trunk (there were no conflicts, just keeping the branch up-to-date). @hachikuji @ewencp you think this looks ready?

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this seems reasonable. Since this logic is used for the request/response APIs, the implication is that some previously invalid requests would now be accepted if the final fields have default values. I am not sure this is desirable since request APIs already have their own versioning. Do you think it would make sense to have a flag in Schema which controls this behavior?

@kkonstantine
Copy link
Copy Markdown
Contributor Author

Thanks for the question @hachikuji !
No problem to be cautious here. However, a change in behaviour would only be observed on fields with a default (and thus optional fields). If an API request or response definitely requires a field, it would omit adding a default in its definition and this error would now bubble up with the correct exception and error message (SchemaException).

Until now, such omissions become apparent only with a BufferUnderflowException which allowed me to suspect that this is not intentional and we just hadn't encountered it until now.

One concern I have with adding a flag is whether this feature would be on or off by default. Having it off might result in errors more often than we would hope for in protocols that will depend on it being always on in order to function properly (for an example of how this is used by KIP-415 see: #6363)

Finally, the only places where I found an explicit handing of BufferUnderflowException are in
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java and
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
but these classes don't use Schema type, if that makes feel a bit more confident.

Thoughts?

@hachikuji
Copy link
Copy Markdown
Contributor

@kkonstantine Hmm.. The interpretation that having a default value implies the field is optional is new here, so I'm not sure the explanation follows. Previously default values were only used to fill in missing values during serialization.

That said, I guess we don't typically use default values in the request/response schema definitions, though I did find at least one case. Look at RenewDelegationTokenResponse. The last field is the throttle time, which provides a default value. So with this change, we would accept an incomplete response as valid where we wouldn't have previously. I'm not sure this is desirable. You could even say that it is changing the API. Possibly we could audit all the schemas so that we do not use default values except where optional fields should be allowed. I'd probably feel a bit safer though it had to be explicitly specified.

@kkonstantine
Copy link
Copy Markdown
Contributor Author

Thanks @hachikuji
I was also in the process of tracking down all the definition of types with defaults. Not a lot direct, but there quite a few for Int8, Int32 and Int64.

At the same time, I think I misread what you meant by "using a flag". I added a method with a boolean to allow us tolerate missing fields when that's desirable. Let me know if you think it makes sense to apply defaults at least in this case, or missing fields should explicitly be represented by null. Not sure, because Struct#hasField uses comparison with null to say whether a field is missing. But as we've mentioned above, this presents an issue with nullable fields that don't have a default. In this case, null can be both indication that the field is missing and a valid value.

I pushed an improvement. After this change, I wouldn't recommend backporting of this PR. (thus I used assertThrows in the tests that is enabled in trunk)

WDYT?

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Just a couple small comments.

* their default values if such fields are optional; otherwise a {@code SchemaException} is
* thrown to signify that mandatory fields are missing.
*/
public Struct read(ByteBuffer buffer, boolean tolerateMissingWithDefaults) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an alternative, we could let this flag be part of the schema definition. In other words, the creator of the schema must decide whether to allow optional fields at the end of the schema. Did you have any reason to prefer leaving the decision to the parser?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason. Added it in the constructor now. Has to go before the var-arg.

for (int i = 0; i < fields.length; i++) {
try {
objects[i] = fields[i].def.type.read(buffer);
if (tolerateMissingWithDefaults) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to push this down?

                    } else if (tolerateMissingWithDefaults && fields[i].def.hasDefaultValue) {
                        objects[i] = fields[i].def.defaultValue;
                    } else {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow the partial snippet.
if tolerateMissingWithDefaults is false I reinstated the previous state of the code. Which will throw BufferUnderflowException if there are missing fields, and will be wrapped in a SchemaException.

Only if tolerateMissingWithDefaults is true I check whether the buffer has remaining bytes, and if not try to assign a default if it exists, otherwise I throw a SchemaException that is again wrapped immediately in another SchemaException.

Would you prefer to have the
if (!tolerateMissingWithDefaults) first?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, what I was suggesting is this:

                    if (buffer.hasRemaining()) {
                        objects[i] = fields[i].def.type.read(buffer);
                    } else if (tolerateMissingWithDefaults && fields[i].def.hasDefaultValue) {
                        objects[i] = fields[i].def.defaultValue;
                    } else {
                        throw new SchemaException("Missing value for field '" + fields[i].def.name +
                                "' which has no default value");
                    }

I was just trying to consolidate the logic, but it's not a big deal.

By the way, since you drew my attention to it, it might be worth catching the SchemaException that we throw here and and raising directly rather than wrapping it in another SchemaException.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I thought we wanted to keep the behavior of throwing BufferUnderflowException. But you are right. It's wrapped anyways.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding keeping the original exception and throwing that instead of re-wrapping by discarding the stacktrace, I feel that wrapping was intentional because SchemaException might be thrown by Type#read as well. But then it might not have a message as the catch block indicates here.

Should I keep the wrapping but also add the inner exception as a cause? (Will have to add a SchemaException(String message, Throwable cause) constructor for that)

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch.

@hachikuji
Copy link
Copy Markdown
Contributor

retest this please

@hachikuji hachikuji merged commit 1bf1e46 into apache:trunk Apr 8, 2019
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* apache/trunk:
  MINOR: Add security considerations for remote JMX in Kafka docs (apache#6544)
  MINOR: Remove redundant access specifiers from metrics interfaces (apache#6527)
  MINOR: Correct KStream documentation (apache#6552)
  KAFKA-8013; Avoid underflow when reading a Struct from a partially correct buffer (apache#6340)
  KAFKA-8058: Fix ConnectClusterStateImpl.connectors() method (apache#6384)
  MINOR: Move common consumer tests out of abstract consumer class (apache#6548)
  KAFKA-8168; Add a generated ApiMessageType class
  KAFKA-7893; Refactor ConsumerBounceTest to reuse functionality from BaseConsumerTest (apache#6238)
  MINOR: Tighten up metadata upgrade test (apache#6531)
  KAFKA-8190; Don't update keystore modification time during validation (apache#6539)
  MINOR: Fixed a few warning in core and connects (apache#6545)
  KAFKA-7904; Add AtMinIsr partition metric and TopicCommand option (KIP-427)
  MINOR: fix throttling and status in ConnectionStressWorker
  KAFKA-8090: Use automatic RPC generation in ControlledShutdown
  KAFKA-6399: Remove Streams max.poll.interval override (apache#6509)
  KAFKA-8126: Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (apache#6475)
  HOTFIX: Update unit test for KIP-443
  KAFKA-7190: KIP-443; Remove streams overrides on repartition topics (apache#6511)
  KAFKA-8183: Add retries to WorkerUtils#verifyTopics (apache#6532)
  KAFKA-8181: Removed Avro topic from TOC on kafka (apache#6529)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…rrect buffer (apache#6340)

Protocol compatibility can be facilitated if a Struct, that has been defined as an extension of a previous Struct by adding fields at the end of the older version, can read a message of an older version by ignoring the absence of the missing new fields. Reading the missing fields should be allowed by the definition of these fields (they have to be nullable) when supported by the schema.

Reviewers: David Arthur <mumrah@gmail.com>, Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants