Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos Spark: Added option to override the MicroBatchPayloadSize in bytes #35379

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class BulkWriter(container: CosmosAsyncContainer,
private val closed = new AtomicBoolean(false)
private val lock = new ReentrantLock
private val pendingTasksCompleted = lock.newCondition
private val pendingRetries = new AtomicLong(0);
private val activeTasks = new AtomicInteger(0)
private val errorCaptureFirstException = new AtomicReference[Throwable]()
private val bulkInputEmitter: Sinks.Many[models.CosmosItemOperation] = Sinks.many().unicast().onBackpressureBuffer()
Expand Down Expand Up @@ -351,12 +352,15 @@ class BulkWriter(container: CosmosAsyncContainer,
s"attemptNumber=${context.attemptNumber}, exceptionMessage=${exceptionMessage}, " +
s"Context: {${operationContext.toString}} ${getThreadInfo}")

this.pendingRetries.incrementAndGet();

// this is to ensure the submission will happen on a different thread in background
// and doesn't block the active thread
val deferredRetryMono = SMono.defer(() => {
scheduleWriteInternal(itemOperation.getPartitionKeyValue,
itemOperation.getItem.asInstanceOf[ObjectNode],
OperationContext(context.itemId, context.partitionKeyValue, context.eTag, context.attemptNumber + 1))
this.pendingRetries.decrementAndGet()
SMono.empty
})

Expand Down Expand Up @@ -487,10 +491,13 @@ class BulkWriter(container: CosmosAsyncContainer,
try {
val numberOfIntervalsWithIdenticalActiveOperationSnapshots = new AtomicLong(0)
var activeTasksSnapshot = activeTasks.get()
while (activeTasksSnapshot > 0 && errorCaptureFirstException.get == null) {
var pendingRetriesSnapshot = pendingRetries.get()
while ((pendingRetriesSnapshot > 0 || activeTasksSnapshot > 0)
&& errorCaptureFirstException.get == null) {

log.logInfo(
s"Waiting for pending activeTasks $activeTasksSnapshot, " +
s"Context: ${operationContext.toString} ${getThreadInfo}")
s"Waiting for pending activeTasks $activeTasksSnapshot and/or pendingRetries " +
s"$pendingRetriesSnapshot, Context: ${operationContext.toString} ${getThreadInfo}")
val activeOperationsSnapshot = activeOperations.clone()
val awaitCompleted = pendingTasksCompleted.await(1, TimeUnit.MINUTES)
if (!awaitCompleted) {
Expand All @@ -501,20 +508,21 @@ class BulkWriter(container: CosmosAsyncContainer,
)
}
activeTasksSnapshot = activeTasks.get()
pendingRetriesSnapshot = pendingRetries.get()
val semaphoreAvailablePermitsSnapshot = semaphore.availablePermits()

if (awaitCompleted) {
log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, " +
s"Context: ${operationContext.toString} ${getThreadInfo}")
log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, pendingRetries " +
s"$pendingRetriesSnapshot Context: ${operationContext.toString} ${getThreadInfo}")
} else {
log.logInfo(s"Waiting interrupted for pending activeTasks $activeTasksSnapshot - " +
s"available permits ${semaphoreAvailablePermitsSnapshot}, " +
log.logInfo(s"Waiting interrupted for pending activeTasks $activeTasksSnapshot , pendingRetries " +
s"$pendingRetriesSnapshot - available permits ${semaphoreAvailablePermitsSnapshot}, " +
s"Context: ${operationContext.toString} ${getThreadInfo}")
}
}

log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, " +
s"Context: ${operationContext.toString} ${getThreadInfo}")
log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, pendingRetries " +
s"$pendingRetriesSnapshot Context: ${operationContext.toString} ${getThreadInfo}")
} finally {
lock.unlock()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package com.azure.cosmos.spark

import com.azure.core.management.AzureEnvironment
import com.azure.cosmos.implementation.batch.BatchRequestResponseConstants
import com.azure.cosmos.implementation.routing.LocationHelper
import com.azure.cosmos.implementation.{SparkBridgeImplementationInternal, Strings}
import com.azure.cosmos.models.{CosmosChangeFeedRequestOptions, CosmosParameterizedQuery, DedicatedGatewayRequestOptions, FeedRange}
Expand Down Expand Up @@ -80,6 +81,7 @@ private[spark] object CosmosConfigNames {
val WriteBulkEnabled = "spark.cosmos.write.bulk.enabled"
val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations"
val WriteBulkMaxConcurrentPartitions = "spark.cosmos.write.bulk.maxConcurrentCosmosPartitions"
val WriteBulkPayloadSizeInBytes = "spark.cosmos.write.bulk.targetedPayloadSizeInBytes"
val WritePointMaxConcurrency = "spark.cosmos.write.point.maxConcurrency"
val WritePatchDefaultOperationType = "spark.cosmos.write.patch.defaultOperationType"
val WritePatchColumnConfigs = "spark.cosmos.write.patch.columnConfigs"
Expand Down Expand Up @@ -163,6 +165,7 @@ private[spark] object CosmosConfigNames {
WriteBulkEnabled,
WriteBulkMaxPendingOperations,
WriteBulkMaxConcurrentPartitions,
WriteBulkPayloadSizeInBytes,
WritePointMaxConcurrency,
WritePatchDefaultOperationType,
WritePatchColumnConfigs,
Expand Down Expand Up @@ -812,7 +815,8 @@ private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy,
pointMaxConcurrency: Option[Int] = None,
maxConcurrentCosmosPartitions: Option[Int] = None,
patchConfigs: Option[CosmosPatchConfigs] = None,
throughputControlConfig: Option[CosmosThroughputControlConfig] = None)
throughputControlConfig: Option[CosmosThroughputControlConfig] = None,
maxMicroBatchPayloadSizeInBytes: Option[Int] = None)

private object CosmosWriteConfig {
private val DefaultMaxRetryCount = 10
Expand All @@ -824,6 +828,15 @@ private object CosmosWriteConfig {
parseFromStringFunction = bulkEnabledAsString => bulkEnabledAsString.toBoolean,
helpMessage = "Cosmos DB Item Write bulk enabled")

private val microBatchPayloadSizeInBytes = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkPayloadSizeInBytes,
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
defaultValue = Option.apply(BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES),
mandatory = false,
parseFromStringFunction = payloadSizeInBytesString => payloadSizeInBytesString.toInt,
helpMessage = "Cosmos DB target bulk micro batch size in bytes - a micro batch will be flushed to the backend " +
"when its payload size exceeds this value. For best efficiency its value should be low enough to leave enough " +
"room for one document - to avoid that the request size exceeds the Cosmos DB maximum of 2 MB too often " +
"which would result in retries and having to transmit large network payloads multiple times.")

private val bulkMaxPendingOperations = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMaxPendingOperations,
mandatory = false,
parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt,
Expand Down Expand Up @@ -963,6 +976,7 @@ private object CosmosWriteConfig {
val bulkEnabledOpt = CosmosConfigEntry.parse(cfg, bulkEnabled)
var patchConfigsOpt = Option.empty[CosmosPatchConfigs]
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)

assert(bulkEnabledOpt.isDefined)

Expand All @@ -987,7 +1001,8 @@ private object CosmosWriteConfig {
pointMaxConcurrency = CosmosConfigEntry.parse(cfg, pointWriteConcurrency),
maxConcurrentCosmosPartitions = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions),
patchConfigs = patchConfigsOpt,
throughputControlConfig = throughputControlConfigOpt)
throughputControlConfig = throughputControlConfigOpt,
maxMicroBatchPayloadSizeInBytes = microBatchPayloadSizeInBytesOpt)
}

def parsePatchColumnConfigs(cfg: Map[String, String], inputSchema: StructType): TrieMap[String, CosmosPatchColumnConfig] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.azure.cosmos.spark

import com.azure.core.management.AzureEnvironment
import com.azure.cosmos.implementation.batch.BatchRequestResponseConstants
import com.azure.cosmos.spark.CosmosPatchOperationTypes.Increment
import com.azure.cosmos.spark.utils.CosmosPatchTestHelper
import org.apache.spark.sql.types.{NumericType, StructType}
Expand Down Expand Up @@ -722,6 +723,31 @@ class CosmosConfigSpec extends UnitSpec {
)
}

"Customizing MaxBulKPayloadSizeInBytes" should "be possible" in {
val schema = CosmosPatchTestHelper.getPatchConfigTestSchema()
var userConfig = Map(
"spark.cosmos.write.strategy" -> "ItemOverwrite",
"spark.cosmos.write.bulk.enabled" -> "True",
)
var writeConfig: CosmosWriteConfig = CosmosWriteConfig.parseWriteConfig(userConfig, schema)
writeConfig should not be null
writeConfig.maxMicroBatchPayloadSizeInBytes should not be null
writeConfig.maxMicroBatchPayloadSizeInBytes.isDefined shouldEqual true
writeConfig.maxMicroBatchPayloadSizeInBytes.get shouldEqual BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES

userConfig = Map(
"spark.cosmos.write.strategy" -> "ItemOverwrite",
"spark.cosmos.write.bulk.enabled" -> "True",
"spark.cosmos.write.bulk.targetedPayloadSizeInBytes" -> "1000000",
)

writeConfig = CosmosWriteConfig.parseWriteConfig(userConfig, schema)
writeConfig should not be null
writeConfig.maxMicroBatchPayloadSizeInBytes should not be null
writeConfig.maxMicroBatchPayloadSizeInBytes.isDefined shouldEqual true
writeConfig.maxMicroBatchPayloadSizeInBytes.get shouldEqual 1000000
}

"Config Parser" should "validate default operation types for patch configs" in {
val schema = CosmosPatchTestHelper.getPatchConfigTestSchema()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;
import java.util.UUID;

import static com.azure.cosmos.implementation.batch.BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES;
import static com.azure.cosmos.implementation.batch.BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES;
import static com.azure.cosmos.implementation.batch.BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -331,7 +331,7 @@ public void batchWithTooManyOperationsTest() {
@Test(groups = {"simple"}, timeOut = TIMEOUT * 10)
public void batchLargerThanServerRequest() {
int operationCount = 20;
int appxDocSize = (MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES * 11) / operationCount;
int appxDocSize = (DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES * 11) / operationCount;

// Increase the doc size by a bit so all docs won't fit in one server request.
appxDocSize = (int)(appxDocSize * 1.05);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void validateAllSetValuesInCosmosBulkItemResponse() {
ServerOperationBatchRequest serverOperationBatchRequest = PartitionKeyRangeServerBatchRequest.createBatchRequest(
PartitionKey.NONE.toString(),
Arrays.asList(arrayOperations),
BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);

// Create dummy result
Expand Down Expand Up @@ -135,7 +135,7 @@ public void validateEmptyHeaderInCosmosBulkItemResponse() {
ServerOperationBatchRequest serverOperationBatchRequest = PartitionKeyRangeServerBatchRequest.createBatchRequest(
PartitionKey.NONE.toString(),
Arrays.asList(arrayOperations),
BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ CosmosBulkExecutionOptions setTargetedMicroBatchRetryRate(

CosmosBulkExecutionOptions setMaxMicroBatchSize(CosmosBulkExecutionOptions options, int maxMicroBatchSize);

int getMaxMicroBatchPayloadSizeInBytes(CosmosBulkExecutionOptions options);

CosmosBulkExecutionOptions setMaxMicroBatchPayloadSizeInBytes(CosmosBulkExecutionOptions options, int maxMicroBatchPayloadSizeInBytes);

int getMaxMicroBatchConcurrency(CosmosBulkExecutionOptions options);

Integer getMaxConcurrentCosmosPartitions(CosmosBulkExecutionOptions options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
public final class BatchRequestResponseConstants {

// Size limits:
public static final int MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES = 220201;
public static final int DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES = 220201;
public static final int MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST = 100;

public static final int DEFAULT_MAX_MICRO_BATCH_INTERVAL_IN_MILLISECONDS = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public final class BulkExecutor<TContext> implements Disposable {
ImplementationBridgeHelpers.CosmosAsyncClientHelper.getCosmosAsyncClientAccessor();

private final CosmosAsyncContainer container;
private final int maxMicroBatchPayloadSizeInBytes;
private final AsyncDocumentClient docClientWrapper;
private final String operationContextText;
private final OperationContextAndListenerTuple operationListener;
Expand All @@ -94,6 +95,7 @@ public final class BulkExecutor<TContext> implements Disposable {

// Options for bulk execution.
private final Long maxMicroBatchIntervalInMs;

private final TContext batchContext;
private final ConcurrentMap<String, PartitionScopeThresholds> partitionScopeThresholds;
private final CosmosBulkExecutionOptions cosmosBulkExecutionOptions;
Expand All @@ -120,6 +122,9 @@ public BulkExecutor(CosmosAsyncContainer container,
checkNotNull(inputOperations, "expected non-null inputOperations");
checkNotNull(cosmosBulkOptions, "expected non-null bulkOptions");

this.maxMicroBatchPayloadSizeInBytes = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMaxMicroBatchPayloadSizeInBytes(cosmosBulkOptions);
this.cosmosBulkExecutionOptions = cosmosBulkOptions;
this.container = container;
this.bulkSpanName = "nonTransactionalBatch." + this.container.getId();
Expand Down Expand Up @@ -489,7 +494,7 @@ private Flux<CosmosBulkOperationResponse<TContext>> executePartitionedGroup(

if (batchSize >= thresholds.getTargetMicroBatchSizeSnapshot() ||
age >= this.maxMicroBatchIntervalInMs ||
totalSerializedLength >= BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES) {
totalSerializedLength >= this.maxMicroBatchPayloadSizeInBytes) {

logger.debug(
"BufferUntil - Flushing PKRange {} due to BatchSize ({}), payload size ({}) or age ({}), " +
Expand Down Expand Up @@ -558,7 +563,7 @@ private Flux<CosmosBulkOperationResponse<TContext>> executeOperations(

String pkRange = thresholds.getPartitionKeyRangeId();
ServerOperationBatchRequest serverOperationBatchRequest =
BulkExecutorUtil.createBatchRequest(operations, pkRange);
BulkExecutorUtil.createBatchRequest(operations, pkRange, this.maxMicroBatchPayloadSizeInBytes);
if (serverOperationBatchRequest.getBatchPendingOperations().size() > 0) {
serverOperationBatchRequest.getBatchPendingOperations().forEach(groupSink::next);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@

final class BulkExecutorUtil {

static ServerOperationBatchRequest createBatchRequest(List<CosmosItemOperation> operations, String partitionKeyRangeId) {
static ServerOperationBatchRequest createBatchRequest(List<CosmosItemOperation> operations, String partitionKeyRangeId, int maxMicroBatchPayloadSizeInBytes) {

return PartitionKeyRangeServerBatchRequest.createBatchRequest(
partitionKeyRangeId,
operations,
BatchRequestResponseConstants.MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES,
maxMicroBatchPayloadSizeInBytes,
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public final class CosmosBulkExecutionOptions {
private int maxMicroBatchConcurrency = BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_CONCURRENCY;
private double maxMicroBatchRetryRate = BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_RETRY_RATE;
private double minMicroBatchRetryRate = BatchRequestResponseConstants.DEFAULT_MIN_MICRO_BATCH_RETRY_RATE;

private int maxMicroBatchPayloadSizeInBytes = BatchRequestResponseConstants.DEFAULT_MAX_DIRECT_MODE_BATCH_REQUEST_BODY_SIZE_IN_BYTES;
private Duration maxMicroBatchInterval = Duration.ofMillis(
BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_INTERVAL_IN_MILLISECONDS);
private final Object legacyBatchScopedContext;
Expand Down Expand Up @@ -102,6 +104,28 @@ CosmosBulkExecutionOptions setMaxMicroBatchSize(int maxMicroBatchSize) {
return this;
}

/**
* The maximum batching request payload size in bytes for bulk operations.
*
* @return maximum micro batch payload size in bytes
*/
int getMaxMicroBatchPayloadSizeInBytes() {
return maxMicroBatchPayloadSizeInBytes;
}

/**
* The maximum batching payload size in bytes for bulk operations. Once queued docs exceed this values the micro
* batch will be flushed to the wire.
*
* @param maxMicroBatchPayloadSizeInBytes maximum payload size of a micro batch in bytes.
*
* @return the bulk processing options.
*/
CosmosBulkExecutionOptions setMaxMicroBatchPayloadSizeInBytes(int maxMicroBatchPayloadSizeInBytes) {
this.maxMicroBatchPayloadSizeInBytes = maxMicroBatchPayloadSizeInBytes;
return this;
}

Integer getMaxConcurrentCosmosPartitions() {
return this.maxConcurrentCosmosPartitions;
}
Expand Down Expand Up @@ -315,6 +339,19 @@ public CosmosBulkExecutionOptions setMaxMicroBatchSize(
return options.setMaxMicroBatchSize(maxMicroBatchSize);
}

@Override
public int getMaxMicroBatchPayloadSizeInBytes(CosmosBulkExecutionOptions options) {
return options.getMaxMicroBatchPayloadSizeInBytes();
}

@Override
public CosmosBulkExecutionOptions setMaxMicroBatchPayloadSizeInBytes(
CosmosBulkExecutionOptions options,
int maxMicroBatchPayloadSizeInBytes) {

return options.setMaxMicroBatchPayloadSizeInBytes(maxMicroBatchPayloadSizeInBytes);
}

@Override
public int getMaxMicroBatchConcurrency(CosmosBulkExecutionOptions options) {
return options.getMaxMicroBatchConcurrency();
Expand Down