Skip to content

Conversation

@frankgh
Copy link
Contributor

@frankgh frankgh commented Jul 25, 2023

When setting Buffered RowBufferMode as part of the WriterOptions, org.apache.cassandra.spark.bulkwriter.RecordWriter ignores that configuration and instead uses the batch size to determine when to finalize an SSTable and start writing a new SSTable, if more rows are available.

In this commit, we fix org.apache.cassandra.spark.bulkwriter.RecordWriter#checkBatchSize to take into account the configured RowBufferMode. And in specific to the case of the UNBUFFERED RowBufferMode, we check then the batchSize of the SSTable during writes, and for the case of BUFFERED that check will take no effect.

Co-authored-by: Doug Rohrer doug@therohrers.org

When setting Buffered RowBufferMode as part of the `WriterOption`s,
`org.apache.cassandra.spark.bulkwriter.RecordWriter` ignores that configuration and instead
uses the batch size to determine when to finalize an SSTable and start writing a new SSTable,
if more rows are available.

In this commit, we fix `org.apache.cassandra.spark.bulkwriter.RecordWriter#checkBatchSize`
to take into account the configured `RowBufferMode`. And in specific to the case of the
`UNBUFFERED` RowBufferMode, we check then the batchSize of the SSTable during writes, and for
the case of `BUFFERED` that check will take no effect.

Co-authored-by: Doug Rohrer <doug@therohrers.org>
@dineshjoshi
Copy link
Member

+1 LGTM.

Copy link
Contributor

@yifan-c yifan-c left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some nits. Looks good to me.

if (sstableWriter != null)
{
finalizeSSTable(streamSession, partitionId, sstableWriter, batchNumber, batchSize);
finalizeSSTable(streamSession, partitionId, sstableWriter, batchNumber, batchSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: reset sstableWriter to null after finalizeSSTable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this is essentially the last step before we're done with the sstablewriter, we don't really need to set the sstablewriter to null here - this RecordWriter instance should be collected as soon as the upload/commits finish, and we close the sstablewriter on finalize so we should be good w/o nulling it out.

}
else if (rowBufferMode == RowBufferMode.BUFFERED)
{
builder.withBufferSizeInMB(bufferSizeMB);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe decide a valid value range for bufferSizeMB and validate. CQLSSTableWriter accepts whatever int value.
For the upper bound, I think 1/2 of the spark.executor.memory is a good limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, for now, we leave this just a configuration option... Validating it given the Spark environment and picking a reasonable upper-bound would take some experimentation (and would likely involve more than just the executor.memory setting, as there are other config settings that deal w/ memory like memory overhead).

@yifan-c yifan-c closed this Aug 9, 2023
@frankgh frankgh deleted the CASSANDRA-18692 branch February 14, 2024 19:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants