Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
* under the License.
*/

import java.nio.file.Files
import java.nio.file.Paths

buildscript {
dependencies {
classpath(group: 'com.github.jengelman.gradle.plugins', name: 'shadow', version: '6.1.0')
Expand Down Expand Up @@ -80,12 +83,23 @@ tasks.register('buildIgnoreRatList', Exec) {
}

rat {

doFirst {
def excludeFilePath = Paths.get("${buildDir}/.rat-excludes.txt")
def excludeLines = Files.readAllLines(excludeFilePath)
excludeLines.each { line ->
if (line.endsWith("/")) {
excludes.add("**/" + line + "**")
} else {
excludes.add(line)
}
}
}

// List of Gradle exclude directives, defaults to ['**/.gradle/**']
excludes.add("**/build/**")
excludes.add("**/.out/**")
excludes.add("CHANGES.txt")
excludes.add("**/org.apache.spark.sql.sources.DataSourceRegister")
excludes.add("**/resources/cassandra-analytics-build.properties")

// Sidecar for build process
excludes.add("**/cassandra-sidecar/**")
Expand All @@ -97,23 +111,16 @@ rat {
excludes.add("gradlew")
excludes.add("gradlew.bat")

// idea generated files
excludes.add("**/.idea/**")

// resource files for test
excludes.add("**/test**/resources/**")

// resources
excludes.add("**/resources/sidecar.version")

// Rat excludes file, one directive per line
excludeFile.set(layout.projectDirectory.file("build/.rat-excludes.txt"))

// XML, TXT and HTML reports directory, defaults to 'build/reports/rat'
reportDir.set(file("build/reports/rat"))
}

rat.dependsOn buildIgnoreRatList
tasks.named('rat').configure {
dependsOn(buildIgnoreRatList)
}

subprojects {
apply(plugin: 'java-library')
Expand Down
7 changes: 0 additions & 7 deletions cassandra-analytics-core-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,6 @@ cassandra_instances:
I have a 3 node setup, so I configure Sidecar for those 3 nodes. CCM creates the Cassandra cluster under
`${HOME}/.ccm/test`, so I update my `data_dirs` and `staging_dir` configuration to use my local path.

Next, create the `staging_dir` where Sidecar will stage SSTables coming from Cassandra Spark bulk writer.
In my case, I have decided to keep the `sstable-staging` directory inside each of the node's directories.

```shell
mkdir -p ${HOME}/.ccm/test/node{1..3}/sstable-staging
```

Finally, run Cassandra Sidecar, we skip running integration tests because we need docker for integration tests. You
can opt to run integration tests if you have docker running in your local environment.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.cassandra.spark.example;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
Expand All @@ -28,19 +29,20 @@
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import org.apache.cassandra.spark.bulkwriter.TTLOption;
import org.apache.cassandra.spark.bulkwriter.TimestampOption;
import org.apache.cassandra.spark.bulkwriter.WriterOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.spark.KryoRegister;
import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
import org.apache.cassandra.spark.bulkwriter.TTLOption;
import org.apache.cassandra.spark.bulkwriter.TimestampOption;
import org.apache.cassandra.spark.bulkwriter.WriterOptions;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
Expand Down Expand Up @@ -128,26 +130,46 @@ private static Dataset<Row> write(long rowCount, SparkConf sparkConf, SQLContext
{
JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sc);
int parallelism = sc.defaultParallelism();
boolean addTTLColumn = true;
boolean addTimestampColumn = true;
boolean addTTLColumn = false;
boolean addTimestampColumn = false;
JavaRDD<Row> rows = genDataset(javaSparkContext, rowCount, parallelism, addTTLColumn, addTimestampColumn);
Dataset<Row> df = sql.createDataFrame(rows, getWriteSchema(addTTLColumn, addTimestampColumn));

df.write()
.format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
.option("sidecar_instances", "localhost,localhost2,localhost3")
.option("keyspace", "spark_test")
.option("table", "test")
.option("local_dc", "datacenter1")
.option("bulk_writer_cl", "LOCAL_QUORUM")
.option("number_splits", "-1")
// A constant timestamp and TTL can be used by setting the following options.
// .option(WriterOptions.TIMESTAMP.name(), TimestampOption.constant(System.currentTimeMillis() * 1000))
// .option(WriterOptions.TTL.name(), TTLOption.constant(20))
.option(WriterOptions.TTL.name(), TTLOption.perRow("ttl"))
.option(WriterOptions.TIMESTAMP.name(), TimestampOption.perRow("timestamp"))
.mode("append")
.save();
DataFrameWriter<Row> dfWriter = df.write()
.format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
.option("sidecar_instances", "localhost,localhost2,localhost3")
.option("keyspace", "spark_test")
.option("table", "test")
.option("local_dc", "datacenter1")
.option("bulk_writer_cl", "LOCAL_QUORUM")
.option("number_splits", "-1")
// A constant timestamp and TTL can be used by setting the following options.
// .option(WriterOptions.TTL.name(), TTLOption.constant(20))
// .option(WriterOptions.TIMESTAMP.name(), TimestampOption.constant(System.currentTimeMillis() * 1000))
.mode("append");

List<String> addedColumns = new ArrayList<>();
if (addTTLColumn)
{
addedColumns.add("ttl");
dfWriter = dfWriter
.option(WriterOptions.TTL.name(), TTLOption.perRow("ttl"));
}

if (addTimestampColumn)
{
addedColumns.add("timestamp");
dfWriter = dfWriter
.option(WriterOptions.TIMESTAMP.name(), TimestampOption.perRow("timestamp"));
}

dfWriter.save();

if (!addedColumns.isEmpty())
{
df = df.drop(addedColumns.toArray(new String[0]));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the columns from the schema, so we can validate.

}

return df;
}

Expand Down Expand Up @@ -183,9 +205,9 @@ private static Dataset<Row> read(int expectedRowCount, SparkConf sparkConf, SQLC
private static StructType getWriteSchema(boolean addTTLColumn, boolean addTimestampColumn)
{
StructType schema = new StructType()
.add("id", LongType, false)
.add("course", BinaryType, false)
.add("marks", LongType, false);
.add("id", LongType, false)
.add("course", BinaryType, false)
.add("marks", LongType, false);
if (addTTLColumn)
{
schema = schema.add("ttl", IntegerType, false);
Expand Down Expand Up @@ -215,34 +237,34 @@ private static JavaRDD<Row> genDataset(JavaSparkContext sc, long records, Intege
long recordsPerPartition = records / parallelism;
long remainder = records - (recordsPerPartition * parallelism);
List<Integer> seq = IntStream.range(0, parallelism).boxed().collect(Collectors.toList());
int ttl = 10;
int ttl = 120; // data will not be queryable in two minutes
long timeStamp = System.currentTimeMillis() * 1000;
JavaRDD<Row> dataset = sc.parallelize(seq, parallelism).mapPartitionsWithIndex(
(Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index, integerIterator) -> {
long firstRecordNumber = index * recordsPerPartition;
long recordsToGenerate = index.equals(parallelism) ? remainder : recordsPerPartition;
java.util.Iterator<Row> rows = LongStream.range(0, recordsToGenerate).mapToObj(offset -> {
long recordNumber = firstRecordNumber + offset;
String courseNameString = String.valueOf(recordNumber);
Integer courseNameStringLen = courseNameString.length();
Integer courseNameMultiplier = 1000 / courseNameStringLen;
byte[] courseName = dupStringAsBytes(courseNameString, courseNameMultiplier);
if (addTTLColumn && addTimestampColumn)
{
return RowFactory.create(recordNumber, courseName, recordNumber, ttl, timeStamp);
}
if (addTTLColumn)
{
return RowFactory.create(recordNumber, courseName, recordNumber, ttl);
}
if (addTimestampColumn)
{
return RowFactory.create(recordNumber, courseName, recordNumber, timeStamp);
}
return RowFactory.create(recordNumber, courseName, recordNumber);
}).iterator();
return rows;
}, false);
(Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index, integerIterator) -> {
long firstRecordNumber = index * recordsPerPartition;
long recordsToGenerate = index.equals(parallelism) ? remainder : recordsPerPartition;
java.util.Iterator<Row> rows = LongStream.range(0, recordsToGenerate).mapToObj(offset -> {
long recordNumber = firstRecordNumber + offset;
String courseNameString = String.valueOf(recordNumber);
Integer courseNameStringLen = courseNameString.length();
Integer courseNameMultiplier = 1000 / courseNameStringLen;
byte[] courseName = dupStringAsBytes(courseNameString, courseNameMultiplier);
if (addTTLColumn && addTimestampColumn)
{
return RowFactory.create(recordNumber, courseName, recordNumber, ttl, timeStamp);
}
if (addTTLColumn)
{
return RowFactory.create(recordNumber, courseName, recordNumber, ttl);
}
if (addTimestampColumn)
{
return RowFactory.create(recordNumber, courseName, recordNumber, timeStamp);
}
return RowFactory.create(recordNumber, courseName, recordNumber);
}).iterator();
return rows;
}, false);
return dataset;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,32 +81,32 @@ public static SidecarClient from(SidecarInstancesProvider sidecarInstancesProvid
Vertx vertx = Vertx.vertx(new VertxOptions().setUseDaemonThread(true).setWorkerPoolSize(config.maxPoolSize()));

HttpClientConfig.Builder<?> builder = new HttpClientConfig.Builder<>()
.ssl(false)
.timeoutMillis(TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
.idleTimeoutMillis((int) TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
.receiveBufferSize((int) config.chunkBufferSize())
.maxChunkSize((int) config.maxBufferSize())
.userAgent(BuildInfo.READER_USER_AGENT);
.ssl(false)
.timeoutMillis(TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
.idleTimeoutMillis((int) TimeUnit.SECONDS.toMillis(config.timeoutSeconds()))
.receiveBufferSize((int) config.chunkBufferSize())
.maxChunkSize((int) config.maxBufferSize())
.userAgent(BuildInfo.READER_USER_AGENT);

if (secretsProvider != null)
{
builder = builder
.ssl(true)
.keyStoreInputStream(secretsProvider.keyStoreInputStream())
.keyStorePassword(String.valueOf(secretsProvider.keyStorePassword()))
.keyStoreType(secretsProvider.keyStoreType())
.trustStoreInputStream(secretsProvider.trustStoreInputStream())
.trustStorePassword(String.valueOf(secretsProvider.trustStorePassword()))
.trustStoreType(secretsProvider.trustStoreType());
.ssl(true)
.keyStoreInputStream(secretsProvider.keyStoreInputStream())
.keyStorePassword(String.valueOf(secretsProvider.keyStorePassword()))
.keyStoreType(secretsProvider.keyStoreType())
.trustStoreInputStream(secretsProvider.trustStoreInputStream())
.trustStorePassword(String.valueOf(secretsProvider.trustStorePassword()))
.trustStoreType(secretsProvider.trustStoreType());
}

HttpClientConfig httpClientConfig = builder.build();

SidecarConfig sidecarConfig = new SidecarConfig.Builder()
.maxRetries(config.maxRetries())
.retryDelayMillis(config.millisToSleep())
.maxRetryDelayMillis(config.maxMillisToSleep())
.build();
.maxRetries(config.maxRetries())
.retryDelayMillis(config.millisToSleep())
.maxRetryDelayMillis(config.maxMillisToSleep())
.build();

return buildClient(sidecarConfig, vertx, httpClientConfig, sidecarInstancesProvider);
}
Expand All @@ -117,23 +117,23 @@ public static SidecarClient from(SidecarInstancesProvider sidecarInstancesProvid
.setWorkerPoolSize(conf.getMaxHttpConnections()));

HttpClientConfig httpClientConfig = new HttpClientConfig.Builder<>()
.timeoutMillis(conf.getHttpResponseTimeoutMs())
.idleTimeoutMillis(conf.getHttpConnectionTimeoutMs())
.userAgent(BuildInfo.WRITER_USER_AGENT)
.keyStoreInputStream(conf.getKeyStore())
.keyStorePassword(conf.getKeyStorePassword())
.keyStoreType(conf.getKeyStoreTypeOrDefault())
.trustStoreInputStream(conf.getTrustStore())
.trustStorePassword(conf.getTrustStorePasswordOrDefault())
.trustStoreType(conf.getTrustStoreTypeOrDefault())
.ssl(conf.hasKeystoreAndKeystorePassword())
.build();
.timeoutMillis(conf.getHttpResponseTimeoutMs())
.idleTimeoutMillis(conf.getHttpConnectionTimeoutMs())
.userAgent(BuildInfo.WRITER_USER_AGENT)
.keyStoreInputStream(conf.getKeyStore())
.keyStorePassword(conf.getKeyStorePassword())
.keyStoreType(conf.getKeyStoreTypeOrDefault())
.trustStoreInputStream(conf.getTrustStore())
.trustStorePassword(conf.getTrustStorePasswordOrDefault())
.trustStoreType(conf.getTrustStoreTypeOrDefault())
.ssl(conf.hasKeystoreAndKeystorePassword())
.build();

SidecarConfig sidecarConfig = new SidecarConfig.Builder()
.maxRetries(conf.getSidecarRequestRetries())
.retryDelayMillis(TimeUnit.SECONDS.toMillis(conf.getSidecarRequestRetryDelayInSeconds()))
.maxRetryDelayMillis(TimeUnit.SECONDS.toMillis(conf.getSidecarRequestMaxRetryDelayInSeconds()))
.build();
.maxRetries(5)
.retryDelayMillis(200)
.maxRetryDelayMillis(500)
.build();

return buildClient(sidecarConfig, vertx, httpClientConfig, sidecarInstancesProvider);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class BulkSparkConf implements Serializable
public static final long DEFAULT_SIDECAR_REQUEST_MAX_RETRY_DELAY_SECONDS = 60L;
public static final int DEFAULT_COMMIT_BATCH_SIZE = 10_000;
public static final int DEFAULT_RING_RETRY_COUNT = 3;
public static final RowBufferMode DEFAULT_ROW_BUFFER_MODE = RowBufferMode.UNBUFFERRED;
public static final RowBufferMode DEFAULT_ROW_BUFFER_MODE = RowBufferMode.UNBUFFERED;
public static final int DEFAULT_BATCH_SIZE_IN_ROWS = 1_000_000;

// NOTE: All Cassandra Analytics setting names must start with "spark" in order to not be ignored by Spark,
Expand Down Expand Up @@ -141,7 +141,7 @@ public BulkSparkConf(SparkConf conf, Map<String, String> options)
this.consistencyLevel = ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM"));
this.localDC = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null);
this.numberSplits = MapUtils.getInt(options, WriterOptions.NUMBER_SPLITS.name(), DEFAULT_NUM_SPLITS, "number of splits");
this.rowBufferMode = MapUtils.getEnumOption(options, WriterOptions.ROW_BUFFER_MODE.name(), DEFAULT_ROW_BUFFER_MODE, "row bufferring mode");
this.rowBufferMode = MapUtils.getEnumOption(options, WriterOptions.ROW_BUFFER_MODE.name(), DEFAULT_ROW_BUFFER_MODE, "row buffering mode");
this.sstableDataSizeInMB = MapUtils.getInt(options, WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(), 160, "sstable data size in MB");
this.sstableBatchSize = MapUtils.getInt(options, WriterOptions.BATCH_SIZE.name(), 1_000_000, "sstable batch size");
this.commitBatchSize = MapUtils.getInt(options, WriterOptions.COMMIT_BATCH_SIZE.name(), DEFAULT_COMMIT_BATCH_SIZE, "commit batch size");
Expand Down Expand Up @@ -185,15 +185,18 @@ protected void validateEnvironment() throws RuntimeException
protected void validateTableWriterSettings()
{
boolean batchSizeIsZero = sstableBatchSize == 0;
if (rowBufferMode == RowBufferMode.BUFFERRED
&& (!batchSizeIsZero && sstableBatchSize != DEFAULT_BATCH_SIZE_IN_ROWS))

if (rowBufferMode == RowBufferMode.UNBUFFERED)
{
Preconditions.checkArgument(!batchSizeIsZero,
"If writing in sorted order (ROW_BUFFER_MODE is UNBUFFERED) then BATCH_SIZE "
+ "should be non zero, but it was set to 0 in writer options");
}
else if (!batchSizeIsZero && sstableBatchSize != DEFAULT_BATCH_SIZE_IN_ROWS)
{
LOGGER.warn("BATCH_SIZE is set to a non-zero, non-default value ({}) but ROW_BUFFER_MODE is set to BUFFERRED."
+ " Ignoring BATCH_SIZE.", sstableBatchSize);
LOGGER.warn("BATCH_SIZE is set to a non-zero, non-default value ({}) but ROW_BUFFER_MODE is set to BUFFERED."
+ " Ignoring BATCH_SIZE.", sstableBatchSize);
}
Preconditions.checkArgument(rowBufferMode == RowBufferMode.UNBUFFERRED && !batchSizeIsZero,
"If writing in sorted order (ROW_BUFFER_MODE is UNBUFFERRED) then BATCH_SIZE "
+ "should be non zero, but it was set to 0 in writer options");
}

/*
Expand Down
Loading