Skip to content

Commit

Permalink
DGS-7797: Introduce a new optional host.port config for inter insta…
Browse files Browse the repository at this point in the history
…nce communication
  • Loading branch information
akhileshm1 committed Jul 27, 2023
1 parent 455a6ba commit 4729671
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 5 deletions.
Expand Up @@ -156,6 +156,8 @@ public class SchemaRegistryConfig extends RestConfig {
*/
public static final String HOST_NAME_CONFIG = "host.name";

public static final String HOST_PORT_CONFIG = "host.port";

public static final String SCHEMA_PROVIDERS_CONFIG = "schema.providers";

/**
Expand Down Expand Up @@ -315,6 +317,8 @@ public class SchemaRegistryConfig extends RestConfig {
"The host name. Make sure to set this if running SchemaRegistry "
+ "with multiple nodes. This name is also used in the endpoint for inter instance "
+ "communication";
protected static final String HOST_PORT_DOC =
"The host port. This port is used in the endpoint for inter instance communication";
protected static final String SCHEMA_PROVIDERS_DOC =
" A list of classes to use as SchemaProvider. Implementing the interface "
+ "<code>SchemaProvider</code> allows you to add custom schema types to Schema Registry.";
Expand Down Expand Up @@ -511,6 +515,9 @@ DEFAULT_KAFKASTORE_WRITE_MAX_RETRIES, atLeast(0),
.define(HOST_NAME_CONFIG, ConfigDef.Type.STRING, getDefaultHost(),
ConfigDef.Importance.HIGH, HOST_DOC
)
.define(HOST_PORT_CONFIG, ConfigDef.Type.INT, SCHEMAREGISTRY_PORT_DEFAULT,
ConfigDef.Importance.MEDIUM, HOST_PORT_DOC
)
.define(SCHEMA_PROVIDERS_CONFIG, ConfigDef.Type.LIST, "",
ConfigDef.Importance.LOW, SCHEMA_PROVIDERS_DOC
)
Expand Down
Expand Up @@ -15,6 +15,7 @@

package io.confluent.kafka.schemaregistry.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -178,11 +179,8 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config,
interInstanceListenerNameConfig,
config.interInstanceProtocol());
log.info("Found internal listener: {}", internalListener.toString());
SchemeAndPort schemeAndPort = new SchemeAndPort(internalListener.getUri().getScheme(),
internalListener.getUri().getPort());
String host = config.getString(SchemaRegistryConfig.HOST_NAME_CONFIG);
this.myIdentity = new SchemaRegistryIdentity(host, schemeAndPort.port,
isEligibleForLeaderElector, schemeAndPort.scheme);

this.myIdentity = getMyIdentity(internalListener, isEligibleForLeaderElector, config);
log.info("Setting my identity to {}", myIdentity);

Map<String, Object> sslConfig = config.getOverriddenSslConfigs(internalListener);
Expand Down Expand Up @@ -222,6 +220,20 @@ public ParsedSchema load(RawSchema s) throws Exception {
this.ruleSetHandler = new RuleSetHandler();
}

@VisibleForTesting
static SchemaRegistryIdentity getMyIdentity(NamedURI internalListener,
boolean isEligibleForLeaderElector, SchemaRegistryConfig config) {
SchemeAndPort schemeAndPort = new SchemeAndPort(internalListener.getUri().getScheme(),
// default value of 8081 is always set for `host.port`. only consider `host.port` if the
// original properties has it. otherwise, use the port from the listener.
config.originals().containsKey(SchemaRegistryConfig.HOST_PORT_CONFIG) ?
config.getInt(SchemaRegistryConfig.HOST_PORT_CONFIG) :
internalListener.getUri().getPort());
String host = config.getString(SchemaRegistryConfig.HOST_NAME_CONFIG);
return new SchemaRegistryIdentity(host, schemeAndPort.port, isEligibleForLeaderElector,
schemeAndPort.scheme);
}

private Map<String, SchemaProvider> initProviders(SchemaRegistryConfig config) {
Map<String, Object> schemaProviderConfigs =
config.originalsWithPrefix(SchemaRegistryConfig.SCHEMA_PROVIDERS_CONFIG + ".");
Expand Down
Expand Up @@ -102,4 +102,43 @@ public void testGetNamedInternalListener() throws SchemaRegistryException, RestC
assertEquals("Expected internal listener's name to be returned", "bob", listener.getName());
assertEquals("Expected Scheme match", SchemaRegistryConfig.HTTP, listener.getUri().getScheme());
}

@Test
public void testMyIdentityWithoutPortOverride() throws RestConfigException, SchemaRegistryException {
String listeners = "bob://localhost:123, http://localhost:456";
String listenerProtocolMap = "bob:https";
Properties props = new Properties();
props.setProperty(SchemaRegistryConfig.HOST_NAME_CONFIG, "schema.registry-0.example.com");
props.setProperty(RestConfig.LISTENERS_CONFIG, listeners);
props.setProperty(RestConfig.LISTENER_PROTOCOL_MAP_CONFIG, listenerProtocolMap);
props.setProperty(SchemaRegistryConfig.INTER_INSTANCE_LISTENER_NAME_CONFIG, "bob");
SchemaRegistryConfig config = new SchemaRegistryConfig(props);
SchemaRegistryIdentity schemaRegistryIdentity = new
SchemaRegistryIdentity("schema.registry-0.example.com", 123, true, "https");
NamedURI internalListener = KafkaSchemaRegistry.getInterInstanceListener(config.getListeners(),
config.interInstanceListenerName(), SchemaRegistryConfig.HTTP);

assertEquals(schemaRegistryIdentity,
KafkaSchemaRegistry.getMyIdentity(internalListener, true, config));
}

@Test
public void testMyIdentityWithPortOverride() throws RestConfigException, SchemaRegistryException {
String listeners = "bob://localhost:123, http://localhost:456";
String listenerProtocolMap = "bob:https";
Properties props = new Properties();
props.setProperty(SchemaRegistryConfig.HOST_NAME_CONFIG, "schema.registry-0.example.com");
props.setProperty(SchemaRegistryConfig.HOST_PORT_CONFIG, "443");
props.setProperty(RestConfig.LISTENERS_CONFIG, listeners);
props.setProperty(RestConfig.LISTENER_PROTOCOL_MAP_CONFIG, listenerProtocolMap);
props.setProperty(SchemaRegistryConfig.INTER_INSTANCE_LISTENER_NAME_CONFIG, "bob");
SchemaRegistryConfig config = new SchemaRegistryConfig(props);
SchemaRegistryIdentity schemaRegistryIdentity = new
SchemaRegistryIdentity("schema.registry-0.example.com", 443, true, "https");
NamedURI internalListener = KafkaSchemaRegistry.getInterInstanceListener(config.getListeners(),
config.interInstanceListenerName(), SchemaRegistryConfig.HTTP);

assertEquals(schemaRegistryIdentity,
KafkaSchemaRegistry.getMyIdentity(internalListener, true, config));
}
}

0 comments on commit 4729671

Please sign in to comment.