-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
SchemaRegistryClientConfig.java
133 lines (108 loc) · 5.51 KB
/
SchemaRegistryClientConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.kafka.schemaregistry.client;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SaslConfigs;
public class SchemaRegistryClientConfig {
public static final String CLIENT_NAMESPACE = "schema.registry.";
public static final String BASIC_AUTH_CREDENTIALS_SOURCE = "basic.auth.credentials.source";
/**
* @deprecated use {@link #USER_INFO_CONFIG} instead
*/
@Deprecated
public static final String SCHEMA_REGISTRY_USER_INFO_CONFIG =
"schema.registry.basic.auth.user.info";
public static final String USER_INFO_CONFIG = "basic.auth.user.info";
public static final String BEARER_AUTH_CREDENTIALS_SOURCE = "bearer.auth.credentials.source";
public static final String BEARER_AUTH_TOKEN_CONFIG = "bearer.auth.token";
public static final String PROXY_HOST = "proxy.host";
public static final String PROXY_PORT = "proxy.port";
public static final String MISSING_CACHE_SIZE_CONFIG = "missing.cache.size";
public static final String MISSING_ID_CACHE_TTL_CONFIG = "missing.id.cache.ttl.sec";
public static final String MISSING_SCHEMA_CACHE_TTL_CONFIG = "missing.schema.cache.ttl.sec";
//OAuth AUTHORIZATION SERVER related configs
public static final String BEARER_AUTH_ISSUER_ENDPOINT_URL = "bearer.auth.issuer.endpoint.url";
public static final String BEARER_AUTH_CLIENT_ID = "bearer.auth.client.id";
public static final String BEARER_AUTH_CLIENT_SECRET = "bearer.auth.client.secret";
public static final String BEARER_AUTH_SCOPE = "bearer.auth.scope";
public static final String BEARER_AUTH_SCOPE_CLAIM_NAME = "bearer.auth.scope.claim.name";
public static final String BEARER_AUTH_SCOPE_CLAIM_NAME_DEFAULT =
SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME;
public static final String BEARER_AUTH_SUB_CLAIM_NAME = "bearer.auth.sub.claim.name";
public static final String BEARER_AUTH_SUB_CLAIM_NAME_DEFAULT =
SaslConfigs.DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME;
//OAuth configs required by SR
public static final String BEARER_AUTH_LOGICAL_CLUSTER = "bearer.auth.logical.cluster";
public static final String BEARER_AUTH_IDENTITY_POOL_ID = "bearer.auth.identity.pool.id";
//OAuth config related to token cache
public static final String BEARER_AUTH_CACHE_EXPIRY_BUFFER_SECONDS =
"bearer.auth.cache.expiry.buffer.seconds";
public static final short BEARER_AUTH_CACHE_EXPIRY_BUFFER_SECONDS_DEFAULT = 300;
//Custom bearer Auth related
public static final String BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
"bearer.auth.custom.provider.class";
public static void withClientSslSupport(ConfigDef configDef, String namespace) {
org.apache.kafka.common.config.ConfigDef sslConfigDef = new org.apache.kafka.common.config
.ConfigDef();
sslConfigDef.withClientSslSupport();
for (org.apache.kafka.common.config.ConfigDef.ConfigKey configKey
: sslConfigDef.configKeys().values()) {
configDef.define(namespace + configKey.name,
typeFor(configKey.type),
configKey.defaultValue,
importanceFor(configKey.importance),
configKey.documentation);
}
}
private static ConfigDef.Type typeFor(org.apache.kafka.common.config.ConfigDef.Type type) {
return ConfigDef.Type.valueOf(type.name());
}
private static ConfigDef.Importance importanceFor(
org.apache.kafka.common.config.ConfigDef.Importance importance) {
return ConfigDef.Importance.valueOf(importance.name());
}
public static long getMissingIdTTL(Map<String, ?> configs) {
return configs != null && configs.containsKey(MISSING_ID_CACHE_TTL_CONFIG)
? (Long) configs.get(MISSING_ID_CACHE_TTL_CONFIG)
: 0L;
}
public static long getMissingSchemaTTL(Map<String, ?> configs) {
return configs != null && configs.containsKey(MISSING_SCHEMA_CACHE_TTL_CONFIG)
? (Long) configs.get(MISSING_SCHEMA_CACHE_TTL_CONFIG)
: 0L;
}
public static int getMaxMissingCacheSize(Map<String, ?> configs) {
return configs != null && configs.containsKey(MISSING_CACHE_SIZE_CONFIG)
? (Integer) configs.get(MISSING_CACHE_SIZE_CONFIG)
: 10000;
}
public static short getBearerAuthCacheExpiryBufferSeconds(Map<String, ?> configs) {
return configs != null && configs.containsKey(BEARER_AUTH_CACHE_EXPIRY_BUFFER_SECONDS)
? (Short) configs.get(BEARER_AUTH_CACHE_EXPIRY_BUFFER_SECONDS)
: BEARER_AUTH_CACHE_EXPIRY_BUFFER_SECONDS_DEFAULT;
}
public static String getBearerAuthScopeClaimName(Map<String, ?> configs) {
return configs != null && configs.containsKey(BEARER_AUTH_SCOPE_CLAIM_NAME)
? (String) configs.get(BEARER_AUTH_SCOPE_CLAIM_NAME)
: BEARER_AUTH_SCOPE_CLAIM_NAME_DEFAULT;
}
public static String getBearerAuthSubClaimName(Map<String, ?> configs) {
return configs != null && configs.containsKey(BEARER_AUTH_SUB_CLAIM_NAME)
? (String) configs.get(BEARER_AUTH_SUB_CLAIM_NAME)
: BEARER_AUTH_SUB_CLAIM_NAME_DEFAULT;
}
}