From d72ae503434f455e65bd378b393c619de1ff4bcb Mon Sep 17 00:00:00 2001 From: Francisco Guerrero Date: Tue, 27 Jun 2023 10:03:56 -0700 Subject: [PATCH 1/4] CASSANDRA-18662: Fix cassandra-analytics-core-example This commit fixes the `SampleCassandraJob` available under the `cassandra-analytics-core-example` subproject. --- build.gradle | 34 ++--- cassandra-analytics-core-example/README.md | 7 - .../spark/example/SampleCassandraJob.java | 120 +++++++++++------- .../org/apache/cassandra/clients/Sidecar.java | 65 +++++----- .../CassandraBulkSourceRelation.java | 1 - .../spark/data/CassandraDataLayer.java | 5 +- 6 files changed, 127 insertions(+), 105 deletions(-) diff --git a/build.gradle b/build.gradle index 8735bb137..216de0e77 100644 --- a/build.gradle +++ b/build.gradle @@ -17,6 +17,9 @@ * under the License. */ +import java.nio.file.Files +import java.nio.file.Paths + buildscript { dependencies { classpath(group: 'com.github.jengelman.gradle.plugins', name: 'shadow', version: '6.1.0') @@ -80,15 +83,23 @@ tasks.register('buildIgnoreRatList', Exec) { } rat { + + doFirst { + def excludeFilePath = Paths.get("${buildDir}/.rat-excludes.txt") + def excludeLines = Files.readAllLines(excludeFilePath) + excludeLines.each { line -> + if (line.endsWith("/")) { + excludes.add("**/" + line + "**") + } else { + excludes.add(line) + } + } + } + // List of Gradle exclude directives, defaults to ['**/.gradle/**'] excludes.add("**/build/**") - excludes.add("**/.out/**") excludes.add("CHANGES.txt") excludes.add("**/org.apache.spark.sql.sources.DataSourceRegister") - excludes.add("**/resources/cassandra-analytics-build.properties") - - // Sidecar for build process - excludes.add("**/cassandra-sidecar/**") // Documentation files excludes.add("**/docs/src/**") @@ -97,23 +108,16 @@ rat { excludes.add("gradlew") excludes.add("gradlew.bat") - // idea generated files - excludes.add("**/.idea/**") - // resource files for test excludes.add("**/test**/resources/**") - // resources - excludes.add("**/resources/sidecar.version") - - // Rat excludes file, one directive per line - excludeFile.set(layout.projectDirectory.file("build/.rat-excludes.txt")) - // XML, TXT and HTML reports directory, defaults to 'build/reports/rat' reportDir.set(file("build/reports/rat")) } -rat.dependsOn buildIgnoreRatList +tasks.named('rat').configure { + dependsOn(buildIgnoreRatList) +} subprojects { apply(plugin: 'java-library') diff --git a/cassandra-analytics-core-example/README.md b/cassandra-analytics-core-example/README.md index 3428995b2..89122c9e4 100644 --- a/cassandra-analytics-core-example/README.md +++ b/cassandra-analytics-core-example/README.md @@ -108,13 +108,6 @@ cassandra_instances: I have a 3 node setup, so I configure Sidecar for those 3 nodes. CCM creates the Cassandra cluster under `${HOME}/.ccm/test`, so I update my `data_dirs` and `staging_dir` configuration to use my local path. -Next, create the `staging_dir` where Sidecar will stage SSTables coming from Cassandra Spark bulk writer. -In my case, I have decided to keep the `sstable-staging` directory inside each of the node's directories. - -```shell -mkdir -p ${HOME}/.ccm/test/node{1..3}/sstable-staging -``` - Finally, run Cassandra Sidecar, we skip running integration tests because we need docker for integration tests. You can opt to run integration tests if you have docker running in your local environment. diff --git a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java index 58c9827a2..b71c57b65 100644 --- a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java +++ b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java @@ -20,6 +20,7 @@ package org.apache.cassandra.spark.example; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -28,19 +29,20 @@ import java.util.stream.IntStream; import java.util.stream.LongStream; -import org.apache.cassandra.spark.bulkwriter.TTLOption; -import org.apache.cassandra.spark.bulkwriter.TimestampOption; -import org.apache.cassandra.spark.bulkwriter.WriterOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.spark.KryoRegister; import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; +import org.apache.cassandra.spark.bulkwriter.TTLOption; +import org.apache.cassandra.spark.bulkwriter.TimestampOption; +import org.apache.cassandra.spark.bulkwriter.WriterOptions; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; +import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -128,26 +130,46 @@ private static Dataset write(long rowCount, SparkConf sparkConf, SQLContext { JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sc); int parallelism = sc.defaultParallelism(); - boolean addTTLColumn = true; - boolean addTimestampColumn = true; + boolean addTTLColumn = false; + boolean addTimestampColumn = false; JavaRDD rows = genDataset(javaSparkContext, rowCount, parallelism, addTTLColumn, addTimestampColumn); Dataset df = sql.createDataFrame(rows, getWriteSchema(addTTLColumn, addTimestampColumn)); - df.write() - .format("org.apache.cassandra.spark.sparksql.CassandraDataSink") - .option("sidecar_instances", "localhost,localhost2,localhost3") - .option("keyspace", "spark_test") - .option("table", "test") - .option("local_dc", "datacenter1") - .option("bulk_writer_cl", "LOCAL_QUORUM") - .option("number_splits", "-1") - // A constant timestamp and TTL can be used by setting the following options. - // .option(WriterOptions.TIMESTAMP.name(), TimestampOption.constant(System.currentTimeMillis() * 1000)) - // .option(WriterOptions.TTL.name(), TTLOption.constant(20)) - .option(WriterOptions.TTL.name(), TTLOption.perRow("ttl")) - .option(WriterOptions.TIMESTAMP.name(), TimestampOption.perRow("timestamp")) - .mode("append") - .save(); + DataFrameWriter dfWriter = df.write() + .format("org.apache.cassandra.spark.sparksql.CassandraDataSink") + .option("sidecar_instances", "localhost,localhost2,localhost3") + .option("keyspace", "spark_test") + .option("table", "test") + .option("local_dc", "datacenter1") + .option("bulk_writer_cl", "LOCAL_QUORUM") + .option("number_splits", "-1") + // A constant timestamp and TTL can be used by setting the following options. + // .option(WriterOptions.TTL.name(), TTLOption.constant(20)) + // .option(WriterOptions.TIMESTAMP.name(), TimestampOption.constant(System.currentTimeMillis() * 1000)) + .mode("append"); + + List addedColumns = new ArrayList<>(); + if (addTTLColumn) + { + addedColumns.add("ttl"); + dfWriter = dfWriter + .option(WriterOptions.TTL.name(), TTLOption.perRow("ttl")); + } + + if (addTimestampColumn) + { + addedColumns.add("timestamp"); + dfWriter = dfWriter + .option(WriterOptions.TIMESTAMP.name(), TimestampOption.perRow("timestamp")); + } + + dfWriter.save(); + + if (!addedColumns.isEmpty()) + { + df = df.drop(addedColumns.toArray(new String[0])); + } + return df; } @@ -183,9 +205,9 @@ private static Dataset read(int expectedRowCount, SparkConf sparkConf, SQLC private static StructType getWriteSchema(boolean addTTLColumn, boolean addTimestampColumn) { StructType schema = new StructType() - .add("id", LongType, false) - .add("course", BinaryType, false) - .add("marks", LongType, false); + .add("id", LongType, false) + .add("course", BinaryType, false) + .add("marks", LongType, false); if (addTTLColumn) { schema = schema.add("ttl", IntegerType, false); @@ -215,34 +237,34 @@ private static JavaRDD genDataset(JavaSparkContext sc, long records, Intege long recordsPerPartition = records / parallelism; long remainder = records - (recordsPerPartition * parallelism); List seq = IntStream.range(0, parallelism).boxed().collect(Collectors.toList()); - int ttl = 10; + int ttl = 120; // data will not be queryable in two minutes long timeStamp = System.currentTimeMillis() * 1000; JavaRDD dataset = sc.parallelize(seq, parallelism).mapPartitionsWithIndex( - (Function2, Iterator>) (index, integerIterator) -> { - long firstRecordNumber = index * recordsPerPartition; - long recordsToGenerate = index.equals(parallelism) ? remainder : recordsPerPartition; - java.util.Iterator rows = LongStream.range(0, recordsToGenerate).mapToObj(offset -> { - long recordNumber = firstRecordNumber + offset; - String courseNameString = String.valueOf(recordNumber); - Integer courseNameStringLen = courseNameString.length(); - Integer courseNameMultiplier = 1000 / courseNameStringLen; - byte[] courseName = dupStringAsBytes(courseNameString, courseNameMultiplier); - if (addTTLColumn && addTimestampColumn) - { - return RowFactory.create(recordNumber, courseName, recordNumber, ttl, timeStamp); - } - if (addTTLColumn) - { - return RowFactory.create(recordNumber, courseName, recordNumber, ttl); - } - if (addTimestampColumn) - { - return RowFactory.create(recordNumber, courseName, recordNumber, timeStamp); - } - return RowFactory.create(recordNumber, courseName, recordNumber); - }).iterator(); - return rows; - }, false); + (Function2, Iterator>) (index, integerIterator) -> { + long firstRecordNumber = index * recordsPerPartition; + long recordsToGenerate = index.equals(parallelism) ? remainder : recordsPerPartition; + java.util.Iterator rows = LongStream.range(0, recordsToGenerate).mapToObj(offset -> { + long recordNumber = firstRecordNumber + offset; + String courseNameString = String.valueOf(recordNumber); + Integer courseNameStringLen = courseNameString.length(); + Integer courseNameMultiplier = 1000 / courseNameStringLen; + byte[] courseName = dupStringAsBytes(courseNameString, courseNameMultiplier); + if (addTTLColumn && addTimestampColumn) + { + return RowFactory.create(recordNumber, courseName, recordNumber, ttl, timeStamp); + } + if (addTTLColumn) + { + return RowFactory.create(recordNumber, courseName, recordNumber, ttl); + } + if (addTimestampColumn) + { + return RowFactory.create(recordNumber, courseName, recordNumber, timeStamp); + } + return RowFactory.create(recordNumber, courseName, recordNumber); + }).iterator(); + return rows; + }, false); return dataset; } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java index 9721c32f2..9a392a4c1 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java @@ -42,6 +42,7 @@ import org.apache.cassandra.sidecar.client.SidecarInstancesProvider; import org.apache.cassandra.sidecar.client.VertxHttpClient; import org.apache.cassandra.sidecar.client.VertxRequestExecutor; +import org.apache.cassandra.sidecar.client.retry.BasicRetryPolicy; import org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy; import org.apache.cassandra.sidecar.client.retry.RetryPolicy; import org.apache.cassandra.sidecar.common.NodeSettings; @@ -81,32 +82,32 @@ public static SidecarClient from(SidecarInstancesProvider sidecarInstancesProvid Vertx vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true).setWorkerPoolSize(config.maxPoolSize())); HttpClientConfig.Builder builder = new HttpClientConfig.Builder<>() - .ssl(false) - .timeoutMillis(TimeUnit.SECONDS.toMillis(config.timeoutSeconds())) - .idleTimeoutMillis((int) TimeUnit.SECONDS.toMillis(config.timeoutSeconds())) - .receiveBufferSize((int) config.chunkBufferSize()) - .maxChunkSize((int) config.maxBufferSize()) - .userAgent(BuildInfo.READER_USER_AGENT); + .ssl(false) + .timeoutMillis(TimeUnit.SECONDS.toMillis(config.timeoutSeconds())) + .idleTimeoutMillis((int) TimeUnit.SECONDS.toMillis(config.timeoutSeconds())) + .receiveBufferSize((int) config.chunkBufferSize()) + .maxChunkSize((int) config.maxBufferSize()) + .userAgent(BuildInfo.READER_USER_AGENT); if (secretsProvider != null) { builder = builder - .ssl(true) - .keyStoreInputStream(secretsProvider.keyStoreInputStream()) - .keyStorePassword(String.valueOf(secretsProvider.keyStorePassword())) - .keyStoreType(secretsProvider.keyStoreType()) - .trustStoreInputStream(secretsProvider.trustStoreInputStream()) - .trustStorePassword(String.valueOf(secretsProvider.trustStorePassword())) - .trustStoreType(secretsProvider.trustStoreType()); + .ssl(true) + .keyStoreInputStream(secretsProvider.keyStoreInputStream()) + .keyStorePassword(String.valueOf(secretsProvider.keyStorePassword())) + .keyStoreType(secretsProvider.keyStoreType()) + .trustStoreInputStream(secretsProvider.trustStoreInputStream()) + .trustStorePassword(String.valueOf(secretsProvider.trustStorePassword())) + .trustStoreType(secretsProvider.trustStoreType()); } HttpClientConfig httpClientConfig = builder.build(); SidecarConfig sidecarConfig = new SidecarConfig.Builder() - .maxRetries(config.maxRetries()) - .retryDelayMillis(config.millisToSleep()) - .maxRetryDelayMillis(config.maxMillisToSleep()) - .build(); + .maxRetries(config.maxRetries()) + .retryDelayMillis(config.millisToSleep()) + .maxRetryDelayMillis(config.maxMillisToSleep()) + .build(); return buildClient(sidecarConfig, vertx, httpClientConfig, sidecarInstancesProvider); } @@ -117,23 +118,23 @@ public static SidecarClient from(SidecarInstancesProvider sidecarInstancesProvid .setWorkerPoolSize(conf.getMaxHttpConnections())); HttpClientConfig httpClientConfig = new HttpClientConfig.Builder<>() - .timeoutMillis(conf.getHttpResponseTimeoutMs()) - .idleTimeoutMillis(conf.getHttpConnectionTimeoutMs()) - .userAgent(BuildInfo.WRITER_USER_AGENT) - .keyStoreInputStream(conf.getKeyStore()) - .keyStorePassword(conf.getKeyStorePassword()) - .keyStoreType(conf.getKeyStoreTypeOrDefault()) - .trustStoreInputStream(conf.getTrustStore()) - .trustStorePassword(conf.getTrustStorePasswordOrDefault()) - .trustStoreType(conf.getTrustStoreTypeOrDefault()) - .ssl(conf.hasKeystoreAndKeystorePassword()) - .build(); + .timeoutMillis(conf.getHttpResponseTimeoutMs()) + .idleTimeoutMillis(conf.getHttpConnectionTimeoutMs()) + .userAgent(BuildInfo.WRITER_USER_AGENT) + .keyStoreInputStream(conf.getKeyStore()) + .keyStorePassword(conf.getKeyStorePassword()) + .keyStoreType(conf.getKeyStoreTypeOrDefault()) + .trustStoreInputStream(conf.getTrustStore()) + .trustStorePassword(conf.getTrustStorePasswordOrDefault()) + .trustStoreType(conf.getTrustStoreTypeOrDefault()) + .ssl(conf.hasKeystoreAndKeystorePassword()) + .build(); SidecarConfig sidecarConfig = new SidecarConfig.Builder() - .maxRetries(conf.getSidecarRequestRetries()) - .retryDelayMillis(TimeUnit.SECONDS.toMillis(conf.getSidecarRequestRetryDelayInSeconds())) - .maxRetryDelayMillis(TimeUnit.SECONDS.toMillis(conf.getSidecarRequestMaxRetryDelayInSeconds())) - .build(); + .maxRetries(5) + .retryDelayMillis(200) + .maxRetryDelayMillis(500) + .build(); return buildClient(sidecarConfig, vertx, httpClientConfig, sidecarInstancesProvider); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index 8f2c0b55d..0a8378527 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -121,7 +121,6 @@ private void cancelJob(@NotNull CancelJobEvent cancelJobEvent) sparkContext.cancelJobGroup(writerContext.job().getId().toString()); } - @SuppressWarnings("RedundantCast") private void persist(@NotNull JavaPairRDD sortedRDD, String[] columnNames) { writeValidator.setPhase("Environment Validation"); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index fdc51bcab..b4104bd44 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -389,9 +389,12 @@ protected void initInstanceMap() instanceMap = clusterConfig.stream().collect(Collectors.toMap(SidecarInstance::hostname, Function.identity())); try { + SslConfigSecretsProvider secretsProvider = sslConfig != null + ? new SslConfigSecretsProvider(sslConfig) + : null; sidecar = Sidecar.from(new SimpleSidecarInstancesProvider(new ArrayList<>(clusterConfig)), sidecarClientConfig, - new SslConfigSecretsProvider(sslConfig)); + secretsProvider); } catch (IOException ioException) { From bb4479e10564d19684f584b12d3b7a6dd6fd29c3 Mon Sep 17 00:00:00 2001 From: Francisco Guerrero Date: Tue, 11 Jul 2023 15:02:00 -0700 Subject: [PATCH 2/4] Fix checkstyle issues --- build.gradle | 23 +++++++++++-------- .../org/apache/cassandra/clients/Sidecar.java | 1 - 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/build.gradle b/build.gradle index 216de0e77..5772b48c7 100644 --- a/build.gradle +++ b/build.gradle @@ -84,23 +84,26 @@ tasks.register('buildIgnoreRatList', Exec) { rat { - doFirst { - def excludeFilePath = Paths.get("${buildDir}/.rat-excludes.txt") - def excludeLines = Files.readAllLines(excludeFilePath) - excludeLines.each { line -> - if (line.endsWith("/")) { - excludes.add("**/" + line + "**") - } else { - excludes.add(line) - } - } + doFirst { + def excludeFilePath = Paths.get("${buildDir}/.rat-excludes.txt") + def excludeLines = Files.readAllLines(excludeFilePath) + excludeLines.each { line -> + if (line.endsWith("/")) { + excludes.add("**/" + line + "**") + } else { + excludes.add(line) + } } + } // List of Gradle exclude directives, defaults to ['**/.gradle/**'] excludes.add("**/build/**") excludes.add("CHANGES.txt") excludes.add("**/org.apache.spark.sql.sources.DataSourceRegister") + // Sidecar for build process + excludes.add("**/cassandra-sidecar/**") + // Documentation files excludes.add("**/docs/src/**") // gradle files diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java index 9a392a4c1..0871ddda6 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java @@ -42,7 +42,6 @@ import org.apache.cassandra.sidecar.client.SidecarInstancesProvider; import org.apache.cassandra.sidecar.client.VertxHttpClient; import org.apache.cassandra.sidecar.client.VertxRequestExecutor; -import org.apache.cassandra.sidecar.client.retry.BasicRetryPolicy; import org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy; import org.apache.cassandra.sidecar.client.retry.RetryPolicy; import org.apache.cassandra.sidecar.common.NodeSettings; From 1f599573fd686f60ed536126071b3d1a9985e70b Mon Sep 17 00:00:00 2001 From: Francisco Guerrero Date: Thu, 13 Jul 2023 08:44:25 -0700 Subject: [PATCH 3/4] Fix serialization issue in SidecarDataTransferApi The `sidecarClient` field in `SidecarDataTransferApi` is declared as transient, this is causing NPEs coming from executors while trying to perform an SSTable upload. This commit completely avoids serializing the `dataTransferApi` field in the `CassandraBulkWriterContext`, and lazily initializing it during the `transfer()` method invocation. We guard the initialization to a single thread by making the `tranfer()` method synchronized. The `SidecarDataTransferApi` can be recreated when needed using the already serialized `clusterInfo`, `jobInfo`, and `conf` fields. --- .../CassandraBulkWriterContext.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java index 2709b51bf..abe4ef807 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java @@ -53,7 +53,7 @@ public class CassandraBulkWriterContext implements BulkWriterContext, KryoSerial @NotNull private final BulkSparkConf conf; private final JobInfo jobInfo; - private final DataTransferApi dataTransferApi; + private transient DataTransferApi dataTransferApi; private final CassandraClusterInfo clusterInfo; private final SchemaInfo schemaInfo; @@ -70,14 +70,12 @@ private CassandraBulkWriterContext(@NotNull BulkSparkConf conf, clusterInfo = new CassandraClusterInfo(conf); CassandraRing ring = clusterInfo.getRing(true); jobInfo = new CassandraJobInfo(conf, - new TokenPartitioner(ring, conf.numberSplits, sparkContext.defaultParallelism(), conf.getCores())); + new TokenPartitioner(ring, conf.numberSplits, sparkContext.defaultParallelism(), conf.getCores())); Preconditions.checkArgument(!conf.consistencyLevel.isLocal() - || (conf.localDC != null && ring.getReplicationFactor().getOptions().containsKey(conf.localDC)), + || (conf.localDC != null && ring.getReplicationFactor().getOptions().containsKey(conf.localDC)), String.format("Keyspace %s is not replicated on datacenter %s", conf.keyspace, conf.localDC)); - dataTransferApi = new SidecarDataTransferApi(clusterInfo.getCassandraContext().getSidecarClient(), jobInfo, conf); - String keyspace = conf.keyspace; String table = conf.table; @@ -133,14 +131,14 @@ private static void configureRelocatedNetty() /** * Use the implementation of the KryoSerializable interface as a detection device to make sure the Spark Bulk * Writer's KryoRegistrator is properly in place. - * + *

* If this class is serialized by Kryo, it means we're not set up correctly, and therefore we log and fail. * This failure will occur early in the job and be very clear, so users can quickly fix their code and get up and * running again, rather than having a random NullPointerException further down the line. */ public static final String KRYO_REGISTRATION_WARNING = - "Spark Bulk Writer Kryo Registrator (SbwKryoRegistrator) was not registered with Spark - " - + "please see the README.md file for more details on how to register the Spark Bulk Writer."; + "Spark Bulk Writer Kryo Registrator (SbwKryoRegistrator) was not registered with Spark - " + + "please see the README.md file for more details on how to register the Spark Bulk Writer."; @Override public void write(Kryo kryo, Output output) @@ -187,8 +185,12 @@ public SchemaInfo schema() @Override @NotNull - public DataTransferApi transfer() + public synchronized DataTransferApi transfer() { + if (dataTransferApi == null) + { + dataTransferApi = new SidecarDataTransferApi(clusterInfo.getCassandraContext().getSidecarClient(), jobInfo, conf); + } return dataTransferApi; } } From 6e0c457614693dce74feba8a70f85e2a7eabb492 Mon Sep 17 00:00:00 2001 From: Francisco Guerrero Date: Fri, 14 Jul 2023 14:22:37 -0700 Subject: [PATCH 4/4] Fix setting ROW_BUFFER_MODE to BUFFERED --- .../spark/bulkwriter/BulkSparkConf.java | 21 +++--- .../spark/bulkwriter/RowBufferMode.java | 4 +- .../spark/bulkwriter/SSTableWriter.java | 2 +- .../spark/bulkwriter/BulkSparkConfTest.java | 66 +++++++++++++++++-- .../bulkwriter/MockBulkWriterContext.java | 2 +- 5 files changed, 76 insertions(+), 19 deletions(-) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java index ee63df9e1..a719df713 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java @@ -80,7 +80,7 @@ public class BulkSparkConf implements Serializable public static final long DEFAULT_SIDECAR_REQUEST_MAX_RETRY_DELAY_SECONDS = 60L; public static final int DEFAULT_COMMIT_BATCH_SIZE = 10_000; public static final int DEFAULT_RING_RETRY_COUNT = 3; - public static final RowBufferMode DEFAULT_ROW_BUFFER_MODE = RowBufferMode.UNBUFFERRED; + public static final RowBufferMode DEFAULT_ROW_BUFFER_MODE = RowBufferMode.UNBUFFERED; public static final int DEFAULT_BATCH_SIZE_IN_ROWS = 1_000_000; // NOTE: All Cassandra Analytics setting names must start with "spark" in order to not be ignored by Spark, @@ -141,7 +141,7 @@ public BulkSparkConf(SparkConf conf, Map options) this.consistencyLevel = ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM")); this.localDC = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null); this.numberSplits = MapUtils.getInt(options, WriterOptions.NUMBER_SPLITS.name(), DEFAULT_NUM_SPLITS, "number of splits"); - this.rowBufferMode = MapUtils.getEnumOption(options, WriterOptions.ROW_BUFFER_MODE.name(), DEFAULT_ROW_BUFFER_MODE, "row bufferring mode"); + this.rowBufferMode = MapUtils.getEnumOption(options, WriterOptions.ROW_BUFFER_MODE.name(), DEFAULT_ROW_BUFFER_MODE, "row buffering mode"); this.sstableDataSizeInMB = MapUtils.getInt(options, WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(), 160, "sstable data size in MB"); this.sstableBatchSize = MapUtils.getInt(options, WriterOptions.BATCH_SIZE.name(), 1_000_000, "sstable batch size"); this.commitBatchSize = MapUtils.getInt(options, WriterOptions.COMMIT_BATCH_SIZE.name(), DEFAULT_COMMIT_BATCH_SIZE, "commit batch size"); @@ -185,15 +185,18 @@ protected void validateEnvironment() throws RuntimeException protected void validateTableWriterSettings() { boolean batchSizeIsZero = sstableBatchSize == 0; - if (rowBufferMode == RowBufferMode.BUFFERRED - && (!batchSizeIsZero && sstableBatchSize != DEFAULT_BATCH_SIZE_IN_ROWS)) + + if (rowBufferMode == RowBufferMode.UNBUFFERED) + { + Preconditions.checkArgument(!batchSizeIsZero, + "If writing in sorted order (ROW_BUFFER_MODE is UNBUFFERED) then BATCH_SIZE " + + "should be non zero, but it was set to 0 in writer options"); + } + else if (!batchSizeIsZero && sstableBatchSize != DEFAULT_BATCH_SIZE_IN_ROWS) { - LOGGER.warn("BATCH_SIZE is set to a non-zero, non-default value ({}) but ROW_BUFFER_MODE is set to BUFFERRED." - + " Ignoring BATCH_SIZE.", sstableBatchSize); + LOGGER.warn("BATCH_SIZE is set to a non-zero, non-default value ({}) but ROW_BUFFER_MODE is set to BUFFERED." + + " Ignoring BATCH_SIZE.", sstableBatchSize); } - Preconditions.checkArgument(rowBufferMode == RowBufferMode.UNBUFFERRED && !batchSizeIsZero, - "If writing in sorted order (ROW_BUFFER_MODE is UNBUFFERRED) then BATCH_SIZE " - + "should be non zero, but it was set to 0 in writer options"); } /* diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java index d54ce3f24..5d30ad817 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java @@ -21,6 +21,6 @@ public enum RowBufferMode { - BUFFERRED, - UNBUFFERRED + BUFFERED, + UNBUFFERED } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java index df6f1551f..9d2ebc947 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java @@ -74,7 +74,7 @@ public SSTableWriter(BulkWriterContext writerContext, Path outDir) LOGGER.info("Running with version " + packageVersion); TableSchema tableSchema = writerContext.schema().getTableSchema(); - boolean sorted = writerContext.job().getRowBufferMode() == RowBufferMode.UNBUFFERRED; + boolean sorted = writerContext.job().getRowBufferMode() == RowBufferMode.UNBUFFERED; this.cqlSSTableWriter = SSTableWriterFactory.getSSTableWriter( CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(packageVersion), this.outDir.toString(), diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java index cbe6030fb..0877d2332 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java @@ -34,10 +34,11 @@ import org.jetbrains.annotations.NotNull; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.isEmptyString; +import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -122,8 +123,8 @@ public void calculatesCoresCorrectlyForDynamicAllocation() @Test public void ensureSetupSparkConfAddsPerformsNecessaryTasks() { - assertThat(sparkConf.get("spark.kryo.registrator", ""), isEmptyString()); - assertThat(sparkConf.get("spark.executor.extraJavaOptions", ""), isEmptyString()); + assertThat(sparkConf.get("spark.kryo.registrator", ""), is(emptyString())); + assertThat(sparkConf.get("spark.executor.extraJavaOptions", ""), is(emptyString())); BulkSparkConf.setupSparkConf(sparkConf, true); assertEquals("," + SbwKryoRegistrator.class.getName(), sparkConf.get("spark.kryo.registrator", "")); if (BuildInfo.isAtLeastJava11(BuildInfo.javaSpecificationVersion())) @@ -150,8 +151,8 @@ public void keystorePathRequiredIfBase64EncodedKeystoreNotSet() NullPointerException npe = assertThrows(NullPointerException.class, () -> new BulkSparkConf(sparkConf, options)); assertEquals("Keystore password was set. But both keystore path and base64 encoded string are not set. " - + "Please either set option " + WriterOptions.KEYSTORE_PATH - + " or option " + WriterOptions.KEYSTORE_BASE64_ENCODED, npe.getMessage()); + + "Please either set option " + WriterOptions.KEYSTORE_PATH + + " or option " + WriterOptions.KEYSTORE_BASE64_ENCODED, npe.getMessage()); } @Test @@ -206,7 +207,60 @@ public void testTrustStorePasswordSetPathNotSet() NullPointerException npe = assertThrows(NullPointerException.class, () -> new BulkSparkConf(sparkConf, options)); assertEquals("Trust Store Path was provided, but password is missing. " - + "Please provide option " + WriterOptions.TRUSTSTORE_PASSWORD, npe.getMessage()); + + "Please provide option " + WriterOptions.TRUSTSTORE_PASSWORD, npe.getMessage()); + } + + @Test + public void testUnbufferedRowBufferMode() + { + Map options = copyDefaultOptions(); + options.put(WriterOptions.ROW_BUFFER_MODE.name(), "UNBUFFERED"); + BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); + assertNotNull(bulkSparkConf); + assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.UNBUFFERED); + } + + @Test + public void testBufferedRowBufferMode() + { + Map options = copyDefaultOptions(); + options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED"); + BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); + assertNotNull(bulkSparkConf); + assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED); + } + + @Test + public void testInvalidRowBufferMode() + { + Map options = copyDefaultOptions(); + options.put(WriterOptions.ROW_BUFFER_MODE.name(), "invalid"); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> new BulkSparkConf(sparkConf, options)); + assertEquals("Key row buffering mode with value invalid is not a valid Enum of type class org.apache.cassandra.spark.bulkwriter.RowBufferMode.", + exception.getMessage()); + } + + @Test + public void testBufferedRowBufferModeWithZeroBatchSize() + { + Map options = copyDefaultOptions(); + options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED"); + options.put(WriterOptions.BATCH_SIZE.name(), "0"); + BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); + assertNotNull(bulkSparkConf); + assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED); + } + + @Test + public void testNonZeroBatchSizeIsIgnoredWithBufferedRowBufferMode() + { + Map options = copyDefaultOptions(); + options.put(WriterOptions.BATCH_SIZE.name(), "5"); + options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED"); + BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); + assertNotNull(bulkSparkConf); + assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED); } private Map copyDefaultOptions() diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index 4cf29ca06..7d1908bd2 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -59,7 +59,7 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, JobInfo, SchemaInfo, DataTransferApi { private static final long serialVersionUID = -2912371629236770646L; - private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERRED; + private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERED; private ConsistencyLevel.CL consistencyLevel; public interface CommitResultSupplier extends BiFunction, String, RemoteCommitResult>