diff --git a/build.gradle b/build.gradle index 8735bb137..5772b48c7 100644 --- a/build.gradle +++ b/build.gradle @@ -17,6 +17,9 @@ * under the License. */ +import java.nio.file.Files +import java.nio.file.Paths + buildscript { dependencies { classpath(group: 'com.github.jengelman.gradle.plugins', name: 'shadow', version: '6.1.0') @@ -80,12 +83,23 @@ tasks.register('buildIgnoreRatList', Exec) { } rat { + + doFirst { + def excludeFilePath = Paths.get("${buildDir}/.rat-excludes.txt") + def excludeLines = Files.readAllLines(excludeFilePath) + excludeLines.each { line -> + if (line.endsWith("/")) { + excludes.add("**/" + line + "**") + } else { + excludes.add(line) + } + } + } + // List of Gradle exclude directives, defaults to ['**/.gradle/**'] excludes.add("**/build/**") - excludes.add("**/.out/**") excludes.add("CHANGES.txt") excludes.add("**/org.apache.spark.sql.sources.DataSourceRegister") - excludes.add("**/resources/cassandra-analytics-build.properties") // Sidecar for build process excludes.add("**/cassandra-sidecar/**") @@ -97,23 +111,16 @@ rat { excludes.add("gradlew") excludes.add("gradlew.bat") - // idea generated files - excludes.add("**/.idea/**") - // resource files for test excludes.add("**/test**/resources/**") - // resources - excludes.add("**/resources/sidecar.version") - - // Rat excludes file, one directive per line - excludeFile.set(layout.projectDirectory.file("build/.rat-excludes.txt")) - // XML, TXT and HTML reports directory, defaults to 'build/reports/rat' reportDir.set(file("build/reports/rat")) } -rat.dependsOn buildIgnoreRatList +tasks.named('rat').configure { + dependsOn(buildIgnoreRatList) +} subprojects { apply(plugin: 'java-library') diff --git a/cassandra-analytics-core-example/README.md b/cassandra-analytics-core-example/README.md index 3428995b2..89122c9e4 100644 --- a/cassandra-analytics-core-example/README.md +++ b/cassandra-analytics-core-example/README.md @@ -108,13 +108,6 @@ cassandra_instances: I have a 3 node setup, so I configure Sidecar for those 3 nodes. CCM creates the Cassandra cluster under `${HOME}/.ccm/test`, so I update my `data_dirs` and `staging_dir` configuration to use my local path. -Next, create the `staging_dir` where Sidecar will stage SSTables coming from Cassandra Spark bulk writer. -In my case, I have decided to keep the `sstable-staging` directory inside each of the node's directories. - -```shell -mkdir -p ${HOME}/.ccm/test/node{1..3}/sstable-staging -``` - Finally, run Cassandra Sidecar, we skip running integration tests because we need docker for integration tests. You can opt to run integration tests if you have docker running in your local environment. diff --git a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java index 58c9827a2..b71c57b65 100644 --- a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java +++ b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java @@ -20,6 +20,7 @@ package org.apache.cassandra.spark.example; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -28,19 +29,20 @@ import java.util.stream.IntStream; import java.util.stream.LongStream; -import org.apache.cassandra.spark.bulkwriter.TTLOption; -import org.apache.cassandra.spark.bulkwriter.TimestampOption; -import org.apache.cassandra.spark.bulkwriter.WriterOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.spark.KryoRegister; import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; +import org.apache.cassandra.spark.bulkwriter.TTLOption; +import org.apache.cassandra.spark.bulkwriter.TimestampOption; +import org.apache.cassandra.spark.bulkwriter.WriterOptions; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; +import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -128,26 +130,46 @@ private static Dataset write(long rowCount, SparkConf sparkConf, SQLContext { JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sc); int parallelism = sc.defaultParallelism(); - boolean addTTLColumn = true; - boolean addTimestampColumn = true; + boolean addTTLColumn = false; + boolean addTimestampColumn = false; JavaRDD rows = genDataset(javaSparkContext, rowCount, parallelism, addTTLColumn, addTimestampColumn); Dataset df = sql.createDataFrame(rows, getWriteSchema(addTTLColumn, addTimestampColumn)); - df.write() - .format("org.apache.cassandra.spark.sparksql.CassandraDataSink") - .option("sidecar_instances", "localhost,localhost2,localhost3") - .option("keyspace", "spark_test") - .option("table", "test") - .option("local_dc", "datacenter1") - .option("bulk_writer_cl", "LOCAL_QUORUM") - .option("number_splits", "-1") - // A constant timestamp and TTL can be used by setting the following options. - // .option(WriterOptions.TIMESTAMP.name(), TimestampOption.constant(System.currentTimeMillis() * 1000)) - // .option(WriterOptions.TTL.name(), TTLOption.constant(20)) - .option(WriterOptions.TTL.name(), TTLOption.perRow("ttl")) - .option(WriterOptions.TIMESTAMP.name(), TimestampOption.perRow("timestamp")) - .mode("append") - .save(); + DataFrameWriter dfWriter = df.write() + .format("org.apache.cassandra.spark.sparksql.CassandraDataSink") + .option("sidecar_instances", "localhost,localhost2,localhost3") + .option("keyspace", "spark_test") + .option("table", "test") + .option("local_dc", "datacenter1") + .option("bulk_writer_cl", "LOCAL_QUORUM") + .option("number_splits", "-1") + // A constant timestamp and TTL can be used by setting the following options. + // .option(WriterOptions.TTL.name(), TTLOption.constant(20)) + // .option(WriterOptions.TIMESTAMP.name(), TimestampOption.constant(System.currentTimeMillis() * 1000)) + .mode("append"); + + List addedColumns = new ArrayList<>(); + if (addTTLColumn) + { + addedColumns.add("ttl"); + dfWriter = dfWriter + .option(WriterOptions.TTL.name(), TTLOption.perRow("ttl")); + } + + if (addTimestampColumn) + { + addedColumns.add("timestamp"); + dfWriter = dfWriter + .option(WriterOptions.TIMESTAMP.name(), TimestampOption.perRow("timestamp")); + } + + dfWriter.save(); + + if (!addedColumns.isEmpty()) + { + df = df.drop(addedColumns.toArray(new String[0])); + } + return df; } @@ -183,9 +205,9 @@ private static Dataset read(int expectedRowCount, SparkConf sparkConf, SQLC private static StructType getWriteSchema(boolean addTTLColumn, boolean addTimestampColumn) { StructType schema = new StructType() - .add("id", LongType, false) - .add("course", BinaryType, false) - .add("marks", LongType, false); + .add("id", LongType, false) + .add("course", BinaryType, false) + .add("marks", LongType, false); if (addTTLColumn) { schema = schema.add("ttl", IntegerType, false); @@ -215,34 +237,34 @@ private static JavaRDD genDataset(JavaSparkContext sc, long records, Intege long recordsPerPartition = records / parallelism; long remainder = records - (recordsPerPartition * parallelism); List seq = IntStream.range(0, parallelism).boxed().collect(Collectors.toList()); - int ttl = 10; + int ttl = 120; // data will not be queryable in two minutes long timeStamp = System.currentTimeMillis() * 1000; JavaRDD dataset = sc.parallelize(seq, parallelism).mapPartitionsWithIndex( - (Function2, Iterator>) (index, integerIterator) -> { - long firstRecordNumber = index * recordsPerPartition; - long recordsToGenerate = index.equals(parallelism) ? remainder : recordsPerPartition; - java.util.Iterator rows = LongStream.range(0, recordsToGenerate).mapToObj(offset -> { - long recordNumber = firstRecordNumber + offset; - String courseNameString = String.valueOf(recordNumber); - Integer courseNameStringLen = courseNameString.length(); - Integer courseNameMultiplier = 1000 / courseNameStringLen; - byte[] courseName = dupStringAsBytes(courseNameString, courseNameMultiplier); - if (addTTLColumn && addTimestampColumn) - { - return RowFactory.create(recordNumber, courseName, recordNumber, ttl, timeStamp); - } - if (addTTLColumn) - { - return RowFactory.create(recordNumber, courseName, recordNumber, ttl); - } - if (addTimestampColumn) - { - return RowFactory.create(recordNumber, courseName, recordNumber, timeStamp); - } - return RowFactory.create(recordNumber, courseName, recordNumber); - }).iterator(); - return rows; - }, false); + (Function2, Iterator>) (index, integerIterator) -> { + long firstRecordNumber = index * recordsPerPartition; + long recordsToGenerate = index.equals(parallelism) ? remainder : recordsPerPartition; + java.util.Iterator rows = LongStream.range(0, recordsToGenerate).mapToObj(offset -> { + long recordNumber = firstRecordNumber + offset; + String courseNameString = String.valueOf(recordNumber); + Integer courseNameStringLen = courseNameString.length(); + Integer courseNameMultiplier = 1000 / courseNameStringLen; + byte[] courseName = dupStringAsBytes(courseNameString, courseNameMultiplier); + if (addTTLColumn && addTimestampColumn) + { + return RowFactory.create(recordNumber, courseName, recordNumber, ttl, timeStamp); + } + if (addTTLColumn) + { + return RowFactory.create(recordNumber, courseName, recordNumber, ttl); + } + if (addTimestampColumn) + { + return RowFactory.create(recordNumber, courseName, recordNumber, timeStamp); + } + return RowFactory.create(recordNumber, courseName, recordNumber); + }).iterator(); + return rows; + }, false); return dataset; } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java index 9721c32f2..0871ddda6 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/clients/Sidecar.java @@ -81,32 +81,32 @@ public static SidecarClient from(SidecarInstancesProvider sidecarInstancesProvid Vertx vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true).setWorkerPoolSize(config.maxPoolSize())); HttpClientConfig.Builder builder = new HttpClientConfig.Builder<>() - .ssl(false) - .timeoutMillis(TimeUnit.SECONDS.toMillis(config.timeoutSeconds())) - .idleTimeoutMillis((int) TimeUnit.SECONDS.toMillis(config.timeoutSeconds())) - .receiveBufferSize((int) config.chunkBufferSize()) - .maxChunkSize((int) config.maxBufferSize()) - .userAgent(BuildInfo.READER_USER_AGENT); + .ssl(false) + .timeoutMillis(TimeUnit.SECONDS.toMillis(config.timeoutSeconds())) + .idleTimeoutMillis((int) TimeUnit.SECONDS.toMillis(config.timeoutSeconds())) + .receiveBufferSize((int) config.chunkBufferSize()) + .maxChunkSize((int) config.maxBufferSize()) + .userAgent(BuildInfo.READER_USER_AGENT); if (secretsProvider != null) { builder = builder - .ssl(true) - .keyStoreInputStream(secretsProvider.keyStoreInputStream()) - .keyStorePassword(String.valueOf(secretsProvider.keyStorePassword())) - .keyStoreType(secretsProvider.keyStoreType()) - .trustStoreInputStream(secretsProvider.trustStoreInputStream()) - .trustStorePassword(String.valueOf(secretsProvider.trustStorePassword())) - .trustStoreType(secretsProvider.trustStoreType()); + .ssl(true) + .keyStoreInputStream(secretsProvider.keyStoreInputStream()) + .keyStorePassword(String.valueOf(secretsProvider.keyStorePassword())) + .keyStoreType(secretsProvider.keyStoreType()) + .trustStoreInputStream(secretsProvider.trustStoreInputStream()) + .trustStorePassword(String.valueOf(secretsProvider.trustStorePassword())) + .trustStoreType(secretsProvider.trustStoreType()); } HttpClientConfig httpClientConfig = builder.build(); SidecarConfig sidecarConfig = new SidecarConfig.Builder() - .maxRetries(config.maxRetries()) - .retryDelayMillis(config.millisToSleep()) - .maxRetryDelayMillis(config.maxMillisToSleep()) - .build(); + .maxRetries(config.maxRetries()) + .retryDelayMillis(config.millisToSleep()) + .maxRetryDelayMillis(config.maxMillisToSleep()) + .build(); return buildClient(sidecarConfig, vertx, httpClientConfig, sidecarInstancesProvider); } @@ -117,23 +117,23 @@ public static SidecarClient from(SidecarInstancesProvider sidecarInstancesProvid .setWorkerPoolSize(conf.getMaxHttpConnections())); HttpClientConfig httpClientConfig = new HttpClientConfig.Builder<>() - .timeoutMillis(conf.getHttpResponseTimeoutMs()) - .idleTimeoutMillis(conf.getHttpConnectionTimeoutMs()) - .userAgent(BuildInfo.WRITER_USER_AGENT) - .keyStoreInputStream(conf.getKeyStore()) - .keyStorePassword(conf.getKeyStorePassword()) - .keyStoreType(conf.getKeyStoreTypeOrDefault()) - .trustStoreInputStream(conf.getTrustStore()) - .trustStorePassword(conf.getTrustStorePasswordOrDefault()) - .trustStoreType(conf.getTrustStoreTypeOrDefault()) - .ssl(conf.hasKeystoreAndKeystorePassword()) - .build(); + .timeoutMillis(conf.getHttpResponseTimeoutMs()) + .idleTimeoutMillis(conf.getHttpConnectionTimeoutMs()) + .userAgent(BuildInfo.WRITER_USER_AGENT) + .keyStoreInputStream(conf.getKeyStore()) + .keyStorePassword(conf.getKeyStorePassword()) + .keyStoreType(conf.getKeyStoreTypeOrDefault()) + .trustStoreInputStream(conf.getTrustStore()) + .trustStorePassword(conf.getTrustStorePasswordOrDefault()) + .trustStoreType(conf.getTrustStoreTypeOrDefault()) + .ssl(conf.hasKeystoreAndKeystorePassword()) + .build(); SidecarConfig sidecarConfig = new SidecarConfig.Builder() - .maxRetries(conf.getSidecarRequestRetries()) - .retryDelayMillis(TimeUnit.SECONDS.toMillis(conf.getSidecarRequestRetryDelayInSeconds())) - .maxRetryDelayMillis(TimeUnit.SECONDS.toMillis(conf.getSidecarRequestMaxRetryDelayInSeconds())) - .build(); + .maxRetries(5) + .retryDelayMillis(200) + .maxRetryDelayMillis(500) + .build(); return buildClient(sidecarConfig, vertx, httpClientConfig, sidecarInstancesProvider); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java index ee63df9e1..a719df713 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java @@ -80,7 +80,7 @@ public class BulkSparkConf implements Serializable public static final long DEFAULT_SIDECAR_REQUEST_MAX_RETRY_DELAY_SECONDS = 60L; public static final int DEFAULT_COMMIT_BATCH_SIZE = 10_000; public static final int DEFAULT_RING_RETRY_COUNT = 3; - public static final RowBufferMode DEFAULT_ROW_BUFFER_MODE = RowBufferMode.UNBUFFERRED; + public static final RowBufferMode DEFAULT_ROW_BUFFER_MODE = RowBufferMode.UNBUFFERED; public static final int DEFAULT_BATCH_SIZE_IN_ROWS = 1_000_000; // NOTE: All Cassandra Analytics setting names must start with "spark" in order to not be ignored by Spark, @@ -141,7 +141,7 @@ public BulkSparkConf(SparkConf conf, Map options) this.consistencyLevel = ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM")); this.localDC = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null); this.numberSplits = MapUtils.getInt(options, WriterOptions.NUMBER_SPLITS.name(), DEFAULT_NUM_SPLITS, "number of splits"); - this.rowBufferMode = MapUtils.getEnumOption(options, WriterOptions.ROW_BUFFER_MODE.name(), DEFAULT_ROW_BUFFER_MODE, "row bufferring mode"); + this.rowBufferMode = MapUtils.getEnumOption(options, WriterOptions.ROW_BUFFER_MODE.name(), DEFAULT_ROW_BUFFER_MODE, "row buffering mode"); this.sstableDataSizeInMB = MapUtils.getInt(options, WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(), 160, "sstable data size in MB"); this.sstableBatchSize = MapUtils.getInt(options, WriterOptions.BATCH_SIZE.name(), 1_000_000, "sstable batch size"); this.commitBatchSize = MapUtils.getInt(options, WriterOptions.COMMIT_BATCH_SIZE.name(), DEFAULT_COMMIT_BATCH_SIZE, "commit batch size"); @@ -185,15 +185,18 @@ protected void validateEnvironment() throws RuntimeException protected void validateTableWriterSettings() { boolean batchSizeIsZero = sstableBatchSize == 0; - if (rowBufferMode == RowBufferMode.BUFFERRED - && (!batchSizeIsZero && sstableBatchSize != DEFAULT_BATCH_SIZE_IN_ROWS)) + + if (rowBufferMode == RowBufferMode.UNBUFFERED) + { + Preconditions.checkArgument(!batchSizeIsZero, + "If writing in sorted order (ROW_BUFFER_MODE is UNBUFFERED) then BATCH_SIZE " + + "should be non zero, but it was set to 0 in writer options"); + } + else if (!batchSizeIsZero && sstableBatchSize != DEFAULT_BATCH_SIZE_IN_ROWS) { - LOGGER.warn("BATCH_SIZE is set to a non-zero, non-default value ({}) but ROW_BUFFER_MODE is set to BUFFERRED." - + " Ignoring BATCH_SIZE.", sstableBatchSize); + LOGGER.warn("BATCH_SIZE is set to a non-zero, non-default value ({}) but ROW_BUFFER_MODE is set to BUFFERED." + + " Ignoring BATCH_SIZE.", sstableBatchSize); } - Preconditions.checkArgument(rowBufferMode == RowBufferMode.UNBUFFERRED && !batchSizeIsZero, - "If writing in sorted order (ROW_BUFFER_MODE is UNBUFFERRED) then BATCH_SIZE " - + "should be non zero, but it was set to 0 in writer options"); } /* diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index 8f2c0b55d..0a8378527 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -121,7 +121,6 @@ private void cancelJob(@NotNull CancelJobEvent cancelJobEvent) sparkContext.cancelJobGroup(writerContext.job().getId().toString()); } - @SuppressWarnings("RedundantCast") private void persist(@NotNull JavaPairRDD sortedRDD, String[] columnNames) { writeValidator.setPhase("Environment Validation"); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java index 2709b51bf..abe4ef807 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java @@ -53,7 +53,7 @@ public class CassandraBulkWriterContext implements BulkWriterContext, KryoSerial @NotNull private final BulkSparkConf conf; private final JobInfo jobInfo; - private final DataTransferApi dataTransferApi; + private transient DataTransferApi dataTransferApi; private final CassandraClusterInfo clusterInfo; private final SchemaInfo schemaInfo; @@ -70,14 +70,12 @@ private CassandraBulkWriterContext(@NotNull BulkSparkConf conf, clusterInfo = new CassandraClusterInfo(conf); CassandraRing ring = clusterInfo.getRing(true); jobInfo = new CassandraJobInfo(conf, - new TokenPartitioner(ring, conf.numberSplits, sparkContext.defaultParallelism(), conf.getCores())); + new TokenPartitioner(ring, conf.numberSplits, sparkContext.defaultParallelism(), conf.getCores())); Preconditions.checkArgument(!conf.consistencyLevel.isLocal() - || (conf.localDC != null && ring.getReplicationFactor().getOptions().containsKey(conf.localDC)), + || (conf.localDC != null && ring.getReplicationFactor().getOptions().containsKey(conf.localDC)), String.format("Keyspace %s is not replicated on datacenter %s", conf.keyspace, conf.localDC)); - dataTransferApi = new SidecarDataTransferApi(clusterInfo.getCassandraContext().getSidecarClient(), jobInfo, conf); - String keyspace = conf.keyspace; String table = conf.table; @@ -133,14 +131,14 @@ private static void configureRelocatedNetty() /** * Use the implementation of the KryoSerializable interface as a detection device to make sure the Spark Bulk * Writer's KryoRegistrator is properly in place. - * + *

* If this class is serialized by Kryo, it means we're not set up correctly, and therefore we log and fail. * This failure will occur early in the job and be very clear, so users can quickly fix their code and get up and * running again, rather than having a random NullPointerException further down the line. */ public static final String KRYO_REGISTRATION_WARNING = - "Spark Bulk Writer Kryo Registrator (SbwKryoRegistrator) was not registered with Spark - " - + "please see the README.md file for more details on how to register the Spark Bulk Writer."; + "Spark Bulk Writer Kryo Registrator (SbwKryoRegistrator) was not registered with Spark - " + + "please see the README.md file for more details on how to register the Spark Bulk Writer."; @Override public void write(Kryo kryo, Output output) @@ -187,8 +185,12 @@ public SchemaInfo schema() @Override @NotNull - public DataTransferApi transfer() + public synchronized DataTransferApi transfer() { + if (dataTransferApi == null) + { + dataTransferApi = new SidecarDataTransferApi(clusterInfo.getCassandraContext().getSidecarClient(), jobInfo, conf); + } return dataTransferApi; } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java index d54ce3f24..5d30ad817 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RowBufferMode.java @@ -21,6 +21,6 @@ public enum RowBufferMode { - BUFFERRED, - UNBUFFERRED + BUFFERED, + UNBUFFERED } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java index df6f1551f..9d2ebc947 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java @@ -74,7 +74,7 @@ public SSTableWriter(BulkWriterContext writerContext, Path outDir) LOGGER.info("Running with version " + packageVersion); TableSchema tableSchema = writerContext.schema().getTableSchema(); - boolean sorted = writerContext.job().getRowBufferMode() == RowBufferMode.UNBUFFERRED; + boolean sorted = writerContext.job().getRowBufferMode() == RowBufferMode.UNBUFFERED; this.cqlSSTableWriter = SSTableWriterFactory.getSSTableWriter( CassandraVersionFeatures.cassandraVersionFeaturesFromCassandraVersion(packageVersion), this.outDir.toString(), diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java index fdc51bcab..b4104bd44 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java @@ -389,9 +389,12 @@ protected void initInstanceMap() instanceMap = clusterConfig.stream().collect(Collectors.toMap(SidecarInstance::hostname, Function.identity())); try { + SslConfigSecretsProvider secretsProvider = sslConfig != null + ? new SslConfigSecretsProvider(sslConfig) + : null; sidecar = Sidecar.from(new SimpleSidecarInstancesProvider(new ArrayList<>(clusterConfig)), sidecarClientConfig, - new SslConfigSecretsProvider(sslConfig)); + secretsProvider); } catch (IOException ioException) { diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java index cbe6030fb..0877d2332 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java @@ -34,10 +34,11 @@ import org.jetbrains.annotations.NotNull; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.isEmptyString; +import static org.hamcrest.Matchers.emptyString; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -122,8 +123,8 @@ public void calculatesCoresCorrectlyForDynamicAllocation() @Test public void ensureSetupSparkConfAddsPerformsNecessaryTasks() { - assertThat(sparkConf.get("spark.kryo.registrator", ""), isEmptyString()); - assertThat(sparkConf.get("spark.executor.extraJavaOptions", ""), isEmptyString()); + assertThat(sparkConf.get("spark.kryo.registrator", ""), is(emptyString())); + assertThat(sparkConf.get("spark.executor.extraJavaOptions", ""), is(emptyString())); BulkSparkConf.setupSparkConf(sparkConf, true); assertEquals("," + SbwKryoRegistrator.class.getName(), sparkConf.get("spark.kryo.registrator", "")); if (BuildInfo.isAtLeastJava11(BuildInfo.javaSpecificationVersion())) @@ -150,8 +151,8 @@ public void keystorePathRequiredIfBase64EncodedKeystoreNotSet() NullPointerException npe = assertThrows(NullPointerException.class, () -> new BulkSparkConf(sparkConf, options)); assertEquals("Keystore password was set. But both keystore path and base64 encoded string are not set. " - + "Please either set option " + WriterOptions.KEYSTORE_PATH - + " or option " + WriterOptions.KEYSTORE_BASE64_ENCODED, npe.getMessage()); + + "Please either set option " + WriterOptions.KEYSTORE_PATH + + " or option " + WriterOptions.KEYSTORE_BASE64_ENCODED, npe.getMessage()); } @Test @@ -206,7 +207,60 @@ public void testTrustStorePasswordSetPathNotSet() NullPointerException npe = assertThrows(NullPointerException.class, () -> new BulkSparkConf(sparkConf, options)); assertEquals("Trust Store Path was provided, but password is missing. " - + "Please provide option " + WriterOptions.TRUSTSTORE_PASSWORD, npe.getMessage()); + + "Please provide option " + WriterOptions.TRUSTSTORE_PASSWORD, npe.getMessage()); + } + + @Test + public void testUnbufferedRowBufferMode() + { + Map options = copyDefaultOptions(); + options.put(WriterOptions.ROW_BUFFER_MODE.name(), "UNBUFFERED"); + BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); + assertNotNull(bulkSparkConf); + assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.UNBUFFERED); + } + + @Test + public void testBufferedRowBufferMode() + { + Map options = copyDefaultOptions(); + options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED"); + BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); + assertNotNull(bulkSparkConf); + assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED); + } + + @Test + public void testInvalidRowBufferMode() + { + Map options = copyDefaultOptions(); + options.put(WriterOptions.ROW_BUFFER_MODE.name(), "invalid"); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> new BulkSparkConf(sparkConf, options)); + assertEquals("Key row buffering mode with value invalid is not a valid Enum of type class org.apache.cassandra.spark.bulkwriter.RowBufferMode.", + exception.getMessage()); + } + + @Test + public void testBufferedRowBufferModeWithZeroBatchSize() + { + Map options = copyDefaultOptions(); + options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED"); + options.put(WriterOptions.BATCH_SIZE.name(), "0"); + BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); + assertNotNull(bulkSparkConf); + assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED); + } + + @Test + public void testNonZeroBatchSizeIsIgnoredWithBufferedRowBufferMode() + { + Map options = copyDefaultOptions(); + options.put(WriterOptions.BATCH_SIZE.name(), "5"); + options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED"); + BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); + assertNotNull(bulkSparkConf); + assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED); } private Map copyDefaultOptions() diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index 4cf29ca06..7d1908bd2 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -59,7 +59,7 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, JobInfo, SchemaInfo, DataTransferApi { private static final long serialVersionUID = -2912371629236770646L; - private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERRED; + private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERED; private ConsistencyLevel.CL consistencyLevel; public interface CommitResultSupplier extends BiFunction, String, RemoteCommitResult>