Skip to content

Conversation

@aymkhalil
Copy link
Contributor

Initial skeleton for the CDC backfill CLI tool:

  • CLI take dependency on DSBulk to export the data to desk and start a CVS connector for the
  • This PR shows the end to end skeleton to get early feedback on the project skeleton and over all approach, unit tests/refactoring are coming up.
  • Will local DSE and Pulsar, the following command should run e2e:
./gradlew cdc-backfill-client:assemble
java  -jar cdc-backfill-client/build/libs/cdc-backfill-client-2.2.3-all.jar backfill --data-dir=target/export --export-host=127.0.0.1:9042 --export-username=cassandra --export-password=cassandra --keyspace ks1 --table sample1
  • Lots of settings are imported as is from DSBulk - will reduce those to the bare minimum
  • Parts of the codes assumes multiple table migrations concurrently, this could be be simplified on the first iteration
  • Checkpointing comes backed in from DSE
  • Integ tests/Pulsar admin CLI integ will be covered in a separate series of PRs one we get the skeleton gets into a reasonable state.

@aymkhalil
Copy link
Contributor Author

Sample run

16:59:28.931 INFO  Contacting origin cluster...
16:59:30.372 WARN  [] 127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:30.668 WARN  [] 127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:30.928 INFO  Successfully contacted origin cluster
16:59:30.928 INFO  Tables to migrate:
16:59:30.931 INFO  - ks1.sample1 (regular)
16:59:33.226 INFO  Exporting ks1.sample1...
16:59:33.527 INFO  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] Username and password provided but auth provider not specified, inferring PlainTextAuthProvider
16:59:33.535 INFO  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] Operation directory: /Users/ayman.khalil/workplace/datastax/cdc-apache-cassandra/logs/EXPORT_ks1_sample1_20230214_005933_227
16:59:33.601 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.805 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.802 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.810 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.802 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.804 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.810 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.831 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:33.831 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] [driver] /127.0.0.1:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
16:59:34.439 WARN  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] Continuous paging is not available, read performance will not be optimal. Check your remote DSE cluster configuration, and ensure that the configured consistency level is either ONE or LOCAL_ONE.
16:59:34.831 INFO  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] Operation EXPORT_ks1_sample1_20230214_005933_227 completed successfully in less than one second.
16:59:37.014 INFO  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] Checkpoints for the current operation were written to checkpoint.csv.
16:59:37.014 INFO  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] To resume the current operation, re-run it with the same settings, and add the following command line flag:
16:59:37.014 INFO  [DSBULK-EXPORT_ks1_sample1_20230214_005933_227] --dsbulk.log.checkpoint.file=/Users/ayman.khalil/workplace/datastax/cdc-apache-cassandra/logs/EXPORT_ks1_sample1_20230214_005933_227/checkpoint.csv
16:59:37.036 INFO  Export of ks1.sample1 finished with STATUS_OK
16:59:37.099 INFO  errorCommitLogReprocessEnabled=false, pulsarServiceUrl=pulsar://localhost:6650/, pulsarAuthParams=null, cdcConcurrentProcessors=-1, sslKeystorePath=null, sslTruststorePath=null, tlsTrustCertsFilePath=null, sslTruststoreType=JKS, sslEnabledProtocols=TLSv1.2,TLSv1.1,TLSv1, sslCipherSuites=null, sslKeystorePassword=null, pulsarBatchDelayInMs=-1, cdcPollIntervalMs=60000, cdcWorkingDir=null/cdc, pulsarAuthPluginClassName=null, pulsarMaxPendingMessagesAcrossPartitions=50000, sslHostnameVerificationEnable=false, topicPrefix=events-, maxInflightMessagesPerTask=16384, sslAllowInsecureConnection=false, useKeyStoreTls=false, sslProvider=null, sslTruststorePassword=null, pulsarKeyBasedBatcher=false, pulsarMaxPendingMessages=1000
16:59:37.708 WARN  Can not find org.apache.pulsar.shade.io.netty.resolver.dns.macos.MacOSDnsServerAddressStreamProvider in the classpath, fallback to system defaults. This may result in incorrect DNS resolutions on MacOS.
16:59:37.807 INFO  Pulsar client connected
16:59:37.918 INFO  InetAddress.getLocalHost() was used to resolve listen_address to akhalil-rmbp16/192.168.1.64, double check this is correct. Please check your node's config and set the listen_address in cassandra.yaml accordingly if applicable.
16:59:38.143 INFO  Pulsar producer name=cdc-producer-null-ks1.sample1 created with batching delay=-1ms
16:59:38.263 INFO  [[id: 0xbd7ca279, L:/127.0.0.1:63104 - R:localhost/127.0.0.1:6650]] Connected to server
16:59:38.384 INFO  Starting Pulsar producer perf with config: {
  "topicName" : "events-ks1.sample1",
  "producerName" : "cdc-producer-null-ks1.sample1",
  "sendTimeoutMs" : 0,
  "blockIfQueueFull" : true,
  "maxPendingMessages" : 1000,
  "maxPendingMessagesAcrossPartitions" : 50000,
  "messageRoutingMode" : "CustomPartition",
  "hashingScheme" : "Murmur3_32Hash",
  "cryptoFailureAction" : "FAIL",
  "batchingMaxPublishDelayMicros" : 1000,
  "batchingPartitionSwitchFrequencyByPublishDelay" : 10,
  "batchingMaxMessages" : 1000,
  "batchingMaxBytes" : 131072,
  "batchingEnabled" : false,
  "chunkingEnabled" : false,
  "compressionType" : "NONE",
  "initialSequenceId" : null,
  "autoUpdatePartitions" : true,
  "autoUpdatePartitionsIntervalSeconds" : 60,
  "multiSchema" : true,
  "accessMode" : "Shared",
  "properties" : { }
}
16:59:38.408 INFO  Pulsar client config: {
  "serviceUrl" : "pulsar://localhost:6650/",
  "authPluginClassName" : null,
  "authParams" : null,
  "authParamMap" : null,
  "operationTimeoutMs" : 30000,
  "statsIntervalSeconds" : 60,
  "numIoThreads" : 1,
  "numListenerThreads" : 1,
  "connectionsPerBroker" : 1,
  "useTcpNoDelay" : false,
  "useTls" : false,
  "tlsTrustCertsFilePath" : "",
  "tlsAllowInsecureConnection" : false,
  "tlsHostnameVerificationEnable" : false,
  "concurrentLookupRequest" : 5000,
  "maxLookupRequest" : 50000,
  "maxLookupRedirects" : 20,
  "maxNumberOfRejectedRequestPerConnection" : 50,
  "keepAliveIntervalSeconds" : 30,
  "connectionTimeoutMs" : 10000,
  "requestTimeoutMs" : 60000,
  "initialBackoffIntervalNanos" : 100000000,
  "maxBackoffIntervalNanos" : 60000000000,
  "enableBusyWait" : false,
  "listenerName" : null,
  "useKeyStoreTls" : false,
  "sslProvider" : null,
  "tlsTrustStoreType" : "JKS",
  "tlsTrustStorePath" : null,
  "tlsTrustStorePassword" : null,
  "tlsCiphers" : [ ],
  "tlsProtocols" : [ ],
  "memoryLimitBytes" : 0,
  "proxyServiceUrl" : null,
  "proxyProtocol" : null,
  "enableTransaction" : false
}
16:59:38.429 INFO  [events-ks1.sample1] [cdc-producer-null-ks1.sample1] Creating producer on cnx [id: 0xbd7ca279, L:/127.0.0.1:63104 - R:localhost/127.0.0.1:6650]
16:59:38.494 INFO  [events-ks1.sample1] [cdc-producer-null-ks1.sample1] Created producer on cnx [id: 0xbd7ca279, L:/127.0.0.1:63104 - R:localhost/127.0.0.1:6650]
16:59:38.525 WARN  Failed to load Circe JNI library. Falling back to Java based CRC32c provider
16:59:38.575 INFO  sent 8 records to Pulsar
16:59:38.587 INFO  Migration finished with 1 successfully migrated tables, 0 failed tables.
16:59:38.588 INFO  Table ks1.sample1 migrated successfully.

Copy link
Collaborator

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work.
I think that we can commit this patch as initial step.
I have a couple of questions

OSS license

Can anyone use this tool or do they need a license for DSBulk ?

Cassandra compatibility

we must support C3, C4 and DSE, IIRC you mentioned that we need different dependencies. That's not a big deal, but do you know if DSBulk will work with all the supported versions ?

Dry run mode

We should add a "--dry-run" mode in which we read from Cassandra without writing to Pulsar

Disk space and retention

We should make it clear that we are storing locally some files, for two reasons:

  1. The local machine must have enough space
  2. The user MUST remember to delete everything, because it may contain sensitive data

I suggest to add a flag to auto-delete the data in the end of the procedure (and enable the feature by default)

what happens if you run the tool on a non empty directory and you have data from other tables ?

//String md5Digest = DigestUtils.md5Hex(dataOutputBuffer.getData());
TableMetadata.Builder builder = TableMetadata.builder(exportedTable.keyspace.getName().toString(), exportedTable.table.getName().toString());
exportedTable.table.getPartitionKey().forEach(k-> builder.addPartitionKeyColumn(k.getName().toString(), UTF8Type.instance));
TableMetadata t = builder.build();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can cache this as follow up work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Will send separate patch with pulsar ready support.


AgentConfig config = AgentConfig.create(AgentConfig.Platform.PULSAR, tenantInfo);
PulsarMutationSender sender = new PulsarMutationSender(config, true);
List<CompletableFuture<?>> futures = new ArrayList<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow up work:
we have to limit the size of this List and also limit the number of pending writes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Will send separate patch with pulsar ready support.

List<CompletableFuture<?>> futures = new ArrayList<>();
long c = Flux.from(connector.read()).flatMap(Resource::read).map(record -> {
//System.out.println("Sending to pulsar " + record.toString());
Object[] pkValues = new Object[]{record.getFieldValue(new DefaultIndexedField(0))};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not clear to me how we deal with complex PKs and UTDs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll verify, my assumption is by reusing the CVSConnector from dsbulk, it handles those type.

Please note that it is discouraged to use UST as PK, but I'll verify it is supported: https://www.datastax.com/blog/cql-improvements-cassandra-21
Please note however that there is relatively little advantages to be gain in using a UDT on a PRIMARY KEY column, avoid abusing such possibility just because it's available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the second thought, I'll relay completely on the agent code for the conversion:

public class PulsarMutationSender extends AbstractPulsarMutationSender<TableMetadata> {

The input will be the CQL types as I read them from TableMetadata from the source table (I have this info outside the CSV file on disk) and use the agent to map to avro. I expect to add some adapter logic to map the CQL between what I get from the TableMetadata, and what the agent logic get from the CommitLog processor but it should be feasible.

@aymkhalil
Copy link
Contributor Author

OSS license

Can anyone use this tool or do they need a license for DSBulk ?

Yes. DSBulk switched to Apache License V2 few years ago and permits distribution of the software.

Cassandra compatibility

we must support C_3, C_4 and DSE, IIRC you mentioned that we need different dependencies. That's not a big deal, but do you know if DSBulk will work with all the supported versions ?

Yes, dsbulk supports DSE and C2.1 and later. That covers the table export part. There is also a C*/DSE version dependent logic which translating the schema of the PK to AVRO before sending records to the events topic. That part will be covered by reusing the agent code. In this patch I hardcoded it to use DSE. With some refactoring, we should be able to fully reuse the agent code so the net result, C_3, C_4, DSE will be supported

Dry run mode

We should add a "--dry-run" mode in which we read from Cassandra without writing to Pulsar

+1. I'll capture this requirement

Disk space and retention

We should make it clear that we are storing locally some files, for two reasons:

  1. The local machine must have enough space
  2. The user MUST remember to delete everything, because it may contain sensitive data

+1. Will add to read me. Please note that we can add a flag to automatically delete the local snapshot after the backfilling completes, but we need to highlight that enough disk space should be available. We can consider optimization later on on sending data in batches, delete them from disk, and then continue with other batches which could be over engineering for now.

I suggest to add a flag to auto-delete the data in the end of the procedure (and enable the feature by default)

Yes!

what happens if you run the tool on a non empty directory and you have data from other tables ?

Each table will be stored in its own directory following keyspace/tablename heirarcy. If the user have data in the directory with the same keyspace and table name, but those don't actually belong to the table, the tool will not know. Now if the PK mismatches, publishing to events topic would fail, but otherwise the tool doesn't know that the data has been manipulated. One we to mitigate, would be to generate random foldername and a new snapshot every time the tool is run, but that would complicate the capability to resume work after the tool is paused because it will require a manifest file (and the user can still manipulate the file). Encryption is another thing we can consider but for V0 I think the existing model suffices.

@aymkhalil aymkhalil marked this pull request as ready for review February 16, 2023 19:30
@aymkhalil aymkhalil merged commit 4aaf85a into master Feb 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants