Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4541: Support for delegation token mechanism #3616

Closed
wants to merge 4 commits into from

Conversation

omkreddy
Copy link
Contributor

@omkreddy omkreddy commented Aug 3, 2017

  • Add capability to create delegation token
  • Add authentication based on delegation token.
  • Add capability to renew/expire delegation tokens.
  • Add units tests and integration tests

@omkreddy
Copy link
Contributor Author

omkreddy commented Aug 3, 2017

This is PR is a working version of delegation token implementation.

KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka

work done:

  1. capability to create/renew/expire delegation tokens
  2. zookeeper based token Storage and token caching
  3. token Based authentication using scram.
  4. command-line to manage tokens. This will be migrated to new Admin API ( will raise a KIP)

Minor deviations from the KIP:

  1. tokenId field changed to string from int.
  2. added owner, maxtime fields to CreateTokenResponse. This change is to make the response format similar to DescribeResponse
  3. In KIP, We proposed to store scram credentials to ZK. But this is not required. Since server can create token hmac anytime, scram credentials can be created online during token creation and token retrieval from ZK.

Ongoing Work:

  1. Support for token management in Admin API (will raise a KIP)
  2. System test

@junrao @harshach @ijuma @rajinisivaram
Pls take a look whenever you get a chance.

@asfgit
Copy link

asfgit commented Aug 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6518/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Aug 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6504/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit
Copy link

asfgit commented Aug 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6569/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Aug 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6554/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit
Copy link

asfgit commented Aug 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6585/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Aug 4, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6570/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit
Copy link

asfgit commented Aug 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6597/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Aug 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6582/
Test PASSed (JDK 8 and Scala 2.12).

@omkreddy
Copy link
Contributor Author

omkreddy commented Sep 6, 2017

@junrao @rajinisivaram it will be great if you can give your initial thoughts on implementation and approach.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omkreddy : Thanks for the patch. Some comments below. Also, (1) could you add the new admin methods to org.apache.kafka.clients.admin.AdminClient since we will be phasing out kafka.admin.AdminClient? (2) could you add all the new ZK accesses to use the new ZookeeperClient?

//Token Create request
public static final Schema TOKEN_CREATE_REQUEST_V0 = new Schema(
new Field("renewers", new ArrayOf(STRING),
"An array of token renewers. Renewer is an Kafka PrincipalType+name string, who is allowed to renew this token before the max lifetime expires."),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should PrincipalType+name be in one field or two separate fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add two separate fields. updated the PR.

}

val isTokenAuthenticated = request.session.principal.tokenAuthenticated
if (request.context.securityProtocol == SecurityProtocol.PLAINTEXT || isTokenAuthenticated)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, SSL can be used for just one-way authentication. In that case, the client is not authenticated by the server. So, we probably want to disallow token creation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok..I have excluded SSL authentication also. we can review and enable if required in future


val isTokenAuthenticated = request.session.principal.tokenAuthenticated
if (request.context.securityProtocol == SecurityProtocol.PLAINTEXT || isTokenAuthenticated)
sendResponseCallback( Errors.TOKEN_REQUEST_NOT_ALLOWED, -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be useful to define a meaningful constant for -1.

def startup() = {
if (config.tokenAuthEnabled) {
zkUtils.makeSurePersistentPathExists(ZkUtils.TokenInfoPath)
loadCache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZkNodeChangeNotificationListener already has the logic to read all existing data from ZK during init(). So, calling loadCache() seems unnecessary here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZkNodeChangeNotificationListener only loads the change notifications. So we need loadCache to build tokenCache


import TokenManager._

type createResponseCallback = CreateTokenResult => Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalize the type name?

//Cache to hold all the tokens
tokenCache = credentialCache.createCache(TOKEN_CACHE, TokenInformation.class);
//Cache to hold tokenId->hmac mapping. This is required for renew, expire requests
hmacIDCache = credentialCache.createCache(TOKEN_HMAC_ID_CACHE, String.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CredentialCache is really used for caching different Scram mechanisms? So, it's a bit weird to reuse that for storing tokens and hmacID? Could we just create two specific HashMap, one for token and the other for hmacID?

//Update Scram Credentials
val scramConfig = new Properties
prepareScramCredentials(base64Pwd, scramConfig)
tokenCache.updateCredentials(tokenInfo.tokenId, scramConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit weird for scramConfig to use Properties here. Could we just use a HashMap?

tokenCache.updateCredentials(tokenInfo.tokenId, scramConfig)

//Update hmac-id cache
tokenCache.addHmacIdMapping(base64Pwd, tokenInfo.tokenId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to move all the three updates to inside TokenCache?


public static RenewTokenRequest parse(ByteBuffer buffer, short version) {
return new RenewTokenRequest(ApiKeys.RENEW_TOKEN.parseRequest(version, buffer),
(short) version);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(short) is unnecessary.

@@ -38,6 +38,7 @@ object ConfigType {
val Client = "clients"
val User = "users"
val Broker = "brokers"
val Token = "tokens"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Token doesn't seem to used. Is anything related to token configurable?

@asfgit
Copy link

asfgit commented Nov 16, 2017

FAILURE
No test results found.
--none--

@asfgit
Copy link

asfgit commented Nov 16, 2017

FAILURE
No test results found.
--none--

@omkreddy
Copy link
Contributor Author

@junrao pinging for review

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omkreddy : Thanks for updated patch. Made another pass and the following are some comments.

def allowTokenRequests(request: RequestChannel.Request) : Boolean = {
if (request.session.principal.tokenAuthenticated ||
request.context.securityProtocol == SecurityProtocol.PLAINTEXT ||
request.context.securityProtocol == SecurityProtocol.SSL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually useful to support 2-way SSL since configuring client side certificates in a shared environment such as Storm is still painful. Actually, in the case of 1-way SSL, the authenticated user will be ANONYMOUS. So, we can simply remove this check and allow tokens to be created for ANONYMOUS user.


private static final Schema TOKEN_CREATE_RESPONSE_V0 = new Schema(
ERROR_CODE,
new Field("owner", STRING, "Token owner."),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent, this probably should be 2 separate fields of type and name like what's in CreateTokenRequest. Actually, is there a need to return owner? The client should know itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking it will be easy to construct DelegationToken object from response. I am not able to find easy way to get user principal on client side. For now, added 2 separate fields for owner field.

}

val tokenMaxLifetime: Long = config.delegationTokenMaxLifeMs
val tokenRenewInterval: Long = config.delegationTokenExpiryTimeMs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes we use time and some other times we use interval. It would be useful to make them consistent throughout the patch.

val tokenInfo = new TokenInformation(tokenId, owner, renewers.asJava)
tokenInfo.setIssueTimestamp(issueTimeStamp)
tokenInfo.setMaxTimestamp(maxLifeTimeStamp)
tokenInfo.setExpiryTimestamp(expiryTimeStamp)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just pass in those 3 timestamps through the constructor?

} catch {
case e: Throwable =>
error("Exception while creating token", e)
responseCallback(CreateTokenResult(-1, -1, -1, "", Array[Byte](), Errors.forException(e)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for handling unexpected errors? If so, they are already handled in KafkaApis.handle().

def expireToken(adminClient: AdminClient, opts: DelegationTokenCommandOptions) = {
val hmac = opts.options.valueOf(opts.hmacOpt)
val expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt).longValue()
println("Calling token operation with hmac :" + hmac +" , expire-time-period : "+ expiryTimePeriodMs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Calling token operation" => "Calling expire token operation"

# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.admin.DelegationTokenCommand "$@"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the same script under bin/windows too?

for (_ <- 0 until producerCount)
producers += createNewProducer
for (_ <- 0 until consumerCount) {
consumers += createNewConsumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test method that uses producers/consumers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. TokenEndToEndAuthorizationTest.scala extends SaslEndToEndAuthorizationTest. All the tests run with token authenticated clients.

@@ -534,3 +562,4 @@ object ZkData {
} else ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unneeded new line

@@ -68,6 +68,7 @@ object ZkUtils {
val ConfigUsersPath = s"$ConfigPath/users"
val ProducerIdBlockPath = "/latest_producer_id_block"


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unneeded new line

Copy link
Contributor Author

@omkreddy omkreddy Jan 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZKUtils changes are not required. reverted the changes.

@omkreddy
Copy link
Contributor Author

@junrao Thanks for the review. Since adding new methods to org.apache.kafka.clients.admin.KafkaAdminClient requires KIP, I planning to do it separate PR. I will raise KIP and PR by next week. Updated the PR. Pls take a look.

@omkreddy omkreddy force-pushed the KAFKA-4541 branch 2 times, most recently from 6a1a6e4 to f96e493 Compare January 11, 2018 20:08
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omkreddy Thanks for the latest patch. Just a few more minor comments.

override def initClients(producerSecurityProps: Properties, consumerSecurityProps: Properties) = {
}

def initTokenClients(token: DelegationToken) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of overriding initTokenClients, would it be simpler to override clientSecurityProps() and put the additionally needed properties there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified the code to minimize the changes. Pls take a look

@@ -72,14 +72,14 @@ object JaasTestUtils {

case class ScramLoginModule(username: String,
password: String,
debug: Boolean = false) extends JaasModule {
debug: Boolean = false, tokenProps: Map[String, String] = Map.empty) extends JaasModule {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put tokenProps in a new line?


def name = "org.apache.kafka.common.security.scram.ScramLoginModule"

def entries: Map[String, String] = Map(
"username" -> username,
"password" -> password
)
) ++ tokenProps.map { case (user, pass) => user -> pass }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the names user, pass appropriate? It seems that they are property name and value.


<suppress checks="JavaNCSS"
files="AbstractRequest.java"/>
files="Protocol.java|AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protocol.java no longer exists.

@@ -158,11 +158,11 @@ object TestUtils extends Logging {
enableSaslPlaintext: Boolean = false,
enableSaslSsl: Boolean = false,
rackInfo: Map[Int, String] = Map(),
logDirCount: Int = 1): Seq[Properties] = {
logDirCount: Int = 1, enableToken: Boolean = false): Seq[Properties] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put enableToken in a new line?

def expireToken(principal: KafkaPrincipal,
hmac: ByteBuffer,
expireLifeTimeMs: Long,
expireResponseCallback: RenewResponseCallback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's clearer to create a separate ExpireResponseCallback.

assertEquals(Errors.DELEGATION_TOKEN_EXPIRED, error)
}

def verifyCreateTokenResult(expected: CreateTokenResult): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's better to define equal() in CreateTokenResult?

assert(expected.hmac sameElements createTokenResult.hmac)
}

def renewResponseCallback(ret: Errors, timeStamp: Long): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this and a few other non-test methods be private?

val owner1 = SecurityUtils.parseKafkaPrincipal("User:owner1")
val owner2 = SecurityUtils.parseKafkaPrincipal("User:owner2")
val owner3 = SecurityUtils.parseKafkaPrincipal("User:owner2")
val owner4 = SecurityUtils.parseKafkaPrincipal("User:owner2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above 2 lines should use owner3 and owner4?

List()
}
else {
def eligible(token: TokenInformation): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to avoid duplicating eligible()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm..not found elegant way to do this. will try to check any other approach.

@omkreddy omkreddy force-pushed the KAFKA-4541 branch 2 times, most recently from e967dba to 86a05a2 Compare January 12, 2018 12:22
@omkreddy
Copy link
Contributor Author

@junrao Thanks for the review. Updated the PR with review comments and some cleanups. Pls check.

}

def allowTokenRequests(request: RequestChannel.Request) : Boolean = {
if (request.session.principal.tokenAuthenticated || request.context.securityProtocol == SecurityProtocol.PLAINTEXT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extend this to disallow requests on SSL listeners that don't enforce presenting a client certificate? Or rather, SSL endpoints with a user principal of ANONYMOUS - otherwise even authenticated users could't request a token on such an endpoint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inconsistency is that we disallow token creation on ANONYMOUS if security protocol is PLAINTEXT, but allow it if security protocol is SSL. We could probably also disable token creation if security protocol is SSL and user is ANONYMOUS.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@omkreddy : Thanks for updated patch. A few more minor comments below.

tokenId: String,
hmac: Array[Byte],
error: Errors) {
override def equals(other: Any): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we override equals(), we would want to override hashcode() as well.

super.setUp()
}

def getDelegationToken(): DelegationToken = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this private? Also, perhaps it's more intuitive to name this createDelegationToken().

adminClient.close()

//wait for token to reach all the brokers
Thread.sleep(100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably not very reliable. Could we wait until the token is in every broker's token cache by poking into each server object?

@@ -15,7 +15,7 @@
* limitations under the License.
*/

package kafka.security.token
package unit.kafka.security.token.delegation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it seems we shouldn't include unit in package name.

}
else {
val owners = if (describeTokenRequest.owners == null) null else describeTokenRequest.owners.asScala
def eligible(token: TokenInformation) : Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put eligible() in Object KafkaApis so that we can use it here and in the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Thanks for the review. made few changes to avoid the duplicate code.
Updated thePR. Pls take a look.

}

def allowTokenRequests(request: RequestChannel.Request) : Boolean = {
if (request.session.principal.tokenAuthenticated || request.context.securityProtocol == SecurityProtocol.PLAINTEXT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inconsistency is that we disallow token creation on ANONYMOUS if security protocol is PLAINTEXT, but allow it if security protocol is SSL. We could probably also disable token creation if security protocol is SSL and user is ANONYMOUS.

@omkreddy omkreddy force-pushed the KAFKA-4541 branch 3 times, most recently from d560b22 to a464102 Compare January 13, 2018 03:14
@omkreddy
Copy link
Contributor Author

KIP for admin client changes: https://cwiki.apache.org/confluence/display/KAFKA/KIP-249%3A+Add+Delegation+Token+Operations+to+KafkaAdminClient
PR is also ready is here. I planning to start discuss thread after merging this PR.

@junrao
Copy link
Contributor

junrao commented Jan 16, 2018

@omkreddy : Thanks for the PR. LGTM.

@junrao
Copy link
Contributor

junrao commented Jan 16, 2018

@omkreddy : Merged the patch to trunk. Be aware that the deadline for 1.1.0 KIP freeze is 01/23. So, ideally, it would be useful to get the admin client KIP in before that.

@junrao junrao closed this in 27a8d0f Jan 16, 2018
@omkreddy
Copy link
Contributor Author

@junrao Thanks for your detailed review. I start discussion on admin client changes.

@omkreddy omkreddy deleted the KAFKA-4541 branch July 3, 2018 15:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants