-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 905][api][pulsar-client] add serializable token class #10574
Conversation
@atezs82 I'd rather not introduce a new auth provider here duplicating most of the code. Is there a way that this can be fixed/enhanced within the context of the existing |
// Read token from a file | ||
URI filePath = URI.create(encodedAuthParamString); | ||
try { | ||
this.token = new String(Files.readAllBytes(Paths.get(filePath)), Charsets.UTF_8).trim(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has a problem that it only reads the content of the file once, at startup. When the token file gets refreshed, the client will still be operating with the old credentials, and the broker will disconnect it when the token expires
@merlimat thanks for the review, I'll try to modify this accordingly. |
@merlimat I pushed a slightly different version without new classes (just modified Note that in theory it is still possible to provide a not-serializable token supplier, which would still give back that cryptic NullPointerException above. |
@@ -107,4 +101,38 @@ public void start() throws PulsarClientException { | |||
// noop | |||
} | |||
|
|||
public static class SerializableURITokenSupplier implements Supplier<String>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public static class SerializableURITokenSupplier implements Supplier<String>, Serializable { | |
private static class SerializableURITokenSupplier implements Supplier<String>, Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
public static class SerializableURITokenSupplier implements Supplier<String>, Serializable { | ||
|
||
private static final long serialVersionUID = 3160666668166028760L; | ||
private URI uri; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private URI uri; | |
private final URI uri; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
public static class SerializableTokenSupplier implements Supplier<String>, Serializable { | ||
|
||
private static final long serialVersionUID = 5095234161799506913L; | ||
private String token; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private String token; | |
private final String token; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@atezs82 Change LGTM, just left few minor suggestions. |
@merlimat Thanks, I'll fix these and push the changes. |
@atezs82 There seems to be an issue reported by spotbugs tool:
|
@merlimat Please excuse my limited knowledge on the @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Using custom serializer which Findbugs can't detect")
private Supplier<ByteArrayInputStream> certStreamProvider, keyStreamProvider, trustStoreStreamProvider; Based on this, I also added a similar suppress annotation to Let me know if this approach shall be OK or shall I modify the custom deserializer logic instead. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great work
+1
Regarding the CI failure it seems that a unit test for On the other hand, I managed to execute just the problematic unit test locally by running
Not exactly sure on how to proceed with this. Shall these test be executed locally, or do they need some kind of CI environment to run? Given that I could execute this locally, I would re-run the workflow on the CI but I think I lack privileges for doing so. |
) ### Motivation When trying to authenticate Pulsar in distributed systems (a very good example of this is the Spark connector, see here: https://pulsar.apache.org/docs/en/adaptors-spark/), the following exception is thrown (this is taken from the log of one of the Spark executors): ``` 21/05/13 11:59:34 WARN PulsarClientImpl: [topic: persistent://tenant/namespace/topic] Could not get connection while getPartitionedTopicMetadata -- Will try again in 380 ms 21/05/13 11:59:34 INFO ConnectionPool: [[id: 0x4d13ed61, L:/1.2.3.4:43624 - R:broker.svc.cluster.local/1.2.3.4:6650]] Connected to server 21/05/13 11:59:34 WARN ClientCnx: [broker.svc.cluster.local/1.2.3.4:6650] Got exception java.lang.RuntimeException: failed to get client token at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getToken(AuthenticationDataToken.java:62) at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getCommandData(AuthenticationDataToken.java:55) at org.apache.pulsar.client.api.AuthenticationDataProvider.authenticate(AuthenticationDataProvider.java:133) at org.apache.pulsar.client.impl.ClientCnx.newConnectCommand(ClientCnx.java:218) at org.apache.pulsar.client.impl.ClientCnx.channelActive(ClientCnx.java:199) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895) at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.fulfillConnectPromise(AbstractEpollChannel.java:620) at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:653) at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getToken(AuthenticationDataToken.java:60) ... 20 more ``` This happens since the token supplier function become null when it gets transferred to eg. a Spark executor. Refer to this issue (which is roughly the same): https://github.com/streamnative/pulsar/issues/905 . ### Modifications Added a new token storage class, `SerializableAuthenticationToken`, which can be serialized properly (this is tested in its corresponding unit test as well). On the other hand, this class does not use supplier functions to fetch token data, it stores the token as a plain string instead. For use-cases requiring a token supplier, this cannot be used, but can load its configuration from file (using the `file://` URL prefix), like the original `AuthenticationToken` class. Corresponding unit test were added to the class. Will also add proper documentation prior to merging once the change can be accepted - if that is required.
) ### Motivation When trying to authenticate Pulsar in distributed systems (a very good example of this is the Spark connector, see here: https://pulsar.apache.org/docs/en/adaptors-spark/), the following exception is thrown (this is taken from the log of one of the Spark executors): ``` 21/05/13 11:59:34 WARN PulsarClientImpl: [topic: persistent://tenant/namespace/topic] Could not get connection while getPartitionedTopicMetadata -- Will try again in 380 ms 21/05/13 11:59:34 INFO ConnectionPool: [[id: 0x4d13ed61, L:/1.2.3.4:43624 - R:broker.svc.cluster.local/1.2.3.4:6650]] Connected to server 21/05/13 11:59:34 WARN ClientCnx: [broker.svc.cluster.local/1.2.3.4:6650] Got exception java.lang.RuntimeException: failed to get client token at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getToken(AuthenticationDataToken.java:62) at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getCommandData(AuthenticationDataToken.java:55) at org.apache.pulsar.client.api.AuthenticationDataProvider.authenticate(AuthenticationDataProvider.java:133) at org.apache.pulsar.client.impl.ClientCnx.newConnectCommand(ClientCnx.java:218) at org.apache.pulsar.client.impl.ClientCnx.channelActive(ClientCnx.java:199) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:209) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1398) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:230) at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:216) at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:895) at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.fulfillConnectPromise(AbstractEpollChannel.java:620) at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:653) at org.apache.pulsar.shade.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465) at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.pulsar.client.impl.auth.AuthenticationDataToken.getToken(AuthenticationDataToken.java:60) ... 20 more ``` This happens since the token supplier function become null when it gets transferred to eg. a Spark executor. Refer to this issue (which is roughly the same): https://github.com/streamnative/pulsar/issues/905 . ### Modifications Added a new token storage class, `SerializableAuthenticationToken`, which can be serialized properly (this is tested in its corresponding unit test as well). On the other hand, this class does not use supplier functions to fetch token data, it stores the token as a plain string instead. For use-cases requiring a token supplier, this cannot be used, but can load its configuration from file (using the `file://` URL prefix), like the original `AuthenticationToken` class. Corresponding unit test were added to the class. Will also add proper documentation prior to merging once the change can be accepted - if that is required.
Motivation
When trying to authenticate Pulsar in distributed systems (a very good example of this is the Spark connector, see here: https://pulsar.apache.org/docs/en/adaptors-spark/), the following exception is thrown (this is taken from the log of one of the Spark executors):
This happens since the token supplier function become null when it gets transferred to eg. a Spark executor. Refer to this issue (which is roughly the same): https://github.com/streamnative/pulsar/issues/905 .
Modifications
Added a new token storage class,
SerializableAuthenticationToken
, which can be serialized properly (this is tested in its corresponding unit test as well). On the other hand, this class does not use supplier functions to fetch token data, it stores the token as a plain string instead. For use-cases requiring a token supplier, this cannot be used, but can load its configuration from file (using thefile://
URL prefix), like the originalAuthenticationToken
class.Corresponding unit test were added to the class. Will also add proper documentation prior to merging once the change can be accepted - if that is required.
Verifying this change
This change added tests and can be verified as follows:
AuthenticationToken
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesSerializableAuthenticationToken
upon connecting as well, apart from the existing authentication methods)Documentation