Skip to content

Commit

Permalink
fix: back out post-3.1 changes to fix 7.1.x build (#8599)
Browse files Browse the repository at this point in the history
Remove/revert the changes to ksql that rely on Streams APIs which aren't present in the 7.1.0 branch of ccs-kafka that will ship with ksql 7.1
  • Loading branch information
A. Sophie Blee-Goldman committed Jan 18, 2022
1 parent 4347001 commit f42225d
Show file tree
Hide file tree
Showing 15 changed files with 37 additions and 808 deletions.
Expand Up @@ -92,7 +92,7 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder;

/**
* A builder for creating queries metadata.
Expand Down Expand Up @@ -429,19 +429,17 @@ PersistentQueryMetadata buildPersistentQueryInSharedRuntime(
valueFormat.getFeatures()
);

final NamedTopologyBuilder namedTopologyBuilder = sharedKafkaStreamsRuntime.getKafkaStreams()
.newNamedTopologyBuilder(
queryId.toString(),
PropertiesUtil.asProperties(queryOverrides)
);
final NamedTopologyStreamsBuilder namedTopologyBuilder =
new NamedTopologyStreamsBuilder(queryId.toString());

final RuntimeBuildContext runtimeBuildContext = buildContext(
applicationId,
queryId,
namedTopologyBuilder
);
final Object result = buildQueryImplementation(physicalPlan, runtimeBuildContext);
final NamedTopology topology = namedTopologyBuilder.build();
final NamedTopology topology =
namedTopologyBuilder.buildNamedTopology(PropertiesUtil.asProperties(queryOverrides));

final Optional<MaterializationProviderBuilderFactory.MaterializationProviderBuilder>
materializationProviderBuilder = getMaterializationInfo(result).map(info ->
Expand Down Expand Up @@ -507,19 +505,16 @@ public NamedTopology getNamedTopology(final SharedKafkaStreamsRuntime sharedRunt
final String applicationId,
final Map<String, Object> queryOverrides,
final ExecutionStep<?> physicalPlan) {
final NamedTopologyBuilder namedTopologyBuilder =
sharedRuntime.getKafkaStreams().newNamedTopologyBuilder(
queryId.toString(),
PropertiesUtil.asProperties(queryOverrides)
);
final NamedTopologyStreamsBuilder namedTopologyBuilder =
new NamedTopologyStreamsBuilder(queryId.toString());

final RuntimeBuildContext runtimeBuildContext = buildContext(
applicationId,
queryId,
namedTopologyBuilder
);
buildQueryImplementation(physicalPlan, runtimeBuildContext);
return namedTopologyBuilder.build();
return namedTopologyBuilder.buildNamedTopology(PropertiesUtil.asProperties(queryOverrides));
}

public static Map<String, Object> buildStreamsProperties(
Expand Down
Expand Up @@ -32,7 +32,6 @@
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -96,12 +95,8 @@ public Set<StreamsTaskMetadata> getTaskMetadata() {
}

public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags(final QueryId queryId) {
try {
return kafkaStreams.allLocalStorePartitionLagsForTopology(queryId.toString());
} catch (IllegalStateException | StreamsException e) {
log.error(e.getMessage());
return ImmutableMap.of();
}
throw new IllegalStateException("Shared runtimes have not been fully implemented in this"
+ " version and should not be used.");
}

@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "streamsProperties is immutable")
Expand Down
Expand Up @@ -24,9 +24,6 @@
import io.confluent.ksql.util.QueryMetadataImpl.TimeBoundedQueue;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.errors.StreamsException;
Expand Down Expand Up @@ -164,33 +161,8 @@ public TimeBoundedQueue getNewQueryErrorQueue() {

@Override
public void stop(final QueryId queryId, final boolean resetOffsets) {
log.info("Attempting to stop Query: " + queryId.toString());
if (collocatedQueries.containsKey(queryId)) {
if (kafkaStreams.state().isRunningOrRebalancing()) {
try {
kafkaStreams.removeNamedTopology(queryId.toString(), resetOffsets)
.all()
.get(shutdownTimeout, TimeUnit.SECONDS);
if (resetOffsets) {
kafkaStreams.cleanUpNamedTopology(queryId.toString());
}
} catch (final TimeoutException | ExecutionException | InterruptedException e) {
log.error("Failed to close query {} within the allotted timeout {} due to",
queryId,
shutdownTimeout,
e);
if (e instanceof TimeoutException) {
log.warn(
"query has not terminated even after trying to remove the topology. "
+ "This may happen when streams threads are hung. State: "
+ kafkaStreams.state());
}
}
} else {
throw new IllegalStateException("Streams in not running but is in state "
+ kafkaStreams.state());
}
}
throw new IllegalStateException("Shared runtimes have not been fully implemented in this"
+ " version and should not be used.");
}

@Override
Expand All @@ -201,27 +173,8 @@ public synchronized void close() {

@Override
public void start(final QueryId queryId) {
if (collocatedQueries.containsKey(queryId) && !collocatedQueries.get(queryId).everStarted) {
if (!kafkaStreams.getTopologyByName(queryId.toString()).isPresent()) {
try {
kafkaStreams.addNamedTopology(collocatedQueries.get(queryId).getTopology())
.all()
.get(shutdownTimeout, TimeUnit.SECONDS);
} catch (final TimeoutException | ExecutionException | InterruptedException e) {
log.error("Failed to start query {} within the allotted timeout {} due to",
queryId,
shutdownTimeout,
e);
throw new IllegalStateException(
"Encountered an error when trying to add query " + queryId + " to runtime: ",
e);
}
} else {
throw new IllegalArgumentException("not done removing query: " + queryId);
}
} else {
throw new IllegalArgumentException("query: " + queryId + " not added to runtime");
}
throw new UnsupportedOperationException("Shared runtimes have not been fully implemented"
+ " in this version and should not be used.");
}

@Override
Expand Down

0 comments on commit f42225d

Please sign in to comment.