From d4cf400cb77e7a9dd0d7810e7de53562197f32fa Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 24 Feb 2021 10:05:12 -0800 Subject: [PATCH] feat(migrations): implement `validate` command (#7087) --- .../commands/NewMigrationCommand.java | 4 +- .../commands/ValidateMigrationsCommand.java | 110 +++++- .../tools/migrations/util/MetadataUtil.java | 109 ++++++ .../util/MigrationsDirectoryUtil.java | 50 ++- .../tools/migrations/util/MigrationsUtil.java | 3 - .../ksql/tools/migrations/MigrationsTest.java | 7 +- .../commands/NewMigrationCommandTest.java | 4 +- .../ValidateMigrationsCommandTest.java | 324 ++++++++++++++++++ 8 files changed, 593 insertions(+), 18 deletions(-) create mode 100644 ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommandTest.java diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/NewMigrationCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/NewMigrationCommand.java index 16753efbe12d..0050080e84dc 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/NewMigrationCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/NewMigrationCommand.java @@ -15,8 +15,8 @@ package io.confluent.ksql.tools.migrations.commands; -import static io.confluent.ksql.tools.migrations.util.MigrationsUtil.MIGRATIONS_CONFIG_FILE; -import static io.confluent.ksql.tools.migrations.util.MigrationsUtil.MIGRATIONS_DIR; +import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.MIGRATIONS_CONFIG_FILE; +import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.MIGRATIONS_DIR; import com.github.rvesse.airline.annotations.Arguments; import com.github.rvesse.airline.annotations.Command; diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommand.java index b32e81cec178..696754076436 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommand.java @@ -15,8 +15,26 @@ package io.confluent.ksql.tools.migrations.commands; +import static io.confluent.ksql.tools.migrations.util.MetadataUtil.getInfoForVersion; +import static io.confluent.ksql.tools.migrations.util.MetadataUtil.getLatestMigratedVersion; +import static io.confluent.ksql.tools.migrations.util.MetadataUtil.validateVersionIsMigrated; +import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.computeHashForFile; +import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePathForVersion; +import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile; + import com.github.rvesse.airline.annotations.Command; import com.github.rvesse.airline.annotations.help.Discussion; +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.api.client.Client; +import io.confluent.ksql.tools.migrations.MigrationConfig; +import io.confluent.ksql.tools.migrations.MigrationException; +import io.confluent.ksql.tools.migrations.util.MetadataUtil; +import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState; +import io.confluent.ksql.tools.migrations.util.MetadataUtil.VersionInfo; +import io.confluent.ksql.tools.migrations.util.MigrationsUtil; +import io.confluent.ksql.util.KsqlException; +import java.util.NoSuchElementException; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +55,56 @@ public class ValidateMigrationsCommand extends BaseCommand { @Override protected int command() { - throw new UnsupportedOperationException(); + if (!validateConfigFilePresent()) { + return 1; + } + + final MigrationConfig config; + try { + config = MigrationConfig.load(configFile); + } catch (KsqlException | MigrationException e) { + LOGGER.error(e.getMessage()); + return 1; + } + + return command( + config, + MigrationsUtil::getKsqlClient, + getMigrationsDirFromConfigFile(configFile) + ); + } + + @VisibleForTesting + int command( + final MigrationConfig config, + final Function clientSupplier, + final String migrationsDir + ) { + final Client ksqlClient; + try { + ksqlClient = clientSupplier.apply(config); + } catch (MigrationException e) { + LOGGER.error(e.getMessage()); + return 1; + } + + final boolean success; + try { + success = validate(config, migrationsDir, ksqlClient); + } catch (MigrationException e) { + LOGGER.error(e.getMessage()); + return 1; + } + + if (success) { + LOGGER.info("Successfully validated checksums for migrations that have already been applied"); + ksqlClient.close(); + } else { + ksqlClient.close(); + return 1; + } + + return 0; } @Override @@ -45,4 +112,45 @@ protected Logger getLogger() { return null; } + /** + * @return true if validation passes, else false. + */ + static boolean validate( + final MigrationConfig config, + final String migrationsDir, + final Client ksqlClient + ) { + String version = getLatestMigratedVersion(config, ksqlClient); + String nextVersion = null; + while (!version.equals(MetadataUtil.NONE_VERSION)) { + final VersionInfo versionInfo = getInfoForVersion(version, config, ksqlClient); + if (nextVersion != null) { + validateVersionIsMigrated(version, versionInfo, nextVersion); + } + + final String filename; + try { + filename = getFilePathForVersion(version, migrationsDir).get(); + } catch (MigrationException | NoSuchElementException e) { + LOGGER.error("No migrations file found for version with status {}. Version: {}", + MigrationState.MIGRATED, version); + return false; + } + + final String hash = computeHashForFile(filename); + final String expectedHash = versionInfo.getExpectedHash(); + if (!expectedHash.equals(hash)) { + LOGGER.error("Migrations file found for version {} does not match the checksum saved " + + "for this version. Expected checksum: {}. Actual checksum: {}. File name: {}", + version, expectedHash, hash, filename); + return false; + } + + nextVersion = version; + version = versionInfo.getPrevVersion(); + } + + return true; + } + } diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MetadataUtil.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MetadataUtil.java index 836a5b292163..71c37e5489cd 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MetadataUtil.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MetadataUtil.java @@ -21,6 +21,7 @@ import io.confluent.ksql.tools.migrations.MigrationConfig; import io.confluent.ksql.tools.migrations.MigrationException; import java.util.List; +import java.util.Objects; import java.util.concurrent.ExecutionException; public final class MetadataUtil { @@ -28,6 +29,13 @@ public final class MetadataUtil { public static final String NONE_VERSION = ""; public static final String CURRENT_VERSION_KEY = "CURRENT"; + public enum MigrationState { + PENDING, + RUNNING, + MIGRATED, + ERROR + } + private MetadataUtil() { } @@ -47,4 +55,105 @@ public static String getCurrentVersion(final MigrationConfig config, final Clien String.format("Could not query %s: %s", migrationTableName, e.getMessage())); } } + + public static String getLatestMigratedVersion( + final MigrationConfig config, + final Client ksqlClient + ) { + final String currentVersion = MetadataUtil.getCurrentVersion(config, ksqlClient); + if (currentVersion.equals(MetadataUtil.NONE_VERSION)) { + return currentVersion; + } + + final VersionInfo currentVersionInfo = getInfoForVersion(currentVersion, config, ksqlClient); + if (currentVersionInfo.state == MigrationState.MIGRATED) { + return currentVersion; + } + + if (currentVersionInfo.prevVersion.equals(MetadataUtil.NONE_VERSION)) { + return MetadataUtil.NONE_VERSION; + } + + final VersionInfo prevVersionInfo = getInfoForVersion( + currentVersionInfo.prevVersion, + config, + ksqlClient + ); + validateVersionIsMigrated(currentVersionInfo.prevVersion, prevVersionInfo, currentVersion); + + return currentVersionInfo.prevVersion; + } + + public static void validateVersionIsMigrated( + final String version, + final VersionInfo versionInfo, + final String nextVersion + ) { + if (versionInfo.state != MigrationState.MIGRATED) { + throw new MigrationException(String.format( + "Discovered version with previous version that does not have status {}. " + + "Version: {}. Previous version: {}. Previous version status: {}", + MigrationState.MIGRATED, + nextVersion, + version, + versionInfo.state + )); + } + } + + public static VersionInfo getInfoForVersion( + final String version, + final MigrationConfig config, + final Client ksqlClient + ) { + final String migrationTableName = config + .getString(MigrationConfig.KSQL_MIGRATIONS_TABLE_NAME); + final BatchedQueryResult result = ksqlClient.executeQuery( + "SELECT checksum, previous, state FROM " + migrationTableName + + " WHERE version_key = '" + version + "';"); + + final String expectedHash; + final String prevVersion; + final String state; + try { + final List resultRows = result.get(); + if (resultRows.size() == 0) { + throw new MigrationException( + "Failed to query state for migration with version " + version + + ": no such migration is present in the migrations metadata table"); + } + expectedHash = resultRows.get(0).getString(0); + prevVersion = resultRows.get(0).getString(1); + state = resultRows.get(0).getString(2); + } catch (InterruptedException | ExecutionException e) { + throw new MigrationException(String.format( + "Failed to query state for migration with version %s: %s", version, e.getMessage())); + } + + return new VersionInfo(expectedHash, prevVersion, state); + } + + public static class VersionInfo { + private final String expectedHash; + private final String prevVersion; + private final MigrationState state; + + VersionInfo(final String expectedHash, final String prevVersion, final String state) { + this.expectedHash = Objects.requireNonNull(expectedHash, "expectedHash"); + this.prevVersion = Objects.requireNonNull(prevVersion, "prevVersion"); + this.state = MigrationState.valueOf(Objects.requireNonNull(state, "state")); + } + + public String getExpectedHash() { + return expectedHash; + } + + public String getPrevVersion() { + return prevVersion; + } + + public MigrationState getState() { + return state; + } + } } diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsDirectoryUtil.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsDirectoryUtil.java index fe3cb3ca9c77..952d37a64079 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsDirectoryUtil.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsDirectoryUtil.java @@ -20,37 +20,63 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; public final class MigrationsDirectoryUtil { + + public static final String MIGRATIONS_DIR = "migrations"; + public static final String MIGRATIONS_CONFIG_FILE = "ksql-migrations.properties"; + private MigrationsDirectoryUtil() { } - public static Optional getFileNameForVersion( + public static String getMigrationsDirFromConfigFile(final String configFilePath) { + final Path parentDir = Paths.get(configFilePath).getParent(); + if (parentDir == null) { + throw new MigrationException("Could not find parent directory for config file '" + + configFilePath + "': no parent dir exists."); + } + return parentDir.resolve(MIGRATIONS_DIR).toString(); + } + + public static Optional getFilePathForVersion( final String version, final String migrationsDir ) { final String prefix = "V" + StringUtils.leftPad(version, 6, "0"); + final File directory = new File(migrationsDir); if (!directory.isDirectory()) { throw new MigrationException(migrationsDir + " is not a directory."); } + final String[] names = directory.list(); if (names == null) { throw new MigrationException("Failed to retrieve files from " + migrationsDir); } - for (String name : names) { - if (name.startsWith(prefix)) { - return Optional.of(name); - } + + final List matches = Arrays.stream(names) + .filter(name -> name.startsWith(prefix)) + .collect(Collectors.toList()); + if (matches.size() == 1) { + return Optional.of(Paths.get(migrationsDir, matches.get(0)).toString()); + } else if (matches.size() == 0) { + return Optional.empty(); + } else { + throw new MigrationException("Found multiple migration files for version " + version); } - return Optional.empty(); } public static String getFileContentsForVersion(final String version, final String migrationsDir) { - final Optional filename = getFileNameForVersion(version, migrationsDir); + final Optional filename = getFilePathForVersion(version, migrationsDir); if (!filename.isPresent()) { throw new MigrationException("Cannot find migration file with version " + version + " in " + migrationsDir); @@ -63,4 +89,14 @@ public static String getFileContentsForVersion(final String version, final Strin String.format("Failed to read %s: %s", filepath, e.getMessage())); } } + + public static String computeHashForFile(final String filename) { + try { + final byte[] bytes = Files.readAllBytes(Paths.get(filename)); + return new String(MessageDigest.getInstance("MD5").digest(bytes), StandardCharsets.UTF_8); + } catch (NoSuchAlgorithmException | IOException e) { + throw new MigrationException(String.format( + "Could not compute hash for file '%s': %s", filename, e.getMessage())); + } + } } diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsUtil.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsUtil.java index d6e4dc70965d..13c358726615 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsUtil.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsUtil.java @@ -29,9 +29,6 @@ private MigrationsUtil() { public static final String MIGRATIONS_COMMAND = "ksql-migrations"; - public static final String MIGRATIONS_DIR = "migrations"; - public static final String MIGRATIONS_CONFIG_FILE = "ksql-migrations.properties"; - public static Client getKsqlClient(final MigrationConfig config) throws MigrationException { final String ksqlServerUrl = config.getString(MigrationConfig.KSQL_SERVER_URL); return getKsqlClient(ksqlServerUrl); diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java index 65cb4ecce5f5..cab94c7f5d47 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java @@ -33,6 +33,7 @@ import io.confluent.ksql.rest.integration.RestIntegrationTestUtil; import io.confluent.ksql.rest.server.TestKsqlRestApp; import io.confluent.ksql.tools.migrations.commands.BaseCommand; +import io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil; import io.confluent.ksql.tools.migrations.util.MigrationsUtil; import java.io.File; import java.nio.file.Files; @@ -77,7 +78,7 @@ public static void setUpClass() throws Exception { final String testDir = Paths.get(TestUtils.tempDirectory().getAbsolutePath(), "migrations_integ_test").toString(); createAndVerifyDirectoryStructure(testDir); - configFilePath = Paths.get(testDir, MigrationsUtil.MIGRATIONS_CONFIG_FILE).toString(); + configFilePath = Paths.get(testDir, MigrationsDirectoryUtil.MIGRATIONS_CONFIG_FILE).toString(); initializeAndVerifyMetadataStreamAndTable(configFilePath); } @@ -102,12 +103,12 @@ private static void createAndVerifyDirectoryStructure(final String testDir) thro assertThat(rootDir.isDirectory(), is(true)); // verify migrations directory - final File migrationsDir = new File(Paths.get(testDir, MigrationsUtil.MIGRATIONS_DIR).toString()); + final File migrationsDir = new File(Paths.get(testDir, MigrationsDirectoryUtil.MIGRATIONS_DIR).toString()); assertThat(migrationsDir.exists(), is(true)); assertThat(migrationsDir.isDirectory(), is(true)); // verify config file - final File configFile = new File(Paths.get(testDir, MigrationsUtil.MIGRATIONS_CONFIG_FILE).toString()); + final File configFile = new File(Paths.get(testDir, MigrationsDirectoryUtil.MIGRATIONS_CONFIG_FILE).toString()); assertThat(configFile.exists(), is(true)); assertThat(configFile.isDirectory(), is(false)); diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/NewMigrationCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/NewMigrationCommandTest.java index 0cd220f84fcc..38a871883595 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/NewMigrationCommandTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/NewMigrationCommandTest.java @@ -15,8 +15,8 @@ package io.confluent.ksql.tools.migrations.commands; -import static io.confluent.ksql.tools.migrations.util.MigrationsUtil.MIGRATIONS_CONFIG_FILE; -import static io.confluent.ksql.tools.migrations.util.MigrationsUtil.MIGRATIONS_DIR; +import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.MIGRATIONS_CONFIG_FILE; +import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.MIGRATIONS_DIR; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommandTest.java new file mode 100644 index 000000000000..83bf48a7ec71 --- /dev/null +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommandTest.java @@ -0,0 +1,324 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.tools.migrations.commands; + +import static io.confluent.ksql.tools.migrations.util.MetadataUtil.CURRENT_VERSION_KEY; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +import com.github.rvesse.airline.SingleCommand; +import com.google.common.collect.ImmutableList; +import io.confluent.ksql.api.client.BatchedQueryResult; +import io.confluent.ksql.api.client.Client; +import io.confluent.ksql.api.client.Row; +import io.confluent.ksql.tools.migrations.MigrationConfig; +import io.confluent.ksql.tools.migrations.util.MetadataUtil; +import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState; +import io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; +import java.nio.charset.Charset; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.commons.lang3.StringUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ValidateMigrationsCommandTest { + + private static final SingleCommand PARSER = + SingleCommand.singleCommand(ValidateMigrationsCommand.class); + + private static final String MIGRATIONS_TABLE = "migrations_table"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Mock + private MigrationConfig config; + @Mock + private Client ksqlClient; + + private String migrationsDir; + private ValidateMigrationsCommand command; + + @Before + public void setUp() { + when(config.getString(MigrationConfig.KSQL_MIGRATIONS_TABLE_NAME)).thenReturn(MIGRATIONS_TABLE); + + migrationsDir = folder.getRoot().getPath(); + command = PARSER.parse(); + } + + @Test + public void shouldValidateSingleMigration() throws Exception { + // Given: + final List versions = ImmutableList.of("1"); + final List checksums = givenExistingMigrationFiles(versions); + givenAppliedMigrations(versions, checksums); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir); + + // Then: + assertThat(result, is(0)); + + verifyClientCallsForVersions(versions); + } + + @Test + public void shouldValidateMultipleMigrations() throws Exception { + // Given: + final List versions = ImmutableList.of("1", "2", "3"); + final List checksums = givenExistingMigrationFiles(versions); + givenAppliedMigrations(versions, checksums); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir); + + // Then: + assertThat(result, is(0)); + + verifyClientCallsForVersions(versions); + } + + @Test + public void shouldValidateNoMigrations() throws Exception { + // Given: + final List versions = ImmutableList.of(); + final List checksums = givenExistingMigrationFiles(versions); + givenAppliedMigrations(versions, checksums); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir); + + // Then: + assertThat(result, is(0)); + + verifyClientCallsForVersions(versions); + } + + @Test + public void shouldValidateWithExtraMigrationFiles() throws Exception { + // Given: + final List migratedVersions = ImmutableList.of("1", "2", "4"); + final List migrationFiles = ImmutableList.of("1", "2", "3", "4", "5"); + final List allChecksums = givenExistingMigrationFiles(migrationFiles); + givenAppliedMigrations(migratedVersions, ImmutableList.of(allChecksums.get(0), allChecksums.get(1), allChecksums.get(3))); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir); + + // Then: + assertThat(result, is(0)); + + verifyClientCallsForVersions(migratedVersions); + } + + @Test + public void shouldFailOnMissingMigrationFile() throws Exception { + // Given: + final List migratedVersions = ImmutableList.of("1", "2", "3"); + final List migrationFiles = ImmutableList.of("1", "3"); + final List checksums = givenExistingMigrationFiles(migrationFiles); + givenAppliedMigrations(migratedVersions, ImmutableList.of(checksums.get(0), "missing", checksums.get(1))); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir); + + // Then: + assertThat(result, is(1)); + + // verification stops on failure, so version "1" is never queried + verifyClientCallsForVersions(ImmutableList.of("2", "3")); + } + + @Test + public void shouldFailOnChecksumMismatch() throws Exception { + // Given: + final List versions = ImmutableList.of("1", "2", "3"); + final List checksums = givenExistingMigrationFiles(versions); + givenAppliedMigrations(versions, ImmutableList.of(checksums.get(0), "mismatched_checksum", checksums.get(2))); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir); + + // Then: + assertThat(result, is(1)); + + // verification stops on failure, so version "1" is never queried + verifyClientCallsForVersions(ImmutableList.of("2", "3")); + } + + @Test + public void shouldNotValidateCurrentVersionIfNotMigrated() throws Exception { + // Given: + final List versions = ImmutableList.of("1", "2", "3"); + final List checksums = givenExistingMigrationFiles(versions); + final List states = ImmutableList.of(MigrationState.MIGRATED, MigrationState.MIGRATED, MigrationState.ERROR); + givenAppliedMigrations(versions, checksums, states); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir); + + // Then: + assertThat(result, is(0)); + + // + verifyClientCallsForVersions(versions, "2"); + } + + /** + * @return checksums for the supplied versions + */ + private List givenExistingMigrationFiles(final List versions) throws Exception { + final List checksums = new ArrayList<>(); + + for (final String version : versions) { + final String filename = filePathForVersion(version); + final String fileContents = fileContentsForVersion(version); + + assertThat(new File(filename).createNewFile(), is(true)); + + try (PrintWriter out = new PrintWriter(filename, Charset.defaultCharset().name())) { + out.println(fileContents); + } catch (FileNotFoundException | UnsupportedEncodingException e) { + Assert.fail("Failed to write test file: " + filename); + } + + checksums.add(MigrationsDirectoryUtil.computeHashForFile(filename)); + } + + return checksums; + } + + /** + * @param versions versions, in the order they were applied + * @param checksums corresponding checksums (ordered according to {@code versions}) + */ + private void givenAppliedMigrations( + final List versions, + final List checksums + ) throws Exception { + givenAppliedMigrations( + versions, + checksums, + Collections.nCopies(versions.size(), MigrationState.MIGRATED) + ); + } + + /** + * @param versions versions, in the order they were applied + * @param checksums corresponding checksums (ordered according to {@code versions}) + * @parma states corresponding migration states (ordered according to {@code versions}) + */ + private void givenAppliedMigrations( + final List versions, + final List checksums, + final List states + ) throws Exception { + String version = versions.size() > 0 + ? versions.get(versions.size() - 1) + : MetadataUtil.NONE_VERSION; + + Row row = mock(Row.class); + BatchedQueryResult queryResult = mock(BatchedQueryResult.class); + when(ksqlClient.executeQuery("SELECT VERSION FROM " + MIGRATIONS_TABLE + + " WHERE version_key = '" + CURRENT_VERSION_KEY + "';")) + .thenReturn(queryResult); + when(queryResult.get()).thenReturn(ImmutableList.of(row)); + when(row.getString("VERSION")).thenReturn(version); + + for (int i = versions.size() - 1; i >= 0; i--) { + version = versions.get(i); + String prevVersion = i > 0 ? versions.get(i-1) : MetadataUtil.NONE_VERSION; + + row = mock(Row.class); + queryResult = mock(BatchedQueryResult.class); + when(ksqlClient.executeQuery("SELECT checksum, previous, state FROM " + MIGRATIONS_TABLE + + " WHERE version_key = '" + version + "';")) + .thenReturn(queryResult); + when(queryResult.get()).thenReturn(ImmutableList.of(row)); + when(row.getString(0)).thenReturn(checksums.get(i)); + when(row.getString(1)).thenReturn(prevVersion); + when(row.getString(2)).thenReturn(states.get(i).toString()); + } + } + + /** + * @param versions versions, in the order they were applied + */ + private void verifyClientCallsForVersions(final List versions) { + final String lastVersion = versions.size() > 0 ? versions.get(versions.size() - 1) : "N/A"; + verifyClientCallsForVersions(versions, lastVersion); + } + + /** + * @param versions versions, in the order they were applied + * @param latestMigratedVersion latest migrated version, always either the last version or the + * second-to-last version in {@code versions}. Info for this + * version is fetched twice by the algorithm for `validate`. + */ + private void verifyClientCallsForVersions( + final List versions, + final String latestMigratedVersion + ) { + final InOrder inOrder = inOrder(ksqlClient); + + // call to get latest version + inOrder.verify(ksqlClient).executeQuery("SELECT VERSION FROM " + MIGRATIONS_TABLE + + " WHERE version_key = '" + CURRENT_VERSION_KEY + "';"); + + // calls to get info for migrated versions + for (int i = versions.size() - 1; i >= 0; i--) { + final int expectedTimes = versions.get(i).equals(latestMigratedVersion) ? 2 : 1; + inOrder.verify(ksqlClient, times(expectedTimes)).executeQuery( + "SELECT checksum, previous, state FROM " + MIGRATIONS_TABLE + + " WHERE version_key = '" + versions.get(i) + "';"); + } + + // close the client + inOrder.verify(ksqlClient).close(); + + inOrder.verifyNoMoreInteractions(); + } + + private String filePathForVersion(final String version) { + final String prefix = "V" + StringUtils.leftPad(version, 6, "0"); + return Paths.get(migrationsDir, prefix + "_awesome_migration").toString(); + } + + private static String fileContentsForVersion(final String version) { + return "sql_" + version; + } + +} \ No newline at end of file