Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split Store and implementations from schema-registry #422

Closed
wants to merge 1 commit into from
Closed

Split Store and implementations from schema-registry #422

wants to merge 1 commit into from

Conversation

Noddy76
Copy link

@Noddy76 Noddy76 commented Sep 28, 2016

This pull request splits Store<K, V> and it's implementations from schema-registry into their own module for reuse elsewhere.

Background

Since the implementation of cleanup.policy = compact in Kafka you can use a topic as a distributed key-value store. Schema Registry does this to hold schema. When looking for a client library that implemented this functionality we found the implementation within schema-registry but it was coupled to the rest of the code and could not be used on it's own.

@ConfluentJenkins
Copy link
Collaborator

Can one of the admins verify this patch?

@ghost
Copy link

ghost commented Sep 28, 2016

It looks like @Noddy76 hasn't signed our Contributor License Agreement, yet.

Appreciation of efforts,

clabot

@Noddy76
Copy link
Author

Noddy76 commented Oct 4, 2016

I have now signed the CLA at http://clabot.confluent.io/cla

@ewencp
Copy link
Contributor

ewencp commented Oct 11, 2016

ok to test

Copy link
Contributor

@ewencp ewencp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Noddy76 This looks like a good direction. We'll want to be careful about what we're considering public API since this is now a library. Also, it seems like we ended up with a bit more duplication than is probably necessary.

</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is purely a library rather than a service then normally you'd only use slf4j-api since you can't assume the library user will be using log4j.

<!-- To prevent a UniqueResourceException due the same resource existing in both
org.apache.directory.api/api-all and org.apache.directory.api/api-ldap-schema-data, api-ldap-schema-data
needs to be excluded from all directory-related maven dependencies. -->
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to try to prune down dependencies in core/pom.xml since a bunch of stuff is included here as well. In particular, these bits stand out -- I think they are only needed for the SASL utility class that got moved, so probably the equivalent sections of core/pom.xml don't need all this duplicated Maven config.

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.protocol.SecurityProtocol;

public class KafkaStoreConfig extends AbstractConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it looks like SchemaRegistryConfig didn't change, I assume the obvious drawback here is lots of duplication. I'm guessing there wasn't a clean way to combine the configs since you now want to inherit from both RestConfig and KafkaStoreConfig in SchemaRegistryConfig? I'm just worried about these things falling out of sync, especially as it relates to documentation since we auto generate the configuration docs from the code.

prop.put(SchemaRegistryConfig.KAFKASTORE_TOPIC_CONFIG, kafkaTopic);
prop.setProperty(KafkaStoreConfig.KAFKASTORE_CONFIG_PREFIX + KafkaStoreConfig.KAFKASTORE_CONNECTION_URL_CONFIG, zkConnect);
prop.put(KafkaStoreConfig.KAFKASTORE_CONFIG_PREFIX + KafkaStoreConfig.KAFKASTORE_TOPIC_CONFIG, kafkaTopic);
prop.setProperty(KafkaStoreConfig.KAFKASTORE_CONFIG_PREFIX + KafkaStoreConfig.KAFKASTORE_KAFKA_GROUPID, "test-group");
Copy link
Contributor

@ewencp ewencp Oct 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This group ID seems to be replacing some setup we previously did in Kafka store, but this doesn't match the values it previously did for the schema registry service. This would be a compatibility issue since that ID is used to uniquely identify the instance. See comment later on for more details.

config.getInt(SchemaRegistryConfig.KAFKASTORE_TOPIC_REPLICATION_FACTOR_CONFIG);
int port = KafkaSchemaRegistry.getPortForIdentity(config.getInt(SchemaRegistryConfig.PORT_CONFIG),
config.getList(RestConfig.LISTENERS_CONFIG));
this.groupId = String.format("schema-registry-%s-%d",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we seem to have lost some unique information about the schema registry instance. Since this is used as the consumer group ID, just using "test-group" in the other part of the patch isn't ideal. In practice, you can probably get away with this because we use the new consumer in "simple" consumer mode and don't commit offsets. However, this makes some logs much harder to use since they don't have unique identifying info. We probably want to maintain the group ID if possible.

StoreUpdateHandler<K, V> storeUpdateHandler,
Serializer<K, V> serializer,
Store<K, V> localStore,
K noopKey) {
K noopKey,
boolean isZkSecurityEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this a constructor flag rather than a config given that everything else is passed in via config?

More generally, as this becomes truly public, supported API rather than internal implementation detail, we'll need to be careful about compatibility so we'd like to make sure these interfaces, especially the constructors and most used methods, are just what we want.

protected List<KafkaServer> servers = null;
protected String brokerList = null;

//protected RestApp restApp = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid a lot of code duplication here with a bit of inheritance and overrides? It seems like very little changed aside from removing a few lines related to instantiating and starting the rest app?

@@ -0,0 +1,31 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this could move from core rather than being copied. Doesn't look like it is used anymore since all the tests it was used in have moved.

@@ -0,0 +1,73 @@
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this could move from core rather than being copied. Doesn't look like it is used anymore since all the tests it was used in have moved.

this.serializer = serializer;
this.defaultCompatibilityLevel = config.compatibilityType();
this.guidToSchemaKey = new HashMap<Integer, SchemaKey>();
this.schemaHashToGuid = new HashMap<MD5, SchemaIdAndSubjects>();
this.zkAclsEnabled = checkZkAclConfig(config);

KafkaStoreConfig kafkaStoreConfig = new KafkaStoreConfig(config.originalsWithPrefix(KafkaStoreConfig.KAFKASTORE_CONFIG_PREFIX));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite enough because you have at least the groupid that is now required for the KafkaStore but will not be included in the SchemaRegistryConfig. This ties back to the test-group issue -- this is where you would need to get the identity info that got dropped from KafkaStore.

Easy way to test this -- try to run schema registry with a simple default config :) Currently it will crash immediately because of this. (Generally, a good sanity check when making substantial changes like this is to just execute the entire quickstart.)

@massdosage
Copy link

Hey @ewencp, thanks for the above feedback. Based on our e-mail discussion it sounds like there is similar code in the Kafka-connect code base that we could base this on. How do you recommend we proceed? Continue with fixing this according to your comments? Or take a look at the Kafka-connect code and modify this to rather use that approach? Also, if this "Kafka as a key-value store" functionality is being used from two Kafka related modules plus us having our own needs for something like this, wouldn't it make sense to make it a top-level module under the main Kafka project and change the other uses to use this common "library"? I think that would be the desired end state but appreciate we might want to take a phase approach to that. Thoughts?

@ewencp
Copy link
Contributor

ewencp commented Oct 26, 2016

@massdosage Good question. Honestly, the best path will depend a lot on your needs. The fact that two modules have something related to your need (interactive queries (previously known as queryable state) in streams and KafkaBasedLog in Connect) is definitely more data points indicating some sort of common need.

My major concern with with suggesting focusing on those two libraries is that adding public API to Kafka requires KIPs and we've got multiple candidates with substantially different APIs, very different use cases, and I think pretty different requirements. That said, both the ones in Kafka are more modern and better implementations than KafkaStore here, which has to jump through some weird hoops to figure out that you have reached the end of the log (writing (null, null) messages to discover the log end offset). Also, assumptions differ between the three implementations. KafkaStore assumes you are already externally ensuring there is only one writer. KafkaBasedLog doesn't necessarily assume the same, but effectively assumes a single writer per key if you use it's public methods to try to ensure you have the latest value for each key. The streams interactive queries API doesn't provide a way to ensure you've read to the end of the log, so you're always just querying the state up to the point the streams job happens to have read (and there are no restrictions on writers).

It's definitely easier to start with a (potentially unstable) API externally and maybe promote it eventually into a first class Kafka API. That said, I'm not sure what the reception to that will be given that folks are just starting to scratch the surface wrt using Kafka this way, i.e. I'm uncertain how comfortable people will be committing to supporting an interface when we have only a couple of examples of how it would be used. Take a look at how much discussion went into the new consumer API and behavior to get an idea for how involved this can be :) We've also been pretty conservative in adding APIs. For example, we've brought up the idea of a producer that logs locally to disk (for better durability in the event of connection failures combined with potential failures) because quite a few people want this, but this hasn't been added yet and we're not sure we want it as a core Kafka API.

Out of curiosity, if you're using Kafka as a key-value store, do you have requirements on having a single writer per key? If so, how are you guaranteeing this? And do you have consistency requirements if you have multiple nodes serving requests? Do you need read-after-write semantics? There are a lot of details that need to be handled on top of what Kafka provides, so I'm curious about your exact requirements.

@mageshn
Copy link
Member

mageshn commented Nov 9, 2018

@Noddy76 Based on the comments and the activity on the PR, my assumption is that this is no longer needed. With that assumption, I'm closing this PR. If you think otherwise, please feel free to reopen and we can work it through.

@mageshn mageshn closed this Nov 9, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants