New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plans to support upcoming Kafka v0.11 ? #901

Open
dvsekhvalnov opened this Issue Jun 23, 2017 · 27 comments

Comments

Projects
None yet
@dvsekhvalnov
Contributor

dvsekhvalnov commented Jun 23, 2017

Any plans so far? 0.11 is about to be released fairly soon, adding support for message headers.

Thanks.

@eapache

This comment has been minimized.

Show comment
Hide comment
@eapache

eapache Jun 23, 2017

Contributor

I would like to, but I have no time to work on it in the next little while. I'm happy to accept pull requests though.

Contributor

eapache commented Jun 23, 2017

I would like to, but I have no time to work on it in the next little while. I'm happy to accept pull requests though.

@kanekv

This comment has been minimized.

Show comment
Hide comment
@kanekv

kanekv Jul 11, 2017

We plan to add support for that, should we agree about transaction API before any work starts on this?

kanekv commented Jul 11, 2017

We plan to add support for that, should we agree about transaction API before any work starts on this?

@tomsapps

This comment has been minimized.

Show comment
Hide comment
@tomsapps

tomsapps Jul 15, 2017

It'd be awesome if we could start using the projects tab to make a 0.11 roadmap. I'm sure that'd bring more people together to work on v0.11 support

tomsapps commented Jul 15, 2017

It'd be awesome if we could start using the projects tab to make a 0.11 roadmap. I'm sure that'd bring more people together to work on v0.11 support

@17twenty

This comment has been minimized.

Show comment
Hide comment
@17twenty

17twenty Sep 18, 2017

What's the extent of work on this? Is there anything outlined as would be happy to pick it up if there's some low hanging fruit?

17twenty commented Sep 18, 2017

What's the extent of work on this? Is there anything outlined as would be happy to pick it up if there's some low hanging fruit?

@eapache

This comment has been minimized.

Show comment
Hide comment
@eapache

eapache Sep 18, 2017

Contributor

There's a bunch of work that can be done here, and a lot of the initial stuff is pretty straightforward low-hanging fruit.

The first batch of work is all foundational:

  • Add all the new errors (codes 45-55) to errors.go (easy).
  • Implement all the new request/response objects (keys 19-33) with the appropriate encode/decode methods and tests (easy). Each request/response pair can be done separately/in-parallel.

With that in place there's then a bunch of other improvements we can make to expose new functionality using the new request/response objects. Some of this should be pretty easy (e.g. a Client.AddTopic method) and some of it will be quite complicated (e.g. transaction support in the producer).

The other piece of work for 0.11 is to adapt to the new Message and MessageSet structures. The pure wire format changes shouldn't be too hard (though doing them without breaking old versions won't be entirely trivial). However, the switch to a non-recursive compressions structure and varint encoding looks really messy if we want to maintain backwards compatibility. I haven't had time to really dig into what would be required here.

Contributor

eapache commented Sep 18, 2017

There's a bunch of work that can be done here, and a lot of the initial stuff is pretty straightforward low-hanging fruit.

The first batch of work is all foundational:

  • Add all the new errors (codes 45-55) to errors.go (easy).
  • Implement all the new request/response objects (keys 19-33) with the appropriate encode/decode methods and tests (easy). Each request/response pair can be done separately/in-parallel.

With that in place there's then a bunch of other improvements we can make to expose new functionality using the new request/response objects. Some of this should be pretty easy (e.g. a Client.AddTopic method) and some of it will be quite complicated (e.g. transaction support in the producer).

The other piece of work for 0.11 is to adapt to the new Message and MessageSet structures. The pure wire format changes shouldn't be too hard (though doing them without breaking old versions won't be entirely trivial). However, the switch to a non-recursive compressions structure and varint encoding looks really messy if we want to maintain backwards compatibility. I haven't had time to really dig into what would be required here.

@wladh

This comment has been minimized.

Show comment
Hide comment
@wladh

wladh Sep 19, 2017

Contributor

I have already started (and almost finished) the work on using the new wire format for messages (now called Record and RecordBatch), but of course without all the transactional support. We mostly need it for the Headers extension. The way I implemented it for now was to add a separate field to the FetchResponseBlock for RecordBatch and I split consumer.parseResponse into separate functions dealing with old and new versions (based on the API version). And I'm implementing similar thing for producer path.
I thought I replied to this issue before, but it seems I didn't.

Contributor

wladh commented Sep 19, 2017

I have already started (and almost finished) the work on using the new wire format for messages (now called Record and RecordBatch), but of course without all the transactional support. We mostly need it for the Headers extension. The way I implemented it for now was to add a separate field to the FetchResponseBlock for RecordBatch and I split consumer.parseResponse into separate functions dealing with old and new versions (based on the API version). And I'm implementing similar thing for producer path.
I thought I replied to this issue before, but it seems I didn't.

@dvsekhvalnov

This comment has been minimized.

Show comment
Hide comment
@dvsekhvalnov

dvsekhvalnov Oct 12, 2017

Contributor

Just curious, if anybody made Headers part work? Or still planning to submit patch request?

Thanks.

Contributor

dvsekhvalnov commented Oct 12, 2017

Just curious, if anybody made Headers part work? Or still planning to submit patch request?

Thanks.

@wladh

This comment has been minimized.

Show comment
Hide comment
@wladh

wladh Oct 12, 2017

Contributor

Yes. I have it working and I just need to add a couple more tests and will post it tomorrow, hopefully.

Contributor

wladh commented Oct 12, 2017

Yes. I have it working and I just need to add a couple more tests and will post it tomorrow, hopefully.

@wladh

This comment has been minimized.

Show comment
Hide comment
@wladh

wladh Nov 3, 2017

Contributor

After merging the PRs above, we support the new kafka 0.11 message format, including the ability to set headers.
There's no support for transactions or idempotent messages. I'm not planning to work on supporting these as I don't have an immediate use case for them.

Contributor

wladh commented Nov 3, 2017

After merging the PRs above, we support the new kafka 0.11 message format, including the ability to set headers.
There's no support for transactions or idempotent messages. I'm not planning to work on supporting these as I don't have an immediate use case for them.

@mastersingh24

This comment has been minimized.

Show comment
Hide comment
@mastersingh24

mastersingh24 Nov 5, 2017

@wladh - does this also mean that you support Kafka v1.0.0 as well (with the same exceptions as above)?

mastersingh24 commented Nov 5, 2017

@wladh - does this also mean that you support Kafka v1.0.0 as well (with the same exceptions as above)?

@wladh

This comment has been minimized.

Show comment
Hide comment
@wladh

wladh Nov 5, 2017

Contributor

@mastersingh24 - I didn't test it, but looking at the code, it seems Kafka 1.0.0 uses the same record format version as Kafka 0.11, so it should work.

Contributor

wladh commented Nov 5, 2017

@mastersingh24 - I didn't test it, but looking at the code, it seems Kafka 1.0.0 uses the same record format version as Kafka 0.11, so it should work.

@wladh

This comment has been minimized.

Show comment
Hide comment
@wladh

wladh Nov 9, 2017

Contributor

@eapache - are you planning to release a new version anytime soon?

Contributor

wladh commented Nov 9, 2017

@eapache - are you planning to release a new version anytime soon?

@eapache

This comment has been minimized.

Show comment
Hide comment
@eapache

eapache Nov 10, 2017

Contributor

Yes, sorry, thanks for reminding me. Monday, probably.

Contributor

eapache commented Nov 10, 2017

Yes, sorry, thanks for reminding me. Monday, probably.

@eapache

This comment has been minimized.

Show comment
Hide comment
@eapache

eapache Nov 13, 2017

Contributor

v1.14 released with record-batch support.

Contributor

eapache commented Nov 13, 2017

v1.14 released with record-batch support.

@kjelle

This comment has been minimized.

Show comment
Hide comment
@kjelle

kjelle Dec 4, 2017

Hello. Is there a plan (any other Issue or MR) to support the newly added Kafka transactions?

kjelle commented Dec 4, 2017

Hello. Is there a plan (any other Issue or MR) to support the newly added Kafka transactions?

@eapache

This comment has been minimized.

Show comment
Hide comment
@eapache

eapache Dec 4, 2017

Contributor

I am not working on it, since we don’t need it at the moment. I don’t know if anybody else is planning to submit a patch.

Contributor

eapache commented Dec 4, 2017

I am not working on it, since we don’t need it at the moment. I don’t know if anybody else is planning to submit a patch.

@buyology

This comment has been minimized.

Show comment
Hide comment
@buyology

buyology Dec 4, 2017

Contributor

I have a working version of both consuming and producing using the transactional features that I'll try to finish off soon.

Would love some input on the API for producing, which currently looks like this:

type TransactionalProducer interface {
    AsyncProducer

    Begin() error
    CommitOffsets(groupID string, offsets map[string][]*PartitionOffsetMetadata) error
    Commit() error
    Abort() error
}

So producing looks something like so (as usual, errors ignored for brevity):

addr := []string{"localhost:9092"}

conf := NewConfig()
transactionID := "transaction_id"
conf.Producer.Transaction.ID = &transactionID
conf.Producer.Transaction.Timeout = 1 * time.Second
conf.Version = V1_0_0_0

// Embeds an `AsyncProducer`, and enforces necessary config constraints.
// Sends off the `InitProducerID`-request with the `TransactionID`.
producer, _ := NewTransactionalProducer(addr, conf)

go func() {
    for err := range producer.Errors() {
        // If we encounter a producer error the whole transaction
        // is aborted. Any messages sent to `Input()` will be ignored
        // and returned on this error channel. 
        // Any calls to `Abort()`/`Commit()` will be no-ops
        // and a new transaction has to be started.
    }
}()

_ = producer.Begin()

// Adds sequence numbers and sends `AddPartitionsToTxn` request when encountering
// previously unseen topic-partitions.
producer.Input() <- &ProducerMessage{Topic: topic, Value: StringEncoder("hello")}

// Commit offsets through producer (`AddOffsetsToTxn` + `TxnOffsetCommit`)
_ = producer.CommitOffsets("groupid", map[string][]*PartitionOffsetMetadata{
    "topic": []*PartitionOffsetMetadata{
        {
            Offset:    11,
            Partition: 0,
        },
}})

// Will drain the messages and then send `EndTxn`.
_ = producer.Abort()
// or
_ = producer.Commit()

@eapache — what do you think would be the best way to submit this?

Thinking of splitting it into:
a) the request/response pairs (InitProducerID, AddPartitionsToTxn, AddOffsetsToTxn, EndTxn, TxnOffsetCommit)
b) the idempotent producer
c) the transactional producer + consumer (simplifies test design to submit these together).

Contributor

buyology commented Dec 4, 2017

I have a working version of both consuming and producing using the transactional features that I'll try to finish off soon.

Would love some input on the API for producing, which currently looks like this:

type TransactionalProducer interface {
    AsyncProducer

    Begin() error
    CommitOffsets(groupID string, offsets map[string][]*PartitionOffsetMetadata) error
    Commit() error
    Abort() error
}

So producing looks something like so (as usual, errors ignored for brevity):

addr := []string{"localhost:9092"}

conf := NewConfig()
transactionID := "transaction_id"
conf.Producer.Transaction.ID = &transactionID
conf.Producer.Transaction.Timeout = 1 * time.Second
conf.Version = V1_0_0_0

// Embeds an `AsyncProducer`, and enforces necessary config constraints.
// Sends off the `InitProducerID`-request with the `TransactionID`.
producer, _ := NewTransactionalProducer(addr, conf)

go func() {
    for err := range producer.Errors() {
        // If we encounter a producer error the whole transaction
        // is aborted. Any messages sent to `Input()` will be ignored
        // and returned on this error channel. 
        // Any calls to `Abort()`/`Commit()` will be no-ops
        // and a new transaction has to be started.
    }
}()

_ = producer.Begin()

// Adds sequence numbers and sends `AddPartitionsToTxn` request when encountering
// previously unseen topic-partitions.
producer.Input() <- &ProducerMessage{Topic: topic, Value: StringEncoder("hello")}

// Commit offsets through producer (`AddOffsetsToTxn` + `TxnOffsetCommit`)
_ = producer.CommitOffsets("groupid", map[string][]*PartitionOffsetMetadata{
    "topic": []*PartitionOffsetMetadata{
        {
            Offset:    11,
            Partition: 0,
        },
}})

// Will drain the messages and then send `EndTxn`.
_ = producer.Abort()
// or
_ = producer.Commit()

@eapache — what do you think would be the best way to submit this?

Thinking of splitting it into:
a) the request/response pairs (InitProducerID, AddPartitionsToTxn, AddOffsetsToTxn, EndTxn, TxnOffsetCommit)
b) the idempotent producer
c) the transactional producer + consumer (simplifies test design to submit these together).

@eapache

This comment has been minimized.

Show comment
Hide comment
@eapache

eapache Dec 4, 2017

Contributor

Re. API my biggest question/concern is whether it even makes sense to add txn support to the async producer, or whether it should just be a feature of the sync producer? In the example code you provide, there is no guarantee that those messages have even been produced yet when you call Commit; you would have to watch for the appropriate Success/Error channel response which seems like a lot of work and easy to get wrong.

Re. submission, that sounds like a reasonable split but I can't really say for sure without seeing the code :)

Contributor

eapache commented Dec 4, 2017

Re. API my biggest question/concern is whether it even makes sense to add txn support to the async producer, or whether it should just be a feature of the sync producer? In the example code you provide, there is no guarantee that those messages have even been produced yet when you call Commit; you would have to watch for the appropriate Success/Error channel response which seems like a lot of work and easy to get wrong.

Re. submission, that sounds like a reasonable split but I can't really say for sure without seeing the code :)

@buyology

This comment has been minimized.

Show comment
Hide comment
@buyology

buyology Dec 5, 2017

Contributor

Yes, I agree — it probably makes more sense to base it off the SyncProducer for the reasons you mention. And it makes error handling significantly easier. The only reason for going that async route is because the Java producer seems to do everything async (incl transactions) and I thought this was the API people was more or less expecting.

Contributor

buyology commented Dec 5, 2017

Yes, I agree — it probably makes more sense to base it off the SyncProducer for the reasons you mention. And it makes error handling significantly easier. The only reason for going that async route is because the Java producer seems to do everything async (incl transactions) and I thought this was the API people was more or less expecting.

@tcrayford

This comment has been minimized.

Show comment
Hide comment
@tcrayford

tcrayford Dec 5, 2017

Contributor

as a note, this is mostly due to previous API decisions made in sarama by the async producer (the channel based mechanism for waiting for Success and Error). The Java producer is async by default, but can wait for messages per send trivially. I don't think using the sync producer for transactional messaging counts as "full support", as the sync producer is so slow as to make it unusable except at very low throughput. In our testing adding transactions to the JVM producer doesn't change performance very much at all if the transaction batches are set large enough.

Contributor

tcrayford commented Dec 5, 2017

as a note, this is mostly due to previous API decisions made in sarama by the async producer (the channel based mechanism for waiting for Success and Error). The Java producer is async by default, but can wait for messages per send trivially. I don't think using the sync producer for transactional messaging counts as "full support", as the sync producer is so slow as to make it unusable except at very low throughput. In our testing adding transactions to the JVM producer doesn't change performance very much at all if the transaction batches are set large enough.

@eapache

This comment has been minimized.

Show comment
Hide comment
@eapache

eapache Dec 6, 2017

Contributor

I'm not entirely clear on how the java producer works, but in principle if you don't wait for messages then transactions don't provide any value at all because you don't know whether a message is present in the transaction or not when you commit?

@tcrayford I'm curious what kind of code you would expect to be able to write with transactions in the async producer?

FWIW, the sync producer is a tiny wrapper on the async producer, so it can provide exactly the same performance if you use SendMessages with appropriately large batches, or just call SendMessage from multiple goroutines.

Contributor

eapache commented Dec 6, 2017

I'm not entirely clear on how the java producer works, but in principle if you don't wait for messages then transactions don't provide any value at all because you don't know whether a message is present in the transaction or not when you commit?

@tcrayford I'm curious what kind of code you would expect to be able to write with transactions in the async producer?

FWIW, the sync producer is a tiny wrapper on the async producer, so it can provide exactly the same performance if you use SendMessages with appropriately large batches, or just call SendMessage from multiple goroutines.

@kjelle

This comment has been minimized.

Show comment
Hide comment
@kjelle

kjelle Jan 3, 2018

@buyology - do you have an estimate on when it would be possible to read your pull request for transactions?

kjelle commented Jan 3, 2018

@buyology - do you have an estimate on when it would be possible to read your pull request for transactions?

@buyology

This comment has been minimized.

Show comment
Hide comment
@buyology

buyology Jan 3, 2018

Contributor

@kjelle — no concrete estimate, but I got the gist of it working, so need to wrap up each part in due order. Pushing it forward now a bit by PR:ing the related protocol messages needed to implement transactional producing in #1016 and (modifications to FindCoordinator in #1010).

Re: the API discussion above, to move things forward, I guess it would make sense to just do it peu à peu by first implementing the sync producer-version, and if we find a way for it to make sense — later add async as well.

Contributor

buyology commented Jan 3, 2018

@kjelle — no concrete estimate, but I got the gist of it working, so need to wrap up each part in due order. Pushing it forward now a bit by PR:ing the related protocol messages needed to implement transactional producing in #1016 and (modifications to FindCoordinator in #1010).

Re: the API discussion above, to move things forward, I guess it would make sense to just do it peu à peu by first implementing the sync producer-version, and if we find a way for it to make sense — later add async as well.

@savaki

This comment has been minimized.

Show comment
Hide comment
@savaki

savaki Jan 18, 2018

Late to this conversation, but I believe one of the expressed goals of the transaction api was to allow for async transactions. In fact, the "read uncommitted" transaction mode was motivated to support this need. Requiring the sync producer to use transactions would dramatically reduce the system throughput.

savaki commented Jan 18, 2018

Late to this conversation, but I believe one of the expressed goals of the transaction api was to allow for async transactions. In fact, the "read uncommitted" transaction mode was motivated to support this need. Requiring the sync producer to use transactions would dramatically reduce the system throughput.

@buyology

This comment has been minimized.

Show comment
Hide comment
@buyology

buyology Jan 19, 2018

Contributor

@savaki — The purpose of this is not to require the sync producer to use transactions but rather build the transactional producer on top of the sync producer.

Records sent in a transaction needs to be flushed before a commit-request can be issued and with the sync producer it's easier to signal to the user that a new transaction cannot be begun while another transaction is in-flight. (Rather then, if you want initiate a second transaction concurrently you'd have to instantiate a second producer.)

Contributor

buyology commented Jan 19, 2018

@savaki — The purpose of this is not to require the sync producer to use transactions but rather build the transactional producer on top of the sync producer.

Records sent in a transaction needs to be flushed before a commit-request can be issued and with the sync producer it's easier to signal to the user that a new transaction cannot be begun while another transaction is in-flight. (Rather then, if you want initiate a second transaction concurrently you'd have to instantiate a second producer.)

@samv

This comment has been minimized.

Show comment
Hide comment
@samv

samv Feb 22, 2018

I'm not entirely clear on how the java producer works, but in principle if you don't wait for messages then transactions don't provide any value at all because you don't know whether a message is present in the transaction or not when you commit?

If you check an example eg here, you can see that you explicitly add messages produced and the offsets committed to the transaction. So you do know whether a message is in the transaction - you added it.

IMHO this provides a lot of value - I'd call it the milestone where streaming systems are as general purpose a business rule paradigm as an RDBMS (assuming, that is, you are using "local state" paradigm, i.e. rocksdb/leveldb backing to a kafka topic). Just without global mutable state, and able to support much higher scales of throughput even with complex transactions.

samv commented Feb 22, 2018

I'm not entirely clear on how the java producer works, but in principle if you don't wait for messages then transactions don't provide any value at all because you don't know whether a message is present in the transaction or not when you commit?

If you check an example eg here, you can see that you explicitly add messages produced and the offsets committed to the transaction. So you do know whether a message is in the transaction - you added it.

IMHO this provides a lot of value - I'd call it the milestone where streaming systems are as general purpose a business rule paradigm as an RDBMS (assuming, that is, you are using "local state" paradigm, i.e. rocksdb/leveldb backing to a kafka topic). Just without global mutable state, and able to support much higher scales of throughput even with complex transactions.

@Destidom

This comment has been minimized.

Show comment
Hide comment
@Destidom

Destidom Sep 19, 2018

Hi, is there any updates on transactions or idempotent messages? Any ETA?

Destidom commented Sep 19, 2018

Hi, is there any updates on transactions or idempotent messages? Any ETA?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment