-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support SASL SCRAM authentication #126
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent progress!
.../java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaClientSaslHandshaker.java
Show resolved
Hide resolved
|
||
encodeProgress = authenticateRequest.limit(); | ||
if (sasl.mechanism != null && ScramMechanism.isScram(sasl.mechanism.toUpperCase())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's separate out doEncodeSaslAuthenticateRequest
into doEncodeSaslPlainAuthenticateRequest
and doEncodeSaslScramFirstAuthenticateRequest
, then rename doEncodeSaslScramAuthenticateRequest
to doEncodeSaslScramFinalAuthenticateRequest
.
Then we can add a @FunctionalInterface
inside KafkaClientSaslHandshaker
perhaps called KafkaClientSaslAuthenticateEncoder
which has a single method matching the signature of doEncodeSaslPlainAuthenticateRequest
, doEncodeSaslScramFirstAuthenticateRequest
and doEncodeSaslScramFinalAuthenticateRequest
.
Then we can add an encodeSaslAuthenticate
field inside KafkaClientSaslHandshaker
of type KafkaClientSaslAuthenticateEncoder
that is initialized to one of doEncodeSaslPlainAuthenticateRequest
or doEncodeSaslScramFirstAuthenticateRequest
upon decoding the handshake response and determining the mechanism being used.
The implementation of doEncodeSaslAuthenticateRequest
would delegate to encodeSaslAuthenticate.encode(traceId, budgetId)
, so it will pick plain
or scram
(first message) initially.
After decoding the scram first response, we can update the encoder to point to doEncodeSaslScramFinalAuthenticateRequest
.
This will allow us to remove the SCRAM-specific variants of abstract methods on KafkaSaslClient
and keep it in terms of the Kafka protocol only, i.e. handshake
and authenticate
, not plain
or scram
.
if (sasl.mechanism != null && ScramMechanism.isScram(sasl.mechanism.toUpperCase())) | ||
{ | ||
//clientNonce = new BigInteger(130, new SecureRandom()).toString(Character.MAX_RADIX); | ||
clientNonce = "fyko+d2lbbFgONRv9qkxdawL"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is currently hardcoded for the benefit of the integration tests, but we need to make it random again, without losing the ability to specify it for the integration test.
In the example above we have base64.encode(byte[18]) == byte[24]
with ascii value fyko+d2lbbFgONRv9qkxdawL
, so we are really generating a random byte[18]
which is directly supported by Random::nextBytes(byte[])
.
This can be modeled as a Consumer<byte[]>
with the ability to override via configuration.
One example we have that is similar shows up in engine
configuration for DNS
host resolution, where we override the implementation to use a static method instead of the default logic to hard code returned IP addresses without being dependent on external DNS name server resolution.
zilla/runtime/engine/src/main/java/io/aklivity/zilla/runtime/engine/EngineConfiguration.java
Line 76 in fc6d9a2
ENGINE_HOST_RESOLVER = config.property(HostResolver.class, "host.resolver", |
We can do something similar here in KafkaConfiguration
for randomBytes(byte[])
using the default implementation of new SecureRandom()::nextBytes
, but also allowing for a static method override in the test to hardcode to base64.decode(fyko+d2lbbFgONRv9qkxdawL)
to create a predictable "random" result for the integration test.
//clientNonce = new BigInteger(130, new SecureRandom()).toString(Character.MAX_RADIX); | ||
clientNonce = "fyko+d2lbbFgONRv9qkxdawL"; | ||
DirectBuffer clientFirstMessage = new String8FW(String.format("n,,n=%s,r=%s", username, clientNonce)) | ||
.value(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaClientSaslHandshaker static constants shared by all threads.
private static final byte[] SASL_SCRAM_CHANNEL_BINDING = "n,,".getBytes(StandardCharsets.US_ASCII);
private static final byte[] SASL_SCRAM_USERNAME = "n=".getBytes(StandardCharsets.US_ASCII);
private static final byte[] SASL_SCRAM_RANDOM = ",r=".getBytes(StandardCharsets.US_ASCII);
KafkaClientSaslHandshaker instance fields at shared by all clients on same thread.
private final MutableDirectBuffer scramBuffer = new UnsafeBuffer(new byte[1024]);
private final byte[] scramClientNonce = new byte[18];
Optimized implementation to generate client first message without allocating new objects.
Consumer<byte[]> randomBytes = ...;
randomBytes.accept(scramClientNonce);
int scramBytes = 0;
scramBuffer.putBytes(scramBytes, SASL_SCRAM_CHANNEL_BINDING);
scramBytes += SASL_SCRAM_CHANNEL_BINDING.length;
scramBuffer.putBytes(scramBytes, SASL_SCRAM_USERNAME);
scramBytes += SASL_SCRAM_USERNAME.length;
scramBytes += scramBuffer.putStringWithoutLengthUtf8(username);
scramBuffer.putBytes(scramBytes, SASL_SCRAM_RANDOM);
scramBytes += SASL_SCRAM_RANDOM.length;
scramBuffer.putBytes(scramBytes, randomBytes);
scramBytes += randomBytes.length;
final SaslAuthenticateRequestFW authenticateRequest =
saslAuthenticateRequestRW.wrap(encodeBuffer, encodeProgress, encodeLimit)
.authBytes(scramBuffer, 0, scramBytes)
.build();
byte[] clientProof = formatter.clientProof(saltedPassword, authMessage); | ||
|
||
String clientProofStr = Base64.getEncoder().encodeToString(clientProof); | ||
String clientFinalMessage = String.format("%s,p=%s", clientFinalMessageWithoutProof, clientProofStr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try following a similar approach to that proposed above for clientFirstMessage
to eliminate object allocation while working out the clientFinalMessage
.
while (serverFinalResponseMatcher.find()) | ||
{ | ||
serverFinalMessage = serverFinalResponseMatcher.group(1); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really a while
loop? or should it be an if
with .matches()
instead?
For example, is it legitimate to have multiple v=...
in the same server final message?
Matcher serverFinalResponseMatcher = SERVER_FINAL_MESSAGE.matcher(serverFinalMessage); | ||
while (serverFinalResponseMatcher.find()) | ||
{ | ||
serverFinalMessage = serverFinalResponseMatcher.group(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using named matcher groups (instead of numerical) for improved readability.
In the SASL SCRAM spec this is called v
for verifier
.
serverSignature)) | ||
{ | ||
client.onDecodeSaslResponse(traceId); | ||
client.onDecodeSaslAuthenticateResponse(traceId, authorization, 58); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid hardcoded values like 58
and instead create a static constant for ERROR_SASL_AUTHENTICATION_FAILED
.
Please make sure we have a negative test in the scripts that sends an invalid verifier value from the server to confirm we behave correctly.
{ | ||
"sasl": | ||
{ | ||
"mechanism": "scram-sha-1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Configuration is called scram-256
but mechanism is scram-sha-1
, please make them consistent, and create a new one only if we need both.
@@ -75,7 +75,7 @@ | |||
{ | |||
"title": "Mechanism", | |||
"type": "string", | |||
"enum": [ "plain" ] | |||
"enum": [ "plain", "scram-sha-256", "scram-sha-1" ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need scram-sha-512
as well?
No description provided.