Skip to content

Commit

Permalink
feat: support variable substitution in SQL statements (#6504)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Oct 27, 2020
1 parent addd07a commit e185c1f
Show file tree
Hide file tree
Showing 22 changed files with 815 additions and 29 deletions.
26 changes: 25 additions & 1 deletion ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.confluent.ksql.parser.SqlBaseParser.StatementContext;
import io.confluent.ksql.parser.SqlBaseParser.UndefineVariableContext;
import io.confluent.ksql.parser.SqlBaseParser.UnsetPropertyContext;
import io.confluent.ksql.parser.VariableSubstitutor;
import io.confluent.ksql.reactive.BaseSubscriber;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.client.KsqlRestClient;
Expand All @@ -52,6 +53,7 @@
import io.confluent.ksql.util.HandlerMaps;
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMap2;
import io.confluent.ksql.util.HandlerMaps.Handler2;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.ParserUtil;
import io.confluent.ksql.util.WelcomeMsgUtils;
import io.vertx.core.Context;
Expand Down Expand Up @@ -341,8 +343,30 @@ private String nextNonCliCommand() {
}
}

private boolean isVariableSubstitutionEnabled() {
final Object substitutionEnabled
= restClient.getProperty(KsqlConfig.KSQL_VARIABLE_SUBSTITUTION_ENABLE);

if (substitutionEnabled != null && substitutionEnabled instanceof Boolean) {
return (boolean) substitutionEnabled;
}

return KsqlConfig.KSQL_VARIABLE_SUBSTITUTION_ENABLE_DEFAULT;
}

private List<ParsedStatement> substituteVariables(final List<ParsedStatement> statements) {
if (isVariableSubstitutionEnabled()) {
return statements.stream()
.map(stmt -> VariableSubstitutor.substitute(stmt, sessionVariables))
.flatMap(replacedSql -> KSQL_PARSER.parse(replacedSql).stream())
.collect(Collectors.toList());
} else {
return statements;
}
}

private void handleStatements(final String line) {
final List<ParsedStatement> statements = KSQL_PARSER.parse(line);
final List<ParsedStatement> statements = substituteVariables(KSQL_PARSER.parse(line));

final StringBuilder consecutiveStatements = new StringBuilder();
for (final ParsedStatement parsed : statements) {
Expand Down
36 changes: 36 additions & 0 deletions ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,42 @@ public void shouldPrintTopicWithDelimitedValue() {
containsString(", key: ITEM_1, value: home cinema"));
}

@Test
public void testDisableVariableSubstitution() {
// Given:
assertRunCommand(
"set '" + KsqlConfig.KSQL_VARIABLE_SUBSTITUTION_ENABLE + "' = 'false';", is(EMPTY_RESULT));
assertRunCommand("define topicName = '" + DELIMITED_TOPIC + "';", is(EMPTY_RESULT));

// When:
run("PRINT ${topicName} FROM BEGINNING INTERVAL 1 LIMIT 2;", localCli);

// Then:
assertThatEventually(() -> terminal.getOutputString(),
containsString("Failed to Describe Kafka Topic(s): [${topicName}]"));
assertThatEventually(() -> terminal.getOutputString(),
containsString("Caused by: This server does not host this topic-partition."));
}

@Test
public void testVariableSubstitution() {
// Given:
assertRunCommand(
"set '" + KsqlConfig.KSQL_VARIABLE_SUBSTITUTION_ENABLE + "' = 'true';", is(EMPTY_RESULT));
assertRunCommand("define topicName = '" + DELIMITED_TOPIC + "';", is(EMPTY_RESULT));

// When:
run("PRINT ${topicName} FROM BEGINNING INTERVAL 1 LIMIT 2;", localCli);

// Then:
assertThatEventually(() -> terminal.getOutputString(),
containsString("Value format: KAFKA_STRING"));
assertThat(terminal.getOutputString(), containsString("Key format: KAFKA_STRING"));
assertThat(terminal.getOutputString(), containsString(", key: <null>, value: <null>"));
assertThat(terminal.getOutputString(),
containsString(", key: ITEM_1, value: home cinema"));
}

@Test
public void testVariableDefineUndefine() {
assertRunCommand("define var1 = '1';", is(EMPTY_RESULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ public Object unset(final String property) {
return props.remove(property);
}

/**
* Get a property value.
*
* @param property the name of the property
* @return the current value for the property, or {@code null} if it was not set.
*/
public Object get(final String property) {
return props.get(property);
}

/**
* @return an immutable Map of the currently set properties.
*/
Expand Down
13 changes: 13 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,12 @@ public class KsqlConfig extends AbstractConfig {
private static final String KSQL_PROPERTIES_OVERRIDES_DENYLIST_DOC = "Comma-separated list of "
+ "properties that KSQL users cannot override.";

public static final String KSQL_VARIABLE_SUBSTITUTION_ENABLE
= "ksql.variable.substitution.enable";
public static final boolean KSQL_VARIABLE_SUBSTITUTION_ENABLE_DEFAULT = true;
public static final String KSQL_VARIABLE_SUBSTITUTION_ENABLE_DOC
= "Enable variable substitution on SQL statements.";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -829,6 +835,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_STATUS_RUNNING_THRESHOLD_SECS_DOC
)
.define(
KSQL_VARIABLE_SUBSTITUTION_ENABLE,
Type.BOOLEAN,
KSQL_VARIABLE_SUBSTITUTION_ENABLE_DEFAULT,
Importance.LOW,
KSQL_VARIABLE_SUBSTITUTION_ENABLE_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ public void shouldReturnOnlyKnownKeys() {
assertThat(propsWithMockParser.toMap().keySet(), containsInAnyOrder("prop-1", "prop-2"));
}

@Test
public void shouldGetCurrentValue() {
assertThat(propsWithMockParser.get("prop-1"), is("parsed-initial-val-1"));
}

@Test
public void shouldSetNewValue() {
// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.TransientQueryMetadata;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

Expand Down Expand Up @@ -100,10 +103,18 @@ public interface KsqlExecutionContext {
* <p>This provides some level of validation as well, e.g. ensuring sources and topics exist
* in the metastore, etc.
*
* <p>If variables are used in the statement, they will be replaced with the values found in
* {@code variablesMap}.
*
* @param stmt the parsed statement.
* @param variablesMap a list of values for SQL variable substitution
* @return the prepared statement.
*/
PreparedStatement<?> prepare(ParsedStatement stmt);
PreparedStatement<?> prepare(ParsedStatement stmt, Map<String, String> variablesMap);

default PreparedStatement<?> prepare(ParsedStatement stmt) {
return prepare(stmt, Collections.emptyMap());
}

/**
* Executes a query using the supplied service context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.VariableSubstitutor;
import io.confluent.ksql.parser.tree.ExecutableDdlStatement;
import io.confluent.ksql.query.QueryExecutor;
import io.confluent.ksql.query.QueryId;
Expand Down Expand Up @@ -158,9 +159,19 @@ List<QueryMetadata> getAllLiveQueries() {
return ImmutableList.copyOf(allLiveQueries);
}

PreparedStatement<?> prepare(final ParsedStatement stmt) {
private ParsedStatement substituteVariables(
final ParsedStatement stmt,
final Map<String, String> variablesMap
) {
return (!variablesMap.isEmpty())
? parse(VariableSubstitutor.substitute(stmt, variablesMap)).get(0)
: stmt ;
}

PreparedStatement<?> prepare(final ParsedStatement stmt, final Map<String, String> variablesMap) {
try {
final PreparedStatement<?> preparedStatement = parser.prepare(stmt, metaStore);
final PreparedStatement<?> preparedStatement =
parser.prepare(substituteVariables(stmt, variablesMap), metaStore);
return PreparedStatement.of(
preparedStatement.getStatementText(),
AstSanitizer.sanitize(preparedStatement.getStatement(), metaStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.confluent.ksql.util.TransientQueryMetadata;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -177,8 +178,11 @@ public List<ParsedStatement> parse(final String sql) {
}

@Override
public PreparedStatement<?> prepare(final ParsedStatement stmt) {
return primaryContext.prepare(stmt);
public PreparedStatement<?> prepare(
final ParsedStatement stmt,
final Map<String, String> variablesMap
) {
return primaryContext.prepare(stmt, variablesMap);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.confluent.ksql.util.Sandbox;
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
Expand Down Expand Up @@ -91,8 +92,11 @@ public List<ParsedStatement> parse(final String sql) {
}

@Override
public PreparedStatement<?> prepare(final ParsedStatement stmt) {
return engineContext.prepare(stmt);
public PreparedStatement<?> prepare(
final ParsedStatement stmt,
final Map<String, String> variablesMap
) {
return engineContext.prepare(stmt, variablesMap);
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions ksqldb-parser/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>commons-lang3</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>

<!-- Required for running tests -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ whenClause
;

identifier
: IDENTIFIER #unquotedIdentifier
: VARIABLE #variableIdentifier
| IDENTIFIER #unquotedIdentifier
| QUOTED_IDENTIFIER #quotedIdentifierAlternative
| nonReserved #unquotedIdentifier
| BACKQUOTED_IDENTIFIER #backQuotedIdentifier
Expand Down Expand Up @@ -364,6 +365,7 @@ literal
| number #numericLiteral
| booleanValue #booleanLiteral
| STRING #stringLiteral
| VARIABLE #variableLiteral
;

nonReserved
Expand Down

0 comments on commit e185c1f

Please sign in to comment.