Skip to content

Commit

Permalink
feat(migrations): add --define flag to apply command (#7401)
Browse files Browse the repository at this point in the history
* feat: enable variable substitution for migrations tool

* address review comments

* checkstyle

* address review comments

* address review comments

* feat(migrations): add --define flag to apply command

* address review comments

* split up unit test

* findbugs

* spotbugs

* checkstyle
  • Loading branch information
Zara Lim committed Apr 21, 2021
1 parent 6d708db commit 165e972
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 24 deletions.
15 changes: 15 additions & 0 deletions docs/operate-and-deploy/migrations-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ ksql-migrations {-c | --config-file} <config-file> apply
[ {-n | --next} ]
[ {-u | --until} <untilVersion> ]
[ {-v | --version} <version> ]
[ {-d | --define} <variableName>=<variableValue> ]
[ --dry-run ]
```

Expand All @@ -239,6 +240,20 @@ to apply:
In addition to selecting a mode for `ksql-migrations apply`, you must also provide
the path to the config file of your migrations project as part of the command.

You can define variables by passing the `--define` flag followed by a string of the form
`name=value` any number of times. For example, the following command

```bash
$ ksql-migrations --config-file /my/migrations/project/ksql-migrations.properties apply --next -d foo=bar -d car=3
```

is equivalent to having the following lines at the begining of each migration file:

```
DEFINE foo='bar';
DEFINE car='3';
```

You can optionally use the `--dry-run` flag to see which migration file(s) the
command will apply before running the actual `ksql-migrations apply` command
to update your ksqlDB cluster. The dry run does not validate whether the ksqlDB
Expand Down
19 changes: 2 additions & 17 deletions ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
import com.github.rvesse.airline.annotations.restrictions.ranges.LongRange;
import com.github.rvesse.airline.help.Help;
import com.github.rvesse.airline.parser.errors.ParseException;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.cli.console.OutputFormat;
import io.confluent.ksql.parser.VariableParser;
import io.confluent.ksql.rest.client.BasicCredentials;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -233,20 +232,6 @@ public Optional<String> getScriptFile() {
}

public Map<String, String> getVariables() {
if (definedVars == null) {
return Collections.emptyMap();
}

final ImmutableMap.Builder<String, String> variables = ImmutableMap.builder();
for (String pair : definedVars) {
final String[] parts = pair.split("=");
if (parts.length != 2) {
throw new IllegalArgumentException("Variables must be defined using '=' (i.e. var=val).");
}

variables.put(parts[0], parts[1]);
}

return variables.build();
return VariableParser.getVariables(definedVars);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser;

import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public final class VariableParser {
private VariableParser() {
}

/**
* Parses a list of Strings of the form, var=val into a map of variables and values.
*/
public static Map<String, String> getVariables(final List<String> definedVars) {
if (definedVars == null) {
return Collections.emptyMap();
}

final ImmutableMap.Builder<String, String> variables = ImmutableMap.builder();
for (String pair : definedVars) {
final String[] parts = pair.split("=");
if (parts.length != 2) {
throw new IllegalArgumentException("Failed to parse argument " + pair
+ ": variables must be defined using '=' (i.e. var=val).");
}

variables.put(parts[0], parts[1]);
}

return variables.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.api.client.FieldInfo;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.parser.VariableParser;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import io.confluent.ksql.tools.migrations.util.CommandParser;
Expand Down Expand Up @@ -122,6 +123,14 @@ public class ApplyMigrationCommand extends BaseCommand {
@Once
private boolean dryRun = false;

@Option(
name = {"--define", "-d"},
description = "Define variables for the session. This is equivalent to including DEFINE "
+ "statements before each migration. The `--define` option should be followed by a "
+ "string of the form `name=value` and may be passed any number of times."
)
private List<String> definedVars = null;

@Override
protected int command() {
if (!validateConfigFilePresent()) {
Expand Down Expand Up @@ -337,7 +346,7 @@ private void executeCommands(
final String previous,
final boolean validateOnly
) {
cleanUpJavaClientVariables(ksqlClient);
setUpJavaClientVariables(ksqlClient);
final Map<String, Object> properties = new HashMap<>();
for (final String command : commands) {
try {
Expand All @@ -360,8 +369,13 @@ private void executeCommands(
}
}

private void cleanUpJavaClientVariables(final Client ksqlClient) {
private void setUpJavaClientVariables(final Client ksqlClient) {
ksqlClient.getVariables().forEach((k, v) -> ksqlClient.undefine(k));
try {
VariableParser.getVariables(definedVars).forEach((k, v) -> ksqlClient.define(k, v));
} catch (IllegalArgumentException e) {
throw new MigrationException(e.getMessage());
}
}

private void executeCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private void shouldApplyMigrations() throws Exception {
1,
"foo FOO fO0",
configFilePath,
"CREATE STREAM FOO (A STRING) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');\n" +
"CREATE STREAM ${streamName} (A STRING) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');\n" +
"-- let's create some connectors!!!\n" +
"CREATE SOURCE CONNECTOR C WITH ('connector.class'='org.apache.kafka.connect.tools.MockSourceConnector');\n" +
"CREATE SINK CONNECTOR D WITH ('connector.class'='org.apache.kafka.connect.tools.MockSinkConnector', 'topics'='d');\n" +
Expand All @@ -235,13 +235,13 @@ private void shouldApplyMigrations() throws Exception {
2,
"bar_bar_BAR",
configFilePath,
"CREATE OR REPLACE STREAM FOO (A STRING, B INT) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');"
+ "ALTER STREAM FOO ADD COLUMN C BIGINT;" +
"CREATE OR REPLACE STREAM ${streamName} (A STRING, B INT) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');"
+ "ALTER STREAM ${streamName} ADD COLUMN C BIGINT;" +
"/* add some '''data''' to FOO */" +
"DEFINE variable = '50';" +
"INSERT INTO FOO VALUES ('HELLO', ${variable}, -4325);" +
"INSERT INTO FOO (A) VALUES ('GOOD''BYE');" +
"INSERT INTO FOO (A) VALUES ('${onlyDefinedInFile1}--ha\nha');" +
"INSERT INTO ${streamName} (A) VALUES ('${onlyDefinedInFile1}--ha\nha');" +
"INSERT INTO FOO (A) VALUES ('');" +
"DEFINE variable = 'cool';" +
"SET 'ksql.output.topic.name.prefix' = '${variable}';" +
Expand All @@ -260,7 +260,7 @@ private void shouldApplyMigrations() throws Exception {
);

// When:
final int applyStatus = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply", "-a").runCommand();
final int applyStatus = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply", "-a", "-d", "streamName=FOO").runCommand();

// Then:
assertThat(applyStatus, is(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,70 @@ public void shouldResetVariablesBetweenMigrations() throws Exception {
inOrder.verifyNoMoreInteractions();
}

@Test
public void shouldApplyArgumentVariablesEveryMigration() throws Exception {
// Given:
command = PARSER.parse("-a", "-d", "name=tame", "-d", "dame=blame");
createMigrationFile(1, NAME, migrationsDir, "INSERT INTO FOO VALUES ('${name}');");
createMigrationFile(2, NAME, migrationsDir, "INSERT INTO FOO VALUES ('${dame}');");
when(versionQueryResult.get()).thenReturn(ImmutableList.of());
when(ksqlClient.getVariables()).thenReturn(
ImmutableMap.of("name", "tame", "dame", "blame")
);
givenAppliedMigration(1, NAME, MigrationState.MIGRATED);

// When:
final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed(
Instant.ofEpochMilli(1000), ZoneId.systemDefault()));

// Then:
assertThat(result, is(0));
final InOrder inOrder = inOrder(ksqlClient);
inOrder.verify(ksqlClient).insertInto("`FOO`", new KsqlObject(ImmutableMap.of("`A`", "tame")));
inOrder.verify(ksqlClient).insertInto("`FOO`", new KsqlObject(ImmutableMap.of("`A`", "blame")));
inOrder.verify(ksqlClient).close();
inOrder.verifyNoMoreInteractions();
}

@Test
public void defineStatementsShouldTakePrecedenceOverArgumentVariables() throws Exception {
// Given:
command = PARSER.parse("-a", "-d", "name=tame");
createMigrationFile(1, NAME, migrationsDir, "DEFINE name='flame'; INSERT INTO FOO VALUES ('${name}');");
when(versionQueryResult.get()).thenReturn(ImmutableList.of());
when(ksqlClient.getVariables()).thenReturn(
ImmutableMap.of("name", "flame")
);
givenAppliedMigration(1, NAME, MigrationState.MIGRATED);

// When:
final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed(
Instant.ofEpochMilli(1000), ZoneId.systemDefault()));

// Then:
assertThat(result, is(0));
final InOrder inOrder = inOrder(ksqlClient);
inOrder.verify(ksqlClient).define("name", "flame");
inOrder.verify(ksqlClient).insertInto("`FOO`", new KsqlObject(ImmutableMap.of("`A`", "flame")));
inOrder.verify(ksqlClient).close();
inOrder.verifyNoMoreInteractions();
}

@Test
public void shouldFailOnInvalidArgumentVariable() throws Exception {
// Given:
command = PARSER.parse("-a", "-d", "woooo");
createMigrationFile(1, NAME, migrationsDir, "INSERT INTO FOO VALUES ('${name}');");
when(versionQueryResult.get()).thenReturn(ImmutableList.of());

// When:
final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed(
Instant.ofEpochMilli(1000), ZoneId.systemDefault()));

// Then:
assertThat(result, is(1));
}

@Test
public void shouldResetPropertiesBetweenMigrations() throws Exception {
// Given:
Expand Down

0 comments on commit 165e972

Please sign in to comment.