Skip to content
Merged
9 changes: 8 additions & 1 deletion backfill-cli/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ dependencies {
implementation "com.datastax.oss:dsbulk-connectors-csv:${dsbulkVersion}"
implementation "com.datastax.oss:dsbulk-executor-reactor:${dsbulkVersion}"
implementation "com.datastax.oss:dsbulk-batcher-reactor:${dsbulkVersion}"
implementation "com.datastax.oss:dsbulk-codecs-api:${dsbulkVersion}"
implementation "com.datastax.oss:dsbulk-codecs-text:${dsbulkVersion}"
implementation "com.google.guava:guava:${guavaVersion}"

implementation "info.picocli:picocli:4.7.1"
Expand All @@ -97,6 +99,8 @@ dependencies {

// Custom nar task that wraps the shadowJar for pulsar admin extension purposes.
tasks.register('nar', Zip) {
dependsOn shadowJar

// bundle the shadow jar as is
from(shadowJar.archivePath) {
into "META-INF/bundled-dependencies"
Expand All @@ -111,6 +115,9 @@ tasks.register('nar', Zip) {
}
}

// explicitly include all files in the resources folder, otherwise the shadowJar will not include them
sourceSets.main.resources.include("**/*")

archiveName "pulsar-cassandra-admin-${project.version}-nar.nar"
destinationDir file("${buildDir}/libs")
}
Expand All @@ -127,7 +134,7 @@ task e2eTest(type: Test) {
testLogging.showStandardStreams = true

dependsOn project(':connector').assemble // couldn't take dependency on nar directly
dependsOn shadowJar
dependsOn nar

useJUnitPlatform()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.datastax.oss.cdc.backfill.admin;

import com.amazonaws.transform.MapEntry;
import com.datastax.oss.cdc.backfill.BackfillCLI;
import org.apache.pulsar.admin.cli.extensions.CommandExecutionContext;
import org.apache.pulsar.admin.cli.extensions.CustomCommand;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright DataStax, Inc 2021.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.datastax.oss.cdc.backfill.factory;

import com.datastax.oss.dsbulk.codecs.api.ConvertingCodecFactory;

import javax.annotation.concurrent.NotThreadSafe;

/**
* Class loader aware converting codec factory.
*/
@NotThreadSafe
public class CodecFactory {
/**
* Works around the dsbulk codec factory limitation that uses the class-loader unaware ServiceLoader.load() API
* which defaults to Thread.currentThread().getContextClassLoader(). This will lead to class loading issues when
* ruing as a NAR archive.
*/
public ConvertingCodecFactory newCodecFactory(ClassLoader classLoader) {
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(classLoader);
return new ConvertingCodecFactory();
} finally {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.datastax.oss.cdc.agent.exceptions.CassandraConnectorSchemaException;
import com.datastax.oss.cdc.backfill.ExitStatus;
import com.datastax.oss.cdc.backfill.exporter.ExportedTable;
import com.datastax.oss.cdc.backfill.factory.CodecFactory;
import com.datastax.oss.cdc.backfill.factory.ConnectorFactory;
import com.datastax.oss.cdc.backfill.factory.PulsarMutationSenderFactory;
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
Expand Down Expand Up @@ -99,7 +100,8 @@ public class PulsarImporter {
* node. Doesn't apply for CDC back-filling.
*/
private final static UUID MUTATION_NODE = null;
private final static ConvertingCodecFactory codecFactory = new ConvertingCodecFactory();
private final static ConvertingCodecFactory codecFactory =
new CodecFactory().newCodecFactory(PulsarImporter.class.getClassLoader());

/**
* The maximum number of in-flight pulsar messages currently being imported
Expand All @@ -110,10 +112,11 @@ public class PulsarImporter {
private final AtomicInteger sentMutations = new AtomicInteger(0);
private final AtomicInteger sentErrors = new AtomicInteger(0);

public PulsarImporter(ConnectorFactory connectorFactory, ExportedTable exportedTable, PulsarMutationSenderFactory factory) {
public PulsarImporter(ConnectorFactory connectorFactory, ExportedTable exportedTable,
PulsarMutationSenderFactory mutationSenderFactory) {
this.connectorFactory = connectorFactory;
this.exportedTable = exportedTable;
this.mutationSender = factory.newPulsarMutationSender();
this.mutationSender = mutationSenderFactory.newPulsarMutationSender();
this.inflightPulsarMessages = new Semaphore(MAX_INFLIGHT_MESSAGES_PER_TASK_SETTING);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.db.marshal.SimpleDateType;
import org.apache.cassandra.db.marshal.TimeType;
import org.apache.cassandra.db.marshal.TimestampType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -63,6 +65,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -155,6 +158,10 @@ public void testImportPartitionAndClusteringKeys() {
new ColumnIdentifier("xdate", true);
ColumnIdentifier xblobIdentifier =
new ColumnIdentifier("xblob", true);
ColumnIdentifier xtimestampIdentifier =
new ColumnIdentifier("xtimestamp", true);
ColumnIdentifier xuuidIdentifier =
new ColumnIdentifier("xuuid", true);
ColumnMetadata xintColumnMetadata =
new ColumnMetadata("ks1", "xint", xintIdentifier, IntegerType.instance, 2, ColumnMetadata.Kind.CLUSTERING);
ColumnMetadata xtimeColumnMetadata =
Expand All @@ -163,12 +170,18 @@ public void testImportPartitionAndClusteringKeys() {
new ColumnMetadata("ks1", "xdate", xdateIdentifier, SimpleDateType.instance, 4, ColumnMetadata.Kind.CLUSTERING);
ColumnMetadata xblobColumnMetadata =
new ColumnMetadata("ks1", "xblob", xblobIdentifier, BytesType.instance, 5, ColumnMetadata.Kind.CLUSTERING);
ColumnMetadata xtimestampColumnMetadata =
new ColumnMetadata("ks1", "xtimestamp", xtimestampIdentifier, TimestampType.instance, 6, ColumnMetadata.Kind.CLUSTERING);
ColumnMetadata xuuidColumnMetadata =
new ColumnMetadata("ks1", "xuuid", xuuidIdentifier, UUIDType.instance, 6, ColumnMetadata.Kind.CLUSTERING);
cassandraColumns.add(xtextColumnMetadata);
cassandraColumns.add(xbooleanColumnMetadata);
cassandraColumns.add(xintColumnMetadata);
cassandraColumns.add(xtimeColumnMetadata);
cassandraColumns.add(xdateColumnMetadata);
cassandraColumns.add(xblobColumnMetadata);
cassandraColumns.add(xtimestampColumnMetadata);
cassandraColumns.add(xuuidColumnMetadata);

Mockito.when(tableMetadata.primaryKeyColumns()).thenReturn(cassandraColumns);

Expand All @@ -180,6 +193,8 @@ public void testImportPartitionAndClusteringKeys() {
columns.add(new DefaultColumnMetadata(CqlIdentifier.fromInternal("ks1"), CqlIdentifier.fromInternal("table1"), CqlIdentifier.fromInternal("xtime"), DataTypes.TIME, false));
columns.add(new DefaultColumnMetadata(CqlIdentifier.fromInternal("ks1"), CqlIdentifier.fromInternal("table1"), CqlIdentifier.fromInternal("xdate"), DataTypes.DATE, false));
columns.add(new DefaultColumnMetadata(CqlIdentifier.fromInternal("ks1"), CqlIdentifier.fromInternal("table1"), CqlIdentifier.fromInternal("xblob"), DataTypes.BLOB, false));
columns.add(new DefaultColumnMetadata(CqlIdentifier.fromInternal("ks1"), CqlIdentifier.fromInternal("table1"), CqlIdentifier.fromInternal("xtimestamp"), DataTypes.TIMESTAMP, false));
columns.add(new DefaultColumnMetadata(CqlIdentifier.fromInternal("ks1"), CqlIdentifier.fromInternal("table1"), CqlIdentifier.fromInternal("xuuid"), DataTypes.UUID, false));
Mockito.when(exportedTable.getPrimaryKey()).thenReturn(columns);

// when
Expand All @@ -192,10 +207,12 @@ public void testImportPartitionAndClusteringKeys() {
assertEquals(2, pkValues.size());
List<Object>[] allPkValues = pkValues.stream().map(v-> v.getPkValues()).map(Arrays::asList).toArray(List[]::new);
assertThat(allPkValues[0], containsInRelativeOrder("vtext", true, 2, LocalTime.of(1, 2, 3).toNanoOfDay(),
ByteBuffer.wrap(new byte[]{0x00, 0x01})));
ByteBuffer.wrap(new byte[]{0x00, 0x01}), Instant.parse("2023-03-22T18:16:20.808Z"),
UUID.fromString("3920dd7d-dcbf-4c2e-bbe5-f300b720ae0d")));
assertEquals(LocalDate.of(2023, 3, 2), cqlSimpleDateToLocalDate((Integer) allPkValues[0].get(4)));
assertThat(allPkValues[1], containsInRelativeOrder("v2text", false, 3, LocalTime.of(1, 2, 4).toNanoOfDay(),
ByteBuffer.wrap(new byte[]{0x01})));
ByteBuffer.wrap(new byte[]{0x01}), Instant.parse("2022-02-21T18:16:20.807Z"),
UUID.fromString("19296adf-fa87-4ba2-bad8-ae86d2769ee6")));
assertEquals(LocalDate.of(2023, 3, 1), cqlSimpleDateToLocalDate((Integer) allPkValues[1].get(4)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.datastax.oss.cdc.CassandraSourceConnectorConfig;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.Version;
import com.datastax.oss.driver.api.core.data.CqlDuration;
import com.datastax.oss.dsbulk.tests.utils.FileUtils;
import com.datastax.testcontainers.PulsarContainer;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
Expand All @@ -63,6 +65,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -101,25 +104,33 @@ public static void initBeforeClass() throws Exception {
testNetwork = Network.newNetwork();

String connectorBuildDir = System.getProperty("connectorBuildDir");
String cdcBackfillBuildDir = System.getProperty("cdcBackfillBuildDir");
String projectVersion = System.getProperty("projectVersion");
String connectorJarFile = String.format(Locale.ROOT, "pulsar-cassandra-source-%s.nar", projectVersion);
String backfillNarFile = String.format(Locale.ROOT, "pulsar-cassandra-admin-%s-nar.nar", projectVersion);
pulsarContainer = new PulsarContainer<>(PULSAR_IMAGE)
.withNetwork(testNetwork)
.withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd.withName("pulsar"))
.withFunctionsWorker()
.withFileSystemBind(
String.format(Locale.ROOT, "%s/libs/%s", connectorBuildDir, connectorJarFile),
String.format(Locale.ROOT, "/pulsar/connectors/%s", connectorJarFile))
.withFileSystemBind(
String.format(Locale.ROOT, "%s/libs/%s", cdcBackfillBuildDir, backfillNarFile),
String.format(Locale.ROOT, "/pulsar/cliextensions/%s", backfillNarFile))
.withClasspathResourceMapping("client.conf",
"/pulsar/conf/client.conf",
BindMode.READ_ONLY)
.withStartupTimeout(Duration.ofSeconds(60));
pulsarContainer.start();

// ./pulsar-admin namespaces set-auto-topic-creation public/default --enable --type partitioned --num-partitions 1
Container.ExecResult result = pulsarContainer.execInContainer(
"/pulsar/bin/pulsar-admin", "namespaces", "set-auto-topic-creation", "public/default", "--enable");
assertEquals(0, result.getExitCode());
assertEquals(0, result.getExitCode(), "Failed to set auto topic create " + result.getStdout() + " " + result.getStderr());
result = pulsarContainer.execInContainer(
"/pulsar/bin/pulsar-admin", "namespaces", "set-is-allow-auto-update-schema", "public/default", "--enable");
assertEquals(0, result.getExitCode());
assertEquals(0, result.getExitCode(), result.getStdout());

String pulsarServiceUrl = "pulsar://pulsar:" + pulsarContainer.BROKER_PORT;
String cassandraFamily = System.getProperty("cassandraFamily");
Expand Down Expand Up @@ -367,7 +378,57 @@ public void testBackfillCLIFullSchema(String ksName) throws InterruptedException
}
}

/**
* Backfill command can be run as a Pulsar Admin extension or as a JAR. Pulsar admin extension is supported as of
* Pulsar 2.11 and LS 2.10_3.4. However, Pulsar 2.11 required a java 17 runtime to run CLI which the backfill
* command does not yet support. For now, the e2e test will run the backfill command a CLI extension if the image
* is LS 2.10_3.4 or later.
*/
private boolean runAsPulsarAdminExtension() {
final String imageName = PULSAR_IMAGE.getUnversionedPart();
final String imageVersion = PULSAR_IMAGE.getVersionPart();

return "datastax/lunastreaming".equals(imageName) && compareLSImageVersion(imageVersion, "2.10_3.4") >= 0;
}

/**
* Piggyback on {@link com.datastax.oss.driver.api.core.Version} to compare the version of the LS image.
*/
private int compareLSImageVersion(String version1, String version2) {
final String adaptedVersion1 = version1.replace("_", "-");
final String adaptedVersion2 = version1.replace("_", "-");
return Version.parse(adaptedVersion1).compareTo(Version.parse(adaptedVersion2));
}

private void runBackfillAsync(String ksName, String tableName) {
if (runAsPulsarAdminExtension()) {
runBackfillAsPulsarAdminExtensionAsync(ksName, tableName);
} else {
runBackfillAsJARAsync(ksName, tableName);
}
}

private void runBackfillAsPulsarAdminExtensionAsync(String ksName, String tableName) {
new Thread(() -> {
try {
String[] backfillCommand = new String[] {
"/pulsar/bin/pulsar-admin", "cassandra-cdc", "backfill", "--data-dir", dataDir.toString(),
"--export-host", "cassandra-1", "--keyspace", ksName, "--table",
tableName, "--export-consistency", "LOCAL_QUORUM"
};
log.info("Running backfill command: {} ", Arrays.toString(backfillCommand));
Container.ExecResult result = pulsarContainer.execInContainer(backfillCommand);
assertEquals(0, result.getExitCode(), "backfill command failed:" + result.getStdout());
log.info(result.getStdout());
log.info("backfill command finished successfully");
} catch (InterruptedException | IOException e) {
log.error("Failed to run backfilling", e);
throw new RuntimeException(e);
}
}).start();
}

private void runBackfillAsJARAsync(String ksName, String tableName) {
new Thread(() -> {
try {
String cdcBackfillBuildDir = System.getProperty("cdcBackfillBuildDir");
Expand Down
15 changes: 15 additions & 0 deletions backfill-cli/src/test/resources/client.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
webServiceUrl=http://localhost:8080/
brokerServiceUrl=pulsar://localhost:6650/
authPlugin=
authParams=
tlsAllowInsecureConnection=false
tlsEnableHostnameVerification=false
tlsTrustCertsFilePath=
useKeyStoreTls=false
tlsTrustStoreType=JKS
tlsTrustStorePath=
tlsTrustStorePassword=
webserviceTlsProvider=

# Pulsar Admin Custom Commands
customCommandFactories=cassandra-cdc
6 changes: 3 additions & 3 deletions backfill-cli/src/test/resources/sample-002.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
xtext,xboolean,xint,xtime,xdate,xblob
vtext,1,2,01:02:03,2023-03-02,AAE=
v2text,0,3,01:02:04,2023-03-01,AQ==
xtext,xboolean,xint,xtime,xdate,xblob,xtimestamp,xuuid
vtext,1,2,01:02:03,2023-03-02,AAE=,2023-03-22T18:16:20.808Z,3920dd7d-dcbf-4c2e-bbe5-f300b720ae0d
v2text,0,3,01:02:04,2023-03-01,AQ==,2022-02-21T18:16:20.807Z,19296adf-fa87-4ba2-bad8-ae86d2769ee6