Skip to content

Commit

Permalink
Adding MaxMicroBatchSize config and siwtching BulKWriter to use buffe…
Browse files Browse the repository at this point in the history
…rUntil (instead of bufferTimeout which has issues when backpressure happens)
  • Loading branch information
FabianMeiswinkel committed Oct 6, 2023
1 parent 24e5c2b commit 686daa5
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 95 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private[spark] object CosmosConfigNames {
val ClientTelemetryEndpoint = "spark.cosmos.clientTelemetry.endpoint"
val WriteBulkEnabled = "spark.cosmos.write.bulk.enabled"
val WriteBulkMaxPendingOperations = "spark.cosmos.write.bulk.maxPendingOperations"
val WriteBulkMaxBatchSize = "spark.cosmos.write.bulk.maxBatchSize"
val WriteBulkMaxConcurrentPartitions = "spark.cosmos.write.bulk.maxConcurrentCosmosPartitions"
val WriteBulkPayloadSizeInBytes = "spark.cosmos.write.bulk.targetedPayloadSizeInBytes"
val WriteBulkInitialBatchSize = "spark.cosmos.write.bulk.initialBatchSize"
Expand Down Expand Up @@ -169,6 +170,7 @@ private[spark] object CosmosConfigNames {
WriteBulkMaxConcurrentPartitions,
WriteBulkPayloadSizeInBytes,
WriteBulkInitialBatchSize,
WriteBulkMaxBatchSize,
WritePointMaxConcurrency,
WritePatchDefaultOperationType,
WritePatchColumnConfigs,
Expand Down Expand Up @@ -823,7 +825,8 @@ private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy,
patchConfigs: Option[CosmosPatchConfigs] = None,
throughputControlConfig: Option[CosmosThroughputControlConfig] = None,
maxMicroBatchPayloadSizeInBytes: Option[Int] = None,
initialMicroBatchSize: Option[Int] = None)
initialMicroBatchSize: Option[Int] = None,
maxMicroBatchSize: Option[Int] = None)

private object CosmosWriteConfig {
private val DefaultMaxRetryCount = 10
Expand Down Expand Up @@ -854,6 +857,17 @@ private object CosmosWriteConfig {
"initial micro batch size is 100. Reduce this when you want to avoid that the first few requests consume " +
"too many RUs.")

private val maxMicroBatchSize = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMaxBatchSize,
defaultValue = Option.apply(BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST),
mandatory = false,
parseFromStringFunction = maxBatchSizeString => Math.min(maxBatchSizeString.toInt, BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST),
helpMessage = "Cosmos DB max bulk micro batch size - a micro batch will be flushed to the backend " +
"when the number of documents enqueued exceeds this size - or the target payload size is met. The micro batch " +
"size is getting automatically tuned based on the throttling rate. By default the " +
"max micro batch size is 100. Reduce this when you want to avoid that requests consume " +
"too many RUs and you cannot enable thoughput control. NOTE: using throuhgput control is preferred and will." +
"result in better throughput while still limiting the RU/s used.")

private val bulkMaxPendingOperations = CosmosConfigEntry[Int](key = CosmosConfigNames.WriteBulkMaxPendingOperations,
mandatory = false,
parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt,
Expand Down Expand Up @@ -1066,6 +1080,7 @@ private object CosmosWriteConfig {
val throughputControlConfigOpt = CosmosThroughputControlConfig.parseThroughputControlConfig(cfg)
val microBatchPayloadSizeInBytesOpt = CosmosConfigEntry.parse(cfg, microBatchPayloadSizeInBytes)
val initialBatchSizeOpt = CosmosConfigEntry.parse(cfg, initialMicroBatchSize)
val maxBatchSizeOpt = CosmosConfigEntry.parse(cfg, maxMicroBatchSize)

assert(bulkEnabledOpt.isDefined)

Expand Down Expand Up @@ -1095,7 +1110,8 @@ private object CosmosWriteConfig {
patchConfigs = patchConfigsOpt,
throughputControlConfig = throughputControlConfigOpt,
maxMicroBatchPayloadSizeInBytes = microBatchPayloadSizeInBytesOpt,
initialMicroBatchSize = initialBatchSizeOpt)
initialMicroBatchSize = initialBatchSizeOpt,
maxMicroBatchSize = maxBatchSizeOpt)
}

def parsePatchColumnConfigs(cfg: Map[String, String], inputSchema: StructType): TrieMap[String, CosmosPatchColumnConfig] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,44 +25,35 @@ class SparkE2EWriteITest
//scalastyle:off magic.number
//scalastyle:off null

private case class UpsertParameterTest(bulkEnabled: Boolean, itemWriteStrategy: ItemWriteStrategy, hasId: Boolean = true, initialBatchSize: Option[Int] = None)
private case class UpsertParameterTest(
bulkEnabled: Boolean,
itemWriteStrategy: ItemWriteStrategy,
hasId: Boolean = true,
initialBatchSize: Option[Int] = None,
maxBatchSize: Option[Int] = None)

private val upsertParameterTest = Seq(
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None),
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1)),
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None),
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemAppend, initialBatchSize = None)
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None),
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = None),
UpsertParameterTest(bulkEnabled = true, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = Some(1), maxBatchSize = Some(5)),
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemOverwrite, initialBatchSize = None, maxBatchSize = None),
UpsertParameterTest(bulkEnabled = false, itemWriteStrategy = ItemWriteStrategy.ItemAppend, initialBatchSize = None, maxBatchSize = None)
)

for (UpsertParameterTest(bulkEnabled, itemWriteStrategy, hasId, initialBatchSize) <- upsertParameterTest) {
it should s"support upserts with bulkEnabled = $bulkEnabled itemWriteStrategy = $itemWriteStrategy hasId = $hasId initialBatchSize = $initialBatchSize" in {
for (UpsertParameterTest(bulkEnabled, itemWriteStrategy, hasId, initialBatchSize, maxBatchSize) <- upsertParameterTest) {
it should s"support upserts with bulkEnabled = $bulkEnabled itemWriteStrategy = $itemWriteStrategy hasId = $hasId initialBatchSize = $initialBatchSize, maxBatchSize = $maxBatchSize" in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val cfg = {

initialBatchSize match {
case Some(customInitialBatchSize) =>
Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.serialization.inclusionMode" -> "NonDefault",
"spark.cosmos.write.bulk.initialBatchSize" -> customInitialBatchSize.toString,
)
case None =>
Map (
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.serialization.inclusionMode" -> "NonDefault"
)
}
}
val configMapBuilder = scala.collection.mutable.Map(
"spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
"spark.cosmos.serialization.inclusionMode" -> "NonDefault"
)

val cfgOverwrite = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
val configOverrideMapBuilder = scala.collection.mutable.Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer,
Expand All @@ -71,6 +62,34 @@ class SparkE2EWriteITest
"spark.cosmos.serialization.inclusionMode" -> "NonDefault"
)

initialBatchSize match {
case Some(customInitialBatchSize) =>
configMapBuilder += (
"spark.cosmos.write.bulk.initialBatchSize" -> customInitialBatchSize.toString,
)

configOverrideMapBuilder += (
"spark.cosmos.write.bulk.initialBatchSize" -> customInitialBatchSize.toString,
)
case None =>
}

maxBatchSize match {
case Some(customMaxBatchSize) =>
configMapBuilder += (
"spark.cosmos.write.bulk.maxBatchSize" -> customMaxBatchSize.toString,
)

configOverrideMapBuilder += (
"spark.cosmos.write.bulk.maxBatchSize" -> customMaxBatchSize.toString,
)
case None =>
}

val cfg = configMapBuilder.toMap

val cfgOverwrite = configOverrideMapBuilder.toMap

val newSpark = getSpark

// scalastyle:off underscore.import
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,9 @@ CosmosBulkExecutionOptions setHeader(CosmosBulkExecutionOptions cosmosBulkExecut

Map<String, String> getCustomOptions(CosmosBulkExecutionOptions cosmosBulkExecutionOptions);
List<String> getExcludeRegions(CosmosBulkExecutionOptions cosmosBulkExecutionOptions);
int getMaxMicroBatchSize(CosmosBulkExecutionOptions cosmosBulkExecutionOptions);

void setMaxMicroBatchSize(CosmosBulkExecutionOptions cosmosBulkExecutionOptions, int maxMicroBatchSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ static ServerOperationBatchRequest createBatchRequest(List<CosmosItemOperation>
partitionKeyRangeId,
operations,
maxMicroBatchPayloadSizeInBytes,
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);
Math.min(operations.size(), BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST));
}

static void setRetryPolicyForBulk(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class PartitionScopeThresholds {
private final double minRetryRate;
private final double maxRetryRate;
private final double avgRetryRate;
private final int maxMicroBatchSize;

public PartitionScopeThresholds(String pkRangeId, CosmosBulkExecutionOptions options) {
checkNotNull(pkRangeId, "expected non-null pkRangeId");
Expand All @@ -46,6 +47,11 @@ public PartitionScopeThresholds(String pkRangeId, CosmosBulkExecutionOptions opt
.getCosmosBulkExecutionOptionsAccessor()
.getMaxTargetedMicroBatchRetryRate(options);
this.avgRetryRate = ((this.maxRetryRate + this.minRetryRate)/2);
this.maxMicroBatchSize = Math.min(
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.getMaxMicroBatchSize(options),
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);
}

public String getPartitionKeyRangeId() {
Expand Down Expand Up @@ -103,12 +109,12 @@ private void reevaluateThresholds(
int microBatchSizeBefore = this.targetMicroBatchSize.get();
int microBatchSizeAfter = microBatchSizeBefore;

if (retryRate < this.minRetryRate && microBatchSizeBefore < BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST) {
if (retryRate < this.minRetryRate && microBatchSizeBefore < maxMicroBatchSize) {
int targetedNewBatchSize = Math.min(
Math.min(
microBatchSizeBefore * 2,
microBatchSizeBefore + (int)(BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST * this.avgRetryRate)),
BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST);
microBatchSizeBefore + (int)(maxMicroBatchSize * this.avgRetryRate)),
maxMicroBatchSize);
if (this.targetMicroBatchSize.compareAndSet(microBatchSizeBefore, targetedNewBatchSize)) {
microBatchSizeAfter = targetedNewBatchSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
public final class CosmosBulkExecutionOptions {
private int initialMicroBatchSize = BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST;
private int maxMicroBatchConcurrency = BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_CONCURRENCY;

private int maxMicroBatchSize = BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST;
private double maxMicroBatchRetryRate = BatchRequestResponseConstants.DEFAULT_MAX_MICRO_BATCH_RETRY_RATE;
private double minMicroBatchRetryRate = BatchRequestResponseConstants.DEFAULT_MIN_MICRO_BATCH_RETRY_RATE;

Expand Down Expand Up @@ -123,6 +125,15 @@ CosmosBulkExecutionOptions setMaxMicroBatchPayloadSizeInBytes(int maxMicroBatchP
return this;
}

int getMaxMicroBatchSize() {
return maxMicroBatchSize;
}

CosmosBulkExecutionOptions setMaxMicroBatchSize(int maxMicroBatchSize) {
this.maxMicroBatchSize = maxMicroBatchSize;
return this;
}

Integer getMaxConcurrentCosmosPartitions() {
return this.maxConcurrentCosmosPartitions;
}
Expand Down Expand Up @@ -412,6 +423,20 @@ public List<String> getExcludeRegions(CosmosBulkExecutionOptions cosmosBulkExecu
return cosmosBulkExecutionOptions.excludeRegions;
}

@Override
public int getMaxMicroBatchSize(CosmosBulkExecutionOptions cosmosBulkExecutionOptions) {
if (cosmosBulkExecutionOptions == null) {
return BatchRequestResponseConstants.MAX_OPERATIONS_IN_DIRECT_MODE_BATCH_REQUEST;
}

return cosmosBulkExecutionOptions.getMaxMicroBatchSize();
}

@Override
public void setMaxMicroBatchSize(CosmosBulkExecutionOptions cosmosBulkExecutionOptions, int maxMicroBatchSize) {
cosmosBulkExecutionOptions.setMaxMicroBatchSize(maxMicroBatchSize);
}

});
}

Expand Down

0 comments on commit 686daa5

Please sign in to comment.