Skip to content

Commit

Permalink
fix(3525): sET should only affect statements after it (confluentinc#3529
Browse files Browse the repository at this point in the history
)

Fixes confluentinc#3525

This commit fixes a regression that sees an old issue reappearing where by a SET statement affects not just statements that follow it, but also statements before it. The primary cause of this is the fact that `ConfiguredStatement` contains a reference to a mutable map containing the property overrides. This map is mutated by subsequent SET statements.  The fix is to make for `ConfiguredStatement` to take a immutable defensive copy, as is good programming practice.  This involves adding explicit handling of SET and UNSET statements, which can no long mutate the overrides in `ConfiguredStatement`.
  • Loading branch information
big-andy-coates committed Oct 11, 2019
1 parent 7fe8772 commit 5315f1e
Show file tree
Hide file tree
Showing 55 changed files with 431 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -123,23 +123,25 @@ public List<QueryMetadata> sql(final String sql) {
return sql(sql, Collections.emptyMap());
}

public List<QueryMetadata> sql(final String sql, final Map<String, Object> overriddenProperties) {
public List<QueryMetadata> sql(final String sql, final Map<String, ?> overriddenProperties) {
final List<ParsedStatement> statements = ksqlEngine.parse(sql);

final KsqlExecutionContext sandbox = ksqlEngine.createSandbox(ksqlEngine.getServiceContext());
final Map<String, Object> validationOverrides = new HashMap<>(overriddenProperties);
for (ParsedStatement stmt : statements) {
execute(
sandbox,
stmt,
ksqlConfig,
overriddenProperties,
validationOverrides,
injectorFactory.apply(sandbox, sandbox.getServiceContext()));
}

final List<QueryMetadata> queries = new ArrayList<>();
final Injector injector = injectorFactory.apply(ksqlEngine, serviceContext);
final Map<String, Object> executionOverrides = new HashMap<>(overriddenProperties);
for (final ParsedStatement parsed : statements) {
execute(ksqlEngine, parsed, ksqlConfig, overriddenProperties, injector)
execute(ksqlEngine, parsed, ksqlConfig, executionOverrides, injector)
.getQuery()
.ifPresent(queries::add);
}
Expand Down Expand Up @@ -181,32 +183,42 @@ private static ExecuteResult execute(
final KsqlExecutionContext executionContext,
final ParsedStatement stmt,
final KsqlConfig ksqlConfig,
final Map<String, Object> overriddenProperties,
final Injector injector) {
final Map<String, Object> mutableSessionPropertyOverrides,
final Injector injector
) {
final PreparedStatement<?> prepared = executionContext.prepare(stmt);
final ConfiguredStatement<?> configured =
injector.inject(ConfiguredStatement.of(prepared, overriddenProperties, ksqlConfig));

final ConfiguredStatement<?> configured = injector.inject(ConfiguredStatement.of(
prepared,
mutableSessionPropertyOverrides,
ksqlConfig
));

final CustomExecutor executor =
CustomExecutors.EXECUTOR_MAP.getOrDefault(
configured.getStatement().getClass(),
executionContext::execute);
(s, props) -> executionContext.execute(s));

return executor.apply(configured);
return executor.apply(configured, mutableSessionPropertyOverrides);
}

@FunctionalInterface
private interface CustomExecutor extends Function<ConfiguredStatement<?>, ExecuteResult> { }
private interface CustomExecutor {
ExecuteResult apply(
ConfiguredStatement<?> statement,
Map<String, Object> mutableSessionPropertyOverrides
);
}

@SuppressWarnings("unchecked")
private enum CustomExecutors {

SET_PROPERTY(SetProperty.class, stmt -> {
PropertyOverrider.set((ConfiguredStatement<SetProperty>) stmt);
SET_PROPERTY(SetProperty.class, (stmt, props) -> {
PropertyOverrider.set((ConfiguredStatement<SetProperty>) stmt, props);
return ExecuteResult.of("Successfully executed " + stmt.getStatement());
}),
UNSET_PROPERTY(UnsetProperty.class, stmt -> {
PropertyOverrider.unset((ConfiguredStatement<UnsetProperty>) stmt);
UNSET_PROPERTY(UnsetProperty.class, (stmt, props) -> {
PropertyOverrider.unset((ConfiguredStatement<UnsetProperty>) stmt, props);
return ExecuteResult.of("Successfully executed " + stmt.getStatement());
})
;
Expand Down Expand Up @@ -238,9 +250,11 @@ private CustomExecutor getExecutor() {
return this::execute;
}

public ExecuteResult execute(final ConfiguredStatement<?> statement) {
return executor.apply(statement);
public ExecuteResult execute(
final ConfiguredStatement<?> statement,
final Map<String, Object> mutableSessionPropertyOverrides
) {
return executor.apply(statement, mutableSessionPropertyOverrides);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private InsertValuesExecutor(

public void execute(
final ConfiguredStatement<InsertValues> statement,
final Map<String, ?> sessionProperties,
final KsqlExecutionContext executionContext,
final ServiceContext serviceContext
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,29 @@
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlStatementException;
import java.util.Map;

public final class PropertyOverrider {

private PropertyOverrider() { }

public static void set(final ConfiguredStatement<SetProperty> statement) {
public static void set(
final ConfiguredStatement<SetProperty> statement,
final Map<String, Object> mutableProperties
) {
final SetProperty setProperty = statement.getStatement();
throwIfInvalidProperty(setProperty.getPropertyName(), statement.getStatementText());
throwIfInvalidPropertyValues(setProperty, statement);
statement.getOverrides().put(setProperty.getPropertyName(), setProperty.getPropertyValue());
mutableProperties.put(setProperty.getPropertyName(), setProperty.getPropertyValue());
}

public static void unset(final ConfiguredStatement<UnsetProperty> statement) {
public static void unset(
final ConfiguredStatement<UnsetProperty> statement,
final Map<String, Object> mutableProperties
) {
final UnsetProperty unsetProperty = statement.getStatement();
throwIfInvalidProperty(unsetProperty.getPropertyName(), statement.getStatementText());
statement.getOverrides().remove(unsetProperty.getPropertyName());
mutableProperties.remove(unsetProperty.getPropertyName());
}

@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_INFERRED") // clone has side-effects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

package io.confluent.ksql.statement;

import static java.util.Objects.requireNonNull;

import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.util.KsqlConfig;
Expand All @@ -25,6 +29,7 @@
* A prepared statement paired with the configurations needed to fully
* execute it.
*/
@Immutable
public final class ConfiguredStatement<T extends Statement> {

private final PreparedStatement<T> statement;
Expand All @@ -33,20 +38,20 @@ public final class ConfiguredStatement<T extends Statement> {

public static <S extends Statement> ConfiguredStatement<S> of(
final PreparedStatement<S> statement,
final Map<String, Object> overrides,
final Map<String, ?> overrides,
final KsqlConfig config
) {
return new ConfiguredStatement<>(statement, overrides, config);
}

private ConfiguredStatement(
final PreparedStatement<T> statement,
final Map<String, Object> overrides,
final Map<String, ?> overrides,
final KsqlConfig config
) {
this.statement = Objects.requireNonNull(statement, "statement");
this.overrides = Objects.requireNonNull(overrides, "overrides");
this.config = Objects.requireNonNull(config, "config");
this.statement = requireNonNull(statement, "statement");
this.overrides = ImmutableMap.copyOf(requireNonNull(overrides, "overrides"));
this.config = requireNonNull(config, "config");
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@

package io.confluent.ksql.embedded;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
Expand Down Expand Up @@ -48,7 +45,6 @@
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -364,43 +360,81 @@ public void shouldInferTopicAfterInferringSchema() {
ksqlContext.sql("Some SQL", SOME_PROPERTIES);

// Then:
verify(ksqlEngine).execute(eq(STMT_1_WITH_TOPIC));
verify(ksqlEngine).execute(STMT_1_WITH_TOPIC);
}

@SuppressWarnings("unchecked")
@Test
public void shouldSetProperty() {
// Given:
final Map<String, Object> properties = new HashMap<>();
when(ksqlEngine.parse(any())).thenReturn(ImmutableList.of(PARSED_STMT_0, PARSED_STMT_0));

final PreparedStatement<SetProperty> set = PreparedStatement.of(
"SET SOMETHING",
new SetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
);

when(ksqlEngine.prepare(any()))
.thenReturn(
(PreparedStatement) PreparedStatement.of(
"SET SOMETHING",
new SetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")));
.thenReturn((PreparedStatement) set)
.thenReturn(PREPARED_STMT_0);

// When:
ksqlContext.sql("SQL;", properties);
ksqlContext.sql("SQL;", ImmutableMap.of());

// Then:
verify(ksqlEngine).execute(ConfiguredStatement.of(
PREPARED_STMT_0, ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"),
SOME_CONFIG
));
}

@SuppressWarnings("unchecked")
@Test
public void shouldSetPropertyOnlyOnCommandsFollowingTheSetStatement() {
// Given:
when(ksqlEngine.parse(any())).thenReturn(ImmutableList.of(PARSED_STMT_0, PARSED_STMT_0));

final PreparedStatement<SetProperty> set = PreparedStatement.of(
"SET SOMETHING",
new SetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
);

when(ksqlEngine.prepare(any()))
.thenReturn((PreparedStatement) PREPARED_STMT_0)
.thenReturn(set);

// When:
ksqlContext.sql("SQL;", ImmutableMap.of());

// Then:
assertThat(properties, hasEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
verify(ksqlEngine).execute(ConfiguredStatement.of(
PREPARED_STMT_0, ImmutableMap.of(), SOME_CONFIG
));
}

@SuppressWarnings("unchecked")
@Test
public void shouldUnsetProperty() {
// Given:
final Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
when(ksqlEngine.parse(any())).thenReturn(ImmutableList.of(PARSED_STMT_0, PARSED_STMT_0));

final Map<String, Object> properties = ImmutableMap
.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

final PreparedStatement<UnsetProperty> unset = PreparedStatement.of(
"UNSET SOMETHING",
new UnsetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));

when(ksqlEngine.prepare(any()))
.thenReturn(
(PreparedStatement) PreparedStatement.of(
"UNSET SOMETHING",
new UnsetProperty(Optional.empty(), ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)));
.thenReturn((PreparedStatement) unset)
.thenReturn(PREPARED_STMT_0);

// When:
ksqlContext.sql("SQL;", properties);

// Then:
assertThat(properties, not(hasEntry(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")));
verify(ksqlEngine).execute(ConfiguredStatement.of(
PREPARED_STMT_0, ImmutableMap.of(), SOME_CONFIG
));
}
}
Loading

0 comments on commit 5315f1e

Please sign in to comment.