Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cdc-backfill-client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ dependencies {
implementation "com.datastax.dse:dse-db:${dse4Version}"
implementation "${pulsarGroup}:pulsar-client:${pulsarVersion}"
testImplementation "org.junit.jupiter:junit-jupiter-api:5.8.1"
testImplementation "org.mockito:mockito-core:3.11.1"
testImplementation "com.datastax.oss:dsbulk-tests:${dsbulkVersion}"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:5.8.1"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.nio.file.Paths;
import java.util.regex.Pattern;

/**
* Define settings for the backfill operation, those will translate to DSBulk or Puslar client settings as appropriate.
*/
public class BackFillSettings {

@CommandLine.Option(
Expand All @@ -37,7 +40,7 @@ public class BackFillSettings {
"The directory where data will be exported to and imported from."
+ "The default is a 'data' subdirectory in the current working directory. "
+ "The data directory will be created if it does not exist. "
+ "Tables will be exported and imported in subdirectories of the data directory specified here; "
+ "Tables will be exported in subdirectories of the data directory specified here; "
+ "there will be one subdirectory per keyspace inside the data directory, "
+ "then one subdirectory per table inside each keyspace directory.",
defaultValue = "data")
Expand All @@ -47,7 +50,7 @@ public class BackFillSettings {
names = {"-k", "--keyspace"},
required = true,
description =
"The name of the keyspace where the table to be exported exist")
"The name of the keyspace where the table to be exported exists")
public String keyspace;

@CommandLine.Option(
Expand All @@ -57,20 +60,9 @@ public class BackFillSettings {
"The name of the table to export data from for cdc back filling")
public String table;


@CommandLine.ArgGroup(exclusive = false, multiplicity = "1")
public ExportSettings exportSettings = new ExportSettings();

@CommandLine.Option(
names = {"-c", "--dsbulk-cmd"},
paramLabel = "CMD",
description =
"The external DSBulk command to use. Ignored if the embedded DSBulk is being used. "
+ "The default is simply 'dsbulk', assuming that the command is available through the "
+ "PATH variable contents.",
defaultValue = "dsbulk")
public String dsbulkCmd = "dsbulk";

@CommandLine.Option(
names = {"-l", "--dsbulk-log-dir"},
paramLabel = "PATH",
Expand All @@ -83,22 +75,11 @@ public class BackFillSettings {
public Path dsbulkLogDir = Paths.get("logs");

@CommandLine.Option(
names = {"-w", "--dsbulk-working-dir"},
names = {"--max-rows-per-second"},
paramLabel = "PATH",
description =
"The directory where DSBulk should be executed. "
+ "Ignored if the embedded DSBulk is being used. "
+ "If unspecified, it defaults to the current working directory.")
public Path dsbulkWorkingDir;

@CommandLine.Option(
names = "--max-concurrent-ops",
paramLabel = "NUM",
description =
"The maximum number of concurrent operations (exports and imports) to carry. Default is 1. "
+ "Set this to higher values to allow exports and imports to occur concurrently; "
+ "e.g. with a value of 2, each table will be imported as soon as it is exported, "
+ "while the next table is being exported.",
defaultValue = "1")
public int maxConcurrentOps = 1;
"The maximum number of rows per second to read from the Cassandra table. "
+ "Setting this option to any negative value or zero will disable it. The default is -1.",
defaultValue ="-1")
public int maxRowsPerSecond = -1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package com.datastax.oss.cdc.backfill;


import com.datastax.oss.cdc.backfill.factory.BackfillFactory;
import com.datastax.oss.cdc.backfill.util.LoggingUtils;
import com.datastax.oss.dsbulk.connectors.api.Connector;
import picocli.CommandLine;
import picocli.CommandLine.ArgGroup;
import picocli.CommandLine.Command;
Expand All @@ -26,8 +28,7 @@
@Command(
name = "BackfillCLI",
description =
"A tool for read historical data from a Cassandra table and sending equivalent mutation records " +
"to the events topic.",
"A tool for back-filling the CDC data topic with historical data from that source Cassandra table.",
versionProvider = VersionProvider.class,
sortOptions = false,
usageHelpWidth = 100)
Expand Down Expand Up @@ -59,7 +60,13 @@ public static void main(String[] args) {
usageHelpWidth = 100)
private int backfill(
@ArgGroup(exclusive = false, multiplicity = "1") BackFillSettings settings) {
CassandraToPulsarMigrator migrator = new CassandraToPulsarMigrator(settings);
// Bootstrap the backfill dependencies
final BackfillFactory factory = new BackfillFactory(settings);
final TableExporter exporter = factory.createTableExporter();
final Connector connector = factory.createCVSConnector(exporter.tableDataDir);
final PulsarImporter importer = factory.createPulsarImporter(connector, exporter.getExportedTable());
final CassandraToPulsarMigrator migrator = new CassandraToPulsarMigrator(exporter, importer);

return migrator.migrate().exitCode();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,165 +16,32 @@

package com.datastax.oss.cdc.backfill;

import com.datastax.oss.cdc.backfill.util.ConnectorUtils;
import com.datastax.oss.cdc.backfill.util.ModelUtils;
import com.datastax.oss.dsbulk.connectors.csv.CSVConnector;
import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/**
* Orchestrates the export to disk and the import to disk activities
*/
public class CassandraToPulsarMigrator {

private static final Logger LOGGER = LoggerFactory.getLogger(CassandraToPulsarMigrator.class);

private static final Object POISON_PILL = new Object();

private final BackFillSettings settings;
private final BlockingQueue<TableExporter> exportQueue;
private final BlockingQueue<Object> importQueue;
private final ExecutorService pool;

private final CopyOnWriteArrayList<TableExportReport> successful =
new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<TableExportReport> failed = new CopyOnWriteArrayList<>();

private final TableExporter exporter;

private final PulsarImporter importer;

public CassandraToPulsarMigrator(BackFillSettings settings) {
this.settings = settings;
exportQueue = new LinkedBlockingQueue<>();
importQueue = new LinkedBlockingQueue<>();
pool = Executors.newFixedThreadPool(settings.maxConcurrentOps);
final ExportedTable table = ModelUtils.buildExportedTable(
settings.exportSettings.clusterInfo, settings.exportSettings.credentials, settings);
// export from C* table to disk
exporter = new TableExporter(table, settings);
CSVConnector connector = new CSVConnector();
Config connectorConfig =
ConnectorUtils.createConfig(
"dsbulk.connector.csv",
"url",
exporter.tableDataDir,
"recursive",
true,
"fileNamePattern",
"\"**/output-*\"");
connector.configure(connectorConfig, true, true);

// import from disk to Pulsar topic
importer = new PulsarImporter(connector, table);
public CassandraToPulsarMigrator(TableExporter exporter, PulsarImporter importer) {
this.importer = importer;
this.exporter = exporter;
}

public ExitStatus migrate() {
try {
migrateTables();
exportQueue.clear();
importQueue.clear();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
pool.shutdownNow();
LOGGER.info(
"Migration finished with {} successfully migrated tables, {} failed tables.",
successful.size(),
failed.size());
for (TableExportReport report : successful) {
LOGGER.info("Table {} migrated successfully.", report.getMigrator().getExportedTable());
}
for (TableExportReport report : failed) {
LOGGER.error(
"Table {} could not be {}: operation {} exited with {}.",
report.getMigrator().getExportedTable(),
report.isExport() ? "exported" : "imported",
report.getOperationId(),
report.getStatus());
}
}
if (failed.isEmpty()) {
return ExitStatus.STATUS_OK;
}
if (successful.isEmpty()) {
return ExitStatus.STATUS_ABORTED_TOO_MANY_ERRORS;
}
return ExitStatus.STATUS_COMPLETED_WITH_ERRORS;
}

private void migrateTables() {
exportQueue.add(exporter);
List<CompletableFuture<?>> futures = new ArrayList<>();
futures.add(CompletableFuture.runAsync(this::exportTables, pool));
futures.add(CompletableFuture.runAsync(this::importTables, pool));
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).join();
}

private void exportTables() {
TableExporter migrator;
while ((migrator = exportQueue.poll()) != null) {
TableExportReport report;
try {
report = migrator.exportTable();
} catch (Exception e) {
LOGGER.error(
"Table "
+ migrator.getExportedTable()
+ ": unexpected error when exporting data, aborting",
e);
report =
new TableExportReport(migrator, ExitStatus.STATUS_ABORTED_FATAL_ERROR, null, true);
}
if (report.getStatus() == ExitStatus.STATUS_OK) {
importQueue.add(migrator);
} else {
failed.add(report);
}
}
importQueue.add(POISON_PILL);
}
ExitStatus status = this.exporter.exportTable();
if (status == ExitStatus.STATUS_OK) {
LOGGER.info("Sending table records from disk to pulsar.");
status = this.importer.importTable();

private void importTables() {
// Output useful logs
while (true) {
TableExporter migrator;
try {
Object o = importQueue.take();
if (o == POISON_PILL) {
break;
}
migrator = (TableExporter) o;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
TableExportReport report;
try {
report = importer.importTable(migrator); // TODO: Decouple status reports between Exporter & Importer
} catch (Exception e) {
LOGGER.error(
"Table "
+ migrator.getExportedTable()
+ ": unexpected error when importing data, aborting",
e);
report =
new TableExportReport(migrator, ExitStatus.STATUS_ABORTED_FATAL_ERROR, null, false);
}
if (report.getStatus() == ExitStatus.STATUS_OK) {
successful.add(report);
} else {
failed.add(report);
}
} else {
LOGGER.error("Failed to export tables. Sending to Pulsar will be skipped.");
}
return status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public ExportedTable(
this.table = table;
this.keyspace = keyspace;
this.columns = columns;
fullyQualifiedName = TableUtils.getFullyQualifiedTableName(table);
this.fullyQualifiedName =
String.format("%s.%s", table.getKeyspace().asCql(true), table.getName().asCql(true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,12 @@ public PulsarImporter(Connector connector, ExportedTable exportedTable) {
this.exportedTable = exportedTable;
}

public TableExportReport importTable(TableExporter migrator) {
public ExitStatus importTable() {
try {
connector.init();
} catch (Exception e) {
throw new RuntimeException("Failed to init connector!", e);
}
String operationId = "OperationID"; // TODO: Retrieve operation ID retrieveImportOperationId();
Map<String, Object> tenantInfo = new HashMap<>();
tenantInfo.put(PULSAR_SERVICE_URL, "pulsar://localhost:6650/");
//tenantInfo.put(PULSAR_AUTH_PLUGIN_CLASS_NAME, "MyAuthPlugin");
Expand All @@ -82,7 +81,7 @@ public TableExportReport importTable(TableExporter migrator) {

LOGGER.info("sent {} records to Pulsar", c);
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])).join();
return new TableExportReport(migrator, ExitStatus.STATUS_OK, operationId, true);
return ExitStatus.STATUS_OK;
}

}

This file was deleted.

Loading