From 20d2b3e74259fc68cc39f37b6f1044480baa63eb Mon Sep 17 00:00:00 2001 From: Ayman Khalil <13960949+aymkhalil@users.noreply.github.com> Date: Wed, 15 Mar 2023 17:18:09 -0700 Subject: [PATCH 1/4] Parse extra backfill configs from client.conf and CLI --- .../cdc/backfill/admin/BackfillCommand.java | 193 +++++++++++++++++- .../cdc/backfill/exporter/ExportSettings.java | 8 +- 2 files changed, 196 insertions(+), 5 deletions(-) diff --git a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java index e95fb874..e4554623 100644 --- a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java +++ b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java @@ -17,6 +17,7 @@ package com.datastax.oss.cdc.backfill.admin; import com.datastax.oss.cdc.backfill.BackfillCLI; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; import org.apache.pulsar.admin.cli.extensions.CommandExecutionContext; import org.apache.pulsar.admin.cli.extensions.CustomCommand; import org.apache.pulsar.admin.cli.extensions.ParameterDescriptor; @@ -26,12 +27,17 @@ import java.io.File; import java.net.URL; import java.net.URLClassLoader; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + public class BackfillCommand implements CustomCommand { @Override public String name() { @@ -56,12 +62,32 @@ public List parameters() { parameters.add( ParameterDescriptor.builder() .description( - "The host name or IP and, optionally, the port of a node from the origin cluster. " + + "The host name or IP and, optionally, the port of a node from the Cassandra cluster. " + "If the port is not specified, it will default to 9042.") .type(ParameterType.STRING) .names(Arrays.asList("--export-host")) .required(true) .build()); + parameters.add( + ParameterDescriptor.builder() + .description( + "The path to a secure connect bundle to connect to the Cassandra cluster, " + + "if that cluster is a DataStax Astra cluster. " + + "Options --export-host and --export-bundle are mutually exclusive.") + .type(ParameterType.STRING) + .names(Arrays.asList("--export-bundle")) + .required(false) + .build()); + parameters.add( + ParameterDescriptor.builder() + .description( + "The protocol version to use to connect to the Cassandra cluster, e.g. 'V4'. " + + "If not specified, the driver will negotiate the highest version supported by both " + + "the client and the server.") + .type(ParameterType.STRING) + .names(Arrays.asList("--export-protocol-version")) + .required(false) + .build()); parameters.add( ParameterDescriptor.builder() .description( @@ -78,6 +104,53 @@ public List parameters() { .names(Arrays.asList("--export-password")) .required(true) .build()); + parameters.add( + ParameterDescriptor.builder() + .description( + "The consistency level to use when exporting data. The default is LOCAL_QUORUM.") + .type(ParameterType.STRING) + .names(Arrays.asList("--export-consistency")) + .required(false) + .build()); + parameters.add( + ParameterDescriptor.builder() + .description( + "The maximum number of records to export from the table. Must be a positive number or -1. " + + "The default is -1 (export the entire table).") + .type(ParameterType.INTEGER) + .names(Arrays.asList("--export-max-records")) + .required(false) + .build()); + parameters.add( + ParameterDescriptor.builder() + .description( + "The maximum number of concurrent files to write to. " + + "Must be a positive number or the special value AUTO. The default is AUTO.") + .type(ParameterType.STRING) + .names(Arrays.asList("--export-max-concurrent-files")) + .required(false) + .build()); + parameters.add( + ParameterDescriptor.builder() + .description( + "The maximum number of concurrent queries to execute. " + + "Must be a positive number or the special value AUTO. The default is AUTO.") + .type(ParameterType.STRING) + .names(Arrays.asList("--export-max-concurrent-queries")) + .required(false) + .build()); + parameters.add( + ParameterDescriptor.builder() + .description( + "An extra DSBulk option to use when exporting. " + + "Any valid DSBulk option can be specified here, and it will passed as is to the DSBulk process. " + + "DSBulk options, including driver options, must be passed as '--long.option.name='. " + + "Short options are not supported. ") + .type(ParameterType.STRING) + .names(Arrays.asList("--export-dsbulk-option")) + .required(false) + .build()); + parameters.add( ParameterDescriptor.builder() .description("The name of the keyspace where the table to be exported exists") @@ -92,6 +165,23 @@ public List parameters() { .names(Arrays.asList("--table", "-t")) .required(true) .build()); + parameters.add( + ParameterDescriptor.builder() + .description("The event topic name prefix. The `.` is appended to that prefix to build the topic name. " + + "The default value is `events-`.") + .type(ParameterType.STRING) + .names(Arrays.asList("--events-topic-prefix")) + .required(false) + .build()); + parameters.add( + ParameterDescriptor.builder() + .description( "The maximum number of rows per second to read from the Cassandra table. " + + "Setting this option to any negative value or zero will disable it. The default is -1.") + .type(ParameterType.INTEGER) + .names(Arrays.asList("--max-rows-per-second")) + .required(false) + .build()); + return parameters; } @@ -99,10 +189,111 @@ public List parameters() { public boolean execute(Map parameters, CommandExecutionContext context) { CommandLine commandLine = new CommandLine(new BackfillCLI()); List args = parameters.entrySet().stream() + .filter(e -> e.getValue() != null) .map(e -> e.getKey() + "=" + e.getValue()) .collect(Collectors.toList()); + PulsarClientParams params = parseClientConfRootParams(context.getConfiguration()); + populateClientConfRootParams(args, params); args.add(0, "backfill"); int exitCode = commandLine.execute(args.toArray(new String[0])); return exitCode == 0; } + + private void populateClientConfRootParams(List args, PulsarClientParams params) { + if (isNotBlank(params.serviceURL)) { + args.add("--pulsar-url=" + params.serviceURL); + } + if (isNotBlank(params.authParams)) { + args.add("--pulsar-auth-params=" + params.authParams); + } + if (isNotBlank(params.tlsProvider)) { + args.add("--pulsar-ssl-provider=" + params.tlsProvider); + } + if (isNotBlank(params.tlsTrustStorePath)) { + args.add("--sslTruststorePath=" + params.tlsTrustStorePath); + } + if (isNotBlank(params.tlsTrustStorePassword)) { + args.add("--pulsar-ssl-truststore-password=" + params.tlsTrustStorePassword); + } + if (isNotBlank(params.tlsTrustStoreType)) { + args.add("--pulsar-ssl-truststore-type=" + params.tlsTrustStoreType); + } + if (isNotBlank(params.tlsKeyStorePath)) { + args.add("--pulsar-ssl-keystore-path=" + params.tlsKeyStorePath); + } + if (isNotBlank(params.tlsKeyStorePassword)) { + args.add("--pulsar-ssl-keystore-password=" + params.tlsKeyStorePassword); + } + // TODO: tls cipher suites are available on the pulsar client builder but not pulsar admin, +// if (isNotBlank(params.tlsCipherSuites)) { +// args.add("--pulsar-ssl-cipher-suites=" + params.tlsCipherSuites); +// } + // TODO: enabled protocols are available on the pulsar client builder but not pulsar admin, +// if (isNotBlank(params.tlsEnabledProtocols)) { +// args.add("--pulsar-ssl-enabled-protocols=" + params.tlsEnabledProtocols); +// } + args.add("--pulsar-ssl-allow-insecure-connections=" + params.tlsAllowInsecureConnection); + args.add("--pulsar-ssl-enable-hostname-verification=" + params.tlsEnableHostnameVerification); + if(isNotBlank(params.tlsTrustCertsFilePath)) { + args.add("--pulsar-ssl-tls-trust-certs-path=" + params.tlsTrustCertsFilePath); + } + args.add("--pulsar-ssl-use-key-store-tls=" + params.useKeyStoreTls); + if(isNotBlank(params.authPluginClassName)) { + args.add("--pulsar-auth-plugin-class-name=" + params.authPluginClassName); + } + } + + /** + * Wraps the configs that are used to initialize the Pulsar client. + */ + private static class PulsarClientParams { + String serviceURL; + String authPluginClassName; + String authParams; + String tlsProvider; + boolean useKeyStoreTls; + String tlsTrustStoreType; + String tlsTrustStorePath; + String tlsTrustStorePassword; + String tlsKeyStoreType; + String tlsKeyStorePath; + String tlsKeyStorePassword; + String tlsKeyFilePath; + String tlsCertificateFilePath; + boolean tlsAllowInsecureConnection; + boolean tlsEnableHostnameVerification; + String tlsTrustCertsFilePath; + } + + private static PulsarClientParams parseClientConfRootParams(Properties properties) { + PulsarClientParams params = new PulsarClientParams(); + params.serviceURL = isNotBlank(properties.getProperty("brokerServiceUrl")) + ? properties.getProperty("brokerServiceUrl") : properties.getProperty("webServiceUrl"); + // fallback to previous-version serviceUrl property to maintain backward-compatibility + if (isBlank(params.serviceURL)) { + params.serviceURL = properties.getProperty("serviceUrl"); + } + params.authPluginClassName = properties.getProperty("authPlugin"); + params.authParams = properties.getProperty("authParams"); + params.tlsProvider = properties.getProperty("webserviceTlsProvider"); + + params.useKeyStoreTls = Boolean + .parseBoolean(properties.getProperty("useKeyStoreTls", "false")); + params.tlsTrustStoreType = properties.getProperty("tlsTrustStoreType", "JKS"); + params.tlsTrustStorePath = properties.getProperty("tlsTrustStorePath"); + params.tlsTrustStorePassword = properties.getProperty("tlsTrustStorePassword"); + params.tlsKeyStoreType = properties.getProperty("tlsKeyStoreType", "JKS"); + params.tlsKeyStorePath = properties.getProperty("tlsKeyStorePath"); + params.tlsKeyStorePassword = properties.getProperty("tlsKeyStorePassword"); + params.tlsKeyFilePath = properties.getProperty("tlsKeyFilePath"); + params.tlsCertificateFilePath = properties.getProperty("tlsCertificateFilePath"); + + params.tlsAllowInsecureConnection = Boolean.parseBoolean(properties + .getProperty("tlsAllowInsecureConnection", "false")); + + params.tlsEnableHostnameVerification = Boolean.parseBoolean(properties + .getProperty("tlsEnableHostnameVerification", "false")); + params.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath"); + return params; + } } diff --git a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/exporter/ExportSettings.java b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/exporter/ExportSettings.java index b9aedccd..a4efca6d 100644 --- a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/exporter/ExportSettings.java +++ b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/exporter/ExportSettings.java @@ -37,7 +37,7 @@ public static class ExportClusterInfo implements ClusterInfo { names = "--export-host", paramLabel = "HOST[:PORT]", description = - "The host name or IP and, optionally, the port of a node from the origin cluster. " + "The host name or IP and, optionally, the port of a node from the Cassandra cluster. " + "If the port is not specified, it will default to 9042. " + "This option can be specified multiple times. " + "Options --export-host and --export-bundle are mutually exclusive.", @@ -49,7 +49,7 @@ public static class ExportClusterInfo implements ClusterInfo { names = "--export-bundle", paramLabel = "PATH", description = - "The path to a secure connect bundle to connect to the origin cluster, " + "The path to a secure connect bundle to connect to the Cassandra cluster, " + "if that cluster is a DataStax Astra cluster. " + "Options --export-host and --export-bundle are mutually exclusive.", required = true) @@ -59,7 +59,7 @@ public static class ExportClusterInfo implements ClusterInfo { names = "--export-protocol-version", paramLabel = "VERSION", description = - "The protocol version to use to connect to the origin cluster, e.g. 'V4'. " + "The protocol version to use to connect to the Cassandra cluster, e.g. 'V4'. " + "If not specified, the driver will negotiate the highest version supported by both " + "the client and the server.") public String protocolVersion; @@ -142,7 +142,7 @@ public char[] getPassword() { names = "--export-max-records", paramLabel = "NUM", description = - "The maximum number of records to export for each table. Must be a positive number or -1. " + "The maximum number of records to export from the table. Must be a positive number or -1. " + "The default is -1 (export the entire table).", defaultValue = "-1") public int maxRecords = -1; From f8ff6d20083e7c942f8634823228d43073f2510a Mon Sep 17 00:00:00 2001 From: Ayman Khalil <13960949+aymkhalil@users.noreply.github.com> Date: Wed, 15 Mar 2023 17:23:48 -0700 Subject: [PATCH 2/4] Optimize imports --- .../com/datastax/oss/cdc/backfill/admin/BackfillCommand.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java index e4554623..1ecbe3a4 100644 --- a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java +++ b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java @@ -17,17 +17,12 @@ package com.datastax.oss.cdc.backfill.admin; import com.datastax.oss.cdc.backfill.BackfillCLI; -import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; import org.apache.pulsar.admin.cli.extensions.CommandExecutionContext; import org.apache.pulsar.admin.cli.extensions.CustomCommand; import org.apache.pulsar.admin.cli.extensions.ParameterDescriptor; import org.apache.pulsar.admin.cli.extensions.ParameterType; import picocli.CommandLine; -import java.io.File; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; From c26c26162f5ddc4680021c511603cfbd37632ac3 Mon Sep 17 00:00:00 2001 From: Ayman Khalil <13960949+aymkhalil@users.noreply.github.com> Date: Wed, 15 Mar 2023 17:33:39 -0700 Subject: [PATCH 3/4] Add tlsCiphers & tlsProtocols --- .../cdc/backfill/admin/BackfillCommand.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java index 1ecbe3a4..df2ae633 100644 --- a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java +++ b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java @@ -219,14 +219,12 @@ private void populateClientConfRootParams(List args, PulsarClientParams if (isNotBlank(params.tlsKeyStorePassword)) { args.add("--pulsar-ssl-keystore-password=" + params.tlsKeyStorePassword); } - // TODO: tls cipher suites are available on the pulsar client builder but not pulsar admin, -// if (isNotBlank(params.tlsCipherSuites)) { -// args.add("--pulsar-ssl-cipher-suites=" + params.tlsCipherSuites); -// } - // TODO: enabled protocols are available on the pulsar client builder but not pulsar admin, -// if (isNotBlank(params.tlsEnabledProtocols)) { -// args.add("--pulsar-ssl-enabled-protocols=" + params.tlsEnabledProtocols); -// } + if (isNotBlank(params.tlsCiphers)) { + args.add("--pulsar-ssl-cipher-suites=" + params.tlsCiphers); + } + if (isNotBlank(params.tlsProtocols)) { + args.add("--pulsar-ssl-enabled-protocols=" + params.tlsProtocols); + } args.add("--pulsar-ssl-allow-insecure-connections=" + params.tlsAllowInsecureConnection); args.add("--pulsar-ssl-enable-hostname-verification=" + params.tlsEnableHostnameVerification); if(isNotBlank(params.tlsTrustCertsFilePath)) { @@ -253,6 +251,8 @@ private static class PulsarClientParams { String tlsKeyStoreType; String tlsKeyStorePath; String tlsKeyStorePassword; + String tlsCiphers; + String tlsProtocols; String tlsKeyFilePath; String tlsCertificateFilePath; boolean tlsAllowInsecureConnection; @@ -280,6 +280,8 @@ private static PulsarClientParams parseClientConfRootParams(Properties propertie params.tlsKeyStoreType = properties.getProperty("tlsKeyStoreType", "JKS"); params.tlsKeyStorePath = properties.getProperty("tlsKeyStorePath"); params.tlsKeyStorePassword = properties.getProperty("tlsKeyStorePassword"); + params.tlsCiphers = properties.getProperty("tlsCiphers"); + params.tlsProtocols = properties.getProperty("tlsProtocols"); params.tlsKeyFilePath = properties.getProperty("tlsKeyFilePath"); params.tlsCertificateFilePath = properties.getProperty("tlsCertificateFilePath"); From c4145b5aa3cc83fe6cd3279133565399ce32d8ac Mon Sep 17 00:00:00 2001 From: Ayman Khalil <13960949+aymkhalil@users.noreply.github.com> Date: Thu, 16 Mar 2023 16:28:08 -0700 Subject: [PATCH 4/4] picocli improvements + support for list options on the puslar admin side --- backfill-cli/build.gradle | 2 +- .../oss/cdc/backfill/BackfillCLI.java | 23 +++--- .../oss/cdc/backfill/BackfillSettings.java | 6 -- .../cdc/backfill/admin/BackfillCommand.java | 81 ++++++++++++++----- .../cdc/backfill/e2e/BackfillCLIE2ETests.java | 2 +- 5 files changed, 72 insertions(+), 42 deletions(-) diff --git a/backfill-cli/build.gradle b/backfill-cli/build.gradle index b1fbf990..e3ea8971 100644 --- a/backfill-cli/build.gradle +++ b/backfill-cli/build.gradle @@ -75,7 +75,7 @@ dependencies { implementation "com.datastax.oss:dsbulk-batcher-reactor:${dsbulkVersion}" implementation "com.google.guava:guava:${guavaVersion}" - implementation "info.picocli:picocli:4.6.3" + implementation "info.picocli:picocli:4.7.1" implementation "org.slf4j:slf4j-api:1.7.36" implementation "ch.qos.logback:logback-classic:1.2.11" implementation "org.apache.cassandra:cassandra-all:${cassandra4Version}" diff --git a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/BackfillCLI.java b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/BackfillCLI.java index e3ea2757..e6adea24 100644 --- a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/BackfillCLI.java +++ b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/BackfillCLI.java @@ -26,17 +26,17 @@ import picocli.CommandLine.Command; import picocli.CommandLine.Option; -import java.io.IOException; -import java.net.URISyntaxException; +import java.util.concurrent.Callable; @Command( - name = "BackfillCLI", + name = "backfill", description = "A tool for back-filling the CDC data topic with historical data from that source Cassandra table.", versionProvider = VersionProvider.class, sortOptions = false, + abbreviateSynopsis = true, usageHelpWidth = 100) -public class BackfillCLI { +public class BackfillCLI implements Callable { @Option( names = {"-h", "--help"}, @@ -50,20 +50,17 @@ public class BackfillCLI { description = "Displays version info.") boolean versionInfoRequested; + @ArgGroup(exclusive = false, multiplicity = "1") + BackfillSettings settings; + public static void main(String[] args) { LoggingUtils.configureLogging(LoggingUtils.MIGRATOR_CONFIGURATION_FILE); - CommandLine commandLine = new CommandLine(new BackfillCLI()); - int exitCode = commandLine.execute(args); + int exitCode = new CommandLine(new BackfillCLI()).execute(args); System.exit(exitCode); } - @Command( - name = "backfill", - optionListHeading = "Available options:%n", - abbreviateSynopsis = true, - usageHelpWidth = 100) - private int backfill( - @ArgGroup(exclusive = false, multiplicity = "1") BackfillSettings settings) throws URISyntaxException, IOException { + @Override + public Integer call() { // Bootstrap the backfill dependencies final BackfillFactory factory = new BackfillFactory(settings); final TableExporter exporter = factory.newTableExporter(); diff --git a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/BackfillSettings.java b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/BackfillSettings.java index 283c690c..84883ff8 100644 --- a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/BackfillSettings.java +++ b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/BackfillSettings.java @@ -28,12 +28,6 @@ */ public class BackfillSettings { - @CommandLine.Option( - names = {"-h", "--help"}, - usageHelp = true, - description = "Displays this help message.") - boolean usageHelpRequested; - @CommandLine.Option( names = {"-d", "--data-dir"}, paramLabel = "PATH", diff --git a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java index df2ae633..52c142c7 100644 --- a/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java +++ b/backfill-cli/src/main/java/com/datastax/oss/cdc/backfill/admin/BackfillCommand.java @@ -16,6 +16,7 @@ package com.datastax.oss.cdc.backfill.admin; +import com.amazonaws.transform.MapEntry; import com.datastax.oss.cdc.backfill.BackfillCLI; import org.apache.pulsar.admin.cli.extensions.CommandExecutionContext; import org.apache.pulsar.admin.cli.extensions.CustomCommand; @@ -23,12 +24,17 @@ import org.apache.pulsar.admin.cli.extensions.ParameterType; import picocli.CommandLine; +import java.io.PrintWriter; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -44,6 +50,11 @@ public String description() { return "Backfills the CDC data topic with historical data from that source Cassandra table."; } + /** + * Delegate to the picocli library to validate the command line arguments so all parameters are marked as optional. + * Commands have to be defined here to be accepted by the Pulsar CLI extension but the idea is to make them + * as thin as possible + */ @Override public List parameters() { List parameters = new ArrayList<>(); @@ -52,7 +63,6 @@ public List parameters() { .description("The directory where data will be exported to and imported from") .type(ParameterType.STRING) .names(Arrays.asList("--data-dir", "-d")) - .required(false) .build()); parameters.add( ParameterDescriptor.builder() @@ -61,7 +71,6 @@ public List parameters() { "If the port is not specified, it will default to 9042.") .type(ParameterType.STRING) .names(Arrays.asList("--export-host")) - .required(true) .build()); parameters.add( ParameterDescriptor.builder() @@ -71,7 +80,6 @@ public List parameters() { + "Options --export-host and --export-bundle are mutually exclusive.") .type(ParameterType.STRING) .names(Arrays.asList("--export-bundle")) - .required(false) .build()); parameters.add( ParameterDescriptor.builder() @@ -81,7 +89,6 @@ public List parameters() { + "the client and the server.") .type(ParameterType.STRING) .names(Arrays.asList("--export-protocol-version")) - .required(false) .build()); parameters.add( ParameterDescriptor.builder() @@ -89,7 +96,6 @@ public List parameters() { "The username to use to authenticate against the origin cluster.") .type(ParameterType.STRING) .names(Arrays.asList("--export-username")) - .required(true) .build()); parameters.add( ParameterDescriptor.builder() @@ -97,7 +103,6 @@ public List parameters() { "The password to use to authenticate against the origin cluster.") .type(ParameterType.STRING) .names(Arrays.asList("--export-password")) - .required(true) .build()); parameters.add( ParameterDescriptor.builder() @@ -105,7 +110,6 @@ public List parameters() { "The consistency level to use when exporting data. The default is LOCAL_QUORUM.") .type(ParameterType.STRING) .names(Arrays.asList("--export-consistency")) - .required(false) .build()); parameters.add( ParameterDescriptor.builder() @@ -114,7 +118,6 @@ public List parameters() { + "The default is -1 (export the entire table).") .type(ParameterType.INTEGER) .names(Arrays.asList("--export-max-records")) - .required(false) .build()); parameters.add( ParameterDescriptor.builder() @@ -123,7 +126,6 @@ public List parameters() { + "Must be a positive number or the special value AUTO. The default is AUTO.") .type(ParameterType.STRING) .names(Arrays.asList("--export-max-concurrent-files")) - .required(false) .build()); parameters.add( ParameterDescriptor.builder() @@ -132,18 +134,16 @@ public List parameters() { + "Must be a positive number or the special value AUTO. The default is AUTO.") .type(ParameterType.STRING) .names(Arrays.asList("--export-max-concurrent-queries")) - .required(false) .build()); parameters.add( ParameterDescriptor.builder() .description( "An extra DSBulk option to use when exporting. " + "Any valid DSBulk option can be specified here, and it will passed as is to the DSBulk process. " - + "DSBulk options, including driver options, must be passed as '--long.option.name='. " + + "DSBulk options, including driver options, must be passed as '--long.option1.name=|--long.option2.name='. " + "Short options are not supported. ") .type(ParameterType.STRING) .names(Arrays.asList("--export-dsbulk-option")) - .required(false) .build()); parameters.add( @@ -151,14 +151,12 @@ public List parameters() { .description("The name of the keyspace where the table to be exported exists") .type(ParameterType.STRING) .names(Arrays.asList("--keyspace", "-k")) - .required(true) .build()); parameters.add( ParameterDescriptor.builder() .description("The name of the table to export data from for cdc back filling") .type(ParameterType.STRING) .names(Arrays.asList("--table", "-t")) - .required(true) .build()); parameters.add( ParameterDescriptor.builder() @@ -166,7 +164,6 @@ public List parameters() { + "The default value is `events-`.") .type(ParameterType.STRING) .names(Arrays.asList("--events-topic-prefix")) - .required(false) .build()); parameters.add( ParameterDescriptor.builder() @@ -174,7 +171,6 @@ public List parameters() { + "Setting this option to any negative value or zero will disable it. The default is -1.") .type(ParameterType.INTEGER) .names(Arrays.asList("--max-rows-per-second")) - .required(false) .build()); return parameters; @@ -182,16 +178,36 @@ public List parameters() { @Override public boolean execute(Map parameters, CommandExecutionContext context) { - CommandLine commandLine = new CommandLine(new BackfillCLI()); + List args = parseParameters(parameters, context); + int exitCode = new CommandLine(new BackfillCLI()) + .setParameterExceptionHandler(new ShortErrorMessageHandler()) + .execute(args.toArray(new String[0])); + return exitCode == 0; + } + + private List parseParameters(Map parameters, CommandExecutionContext context) { List args = parameters.entrySet().stream() .filter(e -> e.getValue() != null) + .flatMap(e -> explodeListOptions(e)) .map(e -> e.getKey() + "=" + e.getValue()) - .collect(Collectors.toList()); + .collect(Collectors.toList()); PulsarClientParams params = parseClientConfRootParams(context.getConfiguration()); populateClientConfRootParams(args, params); - args.add(0, "backfill"); - int exitCode = commandLine.execute(args.toArray(new String[0])); - return exitCode == 0; + return args; + } + + private static Set listOptions = new HashSet<>(Arrays.asList("--export-dsbulk-option")); + private Stream> explodeListOptions(Map.Entry entry) { + List> entries = new ArrayList<>(); + if (listOptions.contains(entry.getKey())) { + String[] options = entry.getValue().toString().split("\\|"); + for (String option : options) { + entries.add(new AbstractMap.SimpleEntry<>(entry.getKey(), option)); + } + } else { + entries.add(entry); + } + return entries.stream(); } private void populateClientConfRootParams(List args, PulsarClientParams params) { @@ -293,4 +309,27 @@ private static PulsarClientParams parseClientConfRootParams(Properties propertie params.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath"); return params; } + + + /** + * Shorten the error message by excluding the usage help message. This makes it more consistent with pulsar admin + * when invalid options are used. + */ + static class ShortErrorMessageHandler implements CommandLine.IParameterExceptionHandler { + public int handleParseException(CommandLine.ParameterException ex, String[] args) { + CommandLine cmd = ex.getCommandLine(); + PrintWriter writer = cmd.getErr(); + + writer.println(ex.getMessage()); + CommandLine.UnmatchedArgumentException.printSuggestions(ex, writer); + writer.print(cmd.getHelp().fullSynopsis()); + + CommandLine.Model.CommandSpec spec = cmd.getCommandSpec(); + writer.printf("Try '%s' for more information.%n", "cassandra-cdc"); + + return cmd.getExitCodeExceptionMapper() != null + ? cmd.getExitCodeExceptionMapper().getExitCode(ex) + : spec.exitCodeOnInvalidInput(); + } + } } diff --git a/backfill-cli/src/test/java/com/datastax/oss/cdc/backfill/e2e/BackfillCLIE2ETests.java b/backfill-cli/src/test/java/com/datastax/oss/cdc/backfill/e2e/BackfillCLIE2ETests.java index f6fd92ef..28fd268e 100644 --- a/backfill-cli/src/test/java/com/datastax/oss/cdc/backfill/e2e/BackfillCLIE2ETests.java +++ b/backfill-cli/src/test/java/com/datastax/oss/cdc/backfill/e2e/BackfillCLIE2ETests.java @@ -375,7 +375,7 @@ private void runBackfillAsync(String ksName, String tableName) { String cdcBackfillJarFile = String.format(Locale.ROOT, "backfill-cli-%s-all.jar", projectVersion); String cdcBackfillFullJarPath = String.format(Locale.ROOT, "%s/libs/%s", cdcBackfillBuildDir, cdcBackfillJarFile); - ProcessBuilder pb = new ProcessBuilder("java", "-jar", cdcBackfillFullJarPath, "backfill", + ProcessBuilder pb = new ProcessBuilder("java", "-jar", cdcBackfillFullJarPath, "--data-dir", dataDir.toString(), "--dsbulk-log-dir", logsDir.toString(), "--export-host", cassandraContainer1.getCqlHostAddress(), "--keyspace", ksName, "--table", tableName, "--export-consistency", "LOCAL_QUORUM", "--pulsar-url", pulsarContainer.getPulsarBrokerUrl());