Skip to content

Commit

Permalink
updated cosmos spark configs (#21004)
Browse files Browse the repository at this point in the history
updated cosmos spark configs
  • Loading branch information
moderakh committed Apr 28, 2021
1 parent 67c297e commit 79b2c9b
Show file tree
Hide file tree
Showing 19 changed files with 130 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
" \"spark.cosmos.accountKey\" : cosmosMasterKey,\n",
" \"spark.cosmos.database\" : cosmosDatabaseName,\n",
" \"spark.cosmos.container\" : cosmosContainerName,\n",
" \"spark.cosmos.read.inferSchemaEnabled\" : \"true\" \n",
" \"spark.cosmos.read.inferSchema.enabled\" : \"true\" \n",
"}"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@
" \"spark.cosmos.database\": \"SampleDatabase\",\n",
" \"spark.cosmos.container\": \"GreenTaxiRecords\",\n",
" \"spark.cosmos.write.strategy\": \"ItemOverwrite\",\n",
" \"spark.cosmos.write.bulkEnabled\": \"true\",\n",
" \"spark.cosmos.throughputControlEnabled\": \"true\",\n",
" \"spark.cosmos.write.bulk.enabled\": \"true\",\n",
" \"spark.cosmos.throughputControl.enabled\": \"true\",\n",
" \"spark.cosmos.throughputControl.name\": \"NYCGreenTaxiDataIngestion\",\n",
" \"spark.cosmos.throughputControl.targetThroughputThreshold\": \"0.95\",\n",
" \"spark.cosmos.throughputControl.globalControl.database\": \"SampleDatabase\",\n",
Expand Down Expand Up @@ -321,7 +321,7 @@
" \"spark.cosmos.database\": \"SampleDatabase\",\n",
" \"spark.cosmos.container\": \"GreenTaxiRecords\",\n",
" \"spark.cosmos.partitioning.strategy\": \"Default\",\n",
" \"spark.cosmos.read.inferSchemaEnabled\" : \"false\"\n",
" \"spark.cosmos.read.inferSchema.enabled\" : \"false\"\n",
"}\n",
"\n",
"query_df = spark.read.format(\"cosmos.items\").options(**readCfg).load()\n",
Expand Down Expand Up @@ -366,7 +366,7 @@
" \"spark.cosmos.database\": \"SampleDatabase\",\n",
" \"spark.cosmos.container\": \"GreenTaxiRecords\",\n",
" \"spark.cosmos.partitioning.strategy\": \"Default\",\n",
" \"spark.cosmos.read.inferSchemaEnabled\" : \"false\",\n",
" \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n",
" \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n",
" \"spark.cosmos.changeFeed.mode\" : \"Incremental\"\n",
"}\n",
Expand Down Expand Up @@ -506,7 +506,7 @@
"OPTIONS (\n",
" spark.cosmos.database = 'SampleDatabase',\n",
" spark.cosmos.container = 'GreenTaxiRecords',\n",
" spark.cosmos.read.inferSchemaEnabled = 'False',\n",
" spark.cosmos.read.inferSchema.enabled = 'False',\n",
" spark.cosmos.read.inferSchemaIncludeSystemProperties = 'True',\n",
" spark.cosmos.partitioning.strategy = 'Aggressive');\n",
"\n",
Expand Down Expand Up @@ -547,7 +547,7 @@
"OPTIONS (\n",
" spark.cosmos.database = 'SampleDatabase',\n",
" spark.cosmos.container = 'GreenTaxiRecords',\n",
" spark.cosmos.read.inferSchemaEnabled = 'True',\n",
" spark.cosmos.read.inferSchema.enabled = 'True',\n",
" spark.cosmos.read.inferSchemaIncludeSystemProperties = 'False',\n",
" spark.cosmos.partitioning.strategy = 'Restrictive');\n",
"\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@
" \"spark.cosmos.database\": \"SampleDatabase\",\n",
" \"spark.cosmos.container\": \"GreenTaxiRecords\",\n",
" \"spark.cosmos.partitioning.strategy\": \"Default\",\n",
" \"spark.cosmos.read.inferSchemaEnabled\" : \"true\",\n",
" \"spark.cosmos.read.inferSchemaForceNullableProperties\" : \"true\",\n",
" \"spark.cosmos.read.inferSchema.enabled\" : \"true\",\n",
" \"spark.cosmos.read.inferSchema.forceNullableProperties\" : \"true\",\n",
" \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n",
" \"spark.cosmos.changeFeed.mode\" : \"Incremental\"\n",
" #\"spark.cosmos.changeFeed.maxItemCountPerTriggerHint\" : \"50000\"\n",
Expand All @@ -256,7 +256,7 @@
" \"spark.cosmos.database\": \"SampleDatabase\",\n",
" \"spark.cosmos.container\": \"GreenTaxiRecordsCFSink\",\n",
" \"spark.cosmos.write.strategy\": \"ItemOverwrite\",\n",
" \"spark.cosmos.write.bulkEnabled\": \"true\",\n",
" \"spark.cosmos.write.bulk.enabled\": \"true\",\n",
" \"checkpointLocation\": \"/tmp/\" + runId + \"/\"\n",
"}\n",
"\n",
Expand Down Expand Up @@ -337,7 +337,7 @@
"OPTIONS (\n",
" spark.cosmos.database = 'SampleDatabase',\n",
" spark.cosmos.container = 'GreenTaxiRecordsCFSink',\n",
" spark.cosmos.read.inferSchemaEnabled = 'False',\n",
" spark.cosmos.read.inferSchema.enabled = 'False',\n",
" spark.cosmos.partitioning.strategy = 'Default');\n",
"\n",
"SELECT COUNT(*) FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSinkView"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ val cfgWithAutoSchemaInference = Map("spark.cosmos.accountEndpoint" -> cosmosEnd
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabaseName,
"spark.cosmos.container" -> cosmosContainerName,
"spark.cosmos.read.inferSchemaEnabled" -> "true"
"spark.cosmos.read.inferSchema.enabled" -> "true"
)

// COMMAND ----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ Configuration Reference:
| `spark.cosmos.useGatewayMode` | `false` | Use gateway mode for the client operations |
| `spark.cosmos.read.forceEventualConsistency` | `true` | Makes the client use Eventual consistency for read operations instead of using the default account level consistency |
| `spark.cosmos.applicationName` | None | Application name |
| `spark.cosmos.preferredRegionsList` | None | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[eastus,westus]`) provided preferred regions will be used as hint. You should use a collocated spark cluster with your Cosmos DB account and pass the spark cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet) |
| `spark.cosmos.preferredRegionsList` | None | Preferred regions list to be used for a multi region Cosmos DB account. This is a comma separated value (e.g., `[East US, West US]` or `East US, West US`) provided preferred regions will be used as hint. You should use a collocated spark cluster with your Cosmos DB account and pass the spark cluster region as preferred region. See list of azure regions [here](https://docs.microsoft.com/dotnet/api/microsoft.azure.documents.locationnames?view=azure-dotnet) |

### Write Config

| Config Property Name | Default | Description |
| :--- | :---- | :--- |
| `spark.cosmos.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy: ItemOverwrite (using upsert), ItemAppend (using create, ignore Conflicts) |
| `spark.cosmos.write.maxRetryCount` | `3` | Cosmos DB Write Max Retry Attempts on failure |
| `spark.cosmos.write.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
| `spark.cosmos.write.bulkEnabled` | `true` | Cosmos DB Item Write bulk enabled |
| `spark.cosmos.write.strategy` | `ItemOverwrite` | Cosmos DB Item write Strategy: `ItemOverwrite` (using upsert), `ItemAppend` (using create, ignore pre-existing items i.e., Conflicts) |
| `spark.cosmos.write.maxRetryCount` | `10` | Cosmos DB Write Max Retry Attempts on retriable failures (e.g., connection error, moderakh add more details) |
| `spark.cosmos.write.point.maxConcurrency` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
| `spark.cosmos.write.bulk.maxPendingOperations` | None | Cosmos DB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size |
| `spark.cosmos.write.bulk.enabled` | `true` | Cosmos DB Item Write bulk enabled |

### Query Config

Expand All @@ -39,19 +40,19 @@ When doing read operations, users can specify a custom schema or allow the conne

| Config Property Name | Default | Description |
| :--- | :---- | :--- |
| `spark.cosmos.read.inferSchemaEnabled` | `true` | When schema inference is disabled and user is not providing a schema, raw json will be returned. |
| `spark.cosmos.read.inferSchemaQuery` | `SELECT * FROM r` | When schema inference is enabled, used as custom query to infer it. For example, if you store multiple entities with different schemas within a container and you want to ensure inference only looks at certain document types or you want to project only particular columns. |
| `spark.cosmos.read.inferSchemaSamplingSize` | `1000` | Sampling size to use when inferring schema and not using a query. |
| `spark.cosmos.read.inferSchemaIncludeSystemProperties` | `false` | When schema inference is enabled, whether the resulting schema will include all [Cosmos DB system properties](https://docs.microsoft.com/azure/cosmos-db/account-databases-containers-items#properties-of-an-item). |
| `spark.cosmos.read.inferSchemaIncludeTimestamp` | `false` | When schema inference is enabled, whether the resulting schema will include the document Timestamp (`_ts`). Not required if `spark.cosmos.read.inferSchemaIncludeSystemProperties` is enabled, as it will already include all system properties. |
| `spark.cosmos.read.inferSchema.enabled` | `true` | When schema inference is disabled and user is not providing a schema, raw json will be returned. |
| `spark.cosmos.read.inferSchema.query` | `SELECT * FROM r` | When schema inference is enabled, used as custom query to infer it. For example, if you store multiple entities with different schemas within a container and you want to ensure inference only looks at certain document types or you want to project only particular columns. |
| `spark.cosmos.read.inferSchema.samplingSize` | `1000` | Sampling size to use when inferring schema and not using a query. |
| `spark.cosmos.read.inferSchema.includeSystemProperties` | `false` | When schema inference is enabled, whether the resulting schema will include all [Cosmos DB system properties](https://docs.microsoft.com/azure/cosmos-db/account-databases-containers-items#properties-of-an-item). |
| `spark.cosmos.read.inferSchema.includeTimestamp` | `false` | When schema inference is enabled, whether the resulting schema will include the document Timestamp (`_ts`). Not required if `spark.cosmos.read.inferSchema.includeSystemProperties` is enabled, as it will already include all system properties. |

#### Json conversion configuration

When reading json documents, if a document contains an attribute that does not map to the schema type, the user can decide whether to use a `null` value (Relaxed) or an exception (Strict).

| Config Property Name | Default | Description |
| :--- | :---- | :--- |
| `spark.cosmos.read.schemaConversionMode` | `Relaxed` | The schema conversion behavior (Relaxed, Strict) |
| `spark.cosmos.read.schemaConversionMode` | `Relaxed` | The schema conversion behavior (`Relaxed`, `Strict`). When reading json documents, if a document contains an attribute that does not map to the schema type, the user can decide whether to use a `null` value (Relaxed) or an exception (Strict).
|

#### Partitioning Strategy Config

Expand All @@ -65,7 +66,7 @@ When reading json documents, if a document contains an attribute that does not m

| Config Property Name | Default | Description |
| :--- | :---- | :--- |
| `spark.cosmos.throughputControlEnabled` | `false` | Whether throughput control is enabled |
| `spark.cosmos.throughputControl.enabled` | `false` | Whether throughput control is enabled |
| `spark.cosmos.throughputControl.name` | None | Throughput control group name |
| `spark.cosmos.throughputControl.targetThroughput` | None | Throughput control group target throughput |
| `spark.cosmos.throughputControl.targetThroughputThreshold` | None | Throughput control group target throughput threshold |
Expand Down
6 changes: 3 additions & 3 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ see [Write Configuration](https://github.com/Azure/azure-sdk-for-java/blob/maste
from pyspark.sql.functions import col

df = spark.read.format("cosmos.items").options(**cfg)\
.option("spark.cosmos.read.inferSchemaEnabled", "true")\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()

df.filter(col("isAlive") == True)\
Expand All @@ -112,7 +112,7 @@ df.filter(col("isAlive") == True)\
see [Query Configuration](https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md#query-config) for more detail.

Note when running queries unless if are interested to get back the raw json payload
we recommend setting `spark.cosmos.read.inferSchemaEnabled` to be `true`.
we recommend setting `spark.cosmos.read.inferSchema.enabled` to be `true`.

see [Schema Inference Configuration](https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/cosmos/azure-cosmos-spark_3-1_2-12/docs/configuration-reference.md#schema-inference-config) for more detail.

Expand All @@ -122,7 +122,7 @@ see [Schema Inference Configuration](https://github.com/Azure/azure-sdk-for-java
```python
# Show the inferred schema from Cosmos DB
df = spark.read.format("cosmos.items").options(**cfg)\
.option("spark.cosmos.read.inferSchemaEnabled", "true")\
.option("spark.cosmos.read.inferSchema.enabled", "true")\
.load()

df.printSchema()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package com.azure.cosmos.spark

import com.azure.cosmos.implementation.guava25.base.Preconditions
import com.azure.cosmos.models.PartitionKey
import com.azure.cosmos.spark.BulkWriter.MaxNumberOfThreadsPerCPUCore
import com.azure.cosmos.spark.BulkWriter.DefaultMaxPendingOperationPerCore
import com.azure.cosmos.{BulkOperations, CosmosAsyncContainer, CosmosBulkOperationResponse, CosmosException, CosmosItemOperation}
import com.fasterxml.jackson.databind.node.ObjectNode
import reactor.core.Disposable
Expand Down Expand Up @@ -34,8 +34,8 @@ class BulkWriter(container: CosmosAsyncContainer,

// TODO: moderakh this requires tuning.
// TODO: moderakh should we do a max on the max memory to ensure we don't run out of memory?
private val maxConcurrency = writeConfig.maxConcurrencyOpt
.getOrElse(SparkUtils.getNumberOfHostCPUCores * MaxNumberOfThreadsPerCPUCore)
private val maxPendingOperations = writeConfig.bulkMaxPendingOperations
.getOrElse(SparkUtils.getNumberOfHostCPUCores * DefaultMaxPendingOperationPerCore)

private val closed = new AtomicBoolean(false)
private val lock = new ReentrantLock
Expand All @@ -57,7 +57,7 @@ class BulkWriter(container: CosmosAsyncContainer,
// TODO: moderakh once that is added in the core SDK, drop activeOperations and rely on the core SDK
// context passing for bulk
private val activeOperations = new TrieMap[CosmosItemOperation, OperationContext]()
private val semaphore = new Semaphore(maxConcurrency)
private val semaphore = new Semaphore(maxPendingOperations)

private val totalScheduledMetrics = new AtomicLong(0)
private val totalSuccessfulIngestionMetrics = new AtomicLong(0)
Expand Down Expand Up @@ -220,7 +220,7 @@ class BulkWriter(container: CosmosAsyncContainer,

assume(activeTasks.get() == 0)
assume(activeOperations.isEmpty)
assume(semaphore.availablePermits() == maxConcurrency)
assume(semaphore.availablePermits() == maxPendingOperations)

logInfo(s"flushAndClose completed with no error. " +
s"totalSuccessfulIngestionMetrics=${totalSuccessfulIngestionMetrics.get()}, totalScheduled=${totalScheduledMetrics}")
Expand Down Expand Up @@ -286,7 +286,7 @@ private object BulkWriter {
// hence we want 2MB/ 1KB items per partition to be buffered
// 2 * 1024 * 167 items should get buffered on a 16 CPU core VM
// so per CPU core we want (2 * 1024 * 167 / 16) max items to be buffered
val MaxNumberOfThreadsPerCPUCore = 2 * 1024 * 167 / 16
val DefaultMaxPendingOperationPerCore = 2 * 1024 * 167 / 16

val emitFailureHandler: EmitFailureHandler =
(signalType, emitResult) => if (emitResult.equals(EmitResult.FAIL_NON_SERIALIZED)) true else false
Expand Down

0 comments on commit 79b2c9b

Please sign in to comment.