diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java index 81e6ea805b0c..2b0e84e81ebc 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java @@ -24,11 +24,11 @@ import com.github.rvesse.airline.annotations.restrictions.RequireOnlyOne; import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.api.client.Client; -import io.confluent.ksql.tools.migrations.Migration; import io.confluent.ksql.tools.migrations.MigrationConfig; import io.confluent.ksql.tools.migrations.MigrationException; import io.confluent.ksql.tools.migrations.util.MetadataUtil; import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState; +import io.confluent.ksql.tools.migrations.util.MigrationFile; import io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil; import io.confluent.ksql.tools.migrations.util.MigrationsUtil; import io.confluent.ksql.util.KsqlException; @@ -165,7 +165,7 @@ private boolean apply( : Integer.parseInt(previous) + 1; LOGGER.info("Loading migration files"); - final List migrations; + final List migrations; try { migrations = loadMigrationsToApply(migrationsDir, minimumVersion); } catch (MigrationException e) { @@ -179,7 +179,7 @@ private boolean apply( LOGGER.info(migrations.size() + " migration file(s) loaded."); } - for (Migration migration : migrations) { + for (MigrationFile migration : migrations) { if (!applyMigration(config, ksqlClient, migration, clock, previous)) { return false; } @@ -189,12 +189,12 @@ private boolean apply( return true; } - private List loadMigrationsToApply( + private List loadMigrationsToApply( final String migrationsDir, final int minimumVersion ) { if (version > 0) { - final Optional migration = + final Optional migration = getMigrationForVersion(String.valueOf(version), migrationsDir); if (!migration.isPresent()) { throw new MigrationException("No migration file with version " + version + " exists."); @@ -202,7 +202,7 @@ private List loadMigrationsToApply( return Collections.singletonList(migration.get()); } - final List migrations = getAllMigrations(migrationsDir).stream() + final List migrations = getAllMigrations(migrationsDir).stream() .filter(migration -> { if (migration.getVersion() < minimumVersion) { return false; @@ -228,14 +228,14 @@ private List loadMigrationsToApply( private boolean applyMigration( final MigrationConfig config, final Client ksqlClient, - final Migration migration, + final MigrationFile migration, final Clock clock, final String previous ) { LOGGER.info("Applying migration version {}: {}", migration.getVersion(), migration.getName()); final String migrationFileContent = MigrationsDirectoryUtil.getFileContentsForName(migration.getFilepath()); - LOGGER.info("Migration file contents:\n{}", migrationFileContent); + LOGGER.info("MigrationFile file contents:\n{}", migrationFileContent); if (dryRun) { LOGGER.info("Dry run complete. No migrations were actually applied."); @@ -251,26 +251,31 @@ private boolean applyMigration( if ( !updateState(config, ksqlClient, MigrationState.RUNNING, - executionStart, migration, clock, previous) + executionStart, migration, clock, previous, Optional.empty()) ) { return false; } - try { - final List commands = Arrays.stream(migrationFileContent.split(";")) - .filter(s -> s.length() > 1) - .collect(Collectors.toList()); - for (final String command : commands) { - ksqlClient.executeStatement(command + ";").get(); + final List commands = Arrays.stream(migrationFileContent.split(";")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .map(s -> s + ";") + .collect(Collectors.toList()); + for (final String command : commands) { + try { + ksqlClient.executeStatement(command).get(); + } catch (InterruptedException | ExecutionException e) { + final String errorMsg = String.format( + "Failed to execute sql: %s. Error: %s", command, e.getMessage()); + LOGGER.error(errorMsg); + updateState(config, ksqlClient, MigrationState.ERROR, + executionStart, migration, clock, previous, Optional.of(errorMsg)); + return false; } - } catch (InterruptedException | ExecutionException e) { - LOGGER.error(e.getMessage()); - updateState(config, ksqlClient, MigrationState.ERROR, - executionStart, migration, clock, previous); - return false; } + updateState(config, ksqlClient, MigrationState.MIGRATED, - executionStart, migration, clock, previous); + executionStart, migration, clock, previous, Optional.empty()); LOGGER.info("Successfully migrated"); return true; } @@ -311,9 +316,10 @@ private boolean updateState( final Client ksqlClient, final MigrationState state, final String executionStart, - final Migration migration, + final MigrationFile migration, final Clock clock, - final String previous + final String previous, + final Optional errorReason ) { final String executionEnd = (state == MigrationState.MIGRATED || state == MigrationState.ERROR) ? Long.toString(clock.millis()) @@ -329,7 +335,8 @@ private boolean updateState( executionEnd, migration, previous, - checksum + checksum, + errorReason ).get(); MetadataUtil.writeRow( config, @@ -340,7 +347,8 @@ private boolean updateState( executionEnd, migration, previous, - checksum + checksum, + errorReason ).get(); return true; } catch (InterruptedException | ExecutionException e) { diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/CreateMigrationCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/CreateMigrationCommand.java index ba1bd76e2187..7574f40249c2 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/CreateMigrationCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/CreateMigrationCommand.java @@ -26,8 +26,8 @@ import com.github.rvesse.airline.annotations.help.Examples; import com.github.rvesse.airline.annotations.restrictions.Required; import com.google.common.annotations.VisibleForTesting; -import io.confluent.ksql.tools.migrations.Migration; import io.confluent.ksql.tools.migrations.MigrationException; +import io.confluent.ksql.tools.migrations.util.MigrationFile; import java.io.File; import java.io.IOException; import java.nio.file.Paths; @@ -104,7 +104,7 @@ private boolean validateVersionDoesNotAlreadyExist(final String migrationsDir) { return true; } - final Optional existingMigration; + final Optional existingMigration; try { existingMigration = getMigrationForVersion(String.valueOf(version), migrationsDir); } catch (MigrationException e) { diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommand.java index 8ee061db1a83..6ff1c1dc6c33 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommand.java @@ -48,7 +48,8 @@ private String createEventStream(final String name, final String topic, final in + " checksum STRING,\n" + " started_on STRING,\n" + " completed_on STRING,\n" - + " previous STRING\n" + + " previous STRING,\n" + + " error_reason STRING\n" + ") WITH ( \n" + " KAFKA_TOPIC='" + topic + "',\n" + " VALUE_FORMAT='JSON',\n" @@ -70,7 +71,8 @@ private String createVersionTable(final String name, final String topic) { + " latest_by_offset(checksum) AS checksum, \n" + " latest_by_offset(started_on) AS started_on, \n" + " latest_by_offset(completed_on) AS completed_on, \n" - + " latest_by_offset(previous) AS previous\n" + + " latest_by_offset(previous) AS previous, \n" + + " latest_by_offset(error_reason) AS error_reason \n" + " FROM migration_events \n" + " GROUP BY version_key;\n"; } diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/MigrationInfoCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/MigrationInfoCommand.java index 4bc23ba2709f..357e3a4dc46a 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/MigrationInfoCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/MigrationInfoCommand.java @@ -15,22 +15,28 @@ package io.confluent.ksql.tools.migrations.commands; +import static io.confluent.ksql.tools.migrations.util.MetadataUtil.getOptionalInfoForVersions; import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile; -import static io.confluent.ksql.tools.migrations.util.ServerVersionUtil.getServerInfo; -import static io.confluent.ksql.tools.migrations.util.ServerVersionUtil.versionSupportsMultiKeyPullQuery; import com.github.rvesse.airline.annotations.Command; import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.api.client.Client; -import io.confluent.ksql.api.client.ServerInfo; import io.confluent.ksql.tools.migrations.MigrationConfig; import io.confluent.ksql.tools.migrations.MigrationException; +import io.confluent.ksql.tools.migrations.util.MetadataUtil; +import io.confluent.ksql.tools.migrations.util.MigrationFile; +import io.confluent.ksql.tools.migrations.util.MigrationVersionInfo; +import io.confluent.ksql.tools.migrations.util.MigrationVersionInfoFormatter; +import io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil; import io.confluent.ksql.tools.migrations.util.MigrationsUtil; import io.confluent.ksql.util.KsqlException; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.function.Function; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; @Command( name = "info", @@ -80,7 +86,20 @@ int command( return 1; } - throw new NotImplementedException(); + boolean success; + try { + printCurrentVersion(config, ksqlClient); + printVersionInfoTable(config, ksqlClient, migrationsDir); + + success = true; + } catch (MigrationException e) { + LOGGER.error(e.getMessage()); + success = false; + } finally { + ksqlClient.close(); + } + + return success ? 0 : 1; } @Override @@ -88,13 +107,47 @@ protected Logger getLogger() { return LOGGER; } - private static boolean serverSupportsMultiKeyPullQuery( + private void printCurrentVersion( + final MigrationConfig config, + final Client ksqlClient + ) { + final String currentVersion = MetadataUtil.getCurrentVersion(config, ksqlClient); + LOGGER.info("Current migration version: {}", currentVersion); + } + + private void printVersionInfoTable( + final MigrationConfig config, final Client ksqlClient, - final MigrationConfig config + final String migrationsDir ) { - final String ksqlServerUrl = config.getString(MigrationConfig.KSQL_SERVER_URL); - final ServerInfo serverInfo = getServerInfo(ksqlClient, ksqlServerUrl); - return versionSupportsMultiKeyPullQuery(serverInfo.getServerVersion()); + final List allMigrations = + MigrationsDirectoryUtil.getAllMigrations(migrationsDir); + final List allVersions = allMigrations.stream() + .map(MigrationFile::getVersion) + .collect(Collectors.toList()); + + if (allMigrations.size() != 0) { + final Map> versionInfos = + getOptionalInfoForVersions(allVersions, config, ksqlClient); + + printAsTable(allMigrations, versionInfos); + } else { + LOGGER.info("No migrations files found"); + } } + private static void printAsTable( + final List allMigrations, + final Map> versionInfos + ) { + final MigrationVersionInfoFormatter formatter = new MigrationVersionInfoFormatter(); + + for (final MigrationFile migration : allMigrations) { + final MigrationVersionInfo versionInfo = versionInfos.get(migration.getVersion()).orElse( + MigrationVersionInfo.pendingMigration(migration.getVersion(), migration.getName())); + formatter.addVersionInfo(versionInfo); + } + + LOGGER.info(formatter.getFormatted()); + } } diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommand.java index fda0bcc27b9a..beebf644c62f 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommand.java @@ -30,7 +30,7 @@ import io.confluent.ksql.tools.migrations.MigrationException; import io.confluent.ksql.tools.migrations.util.MetadataUtil; import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState; -import io.confluent.ksql.tools.migrations.util.MetadataUtil.VersionInfo; +import io.confluent.ksql.tools.migrations.util.MigrationVersionInfo; import io.confluent.ksql.tools.migrations.util.MigrationsUtil; import io.confluent.ksql.util.KsqlException; import java.util.NoSuchElementException; @@ -127,7 +127,7 @@ static boolean validate( String version = getLatestMigratedVersion(config, ksqlClient); String nextVersion = null; while (!version.equals(MetadataUtil.NONE_VERSION)) { - final VersionInfo versionInfo = getInfoForVersion(version, config, ksqlClient); + final MigrationVersionInfo versionInfo = getInfoForVersion(version, config, ksqlClient); if (nextVersion != null) { validateVersionIsMigrated(version, versionInfo, nextVersion); } diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MetadataUtil.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MetadataUtil.java index d33706dda5e1..272ed2a87087 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MetadataUtil.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MetadataUtil.java @@ -15,27 +15,33 @@ package io.confluent.ksql.tools.migrations.util; +import static io.confluent.ksql.tools.migrations.util.ServerVersionUtil.getServerInfo; +import static io.confluent.ksql.tools.migrations.util.ServerVersionUtil.versionSupportsMultiKeyPullQuery; + import com.google.common.collect.ImmutableList; import io.confluent.ksql.api.client.BatchedQueryResult; import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.KsqlArray; import io.confluent.ksql.api.client.KsqlObject; import io.confluent.ksql.api.client.Row; -import io.confluent.ksql.tools.migrations.Migration; +import io.confluent.ksql.api.client.ServerInfo; import io.confluent.ksql.tools.migrations.MigrationConfig; import io.confluent.ksql.tools.migrations.MigrationException; import java.util.List; -import java.util.Objects; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; public final class MetadataUtil { public static final String NONE_VERSION = ""; public static final String CURRENT_VERSION_KEY = "CURRENT"; + public static final String EMPTY_ERROR_REASON = "N/A"; private static final List KEYS = ImmutableList.of( "VERSION_KEY", "VERSION", "NAME", "STATE", - "CHECKSUM", "STARTED_ON", "COMPLETED_ON", "PREVIOUS" + "CHECKSUM", "STARTED_ON", "COMPLETED_ON", "PREVIOUS", "ERROR_REASON" ); public enum MigrationState { @@ -72,9 +78,10 @@ public static CompletableFuture writeRow( final String state, final String startOn, final String completedOn, - final Migration migration, + final MigrationFile migration, final String previous, - final String checksum + final String checksum, + final Optional errorReason ) { final String migrationStreamName = config.getString(MigrationConfig.KSQL_MIGRATIONS_STREAM_NAME); @@ -86,7 +93,8 @@ public static CompletableFuture writeRow( checksum, startOn, completedOn, - previous + previous, + errorReason.orElse(EMPTY_ERROR_REASON) ); return client.insertInto( migrationStreamName, @@ -103,43 +111,56 @@ public static String getLatestMigratedVersion( return currentVersion; } - final VersionInfo currentVersionInfo = getInfoForVersion(currentVersion, config, ksqlClient); - if (currentVersionInfo.state == MigrationState.MIGRATED) { + final MigrationVersionInfo currentVersionInfo = + getInfoForVersion(currentVersion, config, ksqlClient); + if (currentVersionInfo.getState() == MigrationState.MIGRATED) { return currentVersion; } - if (currentVersionInfo.prevVersion.equals(MetadataUtil.NONE_VERSION)) { + if (currentVersionInfo.getPrevVersion().equals(MetadataUtil.NONE_VERSION)) { return MetadataUtil.NONE_VERSION; } - final VersionInfo prevVersionInfo = getInfoForVersion( - currentVersionInfo.prevVersion, + final MigrationVersionInfo prevVersionInfo = getInfoForVersion( + currentVersionInfo.getPrevVersion(), config, ksqlClient ); - validateVersionIsMigrated(currentVersionInfo.prevVersion, prevVersionInfo, currentVersion); + validateVersionIsMigrated(currentVersionInfo.getPrevVersion(), prevVersionInfo, currentVersion); - return currentVersionInfo.prevVersion; + return currentVersionInfo.getPrevVersion(); } public static void validateVersionIsMigrated( final String version, - final VersionInfo versionInfo, + final MigrationVersionInfo versionInfo, final String nextVersion ) { - if (versionInfo.state != MigrationState.MIGRATED) { + if (versionInfo.getState() != MigrationState.MIGRATED) { throw new MigrationException(String.format( "Discovered version with previous version that does not have status %s. " + "Version: %s. Previous version: %s. Previous version status: %s", MigrationState.MIGRATED, nextVersion, version, - versionInfo.state + versionInfo.getState() )); } } - public static VersionInfo getInfoForVersion( + public static MigrationVersionInfo getInfoForVersion( + final String version, + final MigrationConfig config, + final Client ksqlClient + ) { + final Optional maybeInfo = + getOptionalInfoForVersion(version, config, ksqlClient); + return maybeInfo.orElseThrow(() -> new MigrationException( + "Failed to query state for migration with version " + version + + ": no such migration is present in the migrations metadata table")); + } + + public static Optional getOptionalInfoForVersion( final String version, final MigrationConfig config, final Client ksqlClient @@ -147,51 +168,66 @@ public static VersionInfo getInfoForVersion( final String migrationTableName = config .getString(MigrationConfig.KSQL_MIGRATIONS_TABLE_NAME); final BatchedQueryResult result = ksqlClient.executeQuery( - "SELECT checksum, previous, state FROM " + migrationTableName - + " WHERE version_key = '" + version + "';"); + "SELECT version, checksum, previous, state, name, started_on, completed_on, error_reason " + + "FROM " + migrationTableName + " WHERE version_key = '" + version + "';"); - final String expectedHash; - final String prevVersion; - final String state; + final Row resultRow; try { final List resultRows = result.get(); if (resultRows.size() == 0) { - throw new MigrationException( - "Failed to query state for migration with version " + version - + ": no such migration is present in the migrations metadata table"); + return Optional.empty(); } - expectedHash = resultRows.get(0).getString(1); - prevVersion = resultRows.get(0).getString(2); - state = resultRows.get(0).getString(3); + resultRow = resultRows.get(0); } catch (InterruptedException | ExecutionException e) { throw new MigrationException(String.format( "Failed to query state for migration with version %s: %s", version, e.getMessage())); } - return new VersionInfo(expectedHash, prevVersion, state); + return Optional.of(MigrationVersionInfo.fromResultRow(resultRow)); } - public static class VersionInfo { - private final String expectedHash; - private final String prevVersion; - private final MigrationState state; - - VersionInfo(final String expectedHash, final String prevVersion, final String state) { - this.expectedHash = Objects.requireNonNull(expectedHash, "expectedHash"); - this.prevVersion = Objects.requireNonNull(prevVersion, "prevVersion"); - this.state = MigrationState.valueOf(Objects.requireNonNull(state, "state")); - } - - public String getExpectedHash() { - return expectedHash; - } + public static Map> getOptionalInfoForVersions( + final List versions, + final MigrationConfig config, + final Client ksqlClient + ) { + if (serverSupportsMultiKeyPullQuery(ksqlClient, config)) { + // issue a single, multi-key pull query + final String migrationTableName = config + .getString(MigrationConfig.KSQL_MIGRATIONS_TABLE_NAME); + final BatchedQueryResult result = ksqlClient.executeQuery( + "SELECT version, checksum, previous, state, name, started_on, completed_on, error_reason " + + "FROM " + migrationTableName + " WHERE version_key IN ('" + + versions.stream().map(String::valueOf).collect(Collectors.joining("', '")) + + "');"); + + final Map resultSet; + try { + resultSet = result.get().stream() + .map(MigrationVersionInfo::fromResultRow) + .collect(Collectors.toMap(MigrationVersionInfo::getVersion, vInfo -> vInfo)); + } catch (InterruptedException | ExecutionException e) { + throw new MigrationException(String.format( + "Failed to query state for migration with versions %s: %s", versions, e.getMessage())); + } - public String getPrevVersion() { - return prevVersion; + return versions.stream() + .collect(Collectors.toMap(v -> v, v -> Optional.ofNullable(resultSet.get(v)))); + } else { + // issue multiple, single-key pull queries + return versions.stream() + .collect(Collectors.toMap( + v -> v, + v -> getOptionalInfoForVersion(String.valueOf(v), config, ksqlClient))); } + } - public MigrationState getState() { - return state; - } + private static boolean serverSupportsMultiKeyPullQuery( + final Client ksqlClient, + final MigrationConfig config + ) { + final String ksqlServerUrl = config.getString(MigrationConfig.KSQL_SERVER_URL); + final ServerInfo serverInfo = getServerInfo(ksqlClient, ksqlServerUrl); + return versionSupportsMultiKeyPullQuery(serverInfo.getServerVersion()); } } diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/Migration.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationFile.java similarity index 87% rename from ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/Migration.java rename to ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationFile.java index 04d819fc822e..31b9cf31b9d1 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/Migration.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationFile.java @@ -13,16 +13,17 @@ * specific language governing permissions and limitations under the License. */ -package io.confluent.ksql.tools.migrations; +package io.confluent.ksql.tools.migrations.util; +import io.confluent.ksql.tools.migrations.MigrationException; import java.util.Objects; -public class Migration { +public class MigrationFile { private final int version; private final String name; private final String filepath; - public Migration( + public MigrationFile( final int version, final String name, final String filepath diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfo.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfo.java new file mode 100644 index 000000000000..89aa3345a94e --- /dev/null +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfo.java @@ -0,0 +1,130 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.tools.migrations.util; + +import static io.confluent.ksql.tools.migrations.util.MetadataUtil.EMPTY_ERROR_REASON; + +import com.google.common.annotations.VisibleForTesting; +import io.confluent.ksql.api.client.Row; +import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Objects; + +public final class MigrationVersionInfo { + + private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS z"; + private static final String EMPTY_MIGRATION_TIMESTAMP = "N/A"; + + private final int version; + private final String expectedHash; + private final String prevVersion; + private final MigrationState state; + private final String name; + private final String startedOn; + private final String completedOn; + private final String errorReason; + + public static MigrationVersionInfo fromResultRow(final Row row) { + return new MigrationVersionInfo( + Integer.parseInt(row.getString(1)), + row.getString(2), + row.getString(3), + row.getString(4), + row.getString(5), + row.getString(6), + row.getString(7), + row.getString(8) + ); + } + + public static MigrationVersionInfo pendingMigration(final int version, final String name) { + return new MigrationVersionInfo( + version, + "N/A", + "N/A", + MigrationState.PENDING.toString(), + name, + EMPTY_MIGRATION_TIMESTAMP, + EMPTY_MIGRATION_TIMESTAMP, + EMPTY_ERROR_REASON + ); + } + + @VisibleForTesting + MigrationVersionInfo( + final int version, + final String expectedHash, + final String prevVersion, + final String state, + final String name, + final String startedOn, + final String completedOn, + final String errorReason + ) { + this.version = version; + this.expectedHash = Objects.requireNonNull(expectedHash, "expectedHash"); + this.prevVersion = Objects.requireNonNull(prevVersion, "prevVersion"); + this.state = MigrationState.valueOf(Objects.requireNonNull(state, "state")); + this.name = Objects.requireNonNull(name, "name"); + this.errorReason = Objects.requireNonNull(errorReason, "errorReason"); + + Objects.requireNonNull(startedOn, "startedOn"); + Objects.requireNonNull(completedOn, "completedOn"); + this.startedOn = formatTimestamp(startedOn); + this.completedOn = formatTimestamp(completedOn); + } + + public int getVersion() { + return version; + } + + public String getExpectedHash() { + return expectedHash; + } + + public String getPrevVersion() { + return prevVersion; + } + + public MigrationState getState() { + return state; + } + + public String getName() { + return name; + } + + public String getStartedOn() { + return startedOn; + } + + public String getCompletedOn() { + return completedOn; + } + + public String getErrorReason() { + return errorReason; + } + + private static String formatTimestamp(final String epochTime) { + if (epochTime.equals("") || epochTime.equals(EMPTY_MIGRATION_TIMESTAMP)) { + return epochTime; + } + + return new SimpleDateFormat(DATE_FORMAT).format(new Date(Long.parseLong(epochTime))); + } +} diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfoFormatter.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfoFormatter.java new file mode 100644 index 000000000000..fc1cb27ffea1 --- /dev/null +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfoFormatter.java @@ -0,0 +1,118 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.tools.migrations.util; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; + +public class MigrationVersionInfoFormatter { + + private static final List VERSION_INFO_FIELDS = ImmutableList.of( + new VersionInfoField("Version", vInfo -> String.valueOf(vInfo.getVersion())), + new VersionInfoField("Name", MigrationVersionInfo::getName), + new VersionInfoField("State", vInfo -> vInfo.getState().toString()), + new VersionInfoField("Previous Version", MigrationVersionInfo::getPrevVersion), + new VersionInfoField("Started On", MigrationVersionInfo::getStartedOn), + new VersionInfoField("Completed On", MigrationVersionInfo::getCompletedOn), + new VersionInfoField("Error Reason", MigrationVersionInfo::getErrorReason) + ); + + private final List versionInfos; + + public MigrationVersionInfoFormatter() { + versionInfos = new ArrayList<>(); + } + + public void addVersionInfo(final MigrationVersionInfo versionInfo) { + versionInfos.add(versionInfo); + } + + public String getFormatted() { + final List columnLengths = VERSION_INFO_FIELDS.stream() + .map(field -> { + final int maxLength = Math.max( + field.header.length(), + versionInfos.stream() + .map(field.extractor) + .map(String::length) + .max(Integer::compare) + .orElse(0)); + return maxLength; + }) + .collect(Collectors.toList()); + final String rowFormatString = constructRowFormatString(columnLengths); + + final StringBuilder builder = new StringBuilder(); + + // format header + builder.append(String.format(rowFormatString, + VERSION_INFO_FIELDS.stream() + .map(f -> f.header) + .toArray())); + + // format divider + final int totalColLength = columnLengths.stream() + .reduce(Integer::sum) + .orElseThrow(IllegalStateException::new); + final int dividerLength = totalColLength + 3 * VERSION_INFO_FIELDS.size() - 1; + final String divider = StringUtils.repeat("-", dividerLength); + builder.append(divider); + builder.append("\n"); + + // format version info rows + for (final MigrationVersionInfo result : versionInfos) { + builder.append(String.format(rowFormatString, + VERSION_INFO_FIELDS.stream() + .map(f -> f.extractor.apply(result)) + .toArray())); + } + + // format footer + builder.append(divider); + builder.append("\n"); + + return builder.toString(); + } + + private static String constructRowFormatString(final List lengths) { + final List columnFormatStrings = lengths.stream() + .map(MigrationVersionInfoFormatter::constructSingleColumnFormatString) + .collect(Collectors.toList()); + return String.format(" %s %n", String.join(" | ", columnFormatStrings)); + } + + private static String constructSingleColumnFormatString(final Integer length) { + return String.format("%%%ds", (-1 * length)); + } + + private static class VersionInfoField { + final String header; + final Function extractor; + + VersionInfoField( + final String header, + final Function extractor + ) { + this.header = header; + this.extractor = extractor; + } + } +} + diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsDirectoryUtil.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsDirectoryUtil.java index 399750c22735..8b52ea3c749e 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsDirectoryUtil.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/util/MigrationsDirectoryUtil.java @@ -15,7 +15,6 @@ package io.confluent.ksql.tools.migrations.util; -import io.confluent.ksql.tools.migrations.Migration; import io.confluent.ksql.tools.migrations.MigrationException; import java.io.File; import java.io.IOException; @@ -60,7 +59,7 @@ public static String getFilePrefixForVersion(final String version) { return "V" + StringUtils.leftPad(version, 6, "0"); } - public static Optional getMigrationForVersion( + public static Optional getMigrationForVersion( final String version, final String migrationsDir ) { @@ -76,7 +75,7 @@ public static Optional getMigrationForVersion( throw new MigrationException("Failed to retrieve files from " + migrationsDir); } - final List matches = Arrays.stream(names) + final List matches = Arrays.stream(names) .filter(name -> name.startsWith(prefix)) .map(name -> getMigrationFromFilename(migrationsDir, name)) .filter(Optional::isPresent) @@ -112,14 +111,14 @@ public static String computeHashForFile(final String filename) { */ public static List getAllVersions(final String migrationsDir) { return getAllMigrations(migrationsDir).stream() - .map(Migration::getVersion) + .map(MigrationFile::getVersion) .collect(Collectors.toList()); } /** * @return all migration files in sorted order */ - public static List getAllMigrations(final String migrationsDir) { + public static List getAllMigrations(final String migrationsDir) { final File directory = new File(migrationsDir); if (!directory.isDirectory()) { throw new MigrationException(migrationsDir + " is not a directory."); @@ -140,7 +139,7 @@ public static List getAllMigrations(final String migrationsDir) { .filter(name -> !new File(name).isDirectory()) .collect(Collectors.toList()); - final List migrations = filenames.stream() + final List migrations = filenames.stream() .map(name -> getMigrationFromFilename(migrationsDir, name)) .filter(Optional::isPresent) .map(Optional::get) @@ -151,7 +150,7 @@ public static List getAllMigrations(final String migrationsDir) { return migrations; } - private static Optional getMigrationFromFilename( + private static Optional getMigrationFromFilename( final String migrationsDir, final String filename ) { @@ -165,24 +164,24 @@ private static Optional getMigrationFromFilename( final int version = Integer.parseInt(matcher.group(1)); if (version <= 0) { throw new MigrationException( - "Migration file versions must be positive. Found: " + filename); + "MigrationFile file versions must be positive. Found: " + filename); } final String description = matcher.group(2).replace('_', ' '); - return Optional.of(new Migration( + return Optional.of(new MigrationFile( version, description, Paths.get(migrationsDir, filename).toString() )); } - private static void validateMigrationVersionsUnique(final List migrations) { + private static void validateMigrationVersionsUnique(final List migrations) { if (migrations.size() == 0) { return; } - Migration previous = migrations.get(0); + MigrationFile previous = migrations.get(0); for (int i = 1; i < migrations.size(); i++) { if (migrations.get(i).getVersion() == previous.getVersion()) { throw new MigrationException(String.format( diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java index cf524274afb0..78e390fb9c76 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java @@ -19,9 +19,13 @@ import static io.confluent.ksql.util.KsqlConfig.KSQL_STREAMS_PREFIX; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.verify; import com.github.rvesse.airline.Cli; import io.confluent.common.utils.IntegrationTest; @@ -45,9 +49,13 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import kafka.zookeeper.ZooKeeperClientException; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.test.TestUtils; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; @@ -57,7 +65,13 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.RuleChain; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) @Category({IntegrationTest.class}) public class MigrationsTest { @@ -76,6 +90,11 @@ public class MigrationsTest { private static final Cli MIGRATIONS_CLI = new Cli<>(Migrations.class); + @Mock + private AppenderSkeleton logAppender; + @Captor + private ArgumentCaptor logCaptor; + private static String configFilePath; @BeforeClass @@ -85,6 +104,8 @@ public static void setUpClass() throws Exception { configFilePath = Paths.get(testDir, MigrationsDirectoryUtil.MIGRATIONS_CONFIG_FILE).toString(); initializeAndVerifyMetadataStreamAndTable(configFilePath); + + waitForMetadataTableReady(); } @AfterClass @@ -93,8 +114,13 @@ public static void classTearDown() { } @Test - public void testApply() throws IOException { - // Migration file + public void shouldApplyMigrationsAndDisplayInfo() throws Exception { + shouldApplyMigrations(); + shouldDisplayInfo(); + } + + private void shouldApplyMigrations() throws Exception { + // Given: createMigrationFile( 1, "foo FOO fO0", @@ -108,42 +134,74 @@ public void testApply() throws IOException { "CREATE STREAM BAR (A STRING) WITH (KAFKA_TOPIC='BAR', PARTITIONS=1, VALUE_FORMAT='DELIMITED');" ); - // This is needed to make sure that the table is fully done being created. - // It's a similar situation to https://github.com/confluentinc/ksql/issues/6249 - assertThatEventually( - () -> makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';").size(), - is(1) - ); - final int status = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply", "-a").run(); - assertThat(status, is(0)); + // When: + final int applyStatus = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply", "-a").run(); + + // Then: + assertThat(applyStatus, is(0)); + + verifyMigrationsApplied(); + } + + private void shouldDisplayInfo() { + // Given: + Logger.getRootLogger().addAppender(logAppender); + + try { + // When: + final int infoStatus = MIGRATIONS_CLI.parse("--config-file", configFilePath, "info").run(); + + // Then: + assertThat(infoStatus, is(0)); + + verify(logAppender, atLeastOnce()).doAppend(logCaptor.capture()); + final List logMessages = logCaptor.getAllValues().stream() + .map(LoggingEvent::getRenderedMessage) + .collect(Collectors.toList()); + assertThat(logMessages, hasItem(containsString("Current migration version: 2"))); + assertThat(logMessages, hasItem(matchesRegex( + " Version \\| Name \\| State \\| Previous Version \\| Started On\\s+\\| Completed On\\s+\\| Error Reason \n" + + "-+\n" + + " 1 \\| foo FOO fO0 \\| MIGRATED \\| \\| \\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3} \\S+ \\| \\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3} \\S+ \\| N/A \n" + + " 2 \\| bar bar BAR \\| MIGRATED \\| 1 \\| \\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3} \\S+ \\| \\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3} \\S+ \\| N/A \n" + + "-+\n" + ))); + } finally { + Logger.getRootLogger().removeAppender(logAppender); + } + } + private static void verifyMigrationsApplied() { // verify FOO and BAR were registered describeSource("FOO"); describeSource("BAR"); - // verify current - final List current = makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';"); - assertThatEventually(() -> current.size(), is(2)); - assertThatEventually(() -> current.get(1).getRow().get().getColumns().get(1), is("2")); - assertThatEventually(() -> current.get(1).getRow().get().getColumns().get(2), is("bar bar BAR")); - assertThatEventually(() -> current.get(1).getRow().get().getColumns().get(3), is("MIGRATED")); - assertThatEventually(() -> current.get(1).getRow().get().getColumns().get(7), is("1")); - // verify version 1 - final List version1 = makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='1';"); - assertThatEventually(() -> version1.size(), is(2)); - assertThatEventually(() -> version1.get(1).getRow().get().getColumns().get(1), is("1")); - assertThatEventually(() -> version1.get(1).getRow().get().getColumns().get(2), is("foo FOO fO0")); - assertThatEventually(() -> version1.get(1).getRow().get().getColumns().get(3), is("MIGRATED")); - assertThatEventually(() -> version1.get(1).getRow().get().getColumns().get(7), is("")); + final List version1 = assertThatEventually( + () -> makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='1';"), + hasSize(2)); + assertThat(version1.get(1).getRow().get().getColumns().get(1), is("1")); + assertThat(version1.get(1).getRow().get().getColumns().get(2), is("foo FOO fO0")); + assertThat(version1.get(1).getRow().get().getColumns().get(3), is("MIGRATED")); + assertThat(version1.get(1).getRow().get().getColumns().get(7), is("")); // verify version 2 - final List version2 = makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';"); - assertThatEventually(() -> version2.size(), is(2)); - assertThatEventually(() -> version2.get(1).getRow().get().getColumns().get(1), is("2")); - assertThatEventually(() -> version2.get(1).getRow().get().getColumns().get(2), is("bar bar BAR")); - assertThatEventually(() -> version2.get(1).getRow().get().getColumns().get(3), is("MIGRATED")); - assertThatEventually(() -> version2.get(1).getRow().get().getColumns().get(7), is("1")); + final List version2 = assertThatEventually( + () -> makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';"), + hasSize(2)); + assertThat(version2.get(1).getRow().get().getColumns().get(1), is("2")); + assertThat(version2.get(1).getRow().get().getColumns().get(2), is("bar bar BAR")); + assertThat(version2.get(1).getRow().get().getColumns().get(3), is("MIGRATED")); + assertThat(version2.get(1).getRow().get().getColumns().get(7), is("1")); + + // verify current + final List current = + makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';"); + assertThat(current.size(), is(2)); + assertThat(current.get(1).getRow().get().getColumns().get(1), is("2")); + assertThat(current.get(1).getRow().get().getColumns().get(2), is("bar bar BAR")); + assertThat(current.get(1).getRow().get().getColumns().get(3), is("MIGRATED")); + assertThat(current.get(1).getRow().get().getColumns().get(7), is("1")); } private static void createAndVerifyDirectoryStructure(final String testDir) throws Exception { @@ -179,13 +237,13 @@ private static void initializeAndVerifyMetadataStreamAndTable(final String confi // verify metadata stream final SourceDescription streamDesc = describeSource("migration_events"); - assertThatEventually(() -> streamDesc.getType(), is("STREAM")); - assertThatEventually(() -> streamDesc.getTopic(), is("default_ksql_migration_events")); - assertThatEventually(() -> streamDesc.getKeyFormat(), is("KAFKA")); - assertThatEventually(() -> streamDesc.getValueFormat(), is("JSON")); - assertThatEventually(() -> streamDesc.getPartitions(), is(1)); - assertThatEventually(() -> streamDesc.getReplication(), is(1)); - assertThatEventually(() -> streamDesc.getFields(), containsInAnyOrder( + assertThat(streamDesc.getType(), is("STREAM")); + assertThat(streamDesc.getTopic(), is("default_ksql_migration_events")); + assertThat(streamDesc.getKeyFormat(), is("KAFKA")); + assertThat(streamDesc.getValueFormat(), is("JSON")); + assertThat(streamDesc.getPartitions(), is(1)); + assertThat(streamDesc.getReplication(), is(1)); + assertThat(streamDesc.getFields(), containsInAnyOrder( fieldInfo("VERSION_KEY", "STRING", true), fieldInfo("VERSION", "STRING", false), fieldInfo("NAME", "STRING", false), @@ -193,18 +251,19 @@ private static void initializeAndVerifyMetadataStreamAndTable(final String confi fieldInfo("CHECKSUM", "STRING", false), fieldInfo("STARTED_ON", "STRING", false), fieldInfo("COMPLETED_ON", "STRING", false), - fieldInfo("PREVIOUS", "STRING", false) + fieldInfo("PREVIOUS", "STRING", false), + fieldInfo("ERROR_REASON", "STRING", false) )); // verify metadata table final SourceDescription tableDesc = describeSource("migration_schema_versions"); - assertThatEventually(() -> tableDesc.getType(), is("TABLE")); - assertThatEventually(() -> tableDesc.getTopic(), is("default_ksql_migration_schema_versions")); - assertThatEventually(() -> tableDesc.getKeyFormat(), is("KAFKA")); - assertThatEventually(() -> tableDesc.getValueFormat(), is("JSON")); - assertThatEventually(() -> tableDesc.getPartitions(), is(1)); - assertThatEventually(() -> tableDesc.getReplication(), is(1)); - assertThatEventually(() -> tableDesc.getFields(), containsInAnyOrder( + assertThat(tableDesc.getType(), is("TABLE")); + assertThat(tableDesc.getTopic(), is("default_ksql_migration_schema_versions")); + assertThat(tableDesc.getKeyFormat(), is("KAFKA")); + assertThat(tableDesc.getValueFormat(), is("JSON")); + assertThat(tableDesc.getPartitions(), is(1)); + assertThat(tableDesc.getReplication(), is(1)); + assertThat(tableDesc.getFields(), containsInAnyOrder( fieldInfo("VERSION_KEY", "STRING", true), fieldInfo("VERSION", "STRING", false), fieldInfo("NAME", "STRING", false), @@ -212,15 +271,17 @@ private static void initializeAndVerifyMetadataStreamAndTable(final String confi fieldInfo("CHECKSUM", "STRING", false), fieldInfo("STARTED_ON", "STRING", false), fieldInfo("COMPLETED_ON", "STRING", false), - fieldInfo("PREVIOUS", "STRING", false) + fieldInfo("PREVIOUS", "STRING", false), + fieldInfo("ERROR_REASON", "STRING", false) )); } private static SourceDescription describeSource(final String name) { - final List entities = makeKsqlRequest("DESCRIBE " + name + ";"); + final List entities = assertThatEventually( + () -> makeKsqlRequest("DESCRIBE " + name + ";"), + hasSize(1)); - assertThatEventually(() -> entities, hasSize(1)); - assertThatEventually(() -> entities.get(0), instanceOf(SourceDescriptionEntity.class)); + assertThat(entities.get(0), instanceOf(SourceDescriptionEntity.class)); SourceDescriptionEntity entity = (SourceDescriptionEntity) entities.get(0); return entity.getSourceDescription(); @@ -234,6 +295,22 @@ private static List makeKsqlQuery(final String sql) { return RestIntegrationTestUtil.makeQueryRequest(REST_APP, sql, Optional.empty()); } + private static Matcher matchesRegex(final String regex) { + return new TypeSafeDiagnosingMatcher() { + @Override + protected boolean matchesSafely( + final String actual, + final Description mismatchDescription) { + return actual.matches(regex); + } + + @Override + public void describeTo(final Description description) { + description.appendText("matches regex: " + regex); + } + }; + } + private static Matcher fieldInfo( final String name, final String type, @@ -293,4 +370,13 @@ private static void createMigrationFile( out.println(content); } } + + private static void waitForMetadataTableReady() { + // This is needed to make sure that the table is fully done being created. + // It's a similar situation to https://github.com/confluentinc/ksql/issues/6249 + assertThatEventually( + () -> makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';").size(), + is(1) + ); + } } \ No newline at end of file diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java index 05cefa38c688..9c9c2095ed42 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.github.rvesse.airline.SingleCommand; @@ -46,6 +47,7 @@ import java.time.Instant; import java.time.ZoneId; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.junit.Before; @@ -104,7 +106,9 @@ public void setUp() throws ExecutionException, InterruptedException { when(ksqlClient.executeStatement(any())).thenReturn(statementResultCf); when(ksqlClient.executeQuery("SELECT VERSION FROM " + MIGRATIONS_TABLE + " WHERE version_key = 'CURRENT';")) .thenReturn(versionQueryResult); - when(ksqlClient.executeQuery("SELECT checksum, previous, state FROM " + MIGRATIONS_TABLE + " WHERE version_key = '1';")) + when(ksqlClient.executeQuery( + "SELECT version, checksum, previous, state, name, started_on, completed_on, error_reason FROM " + + MIGRATIONS_TABLE + " WHERE version_key = '1';")) .thenReturn(infoQueryResult); when(ksqlClient.describeSource(MIGRATIONS_STREAM)).thenReturn(sourceDescriptionCf); when(ksqlClient.describeSource(MIGRATIONS_TABLE)).thenReturn(sourceDescriptionCf); @@ -141,8 +145,8 @@ public void shouldApplySecondMigration() throws Exception { command = PARSER.parse("-n"); createMigrationFile(1, NAME, migrationsDir, COMMAND); createMigrationFile(3, NAME, migrationsDir, COMMAND); - when(versionQueryResult.get()).thenReturn(ImmutableList.of(createVersionRow("1"))); - when(infoQueryResult.get()).thenReturn(ImmutableList.of(createInfoRow(1, NAME, MigrationState.MIGRATED))); + givenCurrentMigrationVersion("1"); + givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( @@ -163,7 +167,7 @@ public void shouldApplyMultipleMigrations() throws Exception { createMigrationFile(1, NAME, migrationsDir, COMMAND); createMigrationFile(2, NAME, migrationsDir, COMMAND); when(versionQueryResult.get()).thenReturn(ImmutableList.of()); - when(infoQueryResult.get()).thenReturn(ImmutableList.of(createInfoRow(1, NAME, MigrationState.MIGRATED))); + givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( @@ -187,7 +191,7 @@ public void shouldApplyUntilVersion() throws Exception { // extra migration to ensure only the first two are applied createMigrationFile(3, NAME, migrationsDir, COMMAND); when(versionQueryResult.get()).thenReturn(ImmutableList.of()); - when(infoQueryResult.get()).thenReturn(ImmutableList.of(createInfoRow(1, NAME, MigrationState.MIGRATED))); + givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( @@ -208,8 +212,8 @@ public void shouldApplySpecificMigration() throws Exception { command = PARSER.parse("-v", "3"); createMigrationFile(1, NAME, migrationsDir, COMMAND); createMigrationFile(3, NAME, migrationsDir, COMMAND); - when(versionQueryResult.get()).thenReturn(ImmutableList.of(createVersionRow("1"))); - when(infoQueryResult.get()).thenReturn(ImmutableList.of(createInfoRow(1, NAME, MigrationState.MIGRATED))); + givenCurrentMigrationVersion("1"); + givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( @@ -230,7 +234,7 @@ public void shouldNotApplyMigrationIfPreviousNotFinished() throws Exception { createMigrationFile(1, NAME, migrationsDir, COMMAND); createMigrationFile(2, NAME, migrationsDir, COMMAND); when(versionQueryResult.get()).thenReturn(ImmutableList.of()); - when(infoQueryResult.get()).thenReturn(ImmutableList.of(createInfoRow(1, NAME, MigrationState.RUNNING))); + givenAppliedMigration(1, NAME, MigrationState.RUNNING); // When: final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( @@ -250,7 +254,7 @@ public void shouldLogErrorStateIfMigrationFails() throws Exception { command = PARSER.parse("-n"); createMigrationFile(1, NAME, migrationsDir, COMMAND); when(versionQueryResult.get()).thenReturn(ImmutableList.of()); - when(statementResultCf.get()).thenThrow(new InterruptedException()); + when(statementResultCf.get()).thenThrow(new ExecutionException("sql rejected", new RuntimeException())); // When: final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( @@ -259,7 +263,9 @@ public void shouldLogErrorStateIfMigrationFails() throws Exception { // Then: assertThat(result, is(1)); final InOrder inOrder = inOrder(ksqlClient); - verifyMigratedVersion(inOrder, 1, "", MigrationState.ERROR); + verifyMigratedVersion( + inOrder, 1, "", MigrationState.ERROR, + Optional.of("Failed to execute sql: " + COMMAND + ". Error: sql rejected")); inOrder.verify(ksqlClient).close(); inOrder.verifyNoMoreInteractions(); } @@ -270,8 +276,8 @@ public void shouldSkipApplyIfValidateFails() throws Exception { command = PARSER.parse("-n"); createMigrationFile(1, NAME, migrationsDir, COMMAND); createMigrationFile(1, "anotherone", migrationsDir, COMMAND); - when(versionQueryResult.get()).thenReturn(ImmutableList.of(createVersionRow("1"))); - when(infoQueryResult.get()).thenReturn(ImmutableList.of(createInfoRow(1, NAME, MigrationState.MIGRATED))); + givenCurrentMigrationVersion("1"); + givenAppliedMigration(1, NAME, MigrationState.MIGRATED); // When: final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( @@ -354,11 +360,12 @@ private KsqlObject createKsqlObject( final MigrationState state, final String startOn, final String completedOn, - final String previous + final String previous, + final Optional errorReason ) { final List KEYS = ImmutableList.of( "VERSION_KEY", "VERSION", "NAME", "STATE", - "CHECKSUM", "STARTED_ON", "COMPLETED_ON", "PREVIOUS" + "CHECKSUM", "STARTED_ON", "COMPLETED_ON", "PREVIOUS", "ERROR_REASON" ); final List values = ImmutableList.of( @@ -369,49 +376,76 @@ private KsqlObject createKsqlObject( MigrationsDirectoryUtil.computeHashForFile(getMigrationFilePath(version, name, migrationsDir)), startOn, completedOn, - previous + previous, + errorReason.orElse("N/A") ); return KsqlObject.fromArray(KEYS, new KsqlArray(values)); } - private Row createVersionRow(final String version) { - return new RowImpl( - ImmutableList.of("VERSION"), - RowUtil.columnTypesFromStrings(ImmutableList.of("STRING")), - new JsonArray(ImmutableList.of(version)), - ImmutableMap.of("VERSION", 1) - ); + private void givenCurrentMigrationVersion(final String version) throws Exception { + final Row row = mock(Row.class); + when(row.getString("VERSION")).thenReturn(version); + when(versionQueryResult.get()).thenReturn(ImmutableList.of(row)); } - private Row createInfoRow(final int version, final String name, final MigrationState state) { + private void givenAppliedMigration( + final int version, + final String name, + final MigrationState state + ) throws Exception { final String checksum = MigrationsDirectoryUtil.computeHashForFile(getMigrationFilePath(version, name, migrationsDir)); final String previous = version == 1 ? MetadataUtil.NONE_VERSION : Integer.toString(version - 1); - return new RowImpl( - ImmutableList.of("CHECKSUM", "PREVIOUS", "STATE"), - RowUtil.columnTypesFromStrings(ImmutableList.of("STRING", "STRING", "STRING")), - new JsonArray(ImmutableList.of(checksum, previous, state.toString())), - ImmutableMap.of("CHECKSUM", 1, "PREVIOUS", 2, "STATE", 3) - ); + + final Row row = mock(Row.class); + when(row.getString(1)).thenReturn(String.valueOf(version)); + when(row.getString(2)).thenReturn(checksum); + when(row.getString(3)).thenReturn(previous); + when(row.getString(4)).thenReturn(state.toString()); + when(row.getString(5)).thenReturn("name"); + when(row.getString(6)).thenReturn("N/A"); + when(row.getString(7)).thenReturn("N/A"); + when(row.getString(8)).thenReturn("no_error"); + + when(infoQueryResult.get()).thenReturn(ImmutableList.of(row)); } - private void verifyMigratedVersion(final InOrder inOrder, final int version, final String previous, final MigrationState finalState) { + private void verifyMigratedVersion( + final InOrder inOrder, + final int version, + final String previous, + final MigrationState finalState + ) { + verifyMigratedVersion(inOrder, version, previous, finalState, Optional.empty()); + } + + private void verifyMigratedVersion( + final InOrder inOrder, + final int version, + final String previous, + final MigrationState finalState, + final Optional errorReason + ) { inOrder.verify(ksqlClient).insertInto( MIGRATIONS_STREAM, - createKsqlObject(MetadataUtil.CURRENT_VERSION_KEY, version, NAME, MigrationState.RUNNING, "1000", "", previous) + createKsqlObject(MetadataUtil.CURRENT_VERSION_KEY, version, NAME, MigrationState.RUNNING, + "1000", "", previous, Optional.empty()) ); inOrder.verify(ksqlClient).insertInto( MIGRATIONS_STREAM, - createKsqlObject(Integer.toString(version), version, NAME, MigrationState.RUNNING, "1000", "", previous) + createKsqlObject(Integer.toString(version), version, NAME, MigrationState.RUNNING, + "1000", "", previous, Optional.empty()) ); inOrder.verify(ksqlClient).executeStatement(COMMAND); inOrder.verify(ksqlClient).insertInto( MIGRATIONS_STREAM, - createKsqlObject(MetadataUtil.CURRENT_VERSION_KEY, version, NAME, finalState, "1000", "1000", previous) + createKsqlObject(MetadataUtil.CURRENT_VERSION_KEY, version, NAME, finalState, + "1000", "1000", previous, errorReason) ); inOrder.verify(ksqlClient).insertInto( MIGRATIONS_STREAM, - createKsqlObject(Integer.toString(version), version, NAME, finalState, "1000", "1000", previous) + createKsqlObject(Integer.toString(version), version, NAME, finalState, + "1000", "1000", previous, errorReason) ); } } diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommandTest.java index aaa9a8ddd82b..bd48f4cfba27 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommandTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/InitializeMigrationCommandTest.java @@ -56,7 +56,8 @@ public class InitializeMigrationCommandTest { + " checksum STRING,\n" + " started_on STRING,\n" + " completed_on STRING,\n" - + " previous STRING\n" + + " previous STRING,\n" + + " error_reason STRING\n" + ") WITH ( \n" + " KAFKA_TOPIC='" + MIGRATIONS_STREAM_TOPIC + "',\n" + " VALUE_FORMAT='JSON',\n" @@ -76,7 +77,8 @@ public class InitializeMigrationCommandTest { + " latest_by_offset(checksum) AS checksum, \n" + " latest_by_offset(started_on) AS started_on, \n" + " latest_by_offset(completed_on) AS completed_on, \n" - + " latest_by_offset(previous) AS previous\n" + + " latest_by_offset(previous) AS previous, \n" + + " latest_by_offset(error_reason) AS error_reason \n" + " FROM migration_events \n" + " GROUP BY version_key;\n"; diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/MigrationInfoCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/MigrationInfoCommandTest.java new file mode 100644 index 000000000000..542c88762030 --- /dev/null +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/MigrationInfoCommandTest.java @@ -0,0 +1,335 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.tools.migrations.commands; + +import static io.confluent.ksql.tools.migrations.util.MetadataUtil.CURRENT_VERSION_KEY; +import static io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState.ERROR; +import static io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState.MIGRATED; +import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePrefixForVersion; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.github.rvesse.airline.SingleCommand; +import com.google.common.collect.ImmutableList; +import io.confluent.ksql.api.client.BatchedQueryResult; +import io.confluent.ksql.api.client.Client; +import io.confluent.ksql.api.client.Row; +import io.confluent.ksql.api.client.ServerInfo; +import io.confluent.ksql.api.client.SourceDescription; +import io.confluent.ksql.tools.migrations.MigrationConfig; +import io.confluent.ksql.tools.migrations.util.MetadataUtil; +import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState; +import java.io.File; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class MigrationInfoCommandTest { + + private static final SingleCommand PARSER = + SingleCommand.singleCommand(MigrationInfoCommand.class); + + private static final String MIGRATIONS_STREAM = "migrations_stream"; + private static final String MIGRATIONS_TABLE = "migrations_table"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Mock + private MigrationConfig config; + @Mock + private Client ksqlClient; + @Mock + private CompletableFuture sourceDescriptionCf; + @Mock + private CompletableFuture serverInfoCf; + @Mock + private SourceDescription sourceDescription; + @Mock + private ServerInfo serverInfo; + + @Mock + private AppenderSkeleton logAppender; + @Captor + private ArgumentCaptor logCaptor; + + private String migrationsDir; + private MigrationInfoCommand command; + + @Before + public void setUp() throws Exception { + when(config.getString(MigrationConfig.KSQL_MIGRATIONS_STREAM_NAME)).thenReturn(MIGRATIONS_STREAM); + when(config.getString(MigrationConfig.KSQL_MIGRATIONS_TABLE_NAME)).thenReturn(MIGRATIONS_TABLE); + when(ksqlClient.describeSource(MIGRATIONS_STREAM)).thenReturn(sourceDescriptionCf); + when(ksqlClient.describeSource(MIGRATIONS_TABLE)).thenReturn(sourceDescriptionCf); + when(ksqlClient.serverInfo()).thenReturn(serverInfoCf); + when(sourceDescriptionCf.get()).thenReturn(sourceDescription); + when(serverInfoCf.get()).thenReturn(serverInfo); + when(serverInfo.getServerVersion()).thenReturn("v0.14.0"); + + migrationsDir = folder.getRoot().getPath(); + command = PARSER.parse(); + + Logger.getRootLogger().addAppender(logAppender); + } + + @After + public void tearDown() { + Logger.getRootLogger().removeAppender(logAppender); + } + + @Test + public void shouldPrintInfo() throws Exception { + // Given: + givenMigrations( + ImmutableList.of("1", "3"), + ImmutableList.of(MIGRATED, ERROR), + ImmutableList.of("N/A", "error reason"), + ImmutableList.of("4")); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir); + + // Then: + assertThat(result, is(0)); + + verify(logAppender, atLeastOnce()).doAppend(logCaptor.capture()); + final List logMessages = logCaptor.getAllValues().stream() + .map(LoggingEvent::getRenderedMessage) + .collect(Collectors.toList()); + assertThat(logMessages, hasItem(containsString("Current migration version: 3"))); + assertThat(logMessages, hasItem(containsString( + " Version | Name | State | Previous Version | Started On | Completed On | Error Reason \n" + + "------------------------------------------------------------------------------------------------\n" + + " 1 | some_name_1 | MIGRATED | | N/A | N/A | N/A \n" + + " 3 | some_name_3 | ERROR | 1 | N/A | N/A | error reason \n" + + " 4 | some name 4 | PENDING | N/A | N/A | N/A | N/A \n" + + "------------------------------------------------------------------------------------------------" + ))); + } + + @Test + public void shouldPrintInfoForEmptyMigrationsDir() throws Exception { + // Given: + givenMigrations(ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), ImmutableList.of()); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir); + + // Then: + assertThat(result, is(0)); + + verify(logAppender, atLeastOnce()).doAppend(logCaptor.capture()); + final List logMessages = logCaptor.getAllValues().stream() + .map(LoggingEvent::getRenderedMessage) + .collect(Collectors.toList()); + assertThat(logMessages, hasItem(containsString("Current migration version: "))); + assertThat(logMessages, hasItem(containsString("No migrations files found"))); + } + + @Test + public void shouldFailIfMetadataNotInitialized() throws Exception { + // Given: + givenMigrations(ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), ImmutableList.of()); + + when(sourceDescriptionCf.get()) + .thenThrow(new ExecutionException("Source not found", new RuntimeException())); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir); + + // Then: + assertThat(result, is(1)); + } + + @Test + public void shouldIssueMultiKeyPullQueryIfSupported() throws Exception { + // Given: + givenMigrations( + ImmutableList.of("1", "3"), + ImmutableList.of(MIGRATED, MIGRATED), + ImmutableList.of("N/A", "N/A"), + ImmutableList.of()); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir); + + // Then: + assertThat(result, is(0)); + + verify(ksqlClient).executeQuery( + "SELECT version, checksum, previous, state, name, started_on, completed_on, error_reason " + + "FROM " + MIGRATIONS_TABLE + " WHERE version_key IN ('1', '3');"); + } + + @Test + public void shouldFallBackToSingleKeyPullQueriesIfNeeded() throws Exception { + // Given: + givenMigrations( + ImmutableList.of("1", "3"), + ImmutableList.of(MIGRATED, MIGRATED), + ImmutableList.of("N/A", "N/A"), + ImmutableList.of(), + false); + + when(serverInfo.getServerVersion()).thenReturn("v0.13.0"); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir); + + // Then: + assertThat(result, is(0)); + + verify(ksqlClient).executeQuery( + "SELECT version, checksum, previous, state, name, started_on, completed_on, error_reason " + + "FROM " + MIGRATIONS_TABLE + " WHERE version_key = '1';"); + verify(ksqlClient).executeQuery( + "SELECT version, checksum, previous, state, name, started_on, completed_on, error_reason " + + "FROM " + MIGRATIONS_TABLE + " WHERE version_key = '3';"); + } + + /** + * @param versions versions, in the order they were applied + * @param states corresponding migration states (ordered according to {@code versions}) + * @param errorReasons corresponding error reasons (ordered according to {@code versions}) + * @param unappliedVersions (additional) existing versions, that have not been applied + */ + private void givenMigrations( + final List versions, + final List states, + final List errorReasons, + final List unappliedVersions + ) throws Exception { + givenMigrations(versions, states, errorReasons, unappliedVersions, true); + } + + /** + * @param appliedVersions applied versions, in the order they were applied + * @param states corresponding migration states (ordered according to {@code versions}) + * @param errorReasons corresponding error reasons (ordered according to {@code versions}) + * @param unappliedVersions (additional) existing versions, that have not been applied + * @param multiKeyPullQuerySupported whether the server version supports multi-key pull queries + */ + private void givenMigrations( + final List appliedVersions, + final List states, + final List errorReasons, + final List unappliedVersions, + final boolean multiKeyPullQuerySupported + ) throws Exception { + givenExistingMigrationFiles(appliedVersions); + givenExistingMigrationFiles(unappliedVersions); + givenCurrentMigrationVersion( + appliedVersions.size() > 0 + ? appliedVersions.get(appliedVersions.size() - 1) + : MetadataUtil.NONE_VERSION); + + final List appliedRows = new ArrayList<>(); + for (int i = 0; i < appliedVersions.size(); i++) { + String version = appliedVersions.get(i); + String prevVersion = i > 0 ? appliedVersions.get(i-1) : MetadataUtil.NONE_VERSION; + + Row row = mock(Row.class); + when(row.getString(1)).thenReturn(version); + when(row.getString(2)).thenReturn("checksum"); + when(row.getString(3)).thenReturn(prevVersion); + when(row.getString(4)).thenReturn(states.get(i).toString()); + when(row.getString(5)).thenReturn(fileDescriptionForVersion(version)); + when(row.getString(6)).thenReturn("N/A"); + when(row.getString(7)).thenReturn("N/A"); + when(row.getString(8)).thenReturn(errorReasons.get(i)); + appliedRows.add(row); + } + + if (multiKeyPullQuerySupported) { + BatchedQueryResult queryResult = mock(BatchedQueryResult.class); + when(ksqlClient.executeQuery( + "SELECT version, checksum, previous, state, name, started_on, completed_on, error_reason " + + "FROM " + MIGRATIONS_TABLE + " WHERE version_key IN ('" + + Stream.concat(appliedVersions.stream(), unappliedVersions.stream()).collect(Collectors.joining("', '")) + + "');")) + .thenReturn(queryResult); + when(queryResult.get()).thenReturn(appliedRows); + } else { + for (int i = 0; i < appliedVersions.size(); i++) { + BatchedQueryResult queryResult = mock(BatchedQueryResult.class); + when(ksqlClient.executeQuery( + "SELECT version, checksum, previous, state, name, started_on, completed_on, error_reason FROM " + + MIGRATIONS_TABLE + " WHERE version_key = '" + appliedVersions.get(i) + "';")) + .thenReturn(queryResult); + when(queryResult.get()).thenReturn(ImmutableList.of(appliedRows.get(i))); + } + for (String version : unappliedVersions) { + BatchedQueryResult queryResult = mock(BatchedQueryResult.class); + when(ksqlClient.executeQuery( + "SELECT version, checksum, previous, state, name, started_on, completed_on, error_reason FROM " + + MIGRATIONS_TABLE + " WHERE version_key = '" + version + "';")) + .thenReturn(queryResult); + when(queryResult.get()).thenReturn(ImmutableList.of()); + } + } + } + + private void givenExistingMigrationFiles(final List versions) throws Exception { + for (final String version : versions) { + final String filename = filePathForVersion(version); + assertThat(new File(filename).createNewFile(), is(true)); + } + } + + private void givenCurrentMigrationVersion(final String version) throws Exception { + Row row = mock(Row.class); + BatchedQueryResult queryResult = mock(BatchedQueryResult.class); + when(ksqlClient.executeQuery("SELECT VERSION FROM " + MIGRATIONS_TABLE + + " WHERE version_key = '" + CURRENT_VERSION_KEY + "';")) + .thenReturn(queryResult); + when(queryResult.get()).thenReturn(ImmutableList.of(row)); + when(row.getString("VERSION")).thenReturn(version); + } + + private String fileDescriptionForVersion(final String version) { + return "some_name_" + version; + } + + private String filePathForVersion(final String version) { + final String prefix = getFilePrefixForVersion(version); + return Paths.get(migrationsDir, prefix + "__" + fileDescriptionForVersion(version) + ".sql").toString(); + } +} \ No newline at end of file diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommandTest.java index 4cde89586474..56a189588e99 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommandTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ValidateMigrationsCommandTest.java @@ -268,7 +268,7 @@ private void givenAppliedMigrations( /** * @param versions versions, in the order they were applied * @param checksums corresponding checksums (ordered according to {@code versions}) - * @parma states corresponding migration states (ordered according to {@code versions}) + * @param states corresponding migration states (ordered according to {@code versions}) */ private void givenAppliedMigrations( final List versions, @@ -293,13 +293,19 @@ private void givenAppliedMigrations( row = mock(Row.class); queryResult = mock(BatchedQueryResult.class); - when(ksqlClient.executeQuery("SELECT checksum, previous, state FROM " + MIGRATIONS_TABLE - + " WHERE version_key = '" + version + "';")) + when(ksqlClient.executeQuery( + "SELECT version, checksum, previous, state, name, started_on, completed_on, error_reason FROM " + + MIGRATIONS_TABLE + " WHERE version_key = '" + version + "';")) .thenReturn(queryResult); when(queryResult.get()).thenReturn(ImmutableList.of(row)); - when(row.getString(1)).thenReturn(checksums.get(i)); - when(row.getString(2)).thenReturn(prevVersion); - when(row.getString(3)).thenReturn(states.get(i).toString()); + when(row.getString(1)).thenReturn(version); + when(row.getString(2)).thenReturn(checksums.get(i)); + when(row.getString(3)).thenReturn(prevVersion); + when(row.getString(4)).thenReturn(states.get(i).toString()); + when(row.getString(5)).thenReturn("name"); + when(row.getString(6)).thenReturn("N/A"); + when(row.getString(7)).thenReturn("N/A"); + when(row.getString(8)).thenReturn("no_error"); } } @@ -331,8 +337,8 @@ private void verifyClientCallsForVersions( for (int i = versions.size() - 1; i >= 0; i--) { final int expectedTimes = versions.get(i).equals(latestMigratedVersion) ? 2 : 1; inOrder.verify(ksqlClient, times(expectedTimes)).executeQuery( - "SELECT checksum, previous, state FROM " + MIGRATIONS_TABLE - + " WHERE version_key = '" + versions.get(i) + "';"); + "SELECT version, checksum, previous, state, name, started_on, completed_on, error_reason FROM " + + MIGRATIONS_TABLE + " WHERE version_key = '" + versions.get(i) + "';"); } // close the client diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfoFormatterTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfoFormatterTest.java new file mode 100644 index 000000000000..bb0604775be9 --- /dev/null +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/util/MigrationVersionInfoFormatterTest.java @@ -0,0 +1,54 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.tools.migrations.util; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import org.junit.Before; +import org.junit.Test; + +public class MigrationVersionInfoFormatterTest { + + private MigrationVersionInfoFormatter formatter; + + @Before + public void setUp() { + formatter = new MigrationVersionInfoFormatter(); + } + + @Test + public void shouldFormatVersionInfo() { + // Given: + formatter.addVersionInfo(new MigrationVersionInfo( + 1, "hash", "", "MIGRATED", + "name", "N/A", "N/A", "N/A")); + formatter.addVersionInfo(new MigrationVersionInfo( + 2, "other_hash", "N/A", "PENDING", + "other_name", "N/A", "N/A", "N/A")); + + // When: + final String formatted = formatter.getFormatted(); + + // Then: + assertThat(formatted, is( + " Version | Name | State | Previous Version | Started On | Completed On | Error Reason \n" + + "-----------------------------------------------------------------------------------------------\n" + + " 1 | name | MIGRATED | | N/A | N/A | N/A \n" + + " 2 | other_name | PENDING | N/A | N/A | N/A | N/A \n" + + "-----------------------------------------------------------------------------------------------\n")); + } +} \ No newline at end of file