Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create or Replace command to modify KSQL query in place #2440

Closed
DW-Pete opened this issue Feb 14, 2019 · 9 comments
Closed

Create or Replace command to modify KSQL query in place #2440

DW-Pete opened this issue Feb 14, 2019 · 9 comments
Assignees
Labels
operability Issues pertaining to running and operating KSQL, notably in production P0 Denotes must-have for a given milestone
Milestone

Comments

@DW-Pete
Copy link

DW-Pete commented Feb 14, 2019

My customer has a set of Kafka topics sourced from a mixture of REST API processes and Oracle Golden Gate. These process can not readily be suspended (well they can, but it is not trivial through the number of moving parts). They also have some KSQL code processing and transforming data as it passes to another system. This uses multiple KSQL tables and streams.

When they need to modify the KSQL code they need to terminate the running queries, drop and replace the KSQL Streams and Tables (CTAS and CSAS).

The problem they have is that the create queries run from the current offset of the source topic and the data streamed between terminate and create is lost. They had hoped that as the KSQL topic names were the same that the previously read offset would be reused from the underlying topic metadata. However it seems that each new query uses its own uniquely named consumer group. The alternative of setting offset to earliest is not viable as this puts a large amount of already processed data on the output streams

Is there a way that we can associate a query to a specific start offset?

@stewartbryson
Copy link

stewartbryson commented Feb 14, 2019

Here's a little bit more insight on this one. Currently, the consumer_group for the KSQL persistent query is ksql.service.id plus the queryId of the statement. The queryId is an auto-incrementing value based on the object_name with _N. Therefore, when a query is terminated and recreated (as part of a release, a deployment, etc.), the persistent query gets a brand new consumer group. Any messages that come into the topic between TERMINATE and CREATE... evaporate into the ether.

One approach is to set ksql.streams.auto.offset.reset to earliest, but that just means that events will be reprocessed, and we need logic for handling that. That simply ignores the main reason for using Kafka... offset management.

A proposed solution is to allow setting the Streams application_id as part of the KSQL (with...) clause. This is exactly how Streams applications work--using defined application ids--and should be workable here.

@rmoff rmoff added the operability Issues pertaining to running and operating KSQL, notably in production label Mar 7, 2019
@dhanasgit
Copy link

This KSQL behavior (Generating brand new consumer group for recreated persistent query) also have some other side effects.
During our development process we recreated the KSQL queries many times and it ended up creating lot of orphaned consumer groups and changelog topics. Can this be cleaned while terminate the query?, or should terminate command have option to delete topics?

@nbyrnes-acv
Copy link

We are building pipelines through KSQL to feed our analytics warehouse. Our policy is for minor schema version changes, only new fields can be added. It would be great if we could re-create the pipeline of streams, and not have to set offset to earliest, but, pass the offset to start with to the first stream in the pipeline in order to be able to re-create all our streams and pick right up where we left off, rather than having to start at the beginning...

@stewartbryson
Copy link

@nbyrnes-acv Read this blog and see if you think it can help. If you have any questions, feel free to reach out.

https://www.confluent.io/blog/deploying-kafka-streams-and-ksql-with-gradle-part-2-managing-ksql-implementations

@stewartbryson
Copy link

I'm really surprised this issue is not rated higher. I must be missing something... are other people working around this somehow?

@apurvam
Copy link
Contributor

apurvam commented Mar 4, 2020

cc @agavra . Worth tracking this as part of the new work to support query upgrades. This could be a low hanging fruit we could tackle in a early milestone. The ability to resume could be something we can fix early in the process.

@agavra
Copy link
Contributor

agavra commented Mar 4, 2020

Yes - I believe there is someone already working on a KLIP for this (cc @eshepelyuk), but I totally agree this is something we should implement: #4622

@AlanConfluent AlanConfluent added this to the 0.12.0 milestone Jul 24, 2020
@AlanConfluent AlanConfluent added the P0 Denotes must-have for a given milestone label Jul 24, 2020
@AlanConfluent AlanConfluent changed the title Replacing a KSQL query does not inherit the previous query offset Create or Replace command to modify KSQL query in-place Jul 25, 2020
@AlanConfluent AlanConfluent changed the title Create or Replace command to modify KSQL query in-place Create or Replace command to modify KSQL query in place Jul 25, 2020
@agavra agavra self-assigned this Jul 29, 2020
@big-andy-coates big-andy-coates added this to To do in Language Features via automation Aug 7, 2020
@big-andy-coates big-andy-coates moved this from To do to In progress in Language Features Aug 7, 2020
@agavra
Copy link
Contributor

agavra commented Aug 26, 2020

This will be available (in some fashion, see documentation here: https://github.com/confluentinc/ksql/blob/master/docs/concepts/upgrades.md) in the next release (0.12.0)!

@agavra agavra closed this as completed Aug 26, 2020
Language Features automation moved this from In progress to Done Aug 26, 2020
@kgoutham
Copy link

kgoutham commented May 1, 2023

Not able to access this link - https://github.com/confluentinc/ksql/blob/master/docs/concepts/upgrades.md.

Instead of 'CREATE OR REPLACE' , Will dropping and recreating the KSQL stream resume processing from the same offset value from the previously terminated stream / query ?

We are planning to update our KSQL streams in production and we would like to be mindful of any chance of data loss which will affect the downstream systems.

Please suggest.

Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
operability Issues pertaining to running and operating KSQL, notably in production P0 Denotes must-have for a given milestone
Projects
Development

No branches or pull requests

9 participants