Skip to content

Commit

Permalink
feat(migrations): implement info command (#7145)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed Mar 3, 2021
1 parent 7007729 commit 956e799
Show file tree
Hide file tree
Showing 16 changed files with 1,060 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import com.github.rvesse.airline.annotations.restrictions.RequireOnlyOne;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.tools.migrations.Migration;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import io.confluent.ksql.tools.migrations.util.MetadataUtil;
import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState;
import io.confluent.ksql.tools.migrations.util.MigrationFile;
import io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil;
import io.confluent.ksql.tools.migrations.util.MigrationsUtil;
import io.confluent.ksql.util.KsqlException;
Expand Down Expand Up @@ -165,7 +165,7 @@ private boolean apply(
: Integer.parseInt(previous) + 1;

LOGGER.info("Loading migration files");
final List<Migration> migrations;
final List<MigrationFile> migrations;
try {
migrations = loadMigrationsToApply(migrationsDir, minimumVersion);
} catch (MigrationException e) {
Expand All @@ -179,7 +179,7 @@ private boolean apply(
LOGGER.info(migrations.size() + " migration file(s) loaded.");
}

for (Migration migration : migrations) {
for (MigrationFile migration : migrations) {
if (!applyMigration(config, ksqlClient, migration, clock, previous)) {
return false;
}
Expand All @@ -189,20 +189,20 @@ private boolean apply(
return true;
}

private List<Migration> loadMigrationsToApply(
private List<MigrationFile> loadMigrationsToApply(
final String migrationsDir,
final int minimumVersion
) {
if (version > 0) {
final Optional<Migration> migration =
final Optional<MigrationFile> migration =
getMigrationForVersion(String.valueOf(version), migrationsDir);
if (!migration.isPresent()) {
throw new MigrationException("No migration file with version " + version + " exists.");
}
return Collections.singletonList(migration.get());
}

final List<Migration> migrations = getAllMigrations(migrationsDir).stream()
final List<MigrationFile> migrations = getAllMigrations(migrationsDir).stream()
.filter(migration -> {
if (migration.getVersion() < minimumVersion) {
return false;
Expand All @@ -228,14 +228,14 @@ private List<Migration> loadMigrationsToApply(
private boolean applyMigration(
final MigrationConfig config,
final Client ksqlClient,
final Migration migration,
final MigrationFile migration,
final Clock clock,
final String previous
) {
LOGGER.info("Applying migration version {}: {}", migration.getVersion(), migration.getName());
final String migrationFileContent =
MigrationsDirectoryUtil.getFileContentsForName(migration.getFilepath());
LOGGER.info("Migration file contents:\n{}", migrationFileContent);
LOGGER.info("MigrationFile file contents:\n{}", migrationFileContent);

if (dryRun) {
LOGGER.info("Dry run complete. No migrations were actually applied.");
Expand All @@ -251,26 +251,31 @@ private boolean applyMigration(

if (
!updateState(config, ksqlClient, MigrationState.RUNNING,
executionStart, migration, clock, previous)
executionStart, migration, clock, previous, Optional.empty())
) {
return false;
}

try {
final List<String> commands = Arrays.stream(migrationFileContent.split(";"))
.filter(s -> s.length() > 1)
.collect(Collectors.toList());
for (final String command : commands) {
ksqlClient.executeStatement(command + ";").get();
final List<String> commands = Arrays.stream(migrationFileContent.split(";"))
.map(String::trim)
.filter(s -> !s.isEmpty())
.map(s -> s + ";")
.collect(Collectors.toList());
for (final String command : commands) {
try {
ksqlClient.executeStatement(command).get();
} catch (InterruptedException | ExecutionException e) {
final String errorMsg = String.format(
"Failed to execute sql: %s. Error: %s", command, e.getMessage());
LOGGER.error(errorMsg);
updateState(config, ksqlClient, MigrationState.ERROR,
executionStart, migration, clock, previous, Optional.of(errorMsg));
return false;
}
} catch (InterruptedException | ExecutionException e) {
LOGGER.error(e.getMessage());
updateState(config, ksqlClient, MigrationState.ERROR,
executionStart, migration, clock, previous);
return false;
}

updateState(config, ksqlClient, MigrationState.MIGRATED,
executionStart, migration, clock, previous);
executionStart, migration, clock, previous, Optional.empty());
LOGGER.info("Successfully migrated");
return true;
}
Expand Down Expand Up @@ -311,9 +316,10 @@ private boolean updateState(
final Client ksqlClient,
final MigrationState state,
final String executionStart,
final Migration migration,
final MigrationFile migration,
final Clock clock,
final String previous
final String previous,
final Optional<String> errorReason
) {
final String executionEnd = (state == MigrationState.MIGRATED || state == MigrationState.ERROR)
? Long.toString(clock.millis())
Expand All @@ -329,7 +335,8 @@ private boolean updateState(
executionEnd,
migration,
previous,
checksum
checksum,
errorReason
).get();
MetadataUtil.writeRow(
config,
Expand All @@ -340,7 +347,8 @@ private boolean updateState(
executionEnd,
migration,
previous,
checksum
checksum,
errorReason
).get();
return true;
} catch (InterruptedException | ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import com.github.rvesse.airline.annotations.help.Examples;
import com.github.rvesse.airline.annotations.restrictions.Required;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.tools.migrations.Migration;
import io.confluent.ksql.tools.migrations.MigrationException;
import io.confluent.ksql.tools.migrations.util.MigrationFile;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
Expand Down Expand Up @@ -104,7 +104,7 @@ private boolean validateVersionDoesNotAlreadyExist(final String migrationsDir) {
return true;
}

final Optional<Migration> existingMigration;
final Optional<MigrationFile> existingMigration;
try {
existingMigration = getMigrationForVersion(String.valueOf(version), migrationsDir);
} catch (MigrationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ private String createEventStream(final String name, final String topic, final in
+ " checksum STRING,\n"
+ " started_on STRING,\n"
+ " completed_on STRING,\n"
+ " previous STRING\n"
+ " previous STRING,\n"
+ " error_reason STRING\n"
+ ") WITH ( \n"
+ " KAFKA_TOPIC='" + topic + "',\n"
+ " VALUE_FORMAT='JSON',\n"
Expand All @@ -70,7 +71,8 @@ private String createVersionTable(final String name, final String topic) {
+ " latest_by_offset(checksum) AS checksum, \n"
+ " latest_by_offset(started_on) AS started_on, \n"
+ " latest_by_offset(completed_on) AS completed_on, \n"
+ " latest_by_offset(previous) AS previous\n"
+ " latest_by_offset(previous) AS previous, \n"
+ " latest_by_offset(error_reason) AS error_reason \n"
+ " FROM migration_events \n"
+ " GROUP BY version_key;\n";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,28 @@

package io.confluent.ksql.tools.migrations.commands;

import static io.confluent.ksql.tools.migrations.util.MetadataUtil.getOptionalInfoForVersions;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;
import static io.confluent.ksql.tools.migrations.util.ServerVersionUtil.getServerInfo;
import static io.confluent.ksql.tools.migrations.util.ServerVersionUtil.versionSupportsMultiKeyPullQuery;

import com.github.rvesse.airline.annotations.Command;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ServerInfo;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import io.confluent.ksql.tools.migrations.util.MetadataUtil;
import io.confluent.ksql.tools.migrations.util.MigrationFile;
import io.confluent.ksql.tools.migrations.util.MigrationVersionInfo;
import io.confluent.ksql.tools.migrations.util.MigrationVersionInfoFormatter;
import io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil;
import io.confluent.ksql.tools.migrations.util.MigrationsUtil;
import io.confluent.ksql.util.KsqlException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

@Command(
name = "info",
Expand Down Expand Up @@ -80,21 +86,68 @@ int command(
return 1;
}

throw new NotImplementedException();
boolean success;
try {
printCurrentVersion(config, ksqlClient);
printVersionInfoTable(config, ksqlClient, migrationsDir);

success = true;
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
success = false;
} finally {
ksqlClient.close();
}

return success ? 0 : 1;
}

@Override
protected Logger getLogger() {
return LOGGER;
}

private static boolean serverSupportsMultiKeyPullQuery(
private void printCurrentVersion(
final MigrationConfig config,
final Client ksqlClient
) {
final String currentVersion = MetadataUtil.getCurrentVersion(config, ksqlClient);
LOGGER.info("Current migration version: {}", currentVersion);
}

private void printVersionInfoTable(
final MigrationConfig config,
final Client ksqlClient,
final MigrationConfig config
final String migrationsDir
) {
final String ksqlServerUrl = config.getString(MigrationConfig.KSQL_SERVER_URL);
final ServerInfo serverInfo = getServerInfo(ksqlClient, ksqlServerUrl);
return versionSupportsMultiKeyPullQuery(serverInfo.getServerVersion());
final List<MigrationFile> allMigrations =
MigrationsDirectoryUtil.getAllMigrations(migrationsDir);
final List<Integer> allVersions = allMigrations.stream()
.map(MigrationFile::getVersion)
.collect(Collectors.toList());

if (allMigrations.size() != 0) {
final Map<Integer, Optional<MigrationVersionInfo>> versionInfos =
getOptionalInfoForVersions(allVersions, config, ksqlClient);

printAsTable(allMigrations, versionInfos);
} else {
LOGGER.info("No migrations files found");
}
}

private static void printAsTable(
final List<MigrationFile> allMigrations,
final Map<Integer, Optional<MigrationVersionInfo>> versionInfos
) {
final MigrationVersionInfoFormatter formatter = new MigrationVersionInfoFormatter();

for (final MigrationFile migration : allMigrations) {
final MigrationVersionInfo versionInfo = versionInfos.get(migration.getVersion()).orElse(
MigrationVersionInfo.pendingMigration(migration.getVersion(), migration.getName()));
formatter.addVersionInfo(versionInfo);
}

LOGGER.info(formatter.getFormatted());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import io.confluent.ksql.tools.migrations.MigrationException;
import io.confluent.ksql.tools.migrations.util.MetadataUtil;
import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState;
import io.confluent.ksql.tools.migrations.util.MetadataUtil.VersionInfo;
import io.confluent.ksql.tools.migrations.util.MigrationVersionInfo;
import io.confluent.ksql.tools.migrations.util.MigrationsUtil;
import io.confluent.ksql.util.KsqlException;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -127,7 +127,7 @@ static boolean validate(
String version = getLatestMigratedVersion(config, ksqlClient);
String nextVersion = null;
while (!version.equals(MetadataUtil.NONE_VERSION)) {
final VersionInfo versionInfo = getInfoForVersion(version, config, ksqlClient);
final MigrationVersionInfo versionInfo = getInfoForVersion(version, config, ksqlClient);
if (nextVersion != null) {
validateVersionIsMigrated(version, versionInfo, nextVersion);
}
Expand Down
Loading

0 comments on commit 956e799

Please sign in to comment.