Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos Spark: Added option to override the MicroBatchPayloadSize in bytes #35379

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

#### Features Added
* Added support for priority based throttling - See [PR 35238](https://github.com/Azure/azure-sdk-for-java/pull/35238)

* Added new configuration parameter `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` to allow increasing the micro batch payload size for better efficiency when documents are often above 110 KB. - See [PR 35379](https://github.com/Azure/azure-sdk-for-java/pull/35379)
*
#### Breaking Changes

#### Bugs Fixed
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added support for priority based throttling - See [PR 35238](https://github.com/Azure/azure-sdk-for-java/pull/35238)
* Added new configuration parameter `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` to allow increasing the micro batch payload size for better efficiency when documents are often above 110 KB. - See [PR 35379](https://github.com/Azure/azure-sdk-for-java/pull/35379)

#### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#### Features Added
* Added support for priority based throttling - See [PR 35238](https://github.com/Azure/azure-sdk-for-java/pull/35238)
* Added new configuration parameter `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` to allow increasing the micro batch payload size for better efficiency when documents are often above 110 KB. - See [PR 35379](https://github.com/Azure/azure-sdk-for-java/pull/35379)

#### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of `1000 x Cores` |
| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled |
| `spark.cosmos.write.bulk.targetedPayloadSizeInBytes` | `220201` | When the targeted payload size is reached for buffered documents, the request is sent to the backend. The default value is optimized for small documents <= 10 KB - when documents often exceed 110 KB, it can help to increase this value to up to about `1500000` (should still be smaller than 2 MB). |

#### Patch Config
| Config Property Name | Default | Description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class BulkWriter(container: CosmosAsyncContainer,
private val closed = new AtomicBoolean(false)
private val lock = new ReentrantLock
private val pendingTasksCompleted = lock.newCondition
private val pendingRetries = new AtomicLong(0);
private val activeTasks = new AtomicInteger(0)
private val errorCaptureFirstException = new AtomicReference[Throwable]()
private val bulkInputEmitter: Sinks.Many[models.CosmosItemOperation] = Sinks.many().unicast().onBackpressureBuffer()
Expand Down Expand Up @@ -351,12 +352,15 @@ class BulkWriter(container: CosmosAsyncContainer,
s"attemptNumber=${context.attemptNumber}, exceptionMessage=${exceptionMessage}, " +
s"Context: {${operationContext.toString}} ${getThreadInfo}")

this.pendingRetries.incrementAndGet();

// this is to ensure the submission will happen on a different thread in background
// and doesn't block the active thread
val deferredRetryMono = SMono.defer(() => {
scheduleWriteInternal(itemOperation.getPartitionKeyValue,
itemOperation.getItem.asInstanceOf[ObjectNode],
OperationContext(context.itemId, context.partitionKeyValue, context.eTag, context.attemptNumber + 1))
this.pendingRetries.decrementAndGet()
SMono.empty
})

Expand Down Expand Up @@ -487,10 +491,13 @@ class BulkWriter(container: CosmosAsyncContainer,
try {
val numberOfIntervalsWithIdenticalActiveOperationSnapshots = new AtomicLong(0)
var activeTasksSnapshot = activeTasks.get()
while (activeTasksSnapshot > 0 && errorCaptureFirstException.get == null) {
var pendingRetriesSnapshot = pendingRetries.get()
while ((pendingRetriesSnapshot > 0 || activeTasksSnapshot > 0)
&& errorCaptureFirstException.get == null) {

log.logInfo(
s"Waiting for pending activeTasks $activeTasksSnapshot, " +
s"Context: ${operationContext.toString} ${getThreadInfo}")
s"Waiting for pending activeTasks $activeTasksSnapshot and/or pendingRetries " +
s"$pendingRetriesSnapshot, Context: ${operationContext.toString} ${getThreadInfo}")
val activeOperationsSnapshot = activeOperations.clone()
val awaitCompleted = pendingTasksCompleted.await(1, TimeUnit.MINUTES)
if (!awaitCompleted) {
Expand All @@ -501,20 +508,21 @@ class BulkWriter(container: CosmosAsyncContainer,
)
}
activeTasksSnapshot = activeTasks.get()
pendingRetriesSnapshot = pendingRetries.get()
val semaphoreAvailablePermitsSnapshot = semaphore.availablePermits()

if (awaitCompleted) {
log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, " +
s"Context: ${operationContext.toString} ${getThreadInfo}")
log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, pendingRetries " +
s"$pendingRetriesSnapshot Context: ${operationContext.toString} ${getThreadInfo}")
} else {
log.logInfo(s"Waiting interrupted for pending activeTasks $activeTasksSnapshot - " +
s"available permits ${semaphoreAvailablePermitsSnapshot}, " +
log.logInfo(s"Waiting interrupted for pending activeTasks $activeTasksSnapshot , pendingRetries " +
s"$pendingRetriesSnapshot - available permits ${semaphoreAvailablePermitsSnapshot}, " +
s"Context: ${operationContext.toString} ${getThreadInfo}")
}
}

log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, " +
s"Context: ${operationContext.toString} ${getThreadInfo}")
log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, pendingRetries " +
s"$pendingRetriesSnapshot Context: ${operationContext.toString} ${getThreadInfo}")
} finally {
lock.unlock()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.cosmos.spark

import com.azure.core.management.AzureEnvironment
import com.azure.cosmos.implementation.batch.BatchRequestResponseConstants
import com.azure.cosmos.implementation.routing.LocationHelper
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, Strings}
import com.azure.cosmos.models.{CosmosChangeFeedRequestOptions, CosmosParameterizedQuery, DedicatedGatewayRequestOptions, FeedRange}
Expand Down Expand Up @@ -80,6 +81,7 @@ private[spark] object CosmosConfigNames {
val WriteBulkEnabled = "spark.cosmos.write.bulk.enabled"
val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations"
val WriteBulkMaxConcurrentPartitions = "spark.cosmos.write.bulk.maxConcurrentCosmosPartitions"
val WriteBulkPayloadSizeInBytes = "spark.cosmos.write.bulk.targetedPayloadSizeInBytes"
val WritePointMaxConcurrency = "spark.cosmos.write.point.maxConcurrency"
val WritePatchDefaultOperationType = "spark.cosmos.write.patch.defaultOperationType"
val WritePatchColumnConfigs = "spark.cosmos.write.patch.columnConfigs"
Expand Down Expand Up @@ -163,6 +165,7 @@ private[spark] object CosmosConfigNames {
WriteBulkEnabled,
WriteBulkMaxPendingOperations,
WriteBulkMaxConcurrentPartitions,
WriteBulkPayloadSizeInBytes,
WritePointMaxConcurrency,
WritePatchDefaultOperationType,
WritePatchColumnConfigs,
Expand Down Expand Up @@ -812,7 +815,8 @@ private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy,
pointMaxConcurrency: Option[Int] = None,
maxConcurrentCosmosPartitions: Option[Int] = None,
patchConfigs: Option[CosmosPatchConfigs] = None,
throughputControlConfig: Option[CosmosThroughputControlConfig] = None)
throughputControlConfig: Option[CosmosThroughputControlConfig] = None,
maxMicroBatchPayloadSizeInBytes: Option[Int] = None)

private object CosmosWriteConfig {
private val DefaultMaxRetryCount = 10
Expand All @@ -824,6 +828,15 @@ private object CosmosWriteConfig {
parseFromStringFunction = bulkEnabledAsString => bulkEnabledAsString.toBoolean,
helpMessage = "Cosmos DB Item Write bulk enabled")

private val microBatchPayloadSizeInBytes = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkPayloadSizeInBytes,
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
defaultValue = Option.apply(BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES),
mandatory = false,
parseFromStringFunction = payloadSizeInBytesString => payloadSizeInBytesString.toInt,
helpMessage = "Cosmos DB target bulk micro batch size in bytes - a micro batch will be flushed to the backend " +
"when its payload size exceeds this value. For best efficiency its value should be low enough to leave enough " +
"room for one document - to avoid that the request size exceeds the Cosmos DB maximum of 2 MB too often " +
"which would result in retries and having to transmit large network payloads multiple times.")

private val bulkMaxPendingOperations = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMaxPendingOperations,
mandatory = false,
parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt,
Expand Down Expand Up @@ -963,6 +976,7 @@ private object CosmosWriteConfig {
val bulkEnabledOpt = CosmosConfigEntry.parse(cfg, bulkEnabled)
var patchConfigsOpt = Option.empty[CosmosPatchConfigs]
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)

assert(bulkEnabledOpt.isDefined)

Expand All @@ -987,7 +1001,8 @@ private object CosmosWriteConfig {
pointMaxConcurrency = CosmosConfigEntry.parse(cfg, pointWriteConcurrency),
maxConcurrentCosmosPartitions = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions),
patchConfigs = patchConfigsOpt,
throughputControlConfig = throughputControlConfigOpt)
throughputControlConfig = throughputControlConfigOpt,
maxMicroBatchPayloadSizeInBytes = microBatchPayloadSizeInBytesOpt)
}

def parsePatchColumnConfigs(cfg: Map[String, String], inputSchema: StructType): TrieMap[String, CosmosPatchColumnConfig] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.azure.cosmos.spark

import com.azure.core.management.AzureEnvironment
import com.azure.cosmos.implementation.batch.BatchRequestResponseConstants
import com.azure.cosmos.spark.CosmosPatchOperationTypes.Increment
import com.azure.cosmos.spark.utils.CosmosPatchTestHelper
import org.apache.spark.sql.types.{NumericType, StructType}
Expand Down Expand Up @@ -722,6 +723,31 @@ class CosmosConfigSpec extends UnitSpec {
)
}

"Customizing MaxBulKPayloadSizeInBytes" should "be possible" in {
val schema = CosmosPatchTestHelper.getPatchConfigTestSchema()
var userConfig = Map(
"spark.cosmos.write.strategy" -> "ItemOverwrite",
"spark.cosmos.write.bulk.enabled" -> "True",
)
var writeConfig: CosmosWriteConfig = CosmosWriteConfig.parseWriteConfig(userConfig, schema)
writeConfig should not be null
writeConfig.maxMicroBatchPayloadSizeInBytes should not be null
writeConfig.maxMicroBatchPayloadSizeInBytes.isDefined shouldEqual true
writeConfig.maxMicroBatchPayloadSizeInBytes.get shouldEqual BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES

userConfig = Map(
"spark.cosmos.write.strategy" -> "ItemOverwrite",
"spark.cosmos.write.bulk.enabled" -> "True",
"spark.cosmos.write.bulk.targetedPayloadSizeInBytes" -> "1000000",
)

writeConfig = CosmosWriteConfig.parseWriteConfig(userConfig, schema)
writeConfig should not be null
writeConfig.maxMicroBatchPayloadSizeInBytes should not be null
writeConfig.maxMicroBatchPayloadSizeInBytes.isDefined shouldEqual true
writeConfig.maxMicroBatchPayloadSizeInBytes.get shouldEqual 1000000
}

"Config Parser" should "validate default operation types for patch configs" in {
val schema = CosmosPatchTestHelper.getPatchConfigTestSchema()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;
import java.util.UUID;

import static com.azure.cosmos.implementation.batch.BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES;
import static com.azure.cosmos.implementation.batch.BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES;
import static com.azure.cosmos.implementation.batch.BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -331,7 +331,7 @@ public void batchWithTooManyOperationsTest() {
@Test(groups = {"simple"}, timeOut = TIMEOUT * 10)
public void batchLargerThanServerRequest() {
int operationCount = 20;
int appxDocSize = (MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES * 11) / operationCount;
int appxDocSize = (DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES * 11) / operationCount;

// Increase the doc size by a bit so all docs won't fit in one server request.
appxDocSize = (int)(appxDocSize * 1.05);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void validateAllSetValuesInCosmosBulkItemResponse() {
ServerOperationBatchRequest serverOperationBatchRequest = PartitionKeyRangeServerBatchRequest.createBatchRequest(
PartitionKey.NONE.toString(),
Arrays.asList(arrayOperations),
BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);

// Create dummy result
Expand Down Expand Up @@ -135,7 +135,7 @@ public void validateEmptyHeaderInCosmosBulkItemResponse() {
ServerOperationBatchRequest serverOperationBatchRequest = PartitionKeyRangeServerBatchRequest.createBatchRequest(
PartitionKey.NONE.toString(),
Arrays.asList(arrayOperations),
BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ CosmosBulkExecutionOptions setTargetedMicroBatchRetryRate(

CosmosBulkExecutionOptions setMaxMicroBatchSize(CosmosBulkExecutionOptions options, int maxMicroBatchSize);

int getMaxMicroBatchPayloadSizeInBytes(CosmosBulkExecutionOptions options);

CosmosBulkExecutionOptions setMaxMicroBatchPayloadSizeInBytes(CosmosBulkExecutionOptions options, int maxMicroBatchPayloadSizeInBytes);

int getMaxMicroBatchConcurrency(CosmosBulkExecutionOptions options);

Integer getMaxConcurrentCosmosPartitions(CosmosBulkExecutionOptions options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
public final class BatchRequestResponseConstants {

// Size limits:
public static final int MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES = 220201;
public static final int DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES = 220201;
public static final int MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST = 100;

public static final int DEFAULT_MAX_MICRO_BATCH_INTERVAL_IN_MILLISECONDS = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public final class BulkExecutor<TContext> implements Disposable {
ImplementationBridgeHelpers.CosmosAsyncClientHelper.getCosmosAsyncClientAccessor();

private final CosmosAsyncContainer container;
private final int maxMicroBatchPayloadSizeInBytes;
private final AsyncDocumentClient docClientWrapper;
private final String operationContextText;
private final OperationContextAndListenerTuple operationListener;
Expand All @@ -94,6 +95,7 @@ public final class BulkExecutor<TContext> implements Disposable {

// Options for bulk execution.
private final Long maxMicroBatchIntervalInMs;

private final TContext batchContext;
private final ConcurrentMap<String, PartitionScopeThresholds> partitionScopeThresholds;
private final CosmosBulkExecutionOptions cosmosBulkExecutionOptions;
Expand All @@ -120,6 +122,9 @@ public BulkExecutor(CosmosAsyncContainer container,
checkNotNull(inputOperations, "expected non-null inputOperations");
checkNotNull(cosmosBulkOptions, "expected non-null bulkOptions");

this.maxMicroBatchPayloadSizeInBytes = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMaxMicroBatchPayloadSizeInBytes(cosmosBulkOptions);
this.cosmosBulkExecutionOptions = cosmosBulkOptions;
this.container = container;
this.bulkSpanName = "nonTransactionalBatch." + this.container.getId();
Expand Down Expand Up @@ -489,7 +494,7 @@ private Flux<CosmosBulkOperationResponse<TContext>> executePartitionedGroup(

if (batchSize >= thresholds.getTargetMicroBatchSizeSnapshot() ||
age >= this.maxMicroBatchIntervalInMs ||
totalSerializedLength >= BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES) {
totalSerializedLength >= this.maxMicroBatchPayloadSizeInBytes) {

logger.debug(
"BufferUntil - Flushing PKRange {} due to BatchSize ({}), payload size ({}) or age ({}), " +
Expand Down Expand Up @@ -558,7 +563,7 @@ private Flux<CosmosBulkOperationResponse<TContext>> executeOperations(

String pkRange = thresholds.getPartitionKeyRangeId();
ServerOperationBatchRequest serverOperationBatchRequest =
BulkExecutorUtil.createBatchRequest(operations, pkRange);
BulkExecutorUtil.createBatchRequest(operations, pkRange, this.maxMicroBatchPayloadSizeInBytes);
if (serverOperationBatchRequest.getBatchPendingOperations().size() > 0) {
serverOperationBatchRequest.getBatchPendingOperations().forEach(groupSink::next);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@

final class BulkExecutorUtil {

static ServerOperationBatchRequest createBatchRequest(List<CosmosItemOperation> operations, String partitionKeyRangeId) {
static ServerOperationBatchRequest createBatchRequest(List<CosmosItemOperation> operations, String partitionKeyRangeId, int maxMicroBatchPayloadSizeInBytes) {

return PartitionKeyRangeServerBatchRequest.createBatchRequest(
partitionKeyRangeId,
operations,
BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
maxMicroBatchPayloadSizeInBytes,
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);
}

Expand Down