Skip to content

Commit

Permalink
fix: re fetch streams for each materializationProviderBuilder (#9036)
Browse files Browse the repository at this point in the history
* fix: re fetch streams for each materializationProviderBuilder

* feat: update topology

* feat: fetch new factory as well

* include error message if the locations are not found
  • Loading branch information
wcarlson5 committed May 6, 2022
1 parent 4b450ec commit 4340dfa
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 41 deletions.
Expand Up @@ -444,16 +444,7 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime(
final Object result = buildQueryImplementation(physicalPlan, runtimeBuildContext);
final NamedTopology topology = namedTopologyBuilder.build();

final Optional<MaterializationProviderBuilderFactory.MaterializationProviderBuilder>
materializationProviderBuilder = getMaterializationInfo(result).map(info ->
materializationProviderBuilderFactory.materializationProviderBuilder(
info,
querySchema,
keyFormat,
queryOverrides,
applicationId,
queryId.toString()
));
final Optional<MaterializationInfo> materializationInfo = getMaterializationInfo(result);

final Optional<ScalablePushRegistry> scalablePushRegistry = applyScalablePushProcessor(
querySchema.logicalSchema(),
Expand All @@ -479,20 +470,21 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime(
runtimeBuildContext.getSchemas(),
config.getOverrides(),
queryId,
materializationProviderBuilder,
materializationInfo,
materializationProviderBuilderFactory,
physicalPlan,
getUncaughtExceptionProcessingLogger(queryId),
sinkDataSource,
listener,
queryOverrides,
scalablePushRegistry,
scalablePushRegistry,
(streamsRuntime) -> getNamedTopology(
streamsRuntime,
queryId,
applicationId,
queryOverrides,
physicalPlan
)
),
keyFormat
);
if (real) {
return binPackedPersistentQueryMetadata;
Expand Down
Expand Up @@ -24,10 +24,10 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.materialization.MaterializationInfo;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.scalablepush.ScalablePushRegistry;
import io.confluent.ksql.execution.streams.materialization.Materialization;
import io.confluent.ksql.execution.streams.materialization.MaterializationProvider;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
Expand All @@ -38,6 +38,7 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.query.QuerySchemas;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.util.QueryMetadataImpl.TimeBoundedQueue;
import java.util.Collection;
import java.util.List;
Expand All @@ -64,7 +65,9 @@ public class BinPackedPersistentQueryMetadataImpl implements PersistentQueryMeta
private final String statementString;
private final String executionPlan;
private final String applicationId;
private final NamedTopology topology;
private final Optional<MaterializationInfo> materializationInfo;
private final KeyFormat keyFormat;
private NamedTopology topology;
private final SharedKafkaStreamsRuntime sharedKafkaStreamsRuntime;
private final QuerySchemas schemas;
private final ImmutableMap<String, Object> overriddenProperties;
Expand All @@ -77,10 +80,8 @@ public class BinPackedPersistentQueryMetadataImpl implements PersistentQueryMeta
private final Listener listener;
private final Function<SharedKafkaStreamsRuntime, NamedTopology> namedTopologyBuilder;
private final TimeBoundedQueue queryErrors;

private final Optional<MaterializationProviderBuilderFactory.MaterializationProviderBuilder>
materializationProviderBuilder;
private final Optional<MaterializationProvider> materializationProvider;
private final MaterializationProviderBuilderFactory
materializationProviderBuilderFactory;
private final Optional<ScalablePushRegistry> scalablePushRegistry;
public boolean everStarted = false;
private boolean corruptionCommandTopic = false;
Expand All @@ -100,15 +101,15 @@ public BinPackedPersistentQueryMetadataImpl(
final QuerySchemas schemas,
final Map<String, Object> overriddenProperties,
final QueryId queryId,
final Optional<MaterializationProviderBuilderFactory.MaterializationProviderBuilder>
materializationProviderBuilder,
final Optional<MaterializationInfo> materializationInfo,
final MaterializationProviderBuilderFactory materializationProviderBuilderFactory,
final ExecutionStep<?> physicalPlan,
final ProcessingLogger processingLogger,
final Optional<DataSource> sinkDataSource,
final Listener listener,
final Map<String, Object> streamsProperties,
final Optional<ScalablePushRegistry> scalablePushRegistry,
final Function<SharedKafkaStreamsRuntime, NamedTopology> namedTopologyBuilder) {
final Function<SharedKafkaStreamsRuntime, NamedTopology> namedTopologyBuilder,
final KeyFormat keyFormat) {
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
this.persistentQueryType = Objects.requireNonNull(persistentQueryType, "persistentQueryType");
this.statementString = Objects.requireNonNull(statementString, "statementString");
Expand All @@ -127,17 +128,14 @@ public BinPackedPersistentQueryMetadataImpl(
this.processingLogger = requireNonNull(processingLogger, "processingLogger");
this.physicalPlan = requireNonNull(physicalPlan, "physicalPlan");
this.resultSchema = requireNonNull(schema, "schema");
this.materializationProviderBuilder =
requireNonNull(materializationProviderBuilder, "materializationProviderBuilder");
this.materializationProviderBuilderFactory = requireNonNull(
materializationProviderBuilderFactory, "materializationProviderBuilderFactory");
this.materializationInfo = requireNonNull(materializationInfo, "materializationInfo");
this.listener = new QueryListenerWrapper(listener, scalablePushRegistry);
this.namedTopologyBuilder = requireNonNull(namedTopologyBuilder, "namedTopologyBuilder");
this.queryErrors = sharedKafkaStreamsRuntime.getNewQueryErrorQueue();
this.materializationProvider = materializationProviderBuilder
.flatMap(builder -> builder.apply(
this.sharedKafkaStreamsRuntime.getKafkaStreams(),
topology
));
this.scalablePushRegistry = requireNonNull(scalablePushRegistry, "scalablePushRegistry");
this.keyFormat = requireNonNull(keyFormat, "keyFormat");
}

// for creating sandbox instances
Expand All @@ -160,12 +158,13 @@ public BinPackedPersistentQueryMetadataImpl(
this.processingLogger = original.processingLogger;
this.physicalPlan = original.getPhysicalPlan();
this.resultSchema = original.resultSchema;
this.materializationProviderBuilder = original.materializationProviderBuilder;
this.materializationProviderBuilderFactory = original.materializationProviderBuilderFactory;
this.materializationInfo = original.materializationInfo;
this.listener = requireNonNull(listener, "listener");
this.queryErrors = sharedKafkaStreamsRuntime.getNewQueryErrorQueue();
this.materializationProvider = original.materializationProvider;
this.scalablePushRegistry = original.scalablePushRegistry;
this.namedTopologyBuilder = original.namedTopologyBuilder;
this.keyFormat = original.keyFormat;
}

@Override
Expand Down Expand Up @@ -217,7 +216,19 @@ public ProcessingLogger getProcessingLogger() {
public Optional<Materialization> getMaterialization(
final QueryId queryId,
final QueryContext.Stacker contextStacker) {
return materializationProvider.map(builder -> builder.build(queryId, contextStacker));
return this.materializationInfo.map(info ->
materializationProviderBuilderFactory.materializationProviderBuilder(
info,
resultSchema,
keyFormat,
getStreamsProperties(),
applicationId,
this.queryId.toString()
)
).flatMap(builder -> builder.apply(
sharedKafkaStreamsRuntime.getKafkaStreams(),
topology)
).map(builder -> builder.build(queryId, contextStacker));
}

@Override
Expand Down Expand Up @@ -299,6 +310,10 @@ public NamedTopology getTopologyCopy(final SharedKafkaStreamsRuntime builder) {
return namedTopologyBuilder.apply(builder);
}

public void updateTopology(final NamedTopology topology) {
this.topology = topology;
}

@Override
public Map<String, Map<Integer, LagInfo>> getAllLocalStorePartitionLags() {
return sharedKafkaStreamsRuntime.getAllLocalStorePartitionLagsForQuery(queryId);
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.TaskId;
Expand Down Expand Up @@ -247,6 +248,9 @@ public void start(final QueryId queryId) {

@Override
public void overrideStreamsProperties(final Map<String, Object> newProperties) {
newProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG,
streamsProperties.get(StreamsConfig.APPLICATION_SERVER_CONFIG));
//The application server should not be overwritten
streamsProperties = ImmutableMap.copyOf(newProperties);
}

Expand All @@ -262,7 +266,8 @@ public void restartStreamsRuntime() {
for (final NamedTopology topology : liveTopologies) {
final BinPackedPersistentQueryMetadataImpl query = collocatedQueries
.get(new QueryId(topology.name()));
kafkaStreamsNamedTopologyWrapper.addNamedTopology(query.getTopologyCopy(this));
query.updateTopology(query.getTopologyCopy(this));
kafkaStreamsNamedTopologyWrapper.addNamedTopology(query.getTopology());
}
setupAndStartKafkaStreams(kafkaStreamsNamedTopologyWrapper);
}
Expand Down
Expand Up @@ -15,22 +15,33 @@

package io.confluent.ksql.util;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.materialization.MaterializationInfo;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.execution.scalablepush.ScalablePushRegistry;
import io.confluent.ksql.query.MaterializationProviderBuilderFactory;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.query.QuerySchemas;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.util.QueryMetadata.Listener;
import java.util.Map;
import java.util.Optional;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -68,6 +79,21 @@ public class BinPackedPersistentQueryMetadataImplTest {
private Map<String, Object> streamsProperties;
@Mock
private Optional<ScalablePushRegistry> scalablePushRegistry;
@Mock
private MaterializationProviderBuilderFactory
materializationProviderBuilderFactory;
@Mock
private KafkaStreamsNamedTopologyWrapper kafkaStreamsNamedTopologyWrapper;
@Mock
private KafkaStreamsNamedTopologyWrapper kafkaStreamsNamedTopologyWrapper2;
@Mock
private QueryContext.Stacker stacker;
@Mock
private KeyFormat keyFormat;
@Mock
private MaterializationInfo materializationInfo;
@Mock
private MaterializationProviderBuilderFactory.MaterializationProviderBuilder materializationProviderBuilder;

private PersistentQueryMetadata query;

Expand All @@ -85,16 +111,33 @@ public void setUp() {
schemas,
overrides,
QUERY_ID,
Optional.empty(),
Optional.of(materializationInfo),
materializationProviderBuilderFactory,
physicalPlan,
processingLogger,
Optional.of(sinkDataSource),
listener,
streamsProperties,
scalablePushRegistry,
(runtime) -> topology);
(runtime) -> topology,
keyFormat);

query.initialize();
when(materializationProviderBuilderFactory.materializationProviderBuilder(
any(), any(), any(), any(), any(), any()))
.thenReturn(materializationProviderBuilder);
}

@Test
public void shouldGetStreamsFreshForMaterialization() {
when(sharedKafkaStreamsRuntimeImpl.getKafkaStreams())
.thenReturn(kafkaStreamsNamedTopologyWrapper)
.thenReturn(kafkaStreamsNamedTopologyWrapper2);
when(materializationProviderBuilder.apply(any(KafkaStreams.class), any())).thenReturn(Optional.empty());
query.getMaterialization(query.getQueryId(), stacker);
query.getMaterialization(query.getQueryId(), stacker);

verify(materializationProviderBuilder).apply(eq(kafkaStreamsNamedTopologyWrapper), any());
verify(materializationProviderBuilder).apply(eq(kafkaStreamsNamedTopologyWrapper2), any());
}

@Test
Expand Down
Expand Up @@ -87,6 +87,7 @@ public class SharedKafkaStreamsRuntimeImplTest {
public void setUp() throws Exception {
when(kafkaStreamsBuilder.buildNamedTopologyWrapper(any())).thenReturn(kafkaStreamsNamedTopologyWrapper).thenReturn(kafkaStreamsNamedTopologyWrapper2);
streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "runtime");
streamProps.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "old");
sharedKafkaStreamsRuntimeImpl = new SharedKafkaStreamsRuntimeImpl(
kafkaStreamsBuilder,
queryErrorClassifier,
Expand All @@ -112,14 +113,14 @@ public void setUp() throws Exception {
public void overrideStreamsPropertiesShouldReplaceProperties() {
// Given:
final Map<String, Object> newProps = new HashMap<>();
newProps.put("Test", "Test");
newProps.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "notused");

// When:
sharedKafkaStreamsRuntimeImpl.overrideStreamsProperties(newProps);

// Then:
final Map<String, Object> properties = sharedKafkaStreamsRuntimeImpl.streamsProperties;
assertThat(properties.get("Test"), equalTo("Test"));
assertThat(properties.get(StreamsConfig.APPLICATION_SERVER_CONFIG), equalTo("old"));
assertThat(properties.size(), equalTo(1));
}

Expand Down Expand Up @@ -232,7 +233,7 @@ public void shouldRestart() {

//Then:
verify(kafkaStreamsNamedTopologyWrapper).close();
verify(kafkaStreamsNamedTopologyWrapper2).addNamedTopology(namedTopology);
verify(kafkaStreamsNamedTopologyWrapper2).addNamedTopology(any());
verify(kafkaStreamsNamedTopologyWrapper2).start();
verify(kafkaStreamsNamedTopologyWrapper2).setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) any());
}
Expand Down
Expand Up @@ -128,6 +128,15 @@ public List<KsqlPartitionLocation> locate(
metadata = getMetadataForAllPartitions(filterPartitions, keySet);
}

if (metadata.isEmpty()) {
final MaterializationException materializationException = new MaterializationException(
"Cannot determine which host contains the required partitions to serve the pull query. \n"
+ "The underlying persistent query may be restarting (e.g. as a result of "
+ "ALTER SYSTEM) view the status of your by issuing <DESCRIBE foo>.");
LOG.debug(materializationException.getMessage());
throw materializationException;
}

// Go through the metadata and group them by partition.
for (PartitionMetadata partitionMetadata : metadata) {
LOG.debug("Handling pull query for partition {} of state store {}.",
Expand Down
Expand Up @@ -226,6 +226,29 @@ public void shouldThrowIfMetadataNotAvailable() {
"Materialized data for key [1] is not available yet. Please try again later."));
}

@Test
public void shouldThrowIfMetadataIsEmpty() {
// Given:
getActiveAndStandbyMetadata();
when(topology.describe()).thenReturn(description);
when(description.subtopologies()).thenReturn(ImmutableSet.of(sub1));
when(sub1.nodes()).thenReturn(ImmutableSet.of(source, processor));
when(source.topicSet()).thenReturn(ImmutableSet.of(TOPIC_NAME));
when(processor.stores()).thenReturn(ImmutableSet.of(STORE_NAME));

// When:
final Exception e = assertThrows(
MaterializationException.class,
() -> locator.locate(Collections.emptyList(), routingOptions, routingFilterFactoryActive, false)
);

// Then:
assertThat(e.getMessage(), is(
"Cannot determine which host contains the required partitions to serve the pull query. \n" +
"The underlying persistent query may be restarting (e.g. as a result of" +
" ALTER SYSTEM) view the status of your by issuing <DESCRIBE foo>."));
}

@Test
public void shouldThrowIfRangeScanAndKeysEmpty() {
// Given:
Expand Down

0 comments on commit 4340dfa

Please sign in to comment.