Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delete_records to the admin client #967

Closed
vmaurin opened this issue Jan 22, 2024 · 5 comments · Fixed by #969
Closed

Add delete_records to the admin client #967

vmaurin opened this issue Jan 22, 2024 · 5 comments · Fixed by #969

Comments

@vmaurin
Copy link
Contributor

vmaurin commented Jan 22, 2024

Describe the solution you'd like

We should have a method on the admin client to delete records : API 21 https://kafka.apache.org/protocol#The_Messages_DeleteRecords

It is useful when doing stream processing and have some logic to cleanup repartitions/shuffle topics

Additional context

It was added a while ago to the java client, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API

vmaurin pushed a commit to vmaurin/aiokafka that referenced this issue Jan 25, 2024
When doing stream processing, it is convinient to use "transient" topic
:
* retention time is infinite
* records get deleted when consumed

The java kafka streams client is using the deleteRecords of the admin
client to perform this operation. It is lacking in aiokafka

The KIP reference https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API

refs aio-libs#967
@vmaurin
Copy link
Contributor Author

vmaurin commented Jan 25, 2024

@ods I gave it a shot, but I can't get it working, I am a bit lost when it comes to debug protocol issue (it seems it is what I am facing here). If you have few tips, I would be happy to rework my copy

#969

@ods
Copy link
Collaborator

ods commented Jan 29, 2024

@vmaurin I checked your PR works with minimal modifications for v0 and v1, but fails with v2. It looks like something is wrong with tagged fields support in aiokafka. As a first step we could add it without v2 for now and keep debugging problems with tagged fields.

@ods
Copy link
Collaborator

ods commented Jan 29, 2024

For the record, the error in log when sending DeleteRecordsRequest_v2 is:

[2024-01-29 15:53:59,551] ERROR Exception while processing request from 172.17.0.2:57466-192.168.65.1:21560-1 (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error parsing request header. Our best guess of the apiKey is: 21
Caused by: java.nio.BufferUnderflowException
        at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:155)
        at java.nio.ByteBuffer.get(ByteBuffer.java:723)
        at org.apache.kafka.common.protocol.ByteBufferAccessor.readArray(ByteBufferAccessor.java:58)
        at org.apache.kafka.common.protocol.Readable.readUnknownTaggedField(Readable.java:52)
        at org.apache.kafka.common.message.RequestHeaderData.read(RequestHeaderData.java:135)
        at org.apache.kafka.common.message.RequestHeaderData.<init>(RequestHeaderData.java:84)
        at org.apache.kafka.common.requests.RequestHeader.parse(RequestHeader.java:95)
        at kafka.network.Processor.parseRequestHeader(SocketServer.scala:999)
        at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:1012)
        at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
        at kafka.network.Processor.processCompletedReceives(SocketServer.scala:1008)
        at kafka.network.Processor.run(SocketServer.scala:893)
        at java.lang.Thread.run(Thread.java:750)

@vmaurin
Copy link
Contributor Author

vmaurin commented Jan 29, 2024

@ods Good catch. I copy pasted the TaggedFields from others definition, but it seems not so usable. In the Java version, they just mention "flexibleVersion" and then it seems to use different serializer

It doesn't seem to work without declaring TaggedFields too

@ods
Copy link
Collaborator

ods commented Jan 29, 2024

What I've found so far: although aiokafka declares v1 headers, it doesn't actually use it. After fixing request header broker doesn't complain anymore. But I'm still struggling to fix the parsing response part.

@ods ods closed this as completed in #969 Jan 29, 2024
ods added a commit that referenced this issue Jan 29, 2024
* Implement KIP-202 : DeleteRecords API

When doing stream processing, it is convinient to use "transient" topic
:
* retention time is infinite
* records get deleted when consumed

The java kafka streams client is using the deleteRecords of the admin
client to perform this operation. It is lacking in aiokafka

The KIP reference https://cwiki.apache.org/confluence/display/KAFKA/KIP-204+%3A+Adding+records+deletion+operation+to+the+new+Admin+Client+API

refs #967

* Use common method to get metadata

* Explain the unpacking catch all

* Remove usage of TaggedFields

TaggedFields doesn't seem to work properly at the moment. Maybe they
should be replaced by an implementation closer to the java client with
their "flexibleVersions"

* Fix linting errors (format)

* Add change log

---------

Co-authored-by: Vincent Maurin <vincent.maurin@checkstep.com>
Co-authored-by: Denis Otkidach <denis.otkidach@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants