From 79b2c9ba57510b1d334e39cf7d78e4bc5032e02e Mon Sep 17 00:00:00 2001 From: Mohammad Derakhshani Date: Wed, 28 Apr 2021 10:04:43 -0700 Subject: [PATCH] updated cosmos spark configs (#21004) updated cosmos spark configs --- ...park Sample.ipynb => Pyspark-Sample.ipynb} | 2 +- .../Python/NYC-Taxi-Data/01_Batch.ipynb | 12 ++--- .../02_StructuredStreaming.ipynb | 8 ++-- ...{Scala Sample.scala => Scala-Sample.scala} | 2 +- .../docs/configuration-reference.md | 27 ++++++----- .../docs/quick-start.md | 6 +-- .../com/azure/cosmos/spark/BulkWriter.scala | 12 ++--- .../com/azure/cosmos/spark/CosmosConfig.scala | 46 ++++++++++-------- .../com/azure/cosmos/spark/PointWriter.scala | 2 +- .../azure/cosmos/spark/BulkWriterITest.scala | 2 +- .../cosmos/spark/CosmosCatalogITest.scala | 14 +++--- .../azure/cosmos/spark/CosmosConfigSpec.scala | 48 +++++++++++++------ .../spark/SparkE2EChangeFeedITest.scala | 14 +++--- .../spark/SparkE2EConfigResolutionITest.scala | 2 +- .../cosmos/spark/SparkE2EQueryITest.scala | 24 +++++----- .../SparkE2EStructuredStreamingITest.scala | 4 +- .../SparkE2EThroughputControlITest.scala | 4 +- .../cosmos/spark/SparkE2EWriteITest.scala | 2 +- .../notebooks/basicScenario.scala | 2 +- 19 files changed, 130 insertions(+), 103 deletions(-) rename sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/{Pyspark Sample.ipynb => Pyspark-Sample.ipynb} (98%) rename sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/{Scala Sample.scala => Scala-Sample.scala} (96%) diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Pyspark Sample.ipynb b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Pyspark-Sample.ipynb similarity index 98% rename from sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Pyspark Sample.ipynb rename to sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Pyspark-Sample.ipynb index 4b4df1e7add6..84f4a417736c 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Pyspark Sample.ipynb +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Pyspark-Sample.ipynb @@ -31,7 +31,7 @@ " \"spark.cosmos.accountKey\" : cosmosMasterKey,\n", " \"spark.cosmos.database\" : cosmosDatabaseName,\n", " \"spark.cosmos.container\" : cosmosContainerName,\n", - " \"spark.cosmos.read.inferSchemaEnabled\" : \"true\" \n", + " \"spark.cosmos.read.inferSchema.enabled\" : \"true\" \n", "}" ] }, diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Python/NYC-Taxi-Data/01_Batch.ipynb b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Python/NYC-Taxi-Data/01_Batch.ipynb index 53f2f2fb8c98..5b06c2cbdf97 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Python/NYC-Taxi-Data/01_Batch.ipynb +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Python/NYC-Taxi-Data/01_Batch.ipynb @@ -236,8 +236,8 @@ " \"spark.cosmos.database\": \"SampleDatabase\",\n", " \"spark.cosmos.container\": \"GreenTaxiRecords\",\n", " \"spark.cosmos.write.strategy\": \"ItemOverwrite\",\n", - " \"spark.cosmos.write.bulkEnabled\": \"true\",\n", - " \"spark.cosmos.throughputControlEnabled\": \"true\",\n", + " \"spark.cosmos.write.bulk.enabled\": \"true\",\n", + " \"spark.cosmos.throughputControl.enabled\": \"true\",\n", " \"spark.cosmos.throughputControl.name\": \"NYCGreenTaxiDataIngestion\",\n", " \"spark.cosmos.throughputControl.targetThroughputThreshold\": \"0.95\",\n", " \"spark.cosmos.throughputControl.globalControl.database\": \"SampleDatabase\",\n", @@ -321,7 +321,7 @@ " \"spark.cosmos.database\": \"SampleDatabase\",\n", " \"spark.cosmos.container\": \"GreenTaxiRecords\",\n", " \"spark.cosmos.partitioning.strategy\": \"Default\",\n", - " \"spark.cosmos.read.inferSchemaEnabled\" : \"false\"\n", + " \"spark.cosmos.read.inferSchema.enabled\" : \"false\"\n", "}\n", "\n", "query_df = spark.read.format(\"cosmos.items\").options(**readCfg).load()\n", @@ -366,7 +366,7 @@ " \"spark.cosmos.database\": \"SampleDatabase\",\n", " \"spark.cosmos.container\": \"GreenTaxiRecords\",\n", " \"spark.cosmos.partitioning.strategy\": \"Default\",\n", - " \"spark.cosmos.read.inferSchemaEnabled\" : \"false\",\n", + " \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n", " \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n", " \"spark.cosmos.changeFeed.mode\" : \"Incremental\"\n", "}\n", @@ -506,7 +506,7 @@ "OPTIONS (\n", " spark.cosmos.database = 'SampleDatabase',\n", " spark.cosmos.container = 'GreenTaxiRecords',\n", - " spark.cosmos.read.inferSchemaEnabled = 'False',\n", + " spark.cosmos.read.inferSchema.enabled = 'False',\n", " spark.cosmos.read.inferSchemaIncludeSystemProperties = 'True',\n", " spark.cosmos.partitioning.strategy = 'Aggressive');\n", "\n", @@ -547,7 +547,7 @@ "OPTIONS (\n", " spark.cosmos.database = 'SampleDatabase',\n", " spark.cosmos.container = 'GreenTaxiRecords',\n", - " spark.cosmos.read.inferSchemaEnabled = 'True',\n", + " spark.cosmos.read.inferSchema.enabled = 'True',\n", " spark.cosmos.read.inferSchemaIncludeSystemProperties = 'False',\n", " spark.cosmos.partitioning.strategy = 'Restrictive');\n", "\n", diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Python/NYC-Taxi-Data/02_StructuredStreaming.ipynb b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Python/NYC-Taxi-Data/02_StructuredStreaming.ipynb index c0f9fdbaf30d..78b17606bba1 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Python/NYC-Taxi-Data/02_StructuredStreaming.ipynb +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Python/NYC-Taxi-Data/02_StructuredStreaming.ipynb @@ -243,8 +243,8 @@ " \"spark.cosmos.database\": \"SampleDatabase\",\n", " \"spark.cosmos.container\": \"GreenTaxiRecords\",\n", " \"spark.cosmos.partitioning.strategy\": \"Default\",\n", - " \"spark.cosmos.read.inferSchemaEnabled\" : \"true\",\n", - " \"spark.cosmos.read.inferSchemaForceNullableProperties\" : \"true\",\n", + " \"spark.cosmos.read.inferSchema.enabled\" : \"true\",\n", + " \"spark.cosmos.read.inferSchema.forceNullableProperties\" : \"true\",\n", " \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n", " \"spark.cosmos.changeFeed.mode\" : \"Incremental\"\n", " #\"spark.cosmos.changeFeed.maxItemCountPerTriggerHint\" : \"50000\"\n", @@ -256,7 +256,7 @@ " \"spark.cosmos.database\": \"SampleDatabase\",\n", " \"spark.cosmos.container\": \"GreenTaxiRecordsCFSink\",\n", " \"spark.cosmos.write.strategy\": \"ItemOverwrite\",\n", - " \"spark.cosmos.write.bulkEnabled\": \"true\",\n", + " \"spark.cosmos.write.bulk.enabled\": \"true\",\n", " \"checkpointLocation\": \"/tmp/\" + runId + \"/\"\n", "}\n", "\n", @@ -337,7 +337,7 @@ "OPTIONS (\n", " spark.cosmos.database = 'SampleDatabase',\n", " spark.cosmos.container = 'GreenTaxiRecordsCFSink',\n", - " spark.cosmos.read.inferSchemaEnabled = 'False',\n", + " spark.cosmos.read.inferSchema.enabled = 'False',\n", " spark.cosmos.partitioning.strategy = 'Default');\n", "\n", "SELECT COUNT(*) FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSinkView" diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Scala Sample.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Scala-Sample.scala similarity index 96% rename from sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Scala Sample.scala rename to sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Scala-Sample.scala index 2537761e0d97..0c3d5ad1c12f 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Scala Sample.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/Samples/Scala-Sample.scala @@ -15,7 +15,7 @@ val cfgWithAutoSchemaInference = Map("spark.cosmos.accountEndpoint" -> cosmosEnd "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabaseName, "spark.cosmos.container" -> cosmosContainerName, - "spark.cosmos.read.inferSchemaEnabled" -> "true" + "spark.cosmos.read.inferSchema.enabled" -> "true" ) // COMMAND ---------- diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md index e679892d46af..e0817d3addaf 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md @@ -20,16 +20,17 @@ Configuration Reference: | `spark.cosmos.useGatewayMode` | `false` | Use gateway mode for the client operations | | `spark.cosmos.read.forceEventualConsistency` | `true` | Makes the client use Eventual consistency for read operations instead of using the default account level consistency | | `spark.cosmos.applicationName` | None | Application name | -| `spark.cosmos.preferredRegionsList` | None | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[eastus,westus]`) provided preferred regions will be used as hint. You should use a collocated spark cluster with your Cosmos DB account and pass the spark cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet) | +| `spark.cosmos.preferredRegionsList` | None | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated spark cluster with your Cosmos DB account and pass the spark cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet) | ### Write Config | Config Property Name | Default | Description | | :--- | :---- | :--- | -| `spark.cosmos.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy: ItemOverwrite (using upsert), ItemAppend (using create, ignore Conflicts) | -| `spark.cosmos.write.maxRetryCount` | `3` | Cosmos DB Write Max Retry Attempts on failure | -| `spark.cosmos.write.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size | -| `spark.cosmos.write.bulkEnabled` | `true` | Cosmos DB Item Write bulk enabled | +| `spark.cosmos.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy: `ItemOverwrite` (using upsert), `ItemAppend` (using create, ignore pre-existing items i.e., Conflicts) | +| `spark.cosmos.write.maxRetryCount` | `10` | Cosmos DB Write Max Retry Attempts on retriable failures (e.g., connection error, moderakh add more details) | +| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size | +| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size | +| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled | ### Query Config @@ -39,19 +40,19 @@ When doing read operations, users can specify a custom schema or allow the conne | Config Property Name | Default | Description | | :--- | :---- | :--- | -| `spark.cosmos.read.inferSchemaEnabled` | `true` | When schema inference is disabled and user is not providing a schema, raw json will be returned. | -| `spark.cosmos.read.inferSchemaQuery` | `SELECT * FROM r` | When schema inference is enabled, used as custom query to infer it. For example, if you store multiple entities with different schemas within a container and you want to ensure inference only looks at certain document types or you want to project only particular columns. | -| `spark.cosmos.read.inferSchemaSamplingSize` | `1000` | Sampling size to use when inferring schema and not using a query. | -| `spark.cosmos.read.inferSchemaIncludeSystemProperties` | `false` | When schema inference is enabled, whether the resulting schema will include all [Cosmos DB system properties](https://docs.microsoft.com/azure/cosmos-db/account-databases-containers-items#properties-of-an-item). | -| `spark.cosmos.read.inferSchemaIncludeTimestamp` | `false` | When schema inference is enabled, whether the resulting schema will include the document Timestamp (`_ts`). Not required if `spark.cosmos.read.inferSchemaIncludeSystemProperties` is enabled, as it will already include all system properties. | +| `spark.cosmos.read.inferSchema.enabled` | `true` | When schema inference is disabled and user is not providing a schema, raw json will be returned. | +| `spark.cosmos.read.inferSchema.query` | `SELECT * FROM r` | When schema inference is enabled, used as custom query to infer it. For example, if you store multiple entities with different schemas within a container and you want to ensure inference only looks at certain document types or you want to project only particular columns. | +| `spark.cosmos.read.inferSchema.samplingSize` | `1000` | Sampling size to use when inferring schema and not using a query. | +| `spark.cosmos.read.inferSchema.includeSystemProperties` | `false` | When schema inference is enabled, whether the resulting schema will include all [Cosmos DB system properties](https://docs.microsoft.com/azure/cosmos-db/account-databases-containers-items#properties-of-an-item). | +| `spark.cosmos.read.inferSchema.includeTimestamp` | `false` | When schema inference is enabled, whether the resulting schema will include the document Timestamp (`_ts`). Not required if `spark.cosmos.read.inferSchema.includeSystemProperties` is enabled, as it will already include all system properties. | #### Json conversion configuration -When reading json documents, if a document contains an attribute that does not map to the schema type, the user can decide whether to use a `null` value (Relaxed) or an exception (Strict). | Config Property Name | Default | Description | | :--- | :---- | :--- | -| `spark.cosmos.read.schemaConversionMode` | `Relaxed` | The schema conversion behavior (Relaxed, Strict) | +| `spark.cosmos.read.schemaConversionMode` | `Relaxed` | The schema conversion behavior (`Relaxed`, `Strict`). When reading json documents, if a document contains an attribute that does not map to the schema type, the user can decide whether to use a `null` value (Relaxed) or an exception (Strict). +| #### Partitioning Strategy Config @@ -65,7 +66,7 @@ When reading json documents, if a document contains an attribute that does not m | Config Property Name | Default | Description | | :--- | :---- | :--- | -| `spark.cosmos.throughputControlEnabled` | `false` | Whether throughput control is enabled | +| `spark.cosmos.throughputControl.enabled` | `false` | Whether throughput control is enabled | | `spark.cosmos.throughputControl.name` | None | Throughput control group name | | `spark.cosmos.throughputControl.targetThroughput` | None | Throughput control group target throughput | | `spark.cosmos.throughputControl.targetThroughputThreshold` | None | Throughput control group target throughput threshold | diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/quick-start.md b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/quick-start.md index 8247840003bb..190f6748a642 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/quick-start.md +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/quick-start.md @@ -102,7 +102,7 @@ see [Write Configuration](https://github.com/Azure/azure-sdk-for-java/blob/maste from pyspark.sql.functions import col df = spark.read.format("cosmos.items").options(**cfg)\ - .option("spark.cosmos.read.inferSchemaEnabled", "true")\ + .option("spark.cosmos.read.inferSchema.enabled", "true")\ .load() df.filter(col("isAlive") == True)\ @@ -112,7 +112,7 @@ df.filter(col("isAlive") == True)\ see [Query Configuration](https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md#query-config) for more detail. Note when running queries unless if are interested to get back the raw json payload -we recommend setting `spark.cosmos.read.inferSchemaEnabled` to be `true`. +we recommend setting `spark.cosmos.read.inferSchema.enabled` to be `true`. see [Schema Inference Configuration](https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md#schema-inference-config) for more detail. @@ -122,7 +122,7 @@ see [Schema Inference Configuration](https://github.com/Azure/azure-sdk-for-java ```python # Show the inferred schema from Cosmos DB df = spark.read.format("cosmos.items").options(**cfg)\ - .option("spark.cosmos.read.inferSchemaEnabled", "true")\ + .option("spark.cosmos.read.inferSchema.enabled", "true")\ .load() df.printSchema() diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala index 2e43bf1f500b..83f15f0e7d41 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala @@ -4,7 +4,7 @@ package com.azure.cosmos.spark import com.azure.cosmos.implementation.guava25.base.Preconditions import com.azure.cosmos.models.PartitionKey -import com.azure.cosmos.spark.BulkWriter.MaxNumberOfThreadsPerCPUCore +import com.azure.cosmos.spark.BulkWriter.DefaultMaxPendingOperationPerCore import com.azure.cosmos.{BulkOperations, CosmosAsyncContainer, CosmosBulkOperationResponse, CosmosException, CosmosItemOperation} import com.fasterxml.jackson.databind.node.ObjectNode import reactor.core.Disposable @@ -34,8 +34,8 @@ class BulkWriter(container: CosmosAsyncContainer, // TODO: moderakh this requires tuning. // TODO: moderakh should we do a max on the max memory to ensure we don't run out of memory? - private val maxConcurrency = writeConfig.maxConcurrencyOpt - .getOrElse(SparkUtils.getNumberOfHostCPUCores * MaxNumberOfThreadsPerCPUCore) + private val maxPendingOperations = writeConfig.bulkMaxPendingOperations + .getOrElse(SparkUtils.getNumberOfHostCPUCores * DefaultMaxPendingOperationPerCore) private val closed = new AtomicBoolean(false) private val lock = new ReentrantLock @@ -57,7 +57,7 @@ class BulkWriter(container: CosmosAsyncContainer, // TODO: moderakh once that is added in the core SDK, drop activeOperations and rely on the core SDK // context passing for bulk private val activeOperations = new TrieMap[CosmosItemOperation, OperationContext]() - private val semaphore = new Semaphore(maxConcurrency) + private val semaphore = new Semaphore(maxPendingOperations) private val totalScheduledMetrics = new AtomicLong(0) private val totalSuccessfulIngestionMetrics = new AtomicLong(0) @@ -220,7 +220,7 @@ class BulkWriter(container: CosmosAsyncContainer, assume(activeTasks.get() == 0) assume(activeOperations.isEmpty) - assume(semaphore.availablePermits() == maxConcurrency) + assume(semaphore.availablePermits() == maxPendingOperations) logInfo(s"flushAndClose completed with no error. " + s"totalSuccessfulIngestionMetrics=${totalSuccessfulIngestionMetrics.get()}, totalScheduled=${totalScheduledMetrics}") @@ -286,7 +286,7 @@ private object BulkWriter { // hence we want 2MB/ 1KB items per partition to be buffered // 2 * 1024 * 167 items should get buffered on a 16 CPU core VM // so per CPU core we want (2 * 1024 * 167 / 16) max items to be buffered - val MaxNumberOfThreadsPerCPUCore = 2 * 1024 * 167 / 16 + val DefaultMaxPendingOperationPerCore = 2 * 1024 * 167 / 16 val emitFailureHandler: EmitFailureHandler = (signalType, emitResult) => if (emitResult.equals(EmitResult.FAIL_NON_SERIALIZED)) true else false diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala index 7220bff257a0..b44043d7db4a 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala @@ -208,8 +208,9 @@ private object CosmosReadConfig { mandatory = false, defaultValue = Some(DefaultSchemaConversionMode), parseFromStringFunction = value => CosmosConfigEntry.parseEnumeration(value, SchemaConversionModes), - helpMessage = "Defines whether to throw on inconsistencies between schema definition and json attribute " + - "types (Strict) or to return null values (Relaxed).") + helpMessage = "The schema conversion behavior (`Relaxed`, `Strict`)." + + " When reading json documents, if a document contains an attribute that does not map to the schema type," + + " the user can decide whether to use a `null` value (Relaxed) or an exception (Strict).") def parseCosmosReadConfig(cfg: Map[String, String]): CosmosReadConfig = { val forceEventualConsistency = CosmosConfigEntry.parse(cfg, ForceEventualConsistency) @@ -290,18 +291,25 @@ private object ItemWriteStrategy extends Enumeration { private case class CosmosWriteConfig(itemWriteStrategy: ItemWriteStrategy, maxRetryCount: Int, bulkEnabled: Boolean, - maxConcurrencyOpt: Option[Int]) + bulkMaxPendingOperations: Option[Int] = Option.empty, + pointMaxConcurrency: Option[Int] = Option.empty) private object CosmosWriteConfig { - private val bulkEnabled = CosmosConfigEntry[Boolean](key = "spark.cosmos.write.bulkEnabled", + private val bulkEnabled = CosmosConfigEntry[Boolean](key = "spark.cosmos.write.bulk.enabled", defaultValue = Option.apply(true), mandatory = false, parseFromStringFunction = bulkEnabledAsString => bulkEnabledAsString.toBoolean, helpMessage = "Cosmos DB Item Write bulk enabled") - private val MaxRetryCount = 3 + private val DefaultMaxRetryCount = 10 - private val writeConcurrency = CosmosConfigEntry[Int](key = "spark.cosmos.write.maxConcurrency", + private val bulkMaxPendingOperations = CosmosConfigEntry[Int](key = "spark.cosmos.write.bulk.maxPendingOperations", + mandatory = false, + parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt, + helpMessage = s"Cosmos DB Item Write Max Pending Operations." + + s" If not specified it will be determined based on the Spark executor VM Size") + + private val pointWriteConcurrency = CosmosConfigEntry[Int](key = "spark.cosmos.write.point.maxConcurrency", mandatory = false, parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt, helpMessage = s"Cosmos DB Item Write Max concurrency." + @@ -312,11 +320,11 @@ private object CosmosWriteConfig { mandatory = false, parseFromStringFunction = itemWriteStrategyAsString => CosmosConfigEntry.parseEnumeration(itemWriteStrategyAsString, ItemWriteStrategy), - helpMessage = "Cosmos DB Item write Strategy: ItemOverwrite (using upsert), ItemAppend (using create, ignore 409)") + helpMessage = "Cosmos DB Item write Strategy: `ItemOverwrite` (using upsert), `ItemAppend` (using create, ignore pre-existing items i.e., Conflicts)") private val maxRetryCount = CosmosConfigEntry[Int](key = "spark.cosmos.write.maxRetryCount", mandatory = false, - defaultValue = Option.apply(MaxRetryCount), + defaultValue = Option.apply(DefaultMaxRetryCount), parseFromStringFunction = maxRetryAttempt => { val cnt = maxRetryAttempt.toInt if (cnt < 0) { @@ -331,7 +339,6 @@ private object CosmosWriteConfig { val maxRetryCountOpt = CosmosConfigEntry.parse(cfg, maxRetryCount) val bulkEnabledOpt = CosmosConfigEntry.parse(cfg, bulkEnabled) assert(bulkEnabledOpt.isDefined) - val maxConcurrencyOpt = CosmosConfigEntry.parse(cfg, writeConcurrency) // parsing above already validated this assert(itemWriteStrategyOpt.isDefined) @@ -341,8 +348,9 @@ private object CosmosWriteConfig { CosmosWriteConfig( itemWriteStrategyOpt.get, maxRetryCountOpt.get, - bulkEnabledOpt.get, - maxConcurrencyOpt) + bulkEnabled = bulkEnabledOpt.get, + bulkMaxPendingOperations = CosmosConfigEntry.parse(cfg, bulkMaxPendingOperations), + pointMaxConcurrency = CosmosConfigEntry.parse(cfg, pointWriteConcurrency)) } } @@ -391,37 +399,37 @@ private case class CosmosSchemaInferenceConfig(inferSchemaSamplingSize: Int, private object CosmosSchemaInferenceConfig { private val DefaultSampleSize: Int = 1000 - private val inferSchemaSamplingSize = CosmosConfigEntry[Int](key = "spark.cosmos.read.inferSchemaSamplingSize", + private val inferSchemaSamplingSize = CosmosConfigEntry[Int](key = "spark.cosmos.read.inferSchema.samplingSize", mandatory = false, defaultValue = Some(DefaultSampleSize), parseFromStringFunction = size => size.toInt, helpMessage = "Sampling size to use when inferring schema") - private val inferSchemaEnabled = CosmosConfigEntry[Boolean](key = "spark.cosmos.read.inferSchemaEnabled", + private val inferSchemaEnabled = CosmosConfigEntry[Boolean](key = "spark.cosmos.read.inferSchema.enabled", mandatory = false, defaultValue = Some(true), parseFromStringFunction = enabled => enabled.toBoolean, helpMessage = "Whether schema inference is enabled or should return raw json") - private val inferSchemaIncludeSystemProperties = CosmosConfigEntry[Boolean](key = "spark.cosmos.read.inferSchemaIncludeSystemProperties", + private val inferSchemaIncludeSystemProperties = CosmosConfigEntry[Boolean](key = "spark.cosmos.read.inferSchema.includeSystemProperties", mandatory = false, defaultValue = Some(false), parseFromStringFunction = include => include.toBoolean, helpMessage = "Whether schema inference should include the system properties in the schema") - private val inferSchemaForceNullableProperties = CosmosConfigEntry[Boolean](key = "spark.cosmos.read.inferSchemaForceNullableProperties", + private val inferSchemaForceNullableProperties = CosmosConfigEntry[Boolean](key = "spark.cosmos.read.inferSchema.forceNullableProperties", mandatory = false, defaultValue = Some(false), parseFromStringFunction = include => include.toBoolean, helpMessage = "Whether schema inference should enforce inferred properties to be nullable - even when no null-values are contained in the sample set") - private val inferSchemaIncludeTimestamp = CosmosConfigEntry[Boolean](key = "spark.cosmos.read.inferSchemaIncludeTimestamp", + private val inferSchemaIncludeTimestamp = CosmosConfigEntry[Boolean](key = "spark.cosmos.read.inferSchema.includeTimestamp", mandatory = false, defaultValue = Some(false), parseFromStringFunction = include => include.toBoolean, helpMessage = "Whether schema inference should include the timestamp (_ts) property") - private val inferSchemaQuery = CosmosConfigEntry[String](key = "spark.cosmos.read.inferSchemaQuery", + private val inferSchemaQuery = CosmosConfigEntry[String](key = "spark.cosmos.read.inferSchema.query", mandatory = false, parseFromStringFunction = query => query, helpMessage = "When schema inference is enabled, used as custom query to infer it") @@ -575,7 +583,7 @@ private object CosmosChangeFeedConfig { helpMessage = "ChangeFeed mode (Incremental or FullFidelity)") private val maxItemCountPerTriggerHint = CosmosConfigEntry[Long]( - key = "spark.cosmos.changeFeed.maxItemCountPerTriggerHint", + key = "spark.cosmos.changeFeed.itemCountPerTriggerHint", mandatory = false, parseFromStringFunction = maxItemCount => maxItemCount.toInt, helpMessage = "Approximate maximum number of items read from change feed for each trigger") @@ -621,7 +629,7 @@ private case class CosmosThroughputControlConfig(groupName: String, private object CosmosThroughputControlConfig { private val throughputControlEnabledSupplier = CosmosConfigEntry[Boolean]( - key = "spark.cosmos.throughputControlEnabled", + key = "spark.cosmos.throughputControl.enabled", mandatory = false, defaultValue = Some(false), parseFromStringFunction = enableThroughputControl => enableThroughputControl.toBoolean, diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/PointWriter.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/PointWriter.scala index 516f14457125..e99993db2ebf 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/PointWriter.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/PointWriter.scala @@ -25,7 +25,7 @@ class PointWriter(container: CosmosAsyncContainer, cosmosWriteConfig: CosmosWrit extends AsyncItemWriter with CosmosLoggingTrait { - private val maxConcurrency = cosmosWriteConfig.maxConcurrencyOpt + private val maxConcurrency = cosmosWriteConfig.pointMaxConcurrency .getOrElse(SparkUtils.getNumberOfHostCPUCores * MaxNumberOfThreadsPerCPUCore) // TODO: moderakh do perf tuning on the maxConcurrency and also the thread pool config diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/BulkWriterITest.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/BulkWriterITest.scala index 768cc28797a5..053b63c95c04 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/BulkWriterITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/BulkWriterITest.scala @@ -25,7 +25,7 @@ class BulkWriterITest extends IntegrationSpec with CosmosClient with AutoCleanab "Bulk Writer" can "upsert item" in { val container = getContainer - val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, maxRetryCount = 5, bulkEnabled = true, Some(900)) + val writeConfig = CosmosWriteConfig(ItemWriteStrategy.ItemOverwrite, 5, bulkEnabled = true, bulkMaxPendingOperations = Some(900)) val bulkWriter = new BulkWriter(container, writeConfig) diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/CosmosCatalogITest.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/CosmosCatalogITest.scala index f2e2765d3080..3cee60bc1474 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/CosmosCatalogITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/CosmosCatalogITest.scala @@ -302,8 +302,8 @@ class CosmosCatalogITest extends IntegrationSpec with CosmosClient with CosmosLo s"OPTIONS (" + s"spark.cosmos.database = '$databaseName', " + s"spark.cosmos.container = '$containerName', " + - "spark.cosmos.read.inferSchemaEnabled = 'True', " + - "spark.cosmos.read.inferSchemaIncludeSystemProperties = 'True', " + + "spark.cosmos.read.inferSchema.enabled = 'True', " + + "spark.cosmos.read.inferSchema.includeSystemProperties = 'True', " + "spark.cosmos.partitioning.strategy = 'Restrictive');") val tables = spark.sql(s"SHOW TABLES in testCatalog.$databaseName;") @@ -379,7 +379,7 @@ class CosmosCatalogITest extends IntegrationSpec with CosmosClient with CosmosLo s"OPTIONS (" + s"spark.cosmos.database = '$databaseName', " + s"spark.cosmos.container = '$containerName', " + - "spark.cosmos.read.inferSchemaEnabled = 'False', " + + "spark.cosmos.read.inferSchema.enabled = 'False', " + "spark.cosmos.partitioning.strategy = 'Restrictive');") var tables = spark.sql(s"SHOW TABLES in testCatalog.$databaseName;") @@ -391,8 +391,8 @@ class CosmosCatalogITest extends IntegrationSpec with CosmosClient with CosmosLo s"OPTIONS (" + s"spark.cosmos.database = '$databaseName', " + s"spark.cosmos.container = '$containerName', " + - "spark.cosmos.read.inferSchemaEnabled = 'True', " + - "spark.cosmos.read.inferSchemaIncludeSystemProperties = 'False', " + + "spark.cosmos.read.inferSchema.enabled = 'True', " + + "spark.cosmos.read.inferSchema.includeSystemProperties = 'False', " + "spark.cosmos.partitioning.strategy = 'Restrictive');") tables = spark.sql(s"SHOW TABLES in testCatalog.$databaseName;") @@ -488,7 +488,7 @@ class CosmosCatalogITest extends IntegrationSpec with CosmosClient with CosmosLo s"OPTIONS (" + s"spark.cosmos.database = '$databaseName', " + s"spark.cosmos.container = '$containerName', " + - "spark.cosmos.read.inferSchemaEnabled = 'False', " + + "spark.cosmos.read.inferSchema.enabled = 'False', " + "spark.cosmos.partitioning.strategy = 'Restrictive');") fail("Expected IllegalArgumentException not thrown") @@ -539,7 +539,7 @@ class CosmosCatalogITest extends IntegrationSpec with CosmosClient with CosmosLo s"OPTIONS (" + s"spark.cosmos.database = '$databaseName', " + s"spark.cosmos.container = '$containerName', " + - "spark.cosmos.read.inferSchemaEnabled = 'False', " + + "spark.cosmos.read.inferSchema.enabled = 'False', " + "spark.cosmos.partitioning.strategy = 'Restrictive');") fail("Expected IllegalArgumentException not thrown") diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala index 3fe64b061458..9177facc5227 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/CosmosConfigSpec.scala @@ -197,11 +197,11 @@ class CosmosConfigSpec extends UnitSpec { it should "parse inference configuration" in { val customQuery = "select * from c" val userConfig = Map( - "spark.cosmos.read.inferSchemaSamplingSize" -> "50", - "spark.cosmos.read.inferSchemaEnabled" -> "false", - "spark.cosmos.read.inferSchemaIncludeSystemProperties" -> "true", - "spark.cosmos.read.inferSchemaIncludeTimestamp" -> "true", - "spark.cosmos.read.inferSchemaQuery" -> customQuery + "spark.cosmos.read.inferSchema.samplingSize" -> "50", + "spark.cosmos.read.inferSchema.enabled" -> "false", + "spark.cosmos.read.inferSchema.includeSystemProperties" -> "true", + "spark.cosmos.read.inferSchema.includeTimestamp" -> "true", + "spark.cosmos.read.inferSchema.query" -> customQuery ) val config = CosmosSchemaInferenceConfig.parseCosmosReadConfig(userConfig) @@ -229,17 +229,19 @@ class CosmosConfigSpec extends UnitSpec { val config = CosmosWriteConfig.parseWriteConfig(userConfig) config.itemWriteStrategy shouldEqual ItemWriteStrategy.ItemOverwrite - config.maxRetryCount shouldEqual 3 + config.maxRetryCount shouldEqual 10 config.bulkEnabled shouldEqual true - config.maxConcurrencyOpt.isDefined shouldEqual false + config.pointMaxConcurrency.isDefined shouldEqual false + config.bulkMaxPendingOperations.isDefined shouldEqual false + } - it should "parse write config" in { + it should "parse point write config" in { val userConfig = Map( "spark.cosmos.write.strategy" -> "ItemAppend", "spark.cosmos.write.maxRetryCount" -> "8", - "spark.cosmos.write.bulkEnabled" -> "false", - "spark.cosmos.write.maxConcurrency" -> "12" + "spark.cosmos.write.bulk.enabled" -> "false", + "spark.cosmos.write.point.maxConcurrency" -> "12" ) val config = CosmosWriteConfig.parseWriteConfig(userConfig) @@ -247,7 +249,23 @@ class CosmosConfigSpec extends UnitSpec { config.itemWriteStrategy shouldEqual ItemWriteStrategy.ItemAppend config.maxRetryCount shouldEqual 8 config.bulkEnabled shouldEqual false - config.maxConcurrencyOpt.get shouldEqual 12 + config.pointMaxConcurrency.get shouldEqual 12 + } + + it should "parse bulk write config" in { + val userConfig = Map( + "spark.cosmos.write.strategy" -> "ItemAppend", + "spark.cosmos.write.maxRetryCount" -> "8", + "spark.cosmos.write.bulk.enabled" -> "true", + "spark.cosmos.write.bulk.maxPendingOperations" -> "12" + ) + + val config = CosmosWriteConfig.parseWriteConfig(userConfig) + + config.itemWriteStrategy shouldEqual ItemWriteStrategy.ItemAppend + config.maxRetryCount shouldEqual 8 + config.bulkEnabled shouldEqual true + config.bulkMaxPendingOperations.get shouldEqual 12 } it should "parse partitioning config with custom Strategy" in { @@ -346,7 +364,7 @@ class CosmosConfigSpec extends UnitSpec { val changeFeedConfig = Map( "spark.cosmos.changeFeed.mode" -> "FULLFidelity", "spark.cosmos.changeFeed.STARTfrom" -> "NOW", - "spark.cosmos.changeFeed.maxItemCountPerTriggerHint" -> "54" + "spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "54" ) val config = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(changeFeedConfig) @@ -361,7 +379,7 @@ class CosmosConfigSpec extends UnitSpec { val changeFeedConfig = Map( "spark.cosmos.changeFeed.mode" -> "incremental", "spark.cosmos.changeFeed.STARTfrom" -> "2019-12-31T10:45:10Z", - "spark.cosmos.changeFeed.maxItemCountPerTriggerHint" -> "54" + "spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "54" ) val config = CosmosChangeFeedConfig.parseCosmosChangeFeedConfig(changeFeedConfig) @@ -414,7 +432,7 @@ class CosmosConfigSpec extends UnitSpec { val changeFeedConfig = Map( "spark.cosmos.changeFeed.mode" -> "Incremental", "spark.cosmos.changeFeed.STARTfrom" -> "2019-12-31T10:45:10Z", - "spark.cosmos.changeFeed.maxItemCountPerTriggerHint" -> "54OOrMore" + "spark.cosmos.changeFeed.itemCountPerTriggerHint" -> "54OOrMore" ) try { @@ -422,7 +440,7 @@ class CosmosConfigSpec extends UnitSpec { fail("incorrect mode") } catch { case e: Exception => e.getMessage shouldEqual - "invalid configuration for spark.cosmos.changeFeed.maxItemCountPerTriggerHint:54OOrMore. " + + "invalid configuration for spark.cosmos.changeFeed.itemCountPerTriggerHint:54OOrMore. " + "Config description: Approximate maximum number of items read from change feed for each trigger" } } diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala index eded2231c1f6..7c36c01fcbe9 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EChangeFeedITest.scala @@ -39,7 +39,7 @@ class SparkE2EChangeFeedITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "false" + "spark.cosmos.read.inferSchema.enabled" -> "false" ) val df = spark.read.format("cosmos.changeFeed").options(cfg).load() @@ -53,7 +53,7 @@ class SparkE2EChangeFeedITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "false", + "spark.cosmos.read.inferSchema.enabled" -> "false", "spark.cosmos.changeFeed.mode" -> "Incremental" ) @@ -83,7 +83,7 @@ class SparkE2EChangeFeedITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "false" + "spark.cosmos.read.inferSchema.enabled" -> "false" ) val customSchema = StructType(Array( @@ -119,7 +119,7 @@ class SparkE2EChangeFeedITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "false", + "spark.cosmos.read.inferSchema.enabled" -> "false", "spark.cosmos.changeFeed.mode" -> "FullFidelity", "spark.cosmos.changeFeed.startFrom" -> "NOW" ) @@ -148,7 +148,7 @@ class SparkE2EChangeFeedITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "false", + "spark.cosmos.read.inferSchema.enabled" -> "false", "spark.cosmos.read.startFrom" -> "Beginning", "spark.cosmos.partitioning.strategy" -> "Restrictive" ) @@ -159,7 +159,7 @@ class SparkE2EChangeFeedITest "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> sinkContainerName, "spark.cosmos.write.strategy" -> "ItemOverwrite", - "spark.cosmos.write.bulkEnabled" -> "true" + "spark.cosmos.write.bulk.enabled" -> "true" ) val testId = UUID.randomUUID().toString.replace("-", "") @@ -198,7 +198,7 @@ class SparkE2EChangeFeedITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> sinkContainerName, - "spark.cosmos.read.inferSchemaEnabled" -> "false", + "spark.cosmos.read.inferSchema.enabled" -> "false", "spark.cosmos.partitioning.strategy" -> "Restrictive" ) diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EConfigResolutionITest.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EConfigResolutionITest.scala index 8e77f86a7769..90069112e522 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EConfigResolutionITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EConfigResolutionITest.scala @@ -83,7 +83,7 @@ class SparkE2EConfigResolutionITest extends IntegrationSpec with CosmosClient wi val options = Map( "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "true" + "spark.cosmos.read.inferSchema.enabled" -> "true" ) val df = spark.read.format("cosmos.items").options(options).load() diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala index f7b69b257322..abe82ccb2366 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EQueryITest.scala @@ -137,8 +137,8 @@ class SparkE2EQueryITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "true", - "spark.cosmos.read.inferSchemaIncludeSystemProperties" -> "true", + "spark.cosmos.read.inferSchema.enabled" -> "true", + "spark.cosmos.read.inferSchema.includeSystemProperties" -> "true", "spark.cosmos.partitioning.strategy" -> "Restrictive" ) @@ -180,8 +180,8 @@ class SparkE2EQueryITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "true", - "spark.cosmos.read.inferSchemaIncludeTimestamp" -> "true", + "spark.cosmos.read.inferSchema.enabled" -> "true", + "spark.cosmos.read.inferSchema.includeTimestamp" -> "true", "spark.cosmos.partitioning.strategy" -> "Restrictive" ) @@ -223,7 +223,7 @@ class SparkE2EQueryITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "true", + "spark.cosmos.read.inferSchema.enabled" -> "true", "spark.cosmos.partitioning.strategy" -> "Restrictive" ) @@ -265,8 +265,8 @@ class SparkE2EQueryITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "true", - "spark.cosmos.read.inferSchemaQuery" -> "select TOP 1 c.isAlive, c.type, c.age from c", + "spark.cosmos.read.inferSchema.enabled" -> "true", + "spark.cosmos.read.inferSchema.query" -> "select TOP 1 c.isAlive, c.type, c.age from c", "spark.cosmos.partitioning.strategy" -> "Restrictive" ) @@ -301,8 +301,8 @@ class SparkE2EQueryITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "true", - "spark.cosmos.read.inferSchemaQuery" -> "select TOP 1 c.type, c.age, c.isAlive, c._ts from c", + "spark.cosmos.read.inferSchema.enabled" -> "true", + "spark.cosmos.read.inferSchema.query" -> "select TOP 1 c.type, c.age, c.isAlive, c._ts from c", "spark.cosmos.partitioning.strategy" -> "Restrictive" ) @@ -354,9 +354,9 @@ class SparkE2EQueryITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "true", - "spark.cosmos.read.inferSchemaSamplingSize" -> samplingSize.toString, - "spark.cosmos.read.inferSchemaQuery" -> "SELECT * FROM c ORDER BY c._ts", + "spark.cosmos.read.inferSchema.enabled" -> "true", + "spark.cosmos.read.inferSchema.samplingSize" -> samplingSize.toString, + "spark.cosmos.read.inferSchema.query" -> "SELECT * FROM c ORDER BY c._ts", "spark.cosmos.partitioning.strategy" -> "Restrictive" ) diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EStructuredStreamingITest.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EStructuredStreamingITest.scala index 8af2529a09fe..7bd295b6c7fb 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EStructuredStreamingITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EStructuredStreamingITest.scala @@ -51,7 +51,7 @@ class SparkE2EStructuredStreamingITest "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "false" + "spark.cosmos.read.inferSchema.enabled" -> "false" ) val writeCfg = Map( @@ -60,7 +60,7 @@ class SparkE2EStructuredStreamingITest "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> targetContainer.getId, "spark.cosmos.write.strategy" -> "ItemOverwrite", - "spark.cosmos.write.bulkEnabled" -> "true", + "spark.cosmos.write.bulk.enabled" -> "true", "checkpointLocation" -> ("/tmp/" + testId + "/") ) diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EThroughputControlITest.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EThroughputControlITest.scala index 45f8827df610..8f8ee7b0a328 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EThroughputControlITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EThroughputControlITest.scala @@ -20,8 +20,8 @@ class SparkE2EThroughputControlITest extends IntegrationSpec with Spark with Cos "spark.cosmos.accountKey" -> TestConfigurations.MASTER_KEY, "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, - "spark.cosmos.read.inferSchemaEnabled" -> "true", - "spark.cosmos.throughputControlEnabled" -> "true", + "spark.cosmos.read.inferSchema.enabled" -> "true", + "spark.cosmos.throughputControl.enabled" -> "true", "spark.cosmos.throughputControl.name" -> "sparkTest", "spark.cosmos.throughputControl.targetThroughput" -> "6", "spark.cosmos.throughputControl.globalControl.database" -> throughputControlDatabaseId, diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala index e58b483d579b..f07f97b41999 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteITest.scala @@ -36,7 +36,7 @@ class SparkE2EWriteITest extends IntegrationSpec with Spark with CosmosClient wi "spark.cosmos.database" -> cosmosDatabase, "spark.cosmos.container" -> cosmosContainer, "spark.cosmos.write.strategy" -> itemWriteStrategy.toString, - "spark.cosmos.write.bulkEnabled" -> bulkEnabled.toString + "spark.cosmos.write.bulk.enabled" -> bulkEnabled.toString ) val newSpark = getSpark() diff --git a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/test-databricks/notebooks/basicScenario.scala b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/test-databricks/notebooks/basicScenario.scala index ada664001593..dcb5c5f9b58e 100644 --- a/sdk/cosmos/azure-cosmos-spark_3-1_2-12/test-databricks/notebooks/basicScenario.scala +++ b/sdk/cosmos/azure-cosmos-spark_3-1_2-12/test-databricks/notebooks/basicScenario.scala @@ -17,7 +17,7 @@ val cfgWithAutoSchemaInference = Map("spark.cosmos.accountEndpoint" -> cosmosEnd "spark.cosmos.accountKey" -> cosmosMasterKey, "spark.cosmos.database" -> cosmosDatabaseName, "spark.cosmos.container" -> cosmosContainerName, - "spark.cosmos.read.inferSchemaEnabled" -> "true" + "spark.cosmos.read.inferSchema.enabled" -> "true" ) // COMMAND ----------