Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type:software.amazon.msk.auth.iam.internals.AWSCredentialsCallback] #36

Closed
lauroawsps opened this issue Aug 5, 2021 · 23 comments

Comments

@lauroawsps
Copy link

Hello,

Using the library aws-msk-iam-auth to authenticate Zeppelin and Flink processes in an MSK cluster with IAM authentication enabled, following error is being thrown when trying to run more than one task at the same TaskManager:

Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type:software.amazon.msk.auth.iam.internals.AWSCredentialsCallback]) occurred when evaluating SASL token received from the Kafka Broker

Library version 1.1.0 has been imported from S3 bucket to Studio Notebook and added as a dependency. Following configuration is being used:

"bootstrap.servers" -> "<MSKBootstrapServers>",
"security.protocol" -> "SASL_SSL",
"sasl.mechanism" -> "AWS_MSK_IAM",
"sasl.jaas.config" -> "software.amazon.msk.auth.iam.IAMLoginModule required;",
"sasl.client.callback.handler.class" -> "software.amazon.msk.auth.iam.IAMClientCallbackHandler"

As the default artifact included in the notebook (flink-sql-connector-kafka_2.12) shades Kafka client dependencies, following Gradle was used to also shade dependencies within aws-msk-iam-auth:

apply plugin: 'com.github.johnrengelman.shadow'

ext {
    projectName = "aws-msk-iam-auth-flink-shaded"
    projectDescription = "Shades Kafka client dependencies in aws-msk-iam-auth library so that they can match Kafka Client dependency in the shaded flink-kafka-connector"
    projectArtifactName = "aws-msk-iam-auth-flink-shaded"
}

dependencies {
    implementation group: 'software.amazon.msk', name: 'aws-msk-iam-auth', version: '1.1.0'
}

jar {
    enabled = false
    dependsOn(shadowJar)
}

shadowJar {
    archiveClassifier.set('')
    archiveName("${projectArtifactName}.jar")
    relocate('org.apache.kafka', 'org.apache.flink.kafka.shaded.org.apache.kafka')
    mergeServiceFiles()
}

The setup works and authenticates fine with MSK when a single task is being executed by TaskManager. However, as mentioned before, when trying to run more than one task in the same manager, following exception is thrown:

java.io.IOException: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b8fc1c328b567c9aab180e1c7af33983)
	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:538)
	at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInsertInto(FlinkStreamSqlInterpreter.java:97)
	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:273)
	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
	at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
	at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
	at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
	at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
	at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b8fc1c328b567c9aab180e1c7af33983)
	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1719)
	at org.apache.flink.table.planner.delegation.ExecutorBase.execute(ExecutorBase.java:52)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1214)
	at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:532)
	... 14 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: b8fc1c328b567c9aab180e1c7af33983)
	at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:116)
	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
	at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:602)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
	at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:309)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
	at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
	... 3 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
	at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$getJobExecutionResult$6(ClusterClientJobClientAdapter.java:114)
	... 18 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:198)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:191)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:185)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:520)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:394)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type:software.amazon.msk.auth.iam.internals.AWSCredentialsCallback]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: Exception while evaluating challenge
	at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:113)
	at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:474)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
	at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:474)
	at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:381)
	at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:293)
	at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:233)
	at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
	at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
	at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483)
	at org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
	at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
	at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
	at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
	at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:368)
	at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1926)
	at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1894)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:553)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:474)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:470)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:529)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:724)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:549)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type:software.amazon.msk.auth.iam.internals.AWSCredentialsCallback
	at software.amazon.msk.auth.iam.IAMClientCallbackHandler.handle(IAMClientCallbackHandler.java:80)
	at software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:138)
	at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:95)
	... 30 more

Could you please help with the issue?

Steps to reproduce:

  1. Create studio notebook with parallelism of 5 and parallelism per KPU also 5.
  2. Add required shaded dependencies as described above.
  3. Make notebook process some data with the Flink Streaming SQL.
  4. Run another task from the notebook.
@dannycranmer
Copy link
Contributor

Hello @lauroawsps , just to confirm that we (Kinesis Data Analytics) also ran into this issue and are currently working towards a fix. I will keep you posted. Sorry for the inconvenience.

@dannycranmer
Copy link
Contributor

Hello @lauroawsps, we have now addressed this issue for KDA Studio. We have added support for the non-relocated Kafka connector and MSK IAM as service managed dependencies. This means you will no longer need to rebuild and provide the connector. This change is not yet available on the console, if you would like to test it out now you can update your application using the CLI. Here is an example request (please note that you will need to provide the entire list of custom artifacts here. Any additional Jars in S3 you want to include should be added to the list (reference)):

aws kinesisanalyticsv2 update-application \
    --application-name <application-name> \
    --current-application-version-id <current-application-version-id> \
    --application-configuration-update '{
       "ZeppelinApplicationConfigurationUpdate": {
         "CustomArtifactsConfigurationUpdate": [ 
            { 
               "ArtifactType": "DEPENDENCY_JAR",
               "MavenReference": { 
                  "GroupId": "org.apache.flink",
                  "ArtifactId": "flink-connector-kafka_2.12",
                  "Version": "1.11.1"
               }
            },
            { 
               "ArtifactType": "DEPENDENCY_JAR",
               "MavenReference": { 
                  "GroupId": "software.amazon.msk",
                  "ArtifactId": "aws-msk-iam-auth",
                  "Version": "1.1.0"
               }
            }
         ]
       }
     }' 

@lauroawsps
Copy link
Author

Brilliant @dannycranmer! Thank you very much!

@sayantacC
Copy link
Contributor

Closing since original issue seems to have been resolved.

@irisgve
Copy link

irisgve commented Dec 3, 2021

@dannycranmer does the fix also apply for KDA applications and not KDA studio? Encountering the same error in one of our KDA applications using Flink trying to consume from MSK using IAM access:

2021-12-03 14:43:09
java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException]
	at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:113)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:474)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:474)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:381)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:293)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:233)
	at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:788)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:750)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2338)
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1725)
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
	at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:348)
	at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:235)
	at org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:49)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: javax.security.auth.callback.UnsupportedCallbackException
	at software.amazon.msk.auth.iam.IAMClientCallbackHandler.handle(IAMClientCallbackHandler.java:68)
	at software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:138)
	at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:95)
	... 30 more

Curiously in just one of our accounts with the same MSK + IAM + KDA setup as other accounts.

@dannycranmer
Copy link
Contributor

@irisgve in our testing this issue did not impact KDA Flink applications. The issue surfaces once a second job is created; KDA Flink applications only ever create one job. Unless there is an edge case we did not consider? Can you please provide more details? Does the job consume records initially? Does the job failover/restart before hitting this issue?

@irisgve
Copy link

irisgve commented Dec 10, 2021

Yeah, the job consumes records initially. The error as we saw actually causes the job to failover/restart so we had to fork the Flink Kafka source to retry when hitting MSK IAM errors because it just kills the job when encountered.

@vmohanan1
Copy link

@dannycranmer I am seeing this error too in KDA using Flink application (not studio). This issue we have is this is causing KDA to restart which increases the source latency.

@dannycranmer
Copy link
Contributor

We are working on a fix. In the meantime if you want to move forwards you can patch the library yourself. This issue occurs when the IAMSaslClientProvider is created by classloader A, and is trying to be used by classloader B. Since Amazon Kinesis Data Analytics for Flink runs a single job at a time, we can fix this via the following (link to code):

    public static void initialize() {
-        Security.addProvider(new IAMSaslClientProvider());
+        Security.insertProviderAt(new IAMSaslClientProvider(), 1);
    }

This ensures that the most recent IAMSaslClientProvider is used. Since there is a single job running at a time, the latest should always be used.

@sayantacC would you mind re-opening this ticket?

@vmohanan1
Copy link

vmohanan1 commented Mar 10, 2022

@dannycranmer Is there anyway for me to catch this error from my application and retry instead of patching the lib? I am having a hard time validating since our servers with IAM auth enabled are in higher environments which have a long release process.

I am wondering if something like this would work:

while (retryAttemptCount < RETRY_ATTEMPT_LIMIT) {
            try {
                LOGGER.info("Trying to create Flink Kafka consumer.");
                return new FlinkKafkaConsumer<T>(
                        topics,
                        deserializationSchema,
                        properties);

            } catch (Exception e) {
                LOGGER.error(
                        "Failed to create consumer. Attempt: " + retryAttemptCount + " out of " + RETRY_ATTEMPT_LIMIT);
                LOGGER.error("Failure message: " + e.getMessage());
                retryAttemptCount++;
            }
        }

But since this class is actually invoked in the callback, I suspect if this will catch the error when thrown.

@sayantacC
Copy link
Contributor

@dannycranmer Reopened as requested.

@sayantacC sayantacC reopened this Mar 10, 2022
@dannycranmer
Copy link
Contributor

dannycranmer commented Mar 10, 2022

@vmohanan1 you could try re-creating the provider under this failure mode, example below. Disclaimer, I have not tested this. One concern is that if UnsupportedCallbackException is not specific enough, or you have a high operator parallelism you might end up inserting a lot of providers un-necessarily.

    ...
            } catch (Exception e) {
                if (ExceptionUtils.hasCause(e, UnsupportedCallbackException.class)) {
                    Security.insertProviderAt(new IAMSaslClientProvider(), 1);
                }

                ...
            }

Note: ExceptionUtils from Apache commons

@vmohanan1
Copy link

thanks I will try this. I am currently on version 1.1.1. I am wondering if upgrading to 1.1.3 will give me retry without failing the pipeline capabilities. So I will try doing both.

@vmohanan1
Copy link

@dannycranmer Just noticed that new IAMSaslClientProvider() has a protected constructor. Any ETA on when the fix might be available?

@dannycranmer
Copy link
Contributor

dannycranmer commented Mar 17, 2022

Apologies I cannot provide an ETA at this point.

Some supplementary information:

The block diagram below illustrates the problem:

Untitled Diagram (21)

We have confirmed the issue also exists for other Kafka plugins, for example, Scram. However when using Scram the majority of callbacks are usable because they are also loaded from parent classloader (since the package prefix is javax):

  • javax.security.auth.callback.NameCallback
  • javax.security.auth.callback.PasswordCallback

If you try to define any extensions via org.apache.kafka.common.security.scram.ScramExtensionsCallback then it is rejected for the same reason as MSK IAM. The Kafka client simply swallows the exception with a debug message:

@vmohanan1
Copy link

When you say Job A and Job B, you mean individual parallel units in a single job, correct?

@dannycranmer
Copy link
Contributor

When you say Job A and Job B, you mean individual parallel units in a single job, correct?

No I am referring to two parallel Flink jobs here. However, the same problem applies to a single job stop/start (failover).

@Excaleo
Copy link

Excaleo commented Apr 27, 2022

@dannycranmer we have forked the library and applied the patch that uses insertProvider(..., 1) instead of addProvider(...), however we have encountered the exception again when a single KDA flink job has stopped and restarted after running for a few days.

java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:199)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:146)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@1b3a8cdb from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@13d765c3]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@1b3a8cdb from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@13d765c3]
	at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:113)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:474)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:474)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:381)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:293)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:233)
	at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:484)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
	at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:104)
	at com.atlassian.obsvs.kda.common.kafka.KafkaPartitionSplitReaderWithRetry.fetch(KafkaPartitionSplitReaderWithRetry.java:36)
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:56)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:138)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:101)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@1b3a8cdb from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@13d765c3
	at software.amazon.msk.auth.iam.IAMClientCallbackHandler.handle(IAMClientCallbackHandler.java:85)
	at software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:138)
	at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:95)
	... 29 more

The classloader references are shown in the stack trace, do you have any insights on what's happening and whether this can be manually patched?

@catrider
Copy link

catrider commented Jun 23, 2022

@dannycranmer I'm running into the same issue. I have a Flink job with a single task running in Kubernetes: 2 jobmanagers (for HA) and 3 task managers. If I kill the leader jobmanager and let it restart, I start to see the UnsupportedCallbackException.

I forked the library and wasn't able to get this suggestion working

public static void initialize() {
-        Security.addProvider(new IAMSaslClientProvider());
+        Security.insertProviderAt(new IAMSaslClientProvider(), 1);
    }

but I tweaked it slightly and this seems to fix the problem for me

        IAMSaslClientProvider iamSaslClientProvider = new IAMSaslClientProvider();
        Security.removeProvider(iamSaslClientProvider.getName());
        Security.addProvider(iamSaslClientProvider);

Security.insertProviderAt (which is also used by Security.addProvider under the covers) stipulates that A provider cannot be added if it is already installed., so does the provider need to be removed first if it already exists? Security.removeProvider is a no-op if it doesn't exist. Let me know if this makes sense to you.

@james4388
Copy link

james4388 commented Aug 17, 2022

I have the same issue with flink (run on EKS) with services role and MSK IAM.
The job is simple

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers(bootstrapServers)
            .setTopics(inputTopic)
            .setGroupId("my-group")

            .setProperty("security.protocol", "SASL_SSL")
            .setProperty("sasl.mechanism", "AWS_MSK_IAM")
            .setProperty("sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
            .setProperty("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")

            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();

DataStream<String> text = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

and similar error

2022-08-17 15:52:11
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:322)
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.lambda$failJob$0(OperatorCoordinatorHolder.java:574)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443)
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: Kafka Source -> Sink: Print to Std. Out' (operator cbc357ccb763df2852fee8c4fc7d55f2).
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:556)
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:231)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:316)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:329)
	at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to 
	at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234)
	at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83)
	at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
	... 6 more
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [my-topic].
	at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:47)
	at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52)
	at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219)
	at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80)
	... 6 more
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@50b3ecb9 from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@5ca5bed1]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
	... 9 more
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@50b3ecb9 from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@5ca5bed1]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@50b3ecb9 from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@5ca5bed1]
	at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:113)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at java.base/javax.security.auth.Subject.doAs(Unknown Source)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
	at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1333)
	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1264)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: org.apache.flink.util.ChildFirstClassLoader@50b3ecb9 from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: org.apache.flink.util.ChildFirstClassLoader@5ca5bed1
	at software.amazon.msk.auth.iam.IAMClientCallbackHandler.handle(IAMClientCallbackHandler.java:85)
	at software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:138)
	at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:95)
	... 14 more

Versions
org.apache.kafka:kafka-clients:2.8.1
software.amazon.msk:aws-msk-iam-auth:1.1.4
flink-connector-kafka:1.15.1
flink:1.15.1

@jaya-ananthram-ft
Copy link

jaya-ananthram-ft commented Sep 2, 2022

@james4388 Did you try the workaround that is mentioned here? Of course, if you have control over the Flink cluster for replacing the libs in the location /opt/flink/lib/. I tried the same version of Flink-1.15.1 with aws-msk-iam-auth:1.1.4 by moving the libs to /opt/flink/lib/ and it works as expected with the EKS through K8's operator application mode (at least for my case). This is a workaround until this issue is fixed.

Just for clarity, I am sharing the list of jars (the following image), that I added to the lib location.
Screenshot 2022-09-02 at 11 00 10 PM

dannycranmer added a commit to dannycranmer/aws-msk-iam-auth that referenced this issue Sep 18, 2022
dannycranmer added a commit to dannycranmer/aws-msk-iam-auth that referenced this issue Oct 10, 2022
dannycranmer added a commit to dannycranmer/aws-msk-iam-auth that referenced this issue Oct 10, 2022
dannycranmer added a commit to dannycranmer/aws-msk-iam-auth that referenced this issue Oct 14, 2022
sayantacC pushed a commit that referenced this issue Oct 14, 2022
@strokyl
Copy link

strokyl commented Oct 31, 2022

For the record I got the same issue, trying to produce a record in MSK and GlueSchemaRegistryKafkaSerializer with Scala:

  • Deps:
      "software.amazon.glue" % "schema-registry-serde" % "1.1.14",
      "software.amazon.msk"  % "aws-msk-iam-auth" % "1.1.42",
      "org.apache.kafka" % "kafka-clients" % "3.3.1"
  • Code:
 val props = new Properties()
 props.put(
   ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
   "...",
 )
 props.put("security.protocol", "SASL_SSL")
 props.put("sasl.mechanism", "AWS_MSK_IAM")
 props.put("sasl.jaas.config", """software.amazon.msk.auth.iam.IAMLoginModule required;""")
 props.put("sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
 props.put(ProducerConfig.ACKS_CONFIG, "all")
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[GlueSchemaRegistryKafkaSerializer].getName())
 props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.JSON.name());
 props.put(AWSSchemaRegistryConstants.AWS_REGION, "eu-west-1");
 props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "default-registry");
 props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "GsrBlogSchema");

 val jsonSchema  = ...
 val jsonPayload = ...


 val jsonSchemaWithData = JsonDataWithSchema.builder(jsonSchema, jsonPayload).build()

 val producer = new KafkaProducer[String, JsonDataWithSchema](props)
 val record = new ProducerRecord[String, JsonDataWithSchema]("glue-json", "toulouse", jsonSchemaWithData)
 producer.send(record).get()
 producer.flush()
 producer.close()
  • Stacktrace:
[error] java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: sbt.internal.LayeredClassLoader@321db28e from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: sbt.internal.LayeredClassLoader@2ad32589]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
[error] 	at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1429)
[error] 	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1071)
[error] 	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:949)
[error] 	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:834)
[error] 	at Main$.delayedEndpoint$Main$1(Main.scala:85)
[error] 	at Main$delayedInit$body.apply(Main.scala:12)
[error] 	at scala.Function0.apply$mcV$sp(Function0.scala:42)
[error] 	at scala.Function0.apply$mcV$sp$(Function0.scala:42)
[error] 	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
[error] 	at scala.App.$anonfun$main$1(App.scala:98)
[error] 	at scala.App.$anonfun$main$1$adapted(App.scala:98)
[error] 	at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
[error] 	at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
[error] 	at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
[error] 	at scala.App.main(App.scala:98)
[error] 	at scala.App.main$(App.scala:96)
[error] 	at Main$.main(Main.scala:12)
[error] 	at Main.main(Main.scala)
[error] 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
[error] 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] 	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
[error] Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: sbt.internal.LayeredClassLoader@321db28e from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: sbt.internal.LayeredClassLoader@2ad32589]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.
[error] Caused by: javax.security.sasl.SaslException: Exception while evaluating challenge [Caused by javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: sbt.internal.LayeredClassLoader@321db28e from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: sbt.internal.LayeredClassLoader@2ad32589]
[error] 	at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:113)
[error] 	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
[error] 	at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
[error] 	at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
[error] 	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
[error] 	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
[error] 	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
[error] 	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
[error] 	at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
[error] 	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
[error] 	at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
[error] 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
[error] 	at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73)
[error] 	at org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:534)
[error] 	at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:455)
[error] 	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
[error] 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
[error] 	at java.base/java.lang.Thread.run(Thread.java:833)
[error] Caused by: javax.security.auth.callback.UnsupportedCallbackException: Unsupported callback type: class: software.amazon.msk.auth.iam.internals.AWSCredentialsCallback classloader: sbt.internal.LayeredClassLoader@321db28e from class: software.amazon.msk.auth.iam.IAMClientCallbackHandler classloader: sbt.internal.LayeredClassLoader@2ad32589
[error] 	at software.amazon.msk.auth.iam.IAMClientCallbackHandler.handle(IAMClientCallbackHandler.java:85)
[error] 	at software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:138)
[error] 	at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:95)
[error] 	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)
[error] 	at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
[error] 	at java.base/javax.security.auth.Subject.doAs(Subject.java:439)
[error] 	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)
[error] 	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)
[error] 	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)
[error] 	at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)
[error] 	at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)
[error] 	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
[error] 	at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
[error] 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
[error] 	at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73)
[error] 	at org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:534)
[error] 	at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:455)
[error] 	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316)
[error] 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
[error] 	at java.base/java.lang.Thread.run(Thread.java:833)

Converting the exact same code in Java, make it work, I don't know why.
Also for sala using local publish version of aws-msk-iam-auth solve the issue.
Do you know when you will do a new release?

@sayantacC
Copy link
Contributor

Fixed in Release 1.1.5..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants