Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plans to support upcoming Kafka v0.11 ? #901

Closed
dvsekhvalnov opened this issue Jun 23, 2017 · 32 comments
Closed

Plans to support upcoming Kafka v0.11 ? #901

dvsekhvalnov opened this issue Jun 23, 2017 · 32 comments

Comments

@dvsekhvalnov
Copy link
Contributor

Any plans so far? 0.11 is about to be released fairly soon, adding support for message headers.

Thanks.

@eapache
Copy link
Contributor

eapache commented Jun 23, 2017

I would like to, but I have no time to work on it in the next little while. I'm happy to accept pull requests though.

@kanekv
Copy link

kanekv commented Jul 11, 2017

We plan to add support for that, should we agree about transaction API before any work starts on this?

@tomsapps
Copy link

It'd be awesome if we could start using the projects tab to make a 0.11 roadmap. I'm sure that'd bring more people together to work on v0.11 support

@17twenty
Copy link

What's the extent of work on this? Is there anything outlined as would be happy to pick it up if there's some low hanging fruit?

@eapache
Copy link
Contributor

eapache commented Sep 18, 2017

There's a bunch of work that can be done here, and a lot of the initial stuff is pretty straightforward low-hanging fruit.

The first batch of work is all foundational:

  • Add all the new errors (codes 45-55) to errors.go (easy).
  • Implement all the new request/response objects (keys 19-33) with the appropriate encode/decode methods and tests (easy). Each request/response pair can be done separately/in-parallel.

With that in place there's then a bunch of other improvements we can make to expose new functionality using the new request/response objects. Some of this should be pretty easy (e.g. a Client.AddTopic method) and some of it will be quite complicated (e.g. transaction support in the producer).

The other piece of work for 0.11 is to adapt to the new Message and MessageSet structures. The pure wire format changes shouldn't be too hard (though doing them without breaking old versions won't be entirely trivial). However, the switch to a non-recursive compressions structure and varint encoding looks really messy if we want to maintain backwards compatibility. I haven't had time to really dig into what would be required here.

@wladh
Copy link
Contributor

wladh commented Sep 19, 2017

I have already started (and almost finished) the work on using the new wire format for messages (now called Record and RecordBatch), but of course without all the transactional support. We mostly need it for the Headers extension. The way I implemented it for now was to add a separate field to the FetchResponseBlock for RecordBatch and I split consumer.parseResponse into separate functions dealing with old and new versions (based on the API version). And I'm implementing similar thing for producer path.
I thought I replied to this issue before, but it seems I didn't.

@dvsekhvalnov
Copy link
Contributor Author

Just curious, if anybody made Headers part work? Or still planning to submit patch request?

Thanks.

@wladh
Copy link
Contributor

wladh commented Oct 12, 2017

Yes. I have it working and I just need to add a couple more tests and will post it tomorrow, hopefully.

@wladh
Copy link
Contributor

wladh commented Nov 3, 2017

After merging the PRs above, we support the new kafka 0.11 message format, including the ability to set headers.
There's no support for transactions or idempotent messages. I'm not planning to work on supporting these as I don't have an immediate use case for them.

@mastersingh24
Copy link

@wladh - does this also mean that you support Kafka v1.0.0 as well (with the same exceptions as above)?

@wladh
Copy link
Contributor

wladh commented Nov 5, 2017

@mastersingh24 - I didn't test it, but looking at the code, it seems Kafka 1.0.0 uses the same record format version as Kafka 0.11, so it should work.

@wladh
Copy link
Contributor

wladh commented Nov 9, 2017

@eapache - are you planning to release a new version anytime soon?

@eapache
Copy link
Contributor

eapache commented Nov 10, 2017

Yes, sorry, thanks for reminding me. Monday, probably.

@eapache
Copy link
Contributor

eapache commented Nov 13, 2017

v1.14 released with record-batch support.

@kjelle
Copy link
Contributor

kjelle commented Dec 4, 2017

Hello. Is there a plan (any other Issue or MR) to support the newly added Kafka transactions?

@eapache
Copy link
Contributor

eapache commented Dec 4, 2017

I am not working on it, since we don’t need it at the moment. I don’t know if anybody else is planning to submit a patch.

@buyology
Copy link

buyology commented Dec 4, 2017

I have a working version of both consuming and producing using the transactional features that I'll try to finish off soon.

Would love some input on the API for producing, which currently looks like this:

type TransactionalProducer interface {
    AsyncProducer

    Begin() error
    CommitOffsets(groupID string, offsets map[string][]*PartitionOffsetMetadata) error
    Commit() error
    Abort() error
}

So producing looks something like so (as usual, errors ignored for brevity):

addr := []string{"localhost:9092"}

conf := NewConfig()
transactionID := "transaction_id"
conf.Producer.Transaction.ID = &transactionID
conf.Producer.Transaction.Timeout = 1 * time.Second
conf.Version = V1_0_0_0

// Embeds an `AsyncProducer`, and enforces necessary config constraints.
// Sends off the `InitProducerID`-request with the `TransactionID`.
producer, _ := NewTransactionalProducer(addr, conf)

go func() {
    for err := range producer.Errors() {
        // If we encounter a producer error the whole transaction
        // is aborted. Any messages sent to `Input()` will be ignored
        // and returned on this error channel. 
        // Any calls to `Abort()`/`Commit()` will be no-ops
        // and a new transaction has to be started.
    }
}()

_ = producer.Begin()

// Adds sequence numbers and sends `AddPartitionsToTxn` request when encountering
// previously unseen topic-partitions.
producer.Input() <- &ProducerMessage{Topic: topic, Value: StringEncoder("hello")}

// Commit offsets through producer (`AddOffsetsToTxn` + `TxnOffsetCommit`)
_ = producer.CommitOffsets("groupid", map[string][]*PartitionOffsetMetadata{
    "topic": []*PartitionOffsetMetadata{
        {
            Offset:    11,
            Partition: 0,
        },
}})

// Will drain the messages and then send `EndTxn`.
_ = producer.Abort()
// or
_ = producer.Commit()

@eapache — what do you think would be the best way to submit this?

Thinking of splitting it into:
a) the request/response pairs (InitProducerID, AddPartitionsToTxn, AddOffsetsToTxn, EndTxn, TxnOffsetCommit)
b) the idempotent producer
c) the transactional producer + consumer (simplifies test design to submit these together).

@eapache
Copy link
Contributor

eapache commented Dec 4, 2017

Re. API my biggest question/concern is whether it even makes sense to add txn support to the async producer, or whether it should just be a feature of the sync producer? In the example code you provide, there is no guarantee that those messages have even been produced yet when you call Commit; you would have to watch for the appropriate Success/Error channel response which seems like a lot of work and easy to get wrong.

Re. submission, that sounds like a reasonable split but I can't really say for sure without seeing the code :)

@buyology
Copy link

buyology commented Dec 5, 2017

Yes, I agree — it probably makes more sense to base it off the SyncProducer for the reasons you mention. And it makes error handling significantly easier. The only reason for going that async route is because the Java producer seems to do everything async (incl transactions) and I thought this was the API people was more or less expecting.

@tcrayford
Copy link
Contributor

as a note, this is mostly due to previous API decisions made in sarama by the async producer (the channel based mechanism for waiting for Success and Error). The Java producer is async by default, but can wait for messages per send trivially. I don't think using the sync producer for transactional messaging counts as "full support", as the sync producer is so slow as to make it unusable except at very low throughput. In our testing adding transactions to the JVM producer doesn't change performance very much at all if the transaction batches are set large enough.

@eapache
Copy link
Contributor

eapache commented Dec 6, 2017

I'm not entirely clear on how the java producer works, but in principle if you don't wait for messages then transactions don't provide any value at all because you don't know whether a message is present in the transaction or not when you commit?

@tcrayford I'm curious what kind of code you would expect to be able to write with transactions in the async producer?

FWIW, the sync producer is a tiny wrapper on the async producer, so it can provide exactly the same performance if you use SendMessages with appropriately large batches, or just call SendMessage from multiple goroutines.

@kjelle
Copy link
Contributor

kjelle commented Jan 3, 2018

@buyology - do you have an estimate on when it would be possible to read your pull request for transactions?

@buyology
Copy link

buyology commented Jan 3, 2018

@kjelle — no concrete estimate, but I got the gist of it working, so need to wrap up each part in due order. Pushing it forward now a bit by PR:ing the related protocol messages needed to implement transactional producing in #1016 and (modifications to FindCoordinator in #1010).

Re: the API discussion above, to move things forward, I guess it would make sense to just do it peu à peu by first implementing the sync producer-version, and if we find a way for it to make sense — later add async as well.

@savaki
Copy link

savaki commented Jan 18, 2018

Late to this conversation, but I believe one of the expressed goals of the transaction api was to allow for async transactions. In fact, the "read uncommitted" transaction mode was motivated to support this need. Requiring the sync producer to use transactions would dramatically reduce the system throughput.

@buyology
Copy link

@savaki — The purpose of this is not to require the sync producer to use transactions but rather build the transactional producer on top of the sync producer.

Records sent in a transaction needs to be flushed before a commit-request can be issued and with the sync producer it's easier to signal to the user that a new transaction cannot be begun while another transaction is in-flight. (Rather then, if you want initiate a second transaction concurrently you'd have to instantiate a second producer.)

@samv
Copy link

samv commented Feb 22, 2018

I'm not entirely clear on how the java producer works, but in principle if you don't wait for messages then transactions don't provide any value at all because you don't know whether a message is present in the transaction or not when you commit?

If you check an example eg here, you can see that you explicitly add messages produced and the offsets committed to the transaction. So you do know whether a message is in the transaction - you added it.

IMHO this provides a lot of value - I'd call it the milestone where streaming systems are as general purpose a business rule paradigm as an RDBMS (assuming, that is, you are using "local state" paradigm, i.e. rocksdb/leveldb backing to a kafka topic). Just without global mutable state, and able to support much higher scales of throughput even with complex transactions.

@Destidom
Copy link

Hi, is there any updates on transactions or idempotent messages? Any ETA?

@rabbitstack
Copy link

I'm also interested about transactional features. Any plans to deliver them in near future?

@pragmaticpi
Copy link

Hi, I was going through the sarama godoc as I am trying to implement exactly once semantics.
So, using Config.Idempotent.Producer to enable Idempotency is correct?

Also, is the transactional API feature has been enabled in the library as I am seeing multiple transactions related variables?

@pmalekn
Copy link

pmalekn commented Oct 31, 2019

@Destidom Idempotent producer support has been merged in Oct 2018 #1152 (not sure how complete that is though).

What are the outstanding features of Kafka v0.11 that we're missing?

@ervitis
Copy link

ervitis commented Jan 14, 2020

What about the transactional producer and the exactly once semantic support? Is there any plan for that feature?
For more information, the library librdkafka is working on it and the Java client supports the transactional producer: confluentinc/librdkafka#2605

@dnwe
Copy link
Collaborator

dnwe commented Apr 7, 2020

This catch-all issue seems like it has been superceded. Please open new issues for any functionality you believe is still missing from Sarama

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests