Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DGS-7243 Add initialized method to determine extension readiness #2632

Merged
merged 5 commits into from May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -48,7 +48,6 @@ public class SchemaRegistryRestApplication extends Application<SchemaRegistryCon

private static final Logger log = LoggerFactory.getLogger(SchemaRegistryRestApplication.class);
private KafkaSchemaRegistry schemaRegistry = null;
private List<SchemaRegistryResourceExtension> schemaRegistryResourceExtensions = null;

public SchemaRegistryRestApplication(Properties props) throws RestConfigException {
this(new SchemaRegistryConfig(props));
Expand Down Expand Up @@ -101,11 +100,6 @@ public void configureBaseApplication(

@Override
public void setupResources(Configurable<?> config, SchemaRegistryConfig schemaRegistryConfig) {
schemaRegistryResourceExtensions =
schemaRegistryConfig.getConfiguredInstances(
schemaRegistryConfig.definedResourceExtensionConfigName(),
SchemaRegistryResourceExtension.class);

config.register(RootResource.class);
config.register(new ConfigResource(schemaRegistry));
config.register(new ContextsResource(schemaRegistry));
Expand All @@ -120,6 +114,8 @@ public void setupResources(Configurable<?> config, SchemaRegistryConfig schemaRe
schemaRegistry.getMetricsContainer().getApiCallsSuccess(),
schemaRegistry.getMetricsContainer().getApiCallsFailure()));

List<SchemaRegistryResourceExtension> schemaRegistryResourceExtensions =
schemaRegistry.getResourceExtensions();
if (schemaRegistryResourceExtensions != null) {
try {
for (SchemaRegistryResourceExtension
Expand Down Expand Up @@ -148,11 +144,14 @@ protected ResourceCollection getStaticResources() {

@Override
public void onShutdown() {

if (schemaRegistry != null) {
schemaRegistry.close();
if (schemaRegistry == null) {
return;
}

schemaRegistry.close();

List<SchemaRegistryResourceExtension> schemaRegistryResourceExtensions =
schemaRegistry.getResourceExtensions();
if (schemaRegistryResourceExtensions != null) {
for (SchemaRegistryResourceExtension
schemaRegistryResourceExtension : schemaRegistryResourceExtensions) {
Expand Down
Expand Up @@ -31,4 +31,8 @@ void register(
SchemaRegistry schemaRegistry
) throws SchemaRegistryException;


default boolean initialized() {
return true;
}
}
Expand Up @@ -56,6 +56,7 @@
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.VersionId;
import io.confluent.kafka.schemaregistry.rest.extensions.SchemaRegistryResourceExtension;
import io.confluent.kafka.schemaregistry.storage.encoder.MetadataEncoderService;
import io.confluent.kafka.schemaregistry.storage.exceptions.EntryTooLargeException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
Expand All @@ -80,6 +81,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.avro.reflect.Nullable;
Expand Down Expand Up @@ -113,6 +115,7 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg
private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class);

private final SchemaRegistryConfig config;
private final List<SchemaRegistryResourceExtension> resourceExtensions;
private final Map<String, Object> props;
private final LoadingCache<RawSchema, ParsedSchema> schemaCache;
private final LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache;
Expand Down Expand Up @@ -142,6 +145,7 @@ public class KafkaSchemaRegistry implements SchemaRegistry, LeaderAwareSchemaReg
private final String kafkaClusterId;
private final String groupId;
private final List<Consumer<Boolean>> leaderChangeListeners = new CopyOnWriteArrayList<>();
private final AtomicBoolean initialized = new AtomicBoolean(false);

public KafkaSchemaRegistry(SchemaRegistryConfig config,
Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer)
Expand All @@ -150,6 +154,9 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config,
throw new SchemaRegistryException("Schema registry configuration is null");
}
this.config = config;
this.resourceExtensions = config.getConfiguredInstances(
config.definedResourceExtensionConfigName(),
SchemaRegistryResourceExtension.class);
this.props = new ConcurrentHashMap<>();
Boolean leaderEligibility = config.getBoolean(SchemaRegistryConfig.MASTER_ELIGIBILITY);
if (leaderEligibility == null) {
Expand Down Expand Up @@ -267,6 +274,10 @@ protected SchemaUpdateHandler getSchemaUpdateHandler(SchemaRegistryConfig config
return new CompositeSchemaUpdateHandler(customSchemaHandlers);
}

public List<SchemaRegistryResourceExtension> getResourceExtensions() {
return resourceExtensions;
}

protected LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache() {
return new InMemoryCache<SchemaRegistryKey, SchemaRegistryValue>(serializer);
}
Expand Down Expand Up @@ -369,6 +380,7 @@ public void postInit() throws SchemaRegistryException {
if (delayLeaderElection) {
electLeader();
}
initialized.set(true);
}

private void electLeader() throws SchemaRegistryException {
Expand All @@ -389,7 +401,7 @@ public void waitForInit() throws InterruptedException {
}

public boolean initialized() {
return kafkaStore.initialized();
return kafkaStore.initialized() && initialized.get();
}

/**
Expand Down