Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CREATE OR REPLACE TABLE on an existing query fails while initializing kafka streams #8130

Merged
merged 1 commit into from Sep 17, 2021

Conversation

spena
Copy link
Member

@spena spena commented Sep 14, 2021

Description

Fixes #8104

The issue in #8104 was caused by a persistentQuery.initialize() call in the QueryRegistryImpl class because it was attempting to initialize a new kafka streams with a state directory that was used by the old kafka streams. This old kafka streams is not stopped because it is run in a sandboxed environment, however, the state store is never unlinked from this old kafka streams, so the new query fails linking to it.

The fix is just to avoid initializing the new query in a sandboxed environment.

This issue started happening since ksqldB 0.17.0

Testing done

Describe the testing strategy. Unit and integration tests are expected for any behavior changes.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@spena spena requested review from agavra, Sullivan-Patrick and a team September 14, 2021 19:01
Copy link
Contributor

@Sullivan-Patrick Sullivan-Patrick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left no feedback and a single question inline, but that's what happens when you ask the intern for a code review :)

One question I have is I recall @vcrfxia mentioning there was existing test coverage for this. I wasn't able to find it myself, but I'd like to bring her in to chime in on that. If there was existing coverage that wasn't functioning correctly, that's probably something to address.

Otherwise LGTM!

@@ -333,8 +333,13 @@ private void registerPersistentQuery(
unregisterQuery(oldQuery);
}

// Initialize the query before it's exposed to other threads via the map/sets.
persistentQuery.initialize();
// If the old query was sandboxed, then the stop() won't stop the streams and will cause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, can you explain why stop() won't stop streams? Or maybe more broadly, why we were running these KStreams applications in a sandboxed environment? Just linking me to something would be sufficient, I wasn't able to piece it together myself easily.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sandboxed objects were implemented in KSQL to validate statements when some calls to Kafka were needed. The validation should not execute any real call (i.e. delete topics, create, etc). It's just a validation. Start looking at the RequesValidator.validate() class/method.

During a validation, the full execution path is tested too. For instance, in the case of persistent queries, the validation will check a query is removed from the query registry, new queries are being added to the registry, etc. To do that, the validation process creates a sandbox for the query registry to prevent real queries are indeed removed, stopped, etc. You can see the QueryRegistryImpl.createSandbox() method.

Before making the QueryRegistry copy, the createSandbox() method creates copies of each query as sandboxed persistent queries, which have dummy methods to start/stop the query. Look at SandboxedPersistentQueryMetadataImpl. So, when the registerPersistentQuery is called by a CREATE OR REPLACE statement, the old query to be stopped is not actually stopped. It just mimics the path execution, but nothing is altered in that streams. However, the initialize() is not sandboxed, so it attempts to initialize a new stream pointing to the state store from the old streams.

I don't understand too much about the sandboxed objects, though, so it took me time to figure out why new streams are started (in sandbox), but not stopped. I wanted to fix that, but I couldn't figure out an easy way to refactor the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, I've seen these validation code paths plenty of times before, but I didn't know what was going on in this detail. Thanks for the in-depth explanation!

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for root causing @spena! Great find - but see below.

This old kafka streams is not stopped because it is run in a sandboxed environment, however, the state store is never unlinked from this old kafka streams, so the new query fails linking to it.

I don't totally understand this - it seems like we're not fixing the underlying bug here and just working around it. There shouldn't be ghost state stores left on disk in any situation, right? It seems to me that if a sandbox query is creating the streams directory, it should also be clearing it up regardless of whether or not it's being run in a sandbox. Either that or (a very likely option) I don't understand something.

@spena
Copy link
Member Author

spena commented Sep 15, 2021

@agavra I found the code a little confusing too. But my understanding was that queries in a sandboxed environment are started but also stopped/closed later. See https://github.com/confluentinc/ksql/pull/8130/files#diff-923409f4a144914013d8209b8f59e547f2f0740665f010318332d251627f771dR148, which closes any query that was started in the sandboxed QueryRegistry that does not sandobox the new query (confusing).

However, queries found in the non-sandbox QueryRegistry are sandboxed in the createSandbox(). When one of them must be removed (because the CREATE OR REPLACE), then it is also stopped. But being this an existing query in the system, the old query must not be actually stopped, just mimic that it was stopped. So, after that, the new query is initialized (which will be stopped/closed later in the above link I left), but there will be now 2 queries pointing to the same state store, and that's where it fails.

I feel we shouldn't even start a kafka streams in a sandboxed environment, but that's what is taking time to understand. The code has evolved that I got lost between different sandboxed classes and a new code about shared runtime vs dedicated runtimes that contains duplicated code.

So, my great idea was, let's skip the initialization of a query if the QueryRegistry is sandboxed 'cause that query will be stopped/closed later anyway, so why to care about initializing it?

@agavra
Copy link
Contributor

agavra commented Sep 15, 2021

@spena thanks - that really helps a lot with my understanding. That does bring up two more questions then:

  • why use oldQuery == null || !sandbox instead of just !sandbox? from your explanation, it would be feasible to never "initialize" a sandboxed query, right?
  • it seems to me like perhaps the "root" cause of the bug is that buildPersistentQueryInDedicatedRuntime returns a non-sandboxed QueryMetadata even if the QueryExecutor is !real

WDYT?

@agavra
Copy link
Contributor

agavra commented Sep 15, 2021

@spena - I'm going to be out tomorrow, but we probably want to get this into 0.22.x (fyi @mjsax). I'm happy if @vcrfxia or @rodesai review this and give their +1.

@spena
Copy link
Member Author

spena commented Sep 16, 2021

I just made a change to sandbox the persistent queries and implement the initialize() method as no-op. I initially had an issue with this but I forgot I fixed it by closing the kafka streams only when is not null in the SandboxedExecutionContext.

This actually causes other problems. I went back to the original idea of not initializing the query when the QueryRegistryImpl is in a sandbox. We should follow up in a better sandbox for kafka streams in other PRs.

@spena spena force-pushed the fix_create_or_replace branch 2 times, most recently from a7f68cf to 8d03a83 Compare September 16, 2021 22:09
@spena spena requested review from a team and vcrfxia September 17, 2021 14:47
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it looks like it's a little complicated but it's important to get this fix in to 0.22. Please backport and make sure we cut it appropriately :)

We should follow up in a better sandbox for kafka streams in other PRs.

Can you create a ticket to track this? I have a feeling that this code will be a bit brittle :)

@spena spena merged commit f03755e into master Sep 17, 2021
@spena spena deleted the fix_create_or_replace branch September 17, 2021 19:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CREATE OR REPLACE TABLE AS does not work
4 participants