Skip to content

Commit

Permalink
feat(migrations): implement validate command (#7087)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Feb 24, 2021
1 parent 68d6cfa commit d4cf400
Show file tree
Hide file tree
Showing 8 changed files with 593 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

package io.confluent.ksql.tools.migrations.commands;

import static io.confluent.ksql.tools.migrations.util.MigrationsUtil.MIGRATIONS_CONFIG_FILE;
import static io.confluent.ksql.tools.migrations.util.MigrationsUtil.MIGRATIONS_DIR;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.MIGRATIONS_CONFIG_FILE;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.MIGRATIONS_DIR;

import com.github.rvesse.airline.annotations.Arguments;
import com.github.rvesse.airline.annotations.Command;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,26 @@

package io.confluent.ksql.tools.migrations.commands;

import static io.confluent.ksql.tools.migrations.util.MetadataUtil.getInfoForVersion;
import static io.confluent.ksql.tools.migrations.util.MetadataUtil.getLatestMigratedVersion;
import static io.confluent.ksql.tools.migrations.util.MetadataUtil.validateVersionIsMigrated;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.computeHashForFile;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePathForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;

import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.help.Discussion;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import io.confluent.ksql.tools.migrations.util.MetadataUtil;
import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState;
import io.confluent.ksql.tools.migrations.util.MetadataUtil.VersionInfo;
import io.confluent.ksql.tools.migrations.util.MigrationsUtil;
import io.confluent.ksql.util.KsqlException;
import java.util.NoSuchElementException;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,12 +55,102 @@ public class ValidateMigrationsCommand extends BaseCommand {

@Override
protected int command() {
throw new UnsupportedOperationException();
if (!validateConfigFilePresent()) {
return 1;
}

final MigrationConfig config;
try {
config = MigrationConfig.load(configFile);
} catch (KsqlException | MigrationException e) {
LOGGER.error(e.getMessage());
return 1;
}

return command(
config,
MigrationsUtil::getKsqlClient,
getMigrationsDirFromConfigFile(configFile)
);
}

@VisibleForTesting
int command(
final MigrationConfig config,
final Function<MigrationConfig, Client> clientSupplier,
final String migrationsDir
) {
final Client ksqlClient;
try {
ksqlClient = clientSupplier.apply(config);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return 1;
}

final boolean success;
try {
success = validate(config, migrationsDir, ksqlClient);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return 1;
}

if (success) {
LOGGER.info("Successfully validated checksums for migrations that have already been applied");
ksqlClient.close();
} else {
ksqlClient.close();
return 1;
}

return 0;
}

@Override
protected Logger getLogger() {
return null;
}

/**
* @return true if validation passes, else false.
*/
static boolean validate(
final MigrationConfig config,
final String migrationsDir,
final Client ksqlClient
) {
String version = getLatestMigratedVersion(config, ksqlClient);
String nextVersion = null;
while (!version.equals(MetadataUtil.NONE_VERSION)) {
final VersionInfo versionInfo = getInfoForVersion(version, config, ksqlClient);
if (nextVersion != null) {
validateVersionIsMigrated(version, versionInfo, nextVersion);
}

final String filename;
try {
filename = getFilePathForVersion(version, migrationsDir).get();
} catch (MigrationException | NoSuchElementException e) {
LOGGER.error("No migrations file found for version with status {}. Version: {}",
MigrationState.MIGRATED, version);
return false;
}

final String hash = computeHashForFile(filename);
final String expectedHash = versionInfo.getExpectedHash();
if (!expectedHash.equals(hash)) {
LOGGER.error("Migrations file found for version {} does not match the checksum saved "
+ "for this version. Expected checksum: {}. Actual checksum: {}. File name: {}",
version, expectedHash, hash, filename);
return false;
}

nextVersion = version;
version = versionInfo.getPrevVersion();
}

return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,21 @@
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

public final class MetadataUtil {

public static final String NONE_VERSION = "<none>";
public static final String CURRENT_VERSION_KEY = "CURRENT";

public enum MigrationState {
PENDING,
RUNNING,
MIGRATED,
ERROR
}

private MetadataUtil() {
}

Expand All @@ -47,4 +55,105 @@ public static String getCurrentVersion(final MigrationConfig config, final Clien
String.format("Could not query %s: %s", migrationTableName, e.getMessage()));
}
}

public static String getLatestMigratedVersion(
final MigrationConfig config,
final Client ksqlClient
) {
final String currentVersion = MetadataUtil.getCurrentVersion(config, ksqlClient);
if (currentVersion.equals(MetadataUtil.NONE_VERSION)) {
return currentVersion;
}

final VersionInfo currentVersionInfo = getInfoForVersion(currentVersion, config, ksqlClient);
if (currentVersionInfo.state == MigrationState.MIGRATED) {
return currentVersion;
}

if (currentVersionInfo.prevVersion.equals(MetadataUtil.NONE_VERSION)) {
return MetadataUtil.NONE_VERSION;
}

final VersionInfo prevVersionInfo = getInfoForVersion(
currentVersionInfo.prevVersion,
config,
ksqlClient
);
validateVersionIsMigrated(currentVersionInfo.prevVersion, prevVersionInfo, currentVersion);

return currentVersionInfo.prevVersion;
}

public static void validateVersionIsMigrated(
final String version,
final VersionInfo versionInfo,
final String nextVersion
) {
if (versionInfo.state != MigrationState.MIGRATED) {
throw new MigrationException(String.format(
"Discovered version with previous version that does not have status {}. "
+ "Version: {}. Previous version: {}. Previous version status: {}",
MigrationState.MIGRATED,
nextVersion,
version,
versionInfo.state
));
}
}

public static VersionInfo getInfoForVersion(
final String version,
final MigrationConfig config,
final Client ksqlClient
) {
final String migrationTableName = config
.getString(MigrationConfig.KSQL_MIGRATIONS_TABLE_NAME);
final BatchedQueryResult result = ksqlClient.executeQuery(
"SELECT checksum, previous, state FROM " + migrationTableName
+ " WHERE version_key = '" + version + "';");

final String expectedHash;
final String prevVersion;
final String state;
try {
final List<Row> resultRows = result.get();
if (resultRows.size() == 0) {
throw new MigrationException(
"Failed to query state for migration with version " + version
+ ": no such migration is present in the migrations metadata table");
}
expectedHash = resultRows.get(0).getString(0);
prevVersion = resultRows.get(0).getString(1);
state = resultRows.get(0).getString(2);
} catch (InterruptedException | ExecutionException e) {
throw new MigrationException(String.format(
"Failed to query state for migration with version %s: %s", version, e.getMessage()));
}

return new VersionInfo(expectedHash, prevVersion, state);
}

public static class VersionInfo {
private final String expectedHash;
private final String prevVersion;
private final MigrationState state;

VersionInfo(final String expectedHash, final String prevVersion, final String state) {
this.expectedHash = Objects.requireNonNull(expectedHash, "expectedHash");
this.prevVersion = Objects.requireNonNull(prevVersion, "prevVersion");
this.state = MigrationState.valueOf(Objects.requireNonNull(state, "state"));
}

public String getExpectedHash() {
return expectedHash;
}

public String getPrevVersion() {
return prevVersion;
}

public MigrationState getState() {
return state;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,63 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;

public final class MigrationsDirectoryUtil {

public static final String MIGRATIONS_DIR = "migrations";
public static final String MIGRATIONS_CONFIG_FILE = "ksql-migrations.properties";

private MigrationsDirectoryUtil() {
}

public static Optional<String> getFileNameForVersion(
public static String getMigrationsDirFromConfigFile(final String configFilePath) {
final Path parentDir = Paths.get(configFilePath).getParent();
if (parentDir == null) {
throw new MigrationException("Could not find parent directory for config file '"
+ configFilePath + "': no parent dir exists.");
}
return parentDir.resolve(MIGRATIONS_DIR).toString();
}

public static Optional<String> getFilePathForVersion(
final String version,
final String migrationsDir
) {
final String prefix = "V" + StringUtils.leftPad(version, 6, "0");

final File directory = new File(migrationsDir);
if (!directory.isDirectory()) {
throw new MigrationException(migrationsDir + " is not a directory.");
}

final String[] names = directory.list();
if (names == null) {
throw new MigrationException("Failed to retrieve files from " + migrationsDir);
}
for (String name : names) {
if (name.startsWith(prefix)) {
return Optional.of(name);
}

final List<String> matches = Arrays.stream(names)
.filter(name -> name.startsWith(prefix))
.collect(Collectors.toList());
if (matches.size() == 1) {
return Optional.of(Paths.get(migrationsDir, matches.get(0)).toString());
} else if (matches.size() == 0) {
return Optional.empty();
} else {
throw new MigrationException("Found multiple migration files for version " + version);
}
return Optional.empty();
}

public static String getFileContentsForVersion(final String version, final String migrationsDir) {
final Optional<String> filename = getFileNameForVersion(version, migrationsDir);
final Optional<String> filename = getFilePathForVersion(version, migrationsDir);
if (!filename.isPresent()) {
throw new MigrationException("Cannot find migration file with version "
+ version + " in " + migrationsDir);
Expand All @@ -63,4 +89,14 @@ public static String getFileContentsForVersion(final String version, final Strin
String.format("Failed to read %s: %s", filepath, e.getMessage()));
}
}

public static String computeHashForFile(final String filename) {
try {
final byte[] bytes = Files.readAllBytes(Paths.get(filename));
return new String(MessageDigest.getInstance("MD5").digest(bytes), StandardCharsets.UTF_8);
} catch (NoSuchAlgorithmException | IOException e) {
throw new MigrationException(String.format(
"Could not compute hash for file '%s': %s", filename, e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ private MigrationsUtil() {

public static final String MIGRATIONS_COMMAND = "ksql-migrations";

public static final String MIGRATIONS_DIR = "migrations";
public static final String MIGRATIONS_CONFIG_FILE = "ksql-migrations.properties";

public static Client getKsqlClient(final MigrationConfig config) throws MigrationException {
final String ksqlServerUrl = config.getString(MigrationConfig.KSQL_SERVER_URL);
return getKsqlClient(ksqlServerUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.confluent.ksql.rest.integration.RestIntegrationTestUtil;
import io.confluent.ksql.rest.server.TestKsqlRestApp;
import io.confluent.ksql.tools.migrations.commands.BaseCommand;
import io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil;
import io.confluent.ksql.tools.migrations.util.MigrationsUtil;
import java.io.File;
import java.nio.file.Files;
Expand Down Expand Up @@ -77,7 +78,7 @@ public static void setUpClass() throws Exception {
final String testDir = Paths.get(TestUtils.tempDirectory().getAbsolutePath(), "migrations_integ_test").toString();
createAndVerifyDirectoryStructure(testDir);

configFilePath = Paths.get(testDir, MigrationsUtil.MIGRATIONS_CONFIG_FILE).toString();
configFilePath = Paths.get(testDir, MigrationsDirectoryUtil.MIGRATIONS_CONFIG_FILE).toString();
initializeAndVerifyMetadataStreamAndTable(configFilePath);
}

Expand All @@ -102,12 +103,12 @@ private static void createAndVerifyDirectoryStructure(final String testDir) thro
assertThat(rootDir.isDirectory(), is(true));

// verify migrations directory
final File migrationsDir = new File(Paths.get(testDir, MigrationsUtil.MIGRATIONS_DIR).toString());
final File migrationsDir = new File(Paths.get(testDir, MigrationsDirectoryUtil.MIGRATIONS_DIR).toString());
assertThat(migrationsDir.exists(), is(true));
assertThat(migrationsDir.isDirectory(), is(true));

// verify config file
final File configFile = new File(Paths.get(testDir, MigrationsUtil.MIGRATIONS_CONFIG_FILE).toString());
final File configFile = new File(Paths.get(testDir, MigrationsDirectoryUtil.MIGRATIONS_CONFIG_FILE).toString());
assertThat(configFile.exists(), is(true));
assertThat(configFile.isDirectory(), is(false));

Expand Down
Loading

0 comments on commit d4cf400

Please sign in to comment.