storage/sink/kafka: Allow configuring compatibility levels for schema registry subjects#27641
Conversation
jkosh44
left a comment
There was a problem hiding this comment.
Adapter code LGTM (I didn't take the time to actually understand the feature though).
| if err.status() == Some(reqwest::StatusCode::NOT_FOUND) { | ||
| GetSubjectConfigError::SubjectNotFound | ||
| } else { | ||
| GetSubjectConfigError::Transport(err) | ||
| } |
There was a problem hiding this comment.
Are you sure about this? I don't think 4xx errors come through as transport errors. I would expect this to look more like the other From<UnhandledError> implementations where you need to match on UnhandledError::Api.code.
Also note that the error I'm seeing when a subject does not exist has code 40408 and message "Subject does not have subject-level compatibility configured". I think the CSR doesn't distinguish between "subject does not exist" and "subject does not have compatibility configured".
There was a problem hiding this comment.
For this one it appears this does come through as a 404 http status code, at least according to their docs: https://docs.confluent.io/platform/7.6/schema-registry/develop/api.html#get--config-(string-%20subject)
Also note that the error I'm seeing when a subject does not exist has code 40408 and message "Subject does not have subject-level compatibility configured".
Where do you see this? Is this on this same GET /subject/<subject> API?
There was a problem hiding this comment.
You are totally right - this returns a 40408 when the subject level is not yet set. Adding the tests caught this. Undocumented behavior!
There was a problem hiding this comment.
Ah, it's not undocumented, it's that reqwest by default won't throw an error on a 4xx or 5xx error code. You need to call raise_for_status() if you want that behavior. In this case, the code is structured so that we don't want that behavior, because send_request is already doing all the right things and pulling out the more specific error code (40408) from the response body, vs the less specific 404 status code of the HTTP response itself.
There was a problem hiding this comment.
Oh sorry I see, I misunderstood! Yes, huh, that does seem to be undocumented.
It looks like you've still got the 404 check in the branch above—I think we should just remove that since we'll never create a transport error with status 404!
|
Dropped in some substantive feedback in line comments. Can you add docs for this one too?
|
benesch
left a comment
There was a problem hiding this comment.
LGTM modulo last two comments!
| if err.status() == Some(reqwest::StatusCode::NOT_FOUND) { | ||
| GetSubjectConfigError::SubjectNotFound | ||
| } else { | ||
| GetSubjectConfigError::Transport(err) | ||
| } |
There was a problem hiding this comment.
Oh sorry I see, I misunderstood! Yes, huh, that does seem to be undocumented.
It looks like you've still got the 404 check in the branch above—I think we should just remove that since we'll never create a transport error with status 404!
| } | ||
| Ok(()) | ||
| } | ||
| Err(GetSubjectConfigError::SubjectLevelCompatibilityNotSet) => ccsr |
There was a problem hiding this comment.
I think we might also want to set the subject compatibility level if we see a SubjectNotFound error? In my testing the CSR would never actually return "subject not found" rather than "subject compatibility level not set", but probably not ideal to rely on that behavior.
There was a problem hiding this comment.
sounds good, will set on both
| /// The requested subject does not exist. | ||
| SubjectNotFound, | ||
| /// The compatibility level for the subject has not been set. | ||
| SubjectLevelCompatibilityNotSet, |
There was a problem hiding this comment.
I just realized this probably wants to be SubjectCompatibilityLevelNotSet! 😀
benesch
left a comment
There was a problem hiding this comment.
(Sorry for just noticing one last thing in the docs. 🙈)

Motivation
This implements the 2nd part of the linked issue, allowing users to specify new options as part of the
CSR CONNECTIONin aCREATE SINKstatement:which uses the CSR config api to set these values: https://docs.confluent.io/platform/7.6/schema-registry/develop/api.html#config
Tips for reviewer
Checklist
$T ⇔ Proto$Tmapping (possibly in a backwards-incompatible way), then it is tagged with aT-protolabel.