Skip to content

Commit

Permalink
fix: backport fixes from query close (#4662)
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Feb 27, 2020
1 parent ac6d18d commit 8168002
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public ExecuteResult execute(

@Override
public void close() {
allLiveQueries.forEach(QueryMetadata::close);
allLiveQueries.forEach(QueryMetadata::stop);
engineMetrics.close();
aggregateMetricsCollector.shutdown();
}
Expand All @@ -201,8 +201,10 @@ private void unregisterQuery(final ServiceContext serviceContext, final QueryMet
final String applicationId = query.getQueryApplicationId();

if (!query.getState().equalsIgnoreCase("NOT_RUNNING")) {
throw new IllegalStateException("query not stopped."
+ " id " + applicationId + ", state: " + query.getState());
log.warn(
"Unregistering query that has not terminated. "
+ "This may happen when streams threads are hung. State: " + query.getState()
);
}

if (!allLiveQueries.remove(query)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,9 @@ public Optional<Materialization> getMaterialization(
) {
return materializationProvider.map(builder -> builder.build(queryId, contextStacker));
}

@Override
public void stop() {
doClose(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryMetadata {
public abstract class QueryMetadata {

private static final Logger LOG = LoggerFactory.getLogger(QueryMetadata.class);

Expand Down Expand Up @@ -140,14 +140,39 @@ public boolean hasEverBeenStarted() {
return everStarted;
}


/**
* Stops the query without cleaning up the external resources
* so that it can be resumed when we call {@link #start()}.
*
* <p>NOTE: {@link TransientQueryMetadata} overrides this method
* since any time a transient query is stopped the external resources
* should be cleaned up.</p>
*
* @see #close()
*/
public abstract void stop();

/**
* Closes the {@code QueryMetadata} and cleans up any of
* the resources associated with it (e.g. internal topics,
* schemas, etc...).
*
* @see QueryMetadata#stop()
*/
public void close() {
doClose(true);
closeCallback.accept(this);
}

protected void doClose(final boolean cleanUp) {
kafkaStreams.close();

kafkaStreams.cleanUp();
if (cleanUp) {
kafkaStreams.cleanUp();
}

queryStateListener.ifPresent(QueryStateListener::close);

closeCallback.accept(this);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,18 @@ public void setLimitHandler(final LimitHandler limitHandler) {
}

@Override
public void close() {
public void stop() {
close();
}

@Override
protected void doClose(final boolean cleanUp) {
// To avoid deadlock, close the queue first to ensure producer side isn't blocked trying to
// write to the blocking queue, otherwise super.close call can deadlock:
rowQueue.close();

// Now safe to close:
super.close();
super.doClose(cleanUp);
isRunning.set(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

Expand Down Expand Up @@ -675,6 +676,63 @@ public void shouldCleanUpInternalTopicsOnClose() {
verify(topicClient).deleteInternalTopics(query.getQueryApplicationId());
}

@Test
public void shouldCleanUpInternalTopicsOnEngineCloseForTransientQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
ksqlEngine.close();

// Then:
verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId());
}

@Test
public void shouldNotCleanUpInternalTopicsOnEngineCloseForPersistentQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream persistent as select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
ksqlEngine.close();

// Then (there are no transient queries, so no internal topics should be deleted):
verify(topicClient, never()).deleteInternalTopics(any());
}

@Test
public void shouldCleanUpInternalTopicsOnQueryCloseForPersistentQueries() {
// Given:
final List<QueryMetadata> query = KsqlEngineTestUtil.execute(
serviceContext,
ksqlEngine,
"create stream persistent as select * from test1 EMIT CHANGES;",
KSQL_CONFIG, Collections.emptyMap()
);

query.get(0).start();

// When:
query.get(0).close();

// Then (there are no transient queries, so no internal topics should be deleted):
verify(topicClient).deleteInternalTopics(query.get(0).getQueryApplicationId());
}

@Test
public void shouldNotCleanUpInternalTopicsOnCloseIfQueryNeverStarted() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

package io.confluent.ksql.util;

import static io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -59,9 +60,11 @@ public class QueryMetadataTest {
@Mock
private Consumer<QueryMetadata> closeCallback;
private QueryMetadata query;
private boolean cleanUp;

@Before
public void setup() {
cleanUp = false;
query = new QueryMetadata(
"foo",
kafkaStreams,
Expand All @@ -72,8 +75,12 @@ public void setup() {
topoplogy,
Collections.emptyMap(),
Collections.emptyMap(),
closeCallback
);
closeCallback) {
@Override
public void stop() {
doClose(cleanUp);
}
};
}

@Test
Expand Down Expand Up @@ -135,6 +142,24 @@ public void shouldCloseKStreamsAppOnCloseThenCloseCallback() {
inOrder.verify(closeCallback).accept(query);
}

@Test
public void shouldNotCallCloseCallbackOnStop() {
// When:
query.stop();

// Then:
verifyNoMoreInteractions(closeCallback);
}

@Test
public void shouldCallKafkaStreamsCloseOnStop() {
// When:
query.stop();

// Then:
verify(kafkaStreams).close();
}

@Test
public void shouldCleanUpKStreamsAppAfterCloseOnClose() {
// When:
Expand All @@ -146,6 +171,27 @@ public void shouldCleanUpKStreamsAppAfterCloseOnClose() {
inOrder.verify(kafkaStreams).cleanUp();
}

@Test
public void shouldNotCleanUpKStreamsAppOnStop() {
// When:
query.stop();

// Then:
verify(kafkaStreams, never()).cleanUp();
}

@Test
public void shouldCallCleanupOnStopIfCleanup() {
// Given:
cleanUp = true;

// When:
query.stop();

// Then:
verify(kafkaStreams).cleanUp();
}

@Test
public void shouldReturnSources() {
assertThat(query.getSourceNames(), is(SOME_SOURCES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.util;

import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.verify;

import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.BlockingRowQueue;
Expand Down Expand Up @@ -84,4 +85,17 @@ public void shouldCloseQueueBeforeTopologyToAvoidDeadLock() {
inOrder.verify(rowQueue).close();
inOrder.verify(kafkaStreams).close();
}

@Test
public void shouldCallCloseOnStop() {
// When:
query.stop();

// Then:
final InOrder inOrder = inOrder(rowQueue, kafkaStreams, closeCallback);
inOrder.verify(rowQueue).close();
inOrder.verify(kafkaStreams).close();
inOrder.verify(kafkaStreams).cleanUp();
inOrder.verify(closeCallback).accept(query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,8 @@ private KsqlConfig buildMergedConfig(final Command command) {
private void terminateQuery(final PreparedStatement<TerminateQuery> terminateQuery) {
final QueryId queryId = terminateQuery.getStatement().getQueryId();

ksqlEngine.getPersistentQuery(queryId)
.orElseThrow(() ->
new KsqlException(String.format("No running query with id %s was found", queryId)))
.close();
final Optional<PersistentQueryMetadata> query = ksqlEngine.getPersistentQuery(queryId);
query.ifPresent(PersistentQueryMetadata::close);
}

private void maybeTerminateQueryForLegacyDropCommand(
Expand Down

0 comments on commit 8168002

Please sign in to comment.