Skip to content

Commit

Permalink
fix: skip adding invalid if not exists to cmd topic (#8206)
Browse files Browse the repository at this point in the history
* fix: skip adding invalid if not exists to cmd topic

(cherry picked from commit 540a449)
  • Loading branch information
wcarlson5 committed Oct 4, 2021
1 parent 4434fe2 commit e164b18
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.WarningEntity;
import io.confluent.ksql.rest.server.execution.StatementExecutorResponse;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
Expand Down Expand Up @@ -91,6 +97,45 @@ public DistributingExecutor(
Objects.requireNonNull(commandRunnerWarning, "commandRunnerWarning");
}

// CHECKSTYLE_RULES.OFF: CyclomaticComplexity
private Optional<StatementExecutorResponse> checkIfNotExistsResponse(
final KsqlExecutionContext executionContext,
final ConfiguredStatement<?> statement
) {
SourceName sourceName = null;
String type = "";
if (statement.getStatement() instanceof CreateStream
&& ((CreateStream) statement.getStatement()).isNotExists()) {
type = "stream";
sourceName = ((CreateStream) statement.getStatement()).getName();
} else if (statement.getStatement() instanceof CreateTable
&& ((CreateTable) statement.getStatement()).isNotExists()) {
type = "table";
sourceName = ((CreateTable) statement.getStatement()).getName();
} else if (statement.getStatement() instanceof CreateTableAsSelect
&& ((CreateTableAsSelect) statement.getStatement()).isNotExists()) {
type = "table";
sourceName = ((CreateTableAsSelect) statement.getStatement()).getName();
} else if (statement.getStatement() instanceof CreateStreamAsSelect
&& ((CreateStreamAsSelect) statement.getStatement()).isNotExists()) {
type = "stream";
sourceName = ((CreateStreamAsSelect) statement.getStatement()).getName();
}
if (sourceName != null
&& executionContext.getMetaStore().getSource(sourceName) != null) {
return Optional.of(StatementExecutorResponse.handled(Optional.of(
new WarningEntity(statement.getStatementText(),
String.format("Cannot add %s %s: A %s with the same name already exists.",
type,
sourceName,
type)
))));
} else {
return Optional.empty();
}
}


/**
* The transactional protocol for sending a command to the command topic is to
* initTransaction(), beginTransaction(), wait for commandRunner to finish processing all previous
Expand All @@ -101,6 +146,7 @@ public DistributingExecutor(
* If a new transactional producer is initialized while the current transaction is incomplete,
* the old producer will be fenced off and unable to continue with its transaction.
*/
// CHECKSTYLE_RULES.OFF: NPathComplexity
public StatementExecutorResponse execute(
final ConfiguredStatement<? extends Statement> statement,
final KsqlExecutionContext executionContext,
Expand All @@ -123,6 +169,15 @@ public StatementExecutorResponse execute(
);
}

final Optional<StatementExecutorResponse> response = checkIfNotExistsResponse(
executionContext,
statement
);

if (response.isPresent()) {
return response.get();
}

checkAuthorization(injected, securityContext, executionContext);

final Producer<CommandId, Command> transactionalProducer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -56,6 +57,8 @@
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.entity.CommandStatus.Status;
import io.confluent.ksql.rest.entity.CommandStatusEntity;
import io.confluent.ksql.rest.entity.WarningEntity;
import io.confluent.ksql.rest.server.execution.StatementExecutorResponse;
import io.confluent.ksql.security.KsqlAuthorizationValidator;
import io.confluent.ksql.security.KsqlSecurityContext;
import io.confluent.ksql.services.SandboxedServiceContext;
Expand Down Expand Up @@ -436,4 +439,33 @@ public void shouldAbortOnError_Exception() {
// Then:
verify(queue).abortCommand(IDGEN.getCommandId(CONFIGURED_STATEMENT.getStatement()));
}

@Test
public void shouldNotEnqueueRedundantIfNotExists() {
// Given:
final PreparedStatement<Statement> preparedStatement =
PreparedStatement.of("", new CreateStream(
SourceName.of("TEST"),
TableElements.of(),
false,
true,
CreateSourceProperties.from(ImmutableMap.of(
CommonCreateConfigs.KAFKA_TOPIC_NAME_PROPERTY, new StringLiteral("topic"),
CommonCreateConfigs.VALUE_FORMAT_PROPERTY, new StringLiteral("json")
)),
false
));
final ConfiguredStatement<Statement> configured =
ConfiguredStatement.of(preparedStatement, SessionConfig.of(KSQL_CONFIG, ImmutableMap.of())
);
final DataSource dataSource = mock(DataSource.class);
doReturn(dataSource).when(metaStore).getSource(SourceName.of("TEST"));

// When:
final StatementExecutorResponse response = distributor.execute(configured, executionContext, securityContext);

// Then:
assertThat("Should be present", response.getEntity().isPresent());
assertThat(((WarningEntity) response.getEntity().get()).getMessage(), containsString(""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,16 @@ public void shouldRecoverInsertIntosRecreates() {
shouldRecover(commands);
}

@Test
public void shouldRecoverIfNotExists() {
server1.submitCommands(
"CREATE STREAM A (COLUMN STRING) WITH (KAFKA_TOPIC='A', VALUE_FORMAT='JSON');",
"CREATE STREAM IF NOT EXISTS B AS SELECT * FROM A;",
"CREATE STREAM IF NOT EXISTS B AS SELECT * FROM A;"
);
shouldRecover(commands);
}

@Test
public void shouldRecoverTerminates() {
server1.submitCommands(
Expand Down

0 comments on commit e164b18

Please sign in to comment.