Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DGS-7289 Adding CustomBearerAuthCredentialProvider #2635

Merged
merged 5 commits into from Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -65,6 +65,10 @@ public class SchemaRegistryClientConfig {
"bearer.auth.cache.expiry.buffer.seconds";
public static final short BEARER_AUTH_CACHE_EXPIRY_BUFFER_SECONDS_DEFAULT = 300;

//Custom bearer Auth related
public static final String BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
"bearer.auth.custom.provider.class";


public static void withClientSslSupport(ConfigDef configDef, String namespace) {
org.apache.kafka.common.config.ConfigDef sslConfigDef = new org.apache.kafka.common.config
Expand Down
Expand Up @@ -21,8 +21,19 @@

public interface BearerAuthCredentialProvider extends Configurable {

String alias();
/*
Making alias() default method as custom implementation loaded using
CustomBearerAuthCredentialProvider via config bearer.auth.custom.provider.class don't need to
Implement it.
*/
default String alias() {
return null;
}

/*
This getBearerToken method should Ideally return a token from a cache. The cache should be
responsible for refreshing the token.
*/
String getBearerToken(URL url);

default String getTargetSchemaRegistry() {
Expand Down
@@ -0,0 +1,77 @@
/*
* Copyright 2023 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.kafka.schemaregistry.client.security.bearerauth;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import java.net.URL;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.oauthbearer.secured.ConfigurationUtils;

public class CustomBearerAuthCredentialProvider implements BearerAuthCredentialProvider {

BearerAuthCredentialProvider customBearerAuthCredentialProvider;
private String targetSchemaRegistry;
private String targetIdentityPoolId;

@Override
public String alias() {
return "CUSTOM";
}

@Override
public String getBearerToken(URL url) {
return this.customBearerAuthCredentialProvider.getBearerToken(url);
}

@Override
public String getTargetSchemaRegistry() {
return this.customBearerAuthCredentialProvider.getTargetSchemaRegistry() != null
? this.customBearerAuthCredentialProvider.getTargetSchemaRegistry()
: this.targetSchemaRegistry;
}

@Override
public String getTargetIdentityPoolId() {
return this.customBearerAuthCredentialProvider.getTargetIdentityPoolId() != null
? this.customBearerAuthCredentialProvider.getTargetIdentityPoolId()
: this.targetIdentityPoolId;
}

@Override
public void configure(Map<String, ?> map) {
ConfigurationUtils cu = new ConfigurationUtils(map);
String className = cu.validateString(
SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS);
try {
this.customBearerAuthCredentialProvider =
(BearerAuthCredentialProvider) Class.forName(className)
.getDeclaredConstructor()
.newInstance();
} catch (Exception e) {
throw new ConfigException(String.format(
"Unable to instantiate an object of class %s, failed with exception: ",
SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS
) + e.getMessage());
}
targetSchemaRegistry = cu.validateString(
SchemaRegistryClientConfig.BEARER_AUTH_LOGICAL_CLUSTER, false);
targetIdentityPoolId = cu.validateString(
SchemaRegistryClientConfig.BEARER_AUTH_IDENTITY_POOL_ID, false);
this.customBearerAuthCredentialProvider.configure(map);
}
}
@@ -1,3 +1,4 @@
io.confluent.kafka.schemaregistry.client.security.bearerauth.StaticTokenCredentialProvider
io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.OauthCredentialProvider
io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.SaslOauthCredentialProvider
io.confluent.kafka.schemaregistry.client.security.bearerauth.oauth.SaslOauthCredentialProvider
io.confluent.kafka.schemaregistry.client.security.bearerauth.CustomBearerAuthCredentialProvider
Expand Up @@ -56,6 +56,19 @@ public void testOAuthCredentialProvider() {
"OAUTHBEARER", CONFIG_MAP), OauthCredentialProvider.class);
}

@Test
public void testCustomBearerAuthCredentialProvider() {
Map<String, String> CONFIG_MAP = new HashMap<>();
CONFIG_MAP.put(SchemaRegistryClientConfig.BEARER_AUTH_LOGICAL_CLUSTER, "lsrc-dummy");
CONFIG_MAP.put(SchemaRegistryClientConfig.BEARER_AUTH_IDENTITY_POOL_ID, "my-pool-id");
CONFIG_MAP.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS,
StaticTokenCredentialProvider.class.getName());
CONFIG_MAP.put(SchemaRegistryClientConfig.BEARER_AUTH_TOKEN_CONFIG, "custom-token");

assertInstance(BearerAuthCredentialProviderFactory.getBearerAuthCredentialProvider(
"CUSTOM", CONFIG_MAP), CustomBearerAuthCredentialProvider.class);
}

@Test
public void testUnknownProvider() {
Assert.assertNull(BearerAuthCredentialProviderFactory.getBearerAuthCredentialProvider(
Expand Down
@@ -0,0 +1,60 @@
package io.confluent.kafka.schemaregistry.client.security.bearerauth;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;


public class CustomBearerAuthCredentialProviderTest {

private final String LSRC_ID = "lsrc-dummy";
private final String POOL_ID = "my-pool-id";

@Test
public void testWithStaticTokenProvider() throws MalformedURLException {
Map<String, String> CONFIG_MAP = new HashMap<>();
CONFIG_MAP.put(SchemaRegistryClientConfig.BEARER_AUTH_LOGICAL_CLUSTER, LSRC_ID);
CONFIG_MAP.put(SchemaRegistryClientConfig.BEARER_AUTH_IDENTITY_POOL_ID, POOL_ID);
CONFIG_MAP.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS,
StaticTokenCredentialProvider.class.getName());
CONFIG_MAP.put(SchemaRegistryClientConfig.BEARER_AUTH_TOKEN_CONFIG, "custom-token");

BearerAuthCredentialProvider provider = new CustomBearerAuthCredentialProvider();
provider.configure(CONFIG_MAP);
Assert.assertEquals("custom-token", provider.getBearerToken(new URL("http://dummy")));
Assert.assertEquals(LSRC_ID, provider.getTargetSchemaRegistry());
Assert.assertEquals(POOL_ID, provider.getTargetIdentityPoolId());
}

@Test
public void testWithMyFileTokenProvider() throws IOException {
String token = "my_custom_file_token";
Path tempFile = Files.createTempFile("MyTokenFile", "txt");
Files.write(tempFile, token.getBytes(StandardCharsets.UTF_8));

Map<String, String> CONFIG_MAP = new HashMap<>();
CONFIG_MAP.put(SchemaRegistryClientConfig.BEARER_AUTH_LOGICAL_CLUSTER, LSRC_ID);
CONFIG_MAP.put(SchemaRegistryClientConfig.BEARER_AUTH_IDENTITY_POOL_ID, POOL_ID);
CONFIG_MAP.put(SchemaRegistryClientConfig.BEARER_AUTH_ISSUER_ENDPOINT_URL, tempFile.toString());
CONFIG_MAP.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS,
"io.confluent.kafka.schemaregistry.client.security.bearerauth.Resources.MyFileTokenProvider");

BearerAuthCredentialProvider provider = new CustomBearerAuthCredentialProvider();
provider.configure(CONFIG_MAP);

Assert.assertEquals(token, provider.getBearerToken(new URL("http://dummy")));
Assert.assertEquals(LSRC_ID, provider.getTargetSchemaRegistry());
Assert.assertEquals(POOL_ID, provider.getTargetIdentityPoolId());

}


}
@@ -0,0 +1,45 @@
package io.confluent.kafka.schemaregistry.client.security.bearerauth.Resources;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import io.confluent.kafka.schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;

/**
* <code>MyFileTokenProvider</code> is a <code>BearerAuthCredentialProvider</code>
* Indented only for testing purpose. The <code>MyFileTokenProvider</code> can loaded using
* <code>CustomTokenCredentialProvider<code/>
*/
public class MyFileTokenProvider implements BearerAuthCredentialProvider {

private String token;

@Override
public String getBearerToken(URL url) {
// Ideally this might be fetching a cache. And cache should hold the mechanism to refresh.
return this.token;
}

@Override
public void configure(Map<String, ?> map) {
Path path = Paths.get(
(String) map.get(SchemaRegistryClientConfig.BEARER_AUTH_ISSUER_ENDPOINT_URL));
try {
this.token = Files
.lines(path, StandardCharsets.UTF_8)
.collect(Collectors.joining(System.lineSeparator()));
} catch (IOException e) {
throw new ConfigException(String.format(
"Not to read file at given location %s", path.toString()
));
}
}

}
Expand Up @@ -187,6 +187,12 @@ public class AbstractKafkaSchemaSerDeConfig extends AbstractConfig {
+ "if no value is specified. This value is ignored if it exceeds the remaining lifetime "
+ "of a token from the moment it is retrieved into schema registry.";

public static final String BEARER_AUTH_CUSTOM_PROVIDER_CLASS = SchemaRegistryClientConfig
.BEARER_AUTH_CUSTOM_PROVIDER_CLASS;
public static final String BEARER_AUTH_CUSTOM_PROVIDER_CLASS_D0C =
"Custom class which will provide the token credential. Needs to implement io.confluent.kafka"
+ ".schemaregistry.client.security.bearerauth.BearerAuthCredentialProvider interface";

public static final String CONTEXT_NAME_STRATEGY = "context.name.strategy";
public static final String CONTEXT_NAME_STRATEGY_DEFAULT =
NullContextNameStrategy.class.getName();
Expand Down Expand Up @@ -274,6 +280,8 @@ public static ConfigDef baseConfigDef() {
Type.SHORT, BEARER_AUTH_CACHE_EXPIRY_BUFFER_SECONDS_DEFAULT, Range.between(0, 3600),
Importance.LOW,
BEARER_AUTH_CACHE_EXPIRY_BUFFER_SECONDS_DOC)
.define(BEARER_AUTH_CUSTOM_PROVIDER_CLASS, Type.STRING,null, Importance.MEDIUM,
BEARER_AUTH_CUSTOM_PROVIDER_CLASS_D0C)
.define(CONTEXT_NAME_STRATEGY, Type.CLASS, CONTEXT_NAME_STRATEGY_DEFAULT,
Importance.MEDIUM, CONTEXT_NAME_STRATEGY_DOC)
.define(KEY_SUBJECT_NAME_STRATEGY, Type.CLASS, KEY_SUBJECT_NAME_STRATEGY_DEFAULT,
Expand Down