Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-2366: Copycat #99

Closed
wants to merge 29 commits into from

Conversation

Projects
None yet
6 participants
@ewencp
Copy link
Contributor

commented Jul 27, 2015

This is an initial patch implementing the basics of Copycat for KIP-26.

The intent here is to start a review of the key pieces of the core API and get a reasonably functional, baseline, non-distributed implementation of Copycat in place to get things rolling. The current patch has a number of known issues that need to be addressed before a final version:

  • Some build-related issues. Specifically, requires some locally-installed dependencies (see below), ignores checkstyle for the runtime data library because it's lifted from Avro currently and likely won't last in its current form, and some Gradle task dependencies aren't quite right because I haven't gotten rid of the dependency on core (which should now be an easy patch since new consumer groups are in a much better state).
  • This patch currently depends on some Confluent trunk code because I prototyped with our Avro serializers w/ schema-registry support. We need to figure out what we want to provide as an example built-in set of serializers. Unlike core Kafka where we could ignore the issue, providing only ByteArray or String serializers, this is pretty central to how Copycat works.
  • This patch uses a hacked up version of Avro as its runtime data format. Not sure if we want to go through the entire API discussion just to get some basic code committed, so I filed KAFKA-2367 to handle that separately. The core connector APIs and the runtime data APIs are entirely orthogonal.
  • This patch needs some updates to get aligned with recent new consumer changes (specifically, I'm aware of the ConcurrentModificationException issue on exit). More generally, the new consumer is in flux but Copycat depends on it, so there are likely to be some negative interactions.
  • The layout feels a bit awkward to me right now because I ported it from a Maven layout. We don't have nearly the same level of granularity in Kafka currently (core and clients, plus the mostly ignored examples, log4j-appender, and a couple of contribs). We might want to reorganize, although keeping data+api separate from runtime and connector plugins is useful for minimizing dependencies.
  • There are a variety of other things (e.g., I'm not happy with the exception hierarchy/how they are currently handled, TopicPartition doesn't really need to be duplicated unless we want Copycat entirely isolated from the Kafka APIs, etc), but I expect those we'll cover in the review.

Before commenting on the patch, it's probably worth reviewing https://issues.apache.org/jira/browse/KAFKA-2365 and https://issues.apache.org/jira/browse/KAFKA-2366 to get an idea of what I had in mind for a) what we ultimately want with all the Copycat patches and b) what we aim to cover in this initial patch. My hope is that we can use a WIP patch (after the current obvious deficiencies are addressed) while recognizing that we want to make iterative progress with a bunch of subsequent PRs.

@asfbot

This comment has been minimized.

Copy link

commented Jul 27, 2015

kafka-trunk-git-pr #52 FAILURE
Looks like there's a problem with this pull request

@ijuma

This comment has been minimized.

Copy link

commented on build.gradle in 11981d2 Jul 27, 2015

Can we use variables to avoid duplication of version numbers for slf4j and junit?

@ijuma

This comment has been minimized.

@ijuma

This comment has been minimized.

The class doesn't actually have type parameters. Were they removed?

This comment has been minimized.

Copy link
Owner Author

replied Jul 31, 2015

See other note about how they kept on ending up being Object anyway. I've removed this javadoc for now.

@ijuma

This comment has been minimized.

General comment, not specific to this PR: we should consider adding some helpers for hashCode, equals and toString (if they don't already exist) to reduce the boilerplate for classes such as this.

@ijuma

This comment has been minimized.

I'd prefer a static factory method called parse.

This comment has been minimized.

Copy link
Owner Author

replied Jul 31, 2015

Removed this entire class as it was redundant anyway. Previously it only existed so the API didn't rely on the Kafka jar when I had this code in a separate prototype, but now that we're in the Kafka code anyway it doesn't matter.

@ijuma

This comment has been minimized.

Do we want to allow this? Additional information tends to be useful. CopycatException doesn't include a no-args constructor.

@ijuma

This comment has been minimized.

The superclass does this check too.

@ijuma

This comment has been minimized.

I think that in Kafka, we don't use braces for single-line statements right? There are quite a number of cases of this.

@ijuma

This comment has been minimized.

I'm a bit uneasy at the fact that we have a concrete class inheriting from another concrete class.

This comment has been minimized.

Copy link
Owner Author

replied Jul 31, 2015

Changed CopycatRecord to be abstract. It doesn't have any abstract methods, but that addresses this concern. There should be no need to instantiate CopycatRecord directly, it's really just there to share some code and make it clear which data is common to both import and export records.

This comment has been minimized.

Copy link

replied Jul 31, 2015

Good. That way we control instantiations so we have more flexibility if/when we need to change it.

@ijuma

This comment has been minimized.

offsets = new HashMap<>();

@ijuma

This comment has been minimized.

Typically this would be Converter<S, T> or something like that. Has it been established that there is no benefit in having type parameters here?

This comment has been minimized.

Copy link
Owner Author

replied Jul 31, 2015

I think the question of type parameters came up elsewhere too. I original had them all over the place. The reason I moved away from them was that I ended up just using Object all over the place anyway so the type parameters just added noise. I think this ends up being the case no matter what underlying serialization format you use as long as you support using primitive types directly. For example, with Confluent's Avro serializers you might think you could use IndexedRecord or GenericRecord, but if you want to be able to serialize (or in this case convert) plain integers or strings, you end up using Object so you can pass in Integer, String, GenericRecord, etc.

However, maybe this isn't always the case. For example, I replaced the Avro implementation with JSON so we could have something built-in without extra dependencies. In that case, I am always decoding to Jackson's tree model and plain primitive objects aren't use. They always get wrapped in a JsonNode representation. In that case, maybe having the type parameter would provide a bit more information for the compiler to check.

This comment has been minimized.

Copy link

replied Jul 31, 2015

Yes, I understand that they didn't provide much value for Avro and other serialisers that support unwrapped primitive types. So the question is: do we want this to be generic or not?

There are a number of serialisation libraries that restrict the message type: Jackson's TreeModel (JsonNode) as you pointed out, Protobuf (Message), Thrift (TBase), any Scala Json library, any Scala type-class based serialisation library (which is the standard these days), etc. We could provide a CastingConverter to be used for libraries like Avro, but it seems to me that it would be better to be more principled in the top-level Converter (if this is meant to be used with various serialisation libraries).

This comment has been minimized.

Copy link
Owner Author

replied Aug 1, 2015

Hmm, so this raises an interesting point. We can definitely add a type parameter to Converter. It probably only makes sense to add one (for the serialization library's type). The second type should always be fixed since it's the Copycat data type -- that's Object for now, but might change once KAKFKA-2367 is resolved.

However, thinking through the impact of this made a couple of things apparent:

  • We probably need key.converter and value.converter settings. Since we allow setting key.serializer and value.serializer differently, it's possible that we need different converters.
  • If there's any variation in which serializers are used between topics, we need to be able to configure all of these parameters on a per connecter basis. For example, a sink connector A that wants to ingest an existing topic with raw string keys (via StringSerializer) and complex values (via an Avro serializer) will need to use StringDeserializer for keys and an Avro deserializer for values. But presumably any normal Copycat source connector S would use an Avro serializer and deserializer, which would imply downstream copycat sink connector B would also need to use those. A and B have mismatching key deserializers, suggesting these need to be configured per-connector (or have a default and be overrideable).
  • If some connectors should be able to use raw primitive types, e.g. use a StringSerializer to get the keys to be raw strings, then we'd need to bring offset.key.serializer and offset.value.serializer back. Currently we use the same serializer for data and offsets, but some serializers could be too restrictive for the source system's offsets. (Actually, I think we'd catch this at the Converter since we'd have to have a StringConverter to match StringSerializer and the type checking would fail there).

I think some of these are not real issues if you already standardize on a general purpose serialization format for everything (i.e. never use raw strings or other primitive types for keys, always use something like Avro for all keys and values). However, this may not be the case for everyone. I'd imagine for something much more free-form and schemaless like JSON, it might be common to use strings as keys.

@ijuma

This comment has been minimized.

Typo "an set" -> "a set".

This comment has been minimized.

Copy link

replied Jul 27, 2015

Also, maybe all occurrences of set should be replaced with list.

@ijuma

This comment has been minimized.

Scala has a similar method and it's called grouped. The difference is that it takes the number of elements per group instead of the number of groups.

This comment has been minimized.

Copy link
Owner Author

replied Jul 31, 2015

It's easy to switch between those two, but in this case I think the number of groups is probably more intuitive since this is meant to be usable for the most common variant of assigning partitions to tasks where you have a list of partitions and are given a # of tasks to split them up into.

@ijuma

This comment has been minimized.

StringJoiner was introduced in Java 8, so strictly speaking it's only missing up to Java 7.

@ijuma

This comment has been minimized.

Do we need some javadoc here?

@ijuma

This comment has been minimized.

Copy link
Contributor

commented Jul 27, 2015

Great to see this!

I did a partial review of the first commit. I didn't continue because I am a bit unsure about which parts are meant to be discussed separately as part of KAFKA-2367 and also the level of review we want to do at this stage (e.g. do we care about code convention issues).

I understand the desire to get some initial code merged and to iterate in subsequent PRs. At the same time, it's a significant chunk of code. Once it's merged, the focus will shift and it's unclear to me exactly which parts will be rewritten and which parts are here to stay.

Thoughts?

@ewencp

This comment has been minimized.

Copy link
Contributor Author

commented Jul 27, 2015

Yeah, hard to say how we should balance just getting some code in place vs. actually being happy with the state of it all. At a minimum, definitely point out code convention stuff since those are trivial to fix anyway. This code was originally under Confluent's code styling and I reformatted, so there are likely a bunch of places I missed cleaning up. One thing I've noticed consistently is that since we use shorter lines, formatting often ends up a bit messier and spread across multiple lines and automatic reformatting won't fix that.

For KAFKA-2367, the intent in that JIRA was to focus on the API currently in copycat-data, which is curruently just a stripped down Avro with all the serialization code removed. I had initially left out "data" from the title of the JIRA which made it sound a bit different, despite the description being clearer. It'd be a good idea to come to some agreement on the APIs in copycat-api in this JIRA since they affect many of the other JIRAs (and the runtime model for connectors).

}

@Override
public List<Properties> getTaskConfigs(int maxTasks) {

This comment has been minimized.

Copy link
@gwenshap

gwenshap Aug 12, 2015

Contributor

maxTasks was in the connector configuration file, but in this case, any more than a single task will completely break the source. This should not be something that users can modify?

I keep bumping into the idea that there should be few logical "layers" of configurations. Will think about it a bit more.

This comment has been minimized.

Copy link
@ewencp

ewencp Aug 12, 2015

Author Contributor

I don't think we can avoid letting the user config this -- they need to be able to specify the maximum level of parallelism they want since each task gets its own thread. But maxTasks is meant to just be a suggestion, connectors can always ignore it if they simply cannot generate more tasks.

We could potentially require connectors to specify how many tasks they can generate, but I think this is only useful for a certain class of connectors that simply do not support parallelism at all (i.e. 1 partition in the source data).

This comment has been minimized.

Copy link
@gwenshap

gwenshap Aug 12, 2015

Contributor

Makes sense. Basically allowing users to throttle connectors.

/**
* Implementation of Converter that uses JSON to store schemas and objects.
*/
public class JsonConverter implements Converter<JsonNode> {

This comment has been minimized.

Copy link
@gwenshap

gwenshap Aug 12, 2015

Contributor

This has the benefit of discouraging anyone who wants to implement their own convertor ;)

* limitations under the License.
**/

package org.apache.kafka.copycat.cli;

This comment has been minimized.

Copy link
@gwenshap

gwenshap Aug 12, 2015

Contributor

I think we will want a CLI wrapper for the REST API at some point. Perhaps choose a name that will make it easier to understand the difference?

This comment has been minimized.

Copy link
@ewencp

ewencp Aug 12, 2015

Author Contributor

Not sure I understand the problem here. Couldn't the CLI for running standalone/distributed workers and for executing REST API calls live under this namespace? Or is it just that cli is not clear?

This comment has been minimized.

Copy link
@gwenshap

gwenshap Aug 12, 2015

Contributor

yeah, same namespace is fine.

But right now "copycat --stuff" basically starts a single copycat flow and a single worker.
In the future "copycat --stuff" would do the same, but also submit jobs to the cluster, maybe report on cluster status, etc.

It made sense to me to separate the "run something here and now" and "control cluster" CLIs into two different commands.

* in workers. The assignment and tracking of those tasks in workers is also managed by the
* coordinator.
*/
public interface Coordinator {

This comment has been minimized.

Copy link
@gwenshap

gwenshap Aug 12, 2015

Contributor

This seems like a central component, but completely missing from KIP-26?

This comment has been minimized.

Copy link
@ewencp

ewencp Aug 12, 2015

Author Contributor

KIP-26 omitted a lot of implementation details. To be honest, I'm not certain how large this interface will get yet. I think it will have to contain most of the functionality that is exposed via the REST API since that implementation will essentially just be invoking methods on this class, but how large will that end up being.

Originally the thought for this class is that the DistributedCoordinator would be a significantly more complex implementation than the standalone version. However, if we are able to leverage the consumer coordinator code, then it may not even be that complex -- mostly just handles forwarding to the current leader (so, e.g., connector configs are only written by a single worker at any given time).

This comment has been minimized.

Copy link
@gwenshap

gwenshap Aug 12, 2015

Contributor

I meant something a bit different - it is pretty unclear what the coordinator actually does (especially in stand-alone mode).

I want more detailed explanation, either here or in the KIP.

This comment has been minimized.

Copy link
@ewencp

ewencp Aug 13, 2015

Author Contributor

Ok, that makes sense. I've tried to clarify it a bit, see if the updated version makes more sense to you.

Another thing to think about -- coordinator is obviously a confusing name since there is already the consumer coordinator in Kafka. I would love to rename this class to something unique within Kafka, but am not coming up with anything better right now. Let me know if you have any ideas.

@asfbot

This comment has been minimized.

Copy link

commented Aug 12, 2015

kafka-trunk-git-pr #130 SUCCESS
This pull request looks good

@@ -204,20 +209,25 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
}
}

tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7', 'clients:jar', 'examples:jar', 'contrib:hadoop-consumer:jar', 'contrib:hadoop-producer:jar', 'log4j-appender:jar', 'tools:jar']) {
def copycatPkgs = ['copycat-data', 'copycat-api', 'copycat-runtime', 'copycat-json', 'copycat-file']

This comment has been minimized.

Copy link
@junrao

junrao Aug 12, 2015

Contributor

A quick comment on packaging. Could we put all copycat projects under a single directory (like what we did contrib/)? Also, is there a need to have different jars for api, data and runtime? It seems that they all need to be used together.

This comment has been minimized.

Copy link
@ewencp

ewencp Aug 12, 2015

Author Contributor

data + api can definitely be combined. I personally like using this separation to enforce clean layering, but there's not much risk of messing that up with those two.

The reason for splitting api and runtime is that connector developers should only need the api jar. Again, this is a way to really force proper layering, be clear about what we want to expose as public API to connector developers, etc.

private Converter<K> keyConverter;
private Converter<V> valueConverter;
private OffsetBackingStore offsetBackingStore;
private Serializer<K> offsetKeySerializer;

This comment has been minimized.

Copy link
@gwenshap

gwenshap Aug 12, 2015

Contributor

This will go on the mailing list, but I still think this is too much complexity too soon:
Offsets can be persisted with the data ser/de (if we think users care) or with our own format (if we think they don't). Separate pluggable serializers is complexity that doesn't buy much.

key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer

offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore

This comment has been minimized.

Copy link
@gwenshap

gwenshap Aug 12, 2015

Contributor

Maybe enum instead of class name... I don't if we want to encourage people to write their own offset stores.

This comment has been minimized.

Copy link
@ewencp

ewencp Aug 13, 2015

Author Contributor

Yeah, this was more to provide an easy way to switch between standalone mode (local, file-based offset storage) and distributed mode (stored in Kafka). But maybe we don't need a config for this since the implementation is fixed in each mode.

This comment has been minimized.

Copy link
@gwenshap

gwenshap Aug 14, 2015

Contributor

Thank you!

* behaves similarly to a real backing store, operations are executed asynchronously on a
* background thread.
*/
public class MemoryOffsetBackingStore implements OffsetBackingStore {

This comment has been minimized.

Copy link
@gwenshap

gwenshap Aug 12, 2015

Contributor

I assume this is just for testing?

@ewencp

This comment has been minimized.

Copy link
Contributor Author

commented Aug 13, 2015

@junrao I reorganized everything under a copycat directory, let me know if the new organization makes more sense. On merging data + api, given the discussion in https://issues.apache.org/jira/browse/KAFKA-2367, we may just drop the data package altogether. I personally prefer keeping the api in a separate jar from the runtime, which I think ensures very clear boundaries for connector developers.

@gwenshap Followed up on a bunch of your comments. I think I've addressed all your comments that weren't going to be discussed more on the mailing list.

I also marked a bunch of public classes in copycat as unstable since we now have that annotation. I didn't mark everything however. I just focused on the classes that are really public and "entry points" to the API. We can do this more comprehensively, but I wasn't sure it really makes sense (e.g. will marking WorkerSinkTaskThread as unstable really help anyone anyway?). Let me know if you think we should mark every class.

Finally, any thoughts on the exception hierarchy? Shall I get rid of the checked vs. runtime exception distinction? And shall I make CopycatException inherit from KafkaException? I'm thinking this may become important for embedded mode, where in some cases it would be nice to be able to catch any Copycat/Kstreams/Kafka client exception with a single catch block.

@asfbot

This comment has been minimized.

Copy link

commented Aug 13, 2015

kafka-trunk-git-pr #136 SUCCESS
This pull request looks good

Make Copycat CLI speific to standalone mode, clean up some config and…
… get rid of config storage in standalone mode.
@asfbot

This comment has been minimized.

Copy link

commented Aug 13, 2015

kafka-trunk-git-pr #139 SUCCESS
This pull request looks good

@gwenshap

This comment has been minimized.

Copy link
Contributor

commented Aug 14, 2015

I agree there's no need to mark the stability of the internals. The whole point is to set expectations of external users.

Regarding exceptions:

  • I don't have strong opinions on whether CopycatException should be KafkaException. I think my imagination failed on how Copycat will be used in embedded mode.
  • Yeah, I think the checked vs. runtime is a bit unclear right now. Either document the specific distinctions with some guidelines on when to use each, or get rid of it for now.

And thank you for getting rid of the storage class config. I found that its easier to add configuration later than to remove it.

@ijuma

This comment has been minimized.

Copy link
Contributor

commented Aug 14, 2015

@ewencp, inheriting from KafkaException makes sense to me as I don't see any particular downsides. Do you?

Regarding checked versus unchecked exceptions, I think checked exceptions tend to be awkward as they don't compose (it's classified as a failed experiment by many). Unless the benefit is clear and we have clear guidelines on when we use checked versus when we use unchecked, I'd just drop the distinction.

@ewencp

This comment has been minimized.

Copy link
Contributor Author

commented Aug 14, 2015

Ok, I want to just drop the checked exceptions and I'll inherit from KafkaException since it unifies things and I think has little consequence. @gwenshap hold off on the merge until I have a chance to update these, please!

And thanks for the feedback everyone!

@asfbot

This comment has been minimized.

Copy link

commented Aug 14, 2015

kafka-trunk-git-pr #144 SUCCESS
This pull request looks good

@asfgit asfgit closed this in f6acfb0 Aug 14, 2015

@becketqin

This comment has been minimized.

Copy link
Contributor

commented Aug 26, 2015

Just went finish going through the code for first pass. Well structured code. A late comment, in which case would the runtime call Connector.reconfigure() instead of call ConnectorContext.requestTaskReconfiguration()? Also, would it worth having Initializable/Configurable/Startable/Shutdownable interface?

@ewencp

This comment has been minimized.

Copy link
Contributor Author

commented Aug 26, 2015

@becketqin ConnectorContext.requestTaskReconfiguration is for Connector implementations to call to indicate that some event happened that requires updating the tasks. For example, a JDBC connector would invoke it if it noticed a table was added since that would require work to be redistributed among tasks. Connector.reconfigure is called by the framework to update the configuration; for example, if a user submitted a modified connector configuration (say, added another connection URL and credentials to start copying another database, or added a table to the whitelist of tables), then the framework would invoke this to notify the connector of changes. The default implementation just stops and restarts the connector -- you only need to override reconfigure if stopping and restarting is too expensive. To be honest, I'm not sure this is valuable -- we may end up wanting to remove this just to keep things simpler, especially since we can add it with the default implementation in a later version if we want to.

On the interfaces, I considered it. The class hierarchy is already a bit more complex than I'd like and I didn't actually have a use for those interfaces. Since added implements to a class is backwards compatible, I figured it's better to only add them once we see a use for them. The only drawback to not adding them now is that if we end up with inconsistent method signatures we won't be able to add the interfaces in the future.

stanislavkozlovski pushed a commit to stanislavkozlovski/kafka that referenced this pull request Jan 20, 2019

CONFLUENT: Make min.insync.replicas visible but read-only in topic de… (
apache#99)

CONFLUENT: Make min.insync.replicas visible but read-only in topic describe
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.