From 686daa54d909f3aae1206f4e26078c0497ad032a Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Fri, 6 Oct 2023 13:15:55 +0000 Subject: [PATCH] Adding MaxMicroBatchSize config and siwtching BulKWriter to use bufferUntil (instead of bufferTimeout which has issues when backpressure happens) --- .../com/azure/cosmos/spark/BulkWriter.scala | 257 ++++++++++++++---- .../com/azure/cosmos/spark/CosmosConfig.scala | 20 +- .../cosmos/spark/SparkE2EWriteITest.scala | 79 ++++-- .../ImplementationBridgeHelpers.java | 3 + .../batch/BulkExecutorUtil.java | 2 +- .../batch/PartitionScopeThresholds.java | 12 +- .../models/CosmosBulkExecutionOptions.java | 25 ++ 7 files changed, 303 insertions(+), 95 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala index 16518e4ba93f..6e654a249c84 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala @@ -3,6 +3,7 @@ package com.azure.cosmos.spark // scalastyle:off underscore.import +import com.azure.cosmos.implementation.CosmosDaemonThreadFactory import com.azure.cosmos.{BridgeInternal, CosmosAsyncContainer, CosmosException} import com.azure.cosmos.implementation.apachecommons.lang.StringUtils import com.azure.cosmos.implementation.batch.{BatchRequestResponseConstants, ItemBulkOperation} @@ -13,6 +14,7 @@ import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler import java.util +import java.util.concurrent.{ScheduledFuture, ScheduledThreadPoolExecutor} import scala.collection.concurrent.TrieMap import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` import scala.collection.mutable @@ -75,6 +77,19 @@ class BulkWriter(container: CosmosAsyncContainer, s"BulkWriter instantiated (Host CPU count: $cpuCount, maxPendingOperations: $maxPendingOperations, " + s"maxConcurrentPartitions: $maxConcurrentPartitions ...") + // Artificial operation used to signale to the bufferUntil operator that + // the buffer should be flushed. A timer-based scheduler will publish this + // dummy operation for every batchIntervalInMs ms. This operation + // is filtered out and will never be flushed to the backend + private val readManyFlushOperationSingleton = new ReadManyOperation( + new CosmosItemIdentity( + new PartitionKey("ReadManyOperation.FlushSingleton"), + "ReadManyOperation.FlushSingleton" + ), + null, + null + ) + private val closed = new AtomicBoolean(false) private val lock = new ReentrantLock private val pendingTasksCompleted = lock.newCondition @@ -117,6 +132,20 @@ class BulkWriter(container: CosmosAsyncContainer, case None => } + writeConfig.maxMicroBatchSize match { + case Some(customMaxMicroBatchSize) => + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor + .setMaxMicroBatchSize( + cosmosBulkExecutionOptions, + Math.max( + 1, + Math.min(customMaxMicroBatchSize, BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST) + ) + ) + case None => + } + private val operationContext = initializeOperationContext() private val cosmosPatchHelperOpt = writeConfig.itemWriteStrategy match { case ItemWriteStrategy.ItemPatch | ItemWriteStrategy.ItemBulkUpdate => @@ -130,6 +159,35 @@ class BulkWriter(container: CosmosAsyncContainer, } } + private val batchIntervalInMs = ImplementationBridgeHelpers + .CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor + .getMaxMicroBatchInterval(cosmosBulkExecutionOptions) + .toMillis + + private[this] val flushExecutorHolder: Option[Tuple2[ScheduledThreadPoolExecutor, ScheduledFuture[_]]] = { + writeConfig.itemWriteStrategy match { + case ItemWriteStrategy.ItemBulkUpdate => { + val executor = new ScheduledThreadPoolExecutor( + 1, + new CosmosDaemonThreadFactory( + "BulkWriterReadManyFlush" + UUID.randomUUID() + )) + executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) + executor.setRemoveOnCancelPolicy(true) + + val future:ScheduledFuture[_] = executor.scheduleWithFixedDelay( + () => this.onFlushReadMany(), + batchIntervalInMs, + batchIntervalInMs, + TimeUnit.MILLISECONDS) + + Some(executor, future) + } + case _ => None + } + } + private def initializeOperationContext(): SparkTaskContext = { val taskContext = TaskContext.get @@ -160,6 +218,24 @@ class BulkWriter(container: CosmosAsyncContainer, } } + private def onFlushReadMany(): Unit = { + if (this.readManyInputEmitterOpt.isEmpty) { + throw new IllegalStateException("Callback onFlushReadMany should only be scheduled for bulk update.") + } + try { + this.readManyInputEmitterOpt.get.tryEmitNext(readManyFlushOperationSingleton) match { + case EmitResult.OK => log.logInfo("onFlushReadMany Successfully emitted flush") + case faultEmitResult => { + log.logError(s"Callback invocation 'onFlush' failed with result: $faultEmitResult.") + } + } + } + catch { + case t: Throwable => + log.logError("Callback invocation 'onFlush' failed.", t) + } + } + private val readManySubscriptionDisposableOpt: Option[Disposable] = { writeConfig.itemWriteStrategy match { case ItemWriteStrategy.ItemBulkUpdate => Some(createReadManySubscriptionDisposable()) @@ -168,29 +244,64 @@ class BulkWriter(container: CosmosAsyncContainer, } private def createReadManySubscriptionDisposable(): Disposable = { - log.logTrace(s"readManySubscriptionDisposable, Context: ${operationContext.toString} ${getThreadInfo}") + log.logTrace(s"readManySubscriptionDisposable, Context: ${operationContext.toString} $getThreadInfo") // We start from using the bulk batch size and interval and concurrency // If in the future, there is a need to separate the configuration, can re-consider val bulkBatchSize = BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST - val batchInterval = ImplementationBridgeHelpers - .CosmosBulkExecutionOptionsHelper - .getCosmosBulkExecutionOptionsAccessor - .getMaxMicroBatchInterval(cosmosBulkExecutionOptions) val batchConcurrency = ImplementationBridgeHelpers .CosmosBulkExecutionOptionsHelper .getCosmosBulkExecutionOptionsAccessor .getMaxMicroBatchConcurrency(cosmosBulkExecutionOptions) + val firstRecordTimeStamp = new AtomicLong(-1) + val currentMicroBatchSize = new AtomicLong(0) + readManyInputEmitterOpt .get .asFlux() .publishOn(readManyBoundedElastic) - .bufferTimeout(bulkBatchSize, batchInterval) + .timestamp + .bufferUntil(timestampReadManyOperationTuple => { + val timestamp = timestampReadManyOperationTuple.getT1 + val readManyOperation = timestampReadManyOperationTuple.getT2 + + if (readManyOperation eq readManyFlushOperationSingleton) { + log.logTrace(s"FlushSingletonReceived, Context: ${operationContext.toString}") + val currentMicroBatchSizeSnapshot = currentMicroBatchSize.get + if (currentMicroBatchSizeSnapshot > 0) { + firstRecordTimeStamp.set(-1) + currentMicroBatchSize.set(0) + log.logTrace(s"FlushSingletonReceived - flushing batch, Context: ${operationContext.toString}") + true + } else { + // avoid counting flush operations for the micro batch size calculation + log.logTrace(s"FlushSingletonReceived - empty buffer, nothing to flush, Context: ${operationContext.toString}") + false + } + } else { + + firstRecordTimeStamp.compareAndSet(-1, timestamp) + val age = timestamp - firstRecordTimeStamp.get + val batchSize = currentMicroBatchSize.incrementAndGet + + if (batchSize >= bulkBatchSize || age >= batchIntervalInMs) { + log.logTrace(s"BatchIntervalExpired - flushing batch, Context: ${operationContext.toString}") + firstRecordTimeStamp.set(-1) + currentMicroBatchSize.set(0) + true + } else { + false + } + } + }) .subscribeOn(readManyBoundedElastic) .asScala - .flatMap(readManyOperations => { + .flatMap(timestampReadManyOperationTuples => { + val readManyOperations = timestampReadManyOperationTuples + .filter(candidate => !candidate.getT2.equals(readManyFlushOperationSingleton)) + .map(tuple => tuple.getT2) if (readManyOperations.isEmpty) { Mono.empty() @@ -276,7 +387,7 @@ class BulkWriter(container: CosmosAsyncContainer, Mono.empty() }) - .doFinally(signalType => { + .doFinally(_ => { for (readManyOperation <- readManyOperations) { activeReadManyOperations.remove(readManyOperation) // for ItemBulkUpdate strategy, each active task includes two stages: ReadMany + BulkWrite @@ -297,7 +408,7 @@ class BulkWriter(container: CosmosAsyncContainer, log.logWarning(s"for itemId=[${requestOperationContext.itemId}], partitionKeyValue=[${requestOperationContext.partitionKeyValue}], " + s"encountered status code '${e.getStatusCode}:${e.getSubStatusCode}' in read many, will retry! " + s"attemptNumber=${requestOperationContext.attemptNumber}, exceptionMessage=${e.getMessage}, " + - s"Context: {${operationContext.toString}} ${getThreadInfo}") + s"Context: {${operationContext.toString}} $getThreadInfo") // the task will be re-queued at the beginning of the flow, so mark it complete here markTaskCompletion() @@ -312,9 +423,9 @@ class BulkWriter(container: CosmosAsyncContainer, // Non-retryable exception or has exceeded the max retry count val requestOperationContext = ReadManyOperation.operationContext log.logError(s"for itemId=[${requestOperationContext.itemId}], partitionKeyValue=[${requestOperationContext.partitionKeyValue}], " + - s"encountered status code '${e.getStatusCode}:${e.getSubStatusCode()}', all retries exhausted! " + + s"encountered status code '${e.getStatusCode}:${e.getSubStatusCode}', all retries exhausted! " + s"attemptNumber=${requestOperationContext.attemptNumber}, exceptionMessage=${e.getMessage}, " + - s"Context: {${operationContext.toString} ${getThreadInfo}") + s"Context: {${operationContext.toString} $getThreadInfo") val message = s"All retries exhausted for readMany - " + s"statusCode=[${e.getStatusCode}:${e.getSubStatusCode}] " + @@ -327,7 +438,7 @@ class BulkWriter(container: CosmosAsyncContainer, } case _ => // handle non cosmos exceptions log.logError(s"Unexpected failure code path in Bulk ingestion readMany stage, " + - s"Context: ${operationContext.toString} ${getThreadInfo}", throwable) + s"Context: ${operationContext.toString} $getThreadInfo", throwable) captureIfFirstFailure(throwable) cancelWork() markTaskCompletion() @@ -339,7 +450,7 @@ class BulkWriter(container: CosmosAsyncContainer, objectNode: ObjectNode, operationContext: OperationContext, statusCode: Int): Unit = { - this.pendingRetries.incrementAndGet(); + this.pendingRetries.incrementAndGet() // this is to ensure the submission will happen on a different thread in background // and doesn't block the active thread val deferredRetryMono = SMono.defer(() => { @@ -375,7 +486,7 @@ class BulkWriter(container: CosmosAsyncContainer, } private val subscriptionDisposable: Disposable = { - log.logTrace(s"subscriptionDisposable, Context: ${operationContext.toString} ${getThreadInfo}") + log.logTrace(s"subscriptionDisposable, Context: ${operationContext.toString} $getThreadInfo") val bulkOperationResponseFlux: SFlux[CosmosBulkOperationResponse[Object]] = container @@ -405,7 +516,7 @@ class BulkWriter(container: CosmosAsyncContainer, s"unexpected failure: itemId=[${context.itemId}], partitionKeyValue=[" + s"${context.partitionKeyValue}], encountered , attemptNumber=${context.attemptNumber}, " + s"exceptionMessage=${resp.getException.getMessage}, " + - s"Context: ${operationContext.toString} ${getThreadInfo}", resp.getException) + s"Context: ${operationContext.toString} $getThreadInfo", resp.getException) captureIfFirstFailure(resp.getException) cancelWork() } @@ -427,7 +538,7 @@ class BulkWriter(container: CosmosAsyncContainer, errorConsumer = Option.apply( ex => { log.logError(s"Unexpected failure code path in Bulk ingestion, " + - s"Context: ${operationContext.toString} ${getThreadInfo}", ex) + s"Context: ${operationContext.toString} $getThreadInfo", ex) // if there is any failure this closes the bulk. // at this point bulk api doesn't allow any retrying // we don't know the list of failed item-operations @@ -447,7 +558,6 @@ class BulkWriter(container: CosmosAsyncContainer, Preconditions.checkState(!closed.get()) throwIfCapturedExceptionExists() - var acquisitionAttempt = 0 val activeTasksSemaphoreTimeout = 10 val operationContext = OperationContext(getId(objectNode), partitionKeyValue, getETag(objectNode), 1) val numberOfIntervalsWithIdenticalActiveOperationSnapshots = new AtomicLong(0) @@ -459,9 +569,9 @@ class BulkWriter(container: CosmosAsyncContainer, var activeOperationsSnapshot = mutable.Set.empty[CosmosItemOperation] var activeReadManyOperationsSnapshot = mutable.Set.empty[ReadManyOperation] log.logTrace( - s"Before TryAcquire ${totalScheduledMetrics.get}, Context: ${operationContext.toString} ${getThreadInfo}") + s"Before TryAcquire ${totalScheduledMetrics.get}, Context: ${operationContext.toString} $getThreadInfo") while (!semaphore.tryAcquire(activeTasksSemaphoreTimeout, TimeUnit.MINUTES)) { - log.logDebug(s"Not able to acquire semaphore, Context: ${operationContext.toString} ${getThreadInfo}") + log.logDebug(s"Not able to acquire semaphore, Context: ${operationContext.toString} $getThreadInfo") if (subscriptionDisposable.isDisposed || (readManySubscriptionDisposableOpt.isDefined && readManySubscriptionDisposableOpt.get.isDisposed)) { captureIfFirstFailure( @@ -479,7 +589,7 @@ class BulkWriter(container: CosmosAsyncContainer, } val cnt = totalScheduledMetrics.getAndIncrement() - log.logTrace(s"total scheduled $cnt, Context: ${operationContext.toString} ${getThreadInfo}") + log.logTrace(s"total scheduled $cnt, Context: ${operationContext.toString} $getThreadInfo") scheduleWriteInternal(partitionKeyValue, objectNode, operationContext) } @@ -490,7 +600,7 @@ class BulkWriter(container: CosmosAsyncContainer, activeTasks.incrementAndGet() if (operationContext.attemptNumber > 1) { log.logInfo(s"bulk scheduleWrite attemptCnt: ${operationContext.attemptNumber}, " + - s"Context: ${operationContext.toString} ${getThreadInfo}") + s"Context: ${operationContext.toString} $getThreadInfo") } // The handling will make sure that during retry: @@ -612,8 +722,8 @@ class BulkWriter(container: CosmosAsyncContainer, log.logDebug(s"encountered item operation response with status code " + s"$effectiveStatusCode:$effectiveSubStatusCode, " + - s"Context: ${operationContext.toString} ${getThreadInfo}") - if (shouldIgnore(effectiveStatusCode, effectiveSubStatusCode, context)) { + s"Context: ${operationContext.toString} $getThreadInfo") + if (shouldIgnore(effectiveStatusCode, effectiveSubStatusCode)) { log.logDebug(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " + s"ignored encountered status code '$effectiveStatusCode:$effectiveSubStatusCode', " + s"Context: ${operationContext.toString}") @@ -623,8 +733,8 @@ class BulkWriter(container: CosmosAsyncContainer, // requeue log.logWarning(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " + s"encountered status code '$effectiveStatusCode:$effectiveSubStatusCode', will retry! " + - s"attemptNumber=${context.attemptNumber}, exceptionMessage=${exceptionMessage}, " + - s"Context: {${operationContext.toString}} ${getThreadInfo}") + s"attemptNumber=${context.attemptNumber}, exceptionMessage=$exceptionMessage, " + + s"Context: {${operationContext.toString}} $getThreadInfo") // If the write strategy is patchBulkUpdate, the OperationContext.sourceItem will not be the original objectNode, // It is computed through read item from cosmosdb, and then patch the item locally. @@ -643,8 +753,8 @@ class BulkWriter(container: CosmosAsyncContainer, } else { log.logError(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " + s"encountered status code '$effectiveStatusCode:$effectiveSubStatusCode', all retries exhausted! " + - s"attemptNumber=${context.attemptNumber}, exceptionMessage=${exceptionMessage}, " + - s"Context: {${operationContext.toString} ${getThreadInfo}") + s"attemptNumber=${context.attemptNumber}, exceptionMessage=$exceptionMessage, " + + s"Context: {${operationContext.toString} $getThreadInfo") val message = s"All retries exhausted for '${itemOperation.getOperationType}' bulk operation - " + s"statusCode=[$effectiveStatusCode:$effectiveSubStatusCode] " + @@ -668,7 +778,7 @@ class BulkWriter(container: CosmosAsyncContainer, val errorSnapshot = errorCaptureFirstException.get() if (errorSnapshot != null) { log.logError(s"throw captured error ${errorSnapshot.getMessage}, " + - s"Context: ${operationContext.toString} ${getThreadInfo}") + s"Context: ${operationContext.toString} $getThreadInfo") throw errorSnapshot } } @@ -714,7 +824,7 @@ class BulkWriter(container: CosmosAsyncContainer, activeOperationsSnapshot: mutable.Set[CosmosItemOperation], activeReadManyOperationsSnapshot: mutable.Set[ReadManyOperation], numberOfIntervalsWithIdenticalActiveOperationSnapshots: AtomicLong - ) = { + ): Unit = { val operationsLog = getActiveOperationsLog(activeOperationsSnapshot, activeReadManyOperationsSnapshot) @@ -722,15 +832,15 @@ class BulkWriter(container: CosmosAsyncContainer, && activeReadManyOperationsSnapshot.equals(activeReadManyOperations)) { numberOfIntervalsWithIdenticalActiveOperationSnapshots.incrementAndGet() log.logWarning( - s"${operationName} has been waiting ${numberOfIntervalsWithIdenticalActiveOperationSnapshots} " + - s"times for identical set of operations: ${operationsLog} " + - s"Context: ${operationContext.toString} ${getThreadInfo}" + s"$operationName has been waiting $numberOfIntervalsWithIdenticalActiveOperationSnapshots " + + s"times for identical set of operations: $operationsLog " + + s"Context: ${operationContext.toString} $getThreadInfo" ) } else { numberOfIntervalsWithIdenticalActiveOperationSnapshots.set(0) log.logInfo( - s"${operationName} is waiting for active bulkWrite operations: ${operationsLog} " + - s"Context: ${operationContext.toString} ${getThreadInfo}" + s"$operationName is waiting for active bulkWrite operations: $operationsLog " + + s"Context: ${operationContext.toString} $getThreadInfo" ) } @@ -738,7 +848,7 @@ class BulkWriter(container: CosmosAsyncContainer, captureIfFirstFailure( new IllegalStateException( - s"Stale bulk ingestion identified in ${operationName} - the following active operations have not been " + + s"Stale bulk ingestion identified in $operationName - the following active operations have not been " + s"completed (first ${BulkWriter.maxItemOperationsToShowInErrorMessage} shown) or progressed after " + s"${BulkWriter.maxAllowedMinutesWithoutAnyProgress} minutes: $operationsLog" )) @@ -755,10 +865,10 @@ class BulkWriter(container: CosmosAsyncContainer, this.synchronized { try { if (!closed.get()) { - log.logInfo(s"flushAndClose invoked, Context: ${operationContext.toString} ${getThreadInfo}") + log.logInfo(s"flushAndClose invoked, Context: ${operationContext.toString} $getThreadInfo") log.logInfo(s"completed so far ${totalSuccessfulIngestionMetrics.get()}, " + s"pending bulkWrite asks ${activeBulkWriteOperations.size}, pending readMany tasks ${activeReadManyOperations.size}," + - s" Context: ${operationContext.toString} ${getThreadInfo}") + s" Context: ${operationContext.toString} $getThreadInfo") // error handling, if there is any error and the subscription is cancelled // the remaining tasks will not be processed hence we never reach activeTasks = 0 @@ -773,7 +883,7 @@ class BulkWriter(container: CosmosAsyncContainer, log.logInfo( s"Waiting for pending activeTasks $activeTasksSnapshot and/or pendingRetries " + - s"$pendingRetriesSnapshot, Context: ${operationContext.toString} ${getThreadInfo}") + s"$pendingRetriesSnapshot, Context: ${operationContext.toString} $getThreadInfo") val activeOperationsSnapshot = activeBulkWriteOperations.clone() val activeReadManyOperationsSnapshot = activeReadManyOperations.clone() val awaitCompleted = pendingTasksCompleted.await(1, TimeUnit.MINUTES) @@ -791,42 +901,42 @@ class BulkWriter(container: CosmosAsyncContainer, if (awaitCompleted) { log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, pendingRetries " + - s"$pendingRetriesSnapshot Context: ${operationContext.toString} ${getThreadInfo}") + s"$pendingRetriesSnapshot Context: ${operationContext.toString} $getThreadInfo") } else { log.logInfo(s"Waiting interrupted for pending activeTasks $activeTasksSnapshot , pendingRetries " + - s"$pendingRetriesSnapshot - available permits ${semaphoreAvailablePermitsSnapshot}, " + - s"Context: ${operationContext.toString} ${getThreadInfo}") + s"$pendingRetriesSnapshot - available permits $semaphoreAvailablePermitsSnapshot, " + + s"Context: ${operationContext.toString} $getThreadInfo") } } log.logInfo(s"Waiting completed for pending activeTasks $activeTasksSnapshot, pendingRetries " + - s"$pendingRetriesSnapshot Context: ${operationContext.toString} ${getThreadInfo}") + s"$pendingRetriesSnapshot Context: ${operationContext.toString} $getThreadInfo") } finally { lock.unlock() } - log.logInfo(s"invoking bulkInputEmitter.onComplete(), Context: ${operationContext.toString} ${getThreadInfo}") + log.logInfo(s"invoking bulkInputEmitter.onComplete(), Context: ${operationContext.toString} $getThreadInfo") semaphore.release(activeTasks.get()) val completeBulkWriteEmitResult = bulkInputEmitter.tryEmitComplete() if (completeBulkWriteEmitResult eq Sinks.EmitResult.OK) { - log.logDebug(s"bulkInputEmitter sink completed, Context: ${operationContext.toString} ${getThreadInfo}") + log.logDebug(s"bulkInputEmitter sink completed, Context: ${operationContext.toString} $getThreadInfo") } else { log.logInfo( s"bulkInputEmitter sink completion failed. EmitResult: $completeBulkWriteEmitResult +" + - s"Context: ${operationContext.toString} ${getThreadInfo}") + s"Context: ${operationContext.toString} $getThreadInfo") } // complete readManyInputEmitter if (readManyInputEmitterOpt.isDefined) { val completeReadManyEmitResult = readManyInputEmitterOpt.get.tryEmitComplete() if (completeReadManyEmitResult eq Sinks.EmitResult.OK) { - log.logDebug(s"bulkInputEmitter sink completed, Context: ${operationContext.toString} ${getThreadInfo}") + log.logDebug(s"bulkInputEmitter sink completed, Context: ${operationContext.toString} $getThreadInfo") } else { log.logInfo( s"bulkInputEmitter sink completion failed. EmitResult: $completeReadManyEmitResult +" + - s"Context: ${operationContext.toString} ${getThreadInfo}") + s"Context: ${operationContext.toString} $getThreadInfo") } } @@ -838,15 +948,44 @@ class BulkWriter(container: CosmosAsyncContainer, assume(semaphore.availablePermits() == maxPendingOperations) log.logInfo(s"flushAndClose completed with no error. " + s"totalSuccessfulIngestionMetrics=${totalSuccessfulIngestionMetrics.get()}, " + - s"totalScheduled=$totalScheduledMetrics, Context: ${operationContext.toString} ${getThreadInfo}") + s"totalScheduled=$totalScheduledMetrics, Context: ${operationContext.toString} $getThreadInfo") assume(totalScheduledMetrics.get() == totalSuccessfulIngestionMetrics.get) } } finally { subscriptionDisposable.dispose() readManySubscriptionDisposableOpt match { - case Some(readManySubscriptionDisposable) => readManySubscriptionDisposable.dispose() + case Some(readManySubscriptionDisposable) => { + + + readManySubscriptionDisposable.dispose() + } case _ => } + + flushExecutorHolder match { + case Some(executorAndFutureTuple) => { + val executor: ScheduledThreadPoolExecutor = executorAndFutureTuple._1 + val future: ScheduledFuture[_] = executorAndFutureTuple._2 + + try { + future.cancel(true) + log.logDebug(s"Cancelled all future scheduled tasks $getThreadInfo, Context: ${operationContext.toString}") + } catch { + case e: Exception => + log.logWarning(s"Failed to cancel scheduled tasks $getThreadInfo, Context: ${operationContext.toString}", e) + } + + try { + log.logDebug(s"Shutting down the executor service, Context: ${operationContext.toString}") + executor.shutdownNow + log.logDebug(s"Successfully shut down the executor service, Context: ${operationContext.toString}") + } catch { + case e: Exception => + log.logWarning(s"Failed to shut down the executor service, Context: ${operationContext.toString}", e) + } + } + case _ => + } closed.set(true) } } @@ -860,7 +999,7 @@ class BulkWriter(container: CosmosAsyncContainer, val activeTasksLeftSnapshot = activeTasks.decrementAndGet() val exceptionSnapshot = errorCaptureFirstException.get() log.logTrace(s"markTaskCompletion, Active tasks left: $activeTasksLeftSnapshot, " + - s"error: $exceptionSnapshot, Context: ${operationContext.toString} ${getThreadInfo}") + s"error: $exceptionSnapshot, Context: ${operationContext.toString} $getThreadInfo") if (activeTasksLeftSnapshot == 0 || exceptionSnapshot != null) { pendingTasksCompleted.signal() } @@ -870,7 +1009,7 @@ class BulkWriter(container: CosmosAsyncContainer, } private def captureIfFirstFailure(throwable: Throwable): Unit = { - log.logError(s"capture failure, Context: {${operationContext.toString}} ${getThreadInfo}", throwable) + log.logError(s"capture failure, Context: {${operationContext.toString}} $getThreadInfo", throwable) lock.lock() try { errorCaptureFirstException.compareAndSet(null, throwable) @@ -890,7 +1029,7 @@ class BulkWriter(container: CosmosAsyncContainer, } } - private def shouldIgnore(statusCode: Int, subStatusCode: Int, operationContext: OperationContext): Boolean = { + private def shouldIgnore(statusCode: Int, subStatusCode: Int): Boolean = { val returnValue = writeConfig.itemWriteStrategy match { case ItemWriteStrategy.ItemAppend => Exceptions.isResourceExistsException(statusCode) case ItemWriteStrategy.ItemDelete => Exceptions.isNotFoundExceptionCore(statusCode, subStatusCode) @@ -917,7 +1056,7 @@ class BulkWriter(container: CosmosAsyncContainer, } log.logDebug(s"Should retry statusCode '$statusCode:$subStatusCode' -> $returnValue, " + - s"Context: ${operationContext.toString} ${getThreadInfo}") + s"Context: ${operationContext.toString} $getThreadInfo") returnValue } @@ -953,7 +1092,7 @@ class BulkWriter(container: CosmosAsyncContainer, * Should not throw any exceptions */ override def abort(): Unit = { - log.logError(s"Abort, Context: ${operationContext.toString} ${getThreadInfo}") + log.logError(s"Abort, Context: ${operationContext.toString} $getThreadInfo") // signal an exception that will be thrown for any pending work/flushAndClose if no other exception has // been registered captureIfFirstFailure( @@ -965,9 +1104,9 @@ class BulkWriter(container: CosmosAsyncContainer, private object BulkWriter { private val log = new DefaultDiagnostics().getLogger(this.getClass) //scalastyle:off magic.number - val maxDelayOn408RequestTimeoutInMs = 10000 - val minDelayOn408RequestTimeoutInMs = 1000 - val maxItemOperationsToShowInErrorMessage = 10 + private val maxDelayOn408RequestTimeoutInMs = 10000 + private val minDelayOn408RequestTimeoutInMs = 1000 + private val maxItemOperationsToShowInErrorMessage = 10 private val BULK_WRITER_BOUNDED_ELASTIC_THREAD_NAME = "bulk-writer-bounded-elastic" private val READ_MANY_BOUNDED_ELASTIC_THREAD_NAME = "read-many-bounded-elastic" private val TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60 // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS @@ -981,7 +1120,7 @@ private object BulkWriter { // intervals are around 2 hours. So I need to increase this threshold for now again - will move it // to 45 minutes - and when I am back from vacation will drive an investigation to improve the // end-to-end behavior on 429/3088 with the backend and monitoring teams. - val maxAllowedMinutesWithoutAnyProgress = 45 + private val maxAllowedMinutesWithoutAnyProgress = 45 //scalastyle:on magic.number // let's say the spark executor VM has 16 CPU cores. @@ -1008,7 +1147,7 @@ private object BulkWriter { } } - val bulkProcessingThresholds = new CosmosBulkExecutionThresholdsState() + private val bulkProcessingThresholds = new CosmosBulkExecutionThresholdsState() // Custom bounded elastic scheduler to switch off IO thread to process response. val bulkWriterBoundedElastic: Scheduler = Schedulers.newBoundedElastic( diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala index a843cab03d3f..08ec687370cd 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala @@ -80,6 +80,7 @@ private[spark] object CosmosConfigNames { val ClientTelemetryEndpoint = "spark.cosmos.clientTelemetry.endpoint" val WriteBulkEnabled = "spark.cosmos.write.bulk.enabled" val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations" + val WriteBulkMaxBatchSize = "spark.cosmos.write.bulk.maxBatchSize" val WriteBulkMaxConcurrentPartitions = "spark.cosmos.write.bulk.maxConcurrentCosmosPartitions" val WriteBulkPayloadSizeInBytes = "spark.cosmos.write.bulk.targetedPayloadSizeInBytes" val WriteBulkInitialBatchSize = "spark.cosmos.write.bulk.initialBatchSize" @@ -169,6 +170,7 @@ private[spark] object CosmosConfigNames { WriteBulkMaxConcurrentPartitions, WriteBulkPayloadSizeInBytes, WriteBulkInitialBatchSize, + WriteBulkMaxBatchSize, WritePointMaxConcurrency, WritePatchDefaultOperationType, WritePatchColumnConfigs, @@ -823,7 +825,8 @@ private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy, patchConfigs: Option[CosmosPatchConfigs] = None, throughputControlConfig: Option[CosmosThroughputControlConfig] = None, maxMicroBatchPayloadSizeInBytes: Option[Int] = None, - initialMicroBatchSize: Option[Int] = None) + initialMicroBatchSize: Option[Int] = None, + maxMicroBatchSize: Option[Int] = None) private object CosmosWriteConfig { private val DefaultMaxRetryCount = 10 @@ -854,6 +857,17 @@ private object CosmosWriteConfig { "initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume " + "too many RUs.") + private val maxMicroBatchSize = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMaxBatchSize, + defaultValue = Option.apply(BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST), + mandatory = false, + parseFromStringFunction = maxBatchSizeString => Math.min(maxBatchSizeString.toInt, BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST), + helpMessage = "Cosmos DB max bulk micro batch size - a micro batch will be flushed to the backend " + + "when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch " + + "size is getting automatically tuned based on the throttling rate. By default the " + + "max micro batch size is 100. Reduce this when you want to avoid that requests consume " + + "too many RUs and you cannot enable thoughput control. NOTE: using throuhgput control is preferred and will." + + "result in better throughput while still limiting the RU/s used.") + private val bulkMaxPendingOperations = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMaxPendingOperations, mandatory = false, parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt, @@ -1066,6 +1080,7 @@ private object CosmosWriteConfig { val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg) val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes) val initialBatchSizeOpt = CosmosConfigEntry.parse(cfg, initialMicroBatchSize) + val maxBatchSizeOpt = CosmosConfigEntry.parse(cfg, maxMicroBatchSize) assert(bulkEnabledOpt.isDefined) @@ -1095,7 +1110,8 @@ private object CosmosWriteConfig { patchConfigs = patchConfigsOpt, throughputControlConfig = throughputControlConfigOpt, maxMicroBatchPayloadSizeInBytes = microBatchPayloadSizeInBytesOpt, - initialMicroBatchSize = initialBatchSizeOpt) + initialMicroBatchSize = initialBatchSizeOpt, + maxMicroBatchSize = maxBatchSizeOpt) } def parsePatchColumnConfigs(cfg: Map[String, String], inputSchema: StructType): TrieMap[String, CosmosPatchColumnConfig] = { diff --git a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala index 9313f7fab0a3..24bf767ad9c7 100644 --- a/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala @@ -25,44 +25,35 @@ class SparkE2EWriteITest //scalastyle:off magic.number //scalastyle:off null - private case class UpsertParameterTest(bulkEnabled: Boolean, itemWriteStrategy: ItemWriteStrategy, hasId: Boolean = true, initialBatchSize: Option[Int] = None) + private case class UpsertParameterTest( + bulkEnabled: Boolean, + itemWriteStrategy: ItemWriteStrategy, + hasId: Boolean = true, + initialBatchSize: Option[Int] = None, + maxBatchSize: Option[Int] = None) private val upsertParameterTest = Seq( - UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None), - UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1)), - UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None), - UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemAppend, initialBatchSize = None) + UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None), + UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = None), + UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = Some(5)), + UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None), + UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemAppend, initialBatchSize = None, maxBatchSize = None) ) - for (UpsertParameterTest(bulkEnabled, itemWriteStrategy, hasId, initialBatchSize) <- upsertParameterTest) { - it should s"support upserts with bulkEnabled = $bulkEnabled itemWriteStrategy = $itemWriteStrategy hasId = $hasId initialBatchSize = $initialBatchSize" in { + for (UpsertParameterTest(bulkEnabled, itemWriteStrategy, hasId, initialBatchSize, maxBatchSize) <- upsertParameterTest) { + it should s"support upserts with bulkEnabled = $bulkEnabled itemWriteStrategy = $itemWriteStrategy hasId = $hasId initialBatchSize = $initialBatchSize, maxBatchSize = $maxBatchSize" in { val cosmosEndpoint = TestConfigurations.HOST val cosmosMasterKey = TestConfigurations.MASTER_KEY - val cfg = { - - initialBatchSize match { - case Some(customInitialBatchSize) => - Map( - "spark.cosmos.accountEndpoint" -> cosmosEndpoint, - "spark.cosmos.accountKey" -> cosmosMasterKey, - "spark.cosmos.database" -> cosmosDatabase, - "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.serialization.inclusionMode" -> "NonDefault", - "spark.cosmos.write.bulk.initialBatchSize" -> customInitialBatchSize.toString, - ) - case None => - Map ( - "spark.cosmos.accountEndpoint" -> cosmosEndpoint, - "spark.cosmos.accountKey" -> cosmosMasterKey, - "spark.cosmos.database" -> cosmosDatabase, - "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.serialization.inclusionMode" -> "NonDefault" - ) - } - } + val configMapBuilder = scala.collection.mutable.Map( + "spark.cosmos.accountEndpoint" -> cosmosEndpoint, + "spark.cosmos.accountKey" -> cosmosMasterKey, + "spark.cosmos.database" -> cosmosDatabase, + "spark.cosmos.container" -> cosmosContainer, + "spark.cosmos.serialization.inclusionMode" -> "NonDefault" + ) - val cfgOverwrite = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint, + val configOverrideMapBuilder = scala.collection.mutable.Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint, "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, @@ -71,6 +62,34 @@ class SparkE2EWriteITest "spark.cosmos.serialization.inclusionMode" -> "NonDefault" ) + initialBatchSize match { + case Some(customInitialBatchSize) => + configMapBuilder += ( + "spark.cosmos.write.bulk.initialBatchSize" -> customInitialBatchSize.toString, + ) + + configOverrideMapBuilder += ( + "spark.cosmos.write.bulk.initialBatchSize" -> customInitialBatchSize.toString, + ) + case None => + } + + maxBatchSize match { + case Some(customMaxBatchSize) => + configMapBuilder += ( + "spark.cosmos.write.bulk.maxBatchSize" -> customMaxBatchSize.toString, + ) + + configOverrideMapBuilder += ( + "spark.cosmos.write.bulk.maxBatchSize" -> customMaxBatchSize.toString, + ) + case None => + } + + val cfg = configMapBuilder.toMap + + val cfgOverwrite = configOverrideMapBuilder.toMap + val newSpark = getSpark // scalastyle:off underscore.import diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index fc7cc712604a..82bb8c3b9e00 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -452,6 +452,9 @@ CosmosBulkExecutionOptions setHeader(CosmosBulkExecutionOptions cosmosBulkExecut Map getCustomOptions(CosmosBulkExecutionOptions cosmosBulkExecutionOptions); List getExcludeRegions(CosmosBulkExecutionOptions cosmosBulkExecutionOptions); + int getMaxMicroBatchSize(CosmosBulkExecutionOptions cosmosBulkExecutionOptions); + + void setMaxMicroBatchSize(CosmosBulkExecutionOptions cosmosBulkExecutionOptions, int maxMicroBatchSize); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutorUtil.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutorUtil.java index 7dc1bd4a3ae1..71f0c48b0f38 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutorUtil.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutorUtil.java @@ -42,7 +42,7 @@ static ServerOperationBatchRequest createBatchRequest(List partitionKeyRangeId, operations, maxMicroBatchPayloadSizeInBytes, - BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST); + Math.min(operations.size(), BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST)); } static void setRetryPolicyForBulk( diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java index 32b0e2071973..38d46d7544e4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/PartitionScopeThresholds.java @@ -28,6 +28,7 @@ public class PartitionScopeThresholds { private final double minRetryRate; private final double maxRetryRate; private final double avgRetryRate; + private final int maxMicroBatchSize; public PartitionScopeThresholds(String pkRangeId, CosmosBulkExecutionOptions options) { checkNotNull(pkRangeId, "expected non-null pkRangeId"); @@ -46,6 +47,11 @@ public PartitionScopeThresholds(String pkRangeId, CosmosBulkExecutionOptions opt .getCosmosBulkExecutionOptionsAccessor() .getMaxTargetedMicroBatchRetryRate(options); this.avgRetryRate = ((this.maxRetryRate + this.minRetryRate)/2); + this.maxMicroBatchSize = Math.min( + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getMaxMicroBatchSize(options), + BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST); } public String getPartitionKeyRangeId() { @@ -103,12 +109,12 @@ private void reevaluateThresholds( int microBatchSizeBefore = this.targetMicroBatchSize.get(); int microBatchSizeAfter = microBatchSizeBefore; - if (retryRate < this.minRetryRate && microBatchSizeBefore < BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST) { + if (retryRate < this.minRetryRate && microBatchSizeBefore < maxMicroBatchSize) { int targetedNewBatchSize = Math.min( Math.min( microBatchSizeBefore * 2, - microBatchSizeBefore + (int)(BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST * this.avgRetryRate)), - BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST); + microBatchSizeBefore + (int)(maxMicroBatchSize * this.avgRetryRate)), + maxMicroBatchSize); if (this.targetMicroBatchSize.compareAndSet(microBatchSizeBefore, targetedNewBatchSize)) { microBatchSizeAfter = targetedNewBatchSize; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkExecutionOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkExecutionOptions.java index a8f3d4711a1e..e5635ed3b8e8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkExecutionOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkExecutionOptions.java @@ -23,6 +23,8 @@ public final class CosmosBulkExecutionOptions { private int initialMicroBatchSize = BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST; private int maxMicroBatchConcurrency = BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_CONCURRENCY; + + private int maxMicroBatchSize = BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST; private double maxMicroBatchRetryRate = BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_RETRY_RATE; private double minMicroBatchRetryRate = BatchRequestResponseConstants.DEFAULT_MIN_MICRO_BATCH_RETRY_RATE; @@ -123,6 +125,15 @@ CosmosBulkExecutionOptions setMaxMicroBatchPayloadSizeInBytes(int maxMicroBatchP return this; } + int getMaxMicroBatchSize() { + return maxMicroBatchSize; + } + + CosmosBulkExecutionOptions setMaxMicroBatchSize(int maxMicroBatchSize) { + this.maxMicroBatchSize = maxMicroBatchSize; + return this; + } + Integer getMaxConcurrentCosmosPartitions() { return this.maxConcurrentCosmosPartitions; } @@ -412,6 +423,20 @@ public List getExcludeRegions(CosmosBulkExecutionOptions cosmosBulkExecu return cosmosBulkExecutionOptions.excludeRegions; } + @Override + public int getMaxMicroBatchSize(CosmosBulkExecutionOptions cosmosBulkExecutionOptions) { + if (cosmosBulkExecutionOptions == null) { + return BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST; + } + + return cosmosBulkExecutionOptions.getMaxMicroBatchSize(); + } + + @Override + public void setMaxMicroBatchSize(CosmosBulkExecutionOptions cosmosBulkExecutionOptions, int maxMicroBatchSize) { + cosmosBulkExecutionOptions.setMaxMicroBatchSize(maxMicroBatchSize); + } + }); }