Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage #39438

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-1_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Bugs Fixed

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)

### 4.28.4 (2024-03-18)

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-2_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Bugs Fixed

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)

### 4.28.4 (2024-03-18)

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Bugs Fixed

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)

### 4.28.4 (2024-03-18)

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### Bugs Fixed

#### Other Changes
* Optimized the partitioning strategy implementation details to avoid unnecessarily high RU usage. - See [PR 39438](https://github.com/Azure/azure-sdk-for-java/pull/39438)

### 4.28.4 (2024-03-18)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@
" { \"spark.cosmos.accountKey\", cosmosMasterKey },\r\n",
" { \"spark.cosmos.database\", \"SampleDatabase\" },\r\n",
" { \"spark.cosmos.container\", \"GreenTaxiRecords\" },\r\n",
" { \"spark.cosmos.read.partitioning.strategy\", \"Default\" }, \r\n",
" { \"spark.cosmos.read.partitioning.strategy\", \"Restrictive\" }, \r\n",
" { \"spark.cosmos.read.inferSchema.enabled\", \"false\" },\r\n",
" { \"spark.cosmos.changeFeed.startFrom\", \"Beginning\" },\r\n",
" { \"spark.cosmos.changeFeed.mode\", \"Incremental\" }\r\n",
Expand Down Expand Up @@ -437,7 +437,7 @@
" { \"spark.cosmos.accountKey\", cosmosMasterKey },\r\n",
" { \"spark.cosmos.database\", \"SampleDatabase\" },\r\n",
" { \"spark.cosmos.container\", \"GreenTaxiRecords\" },\r\n",
" { \"spark.cosmos.read.partitioning.strategy\", \"Default\" }, \r\n",
" { \"spark.cosmos.read.partitioning.strategy\", \"Restrictive\" }, \r\n",
" { \"spark.cosmos.read.inferSchema.enabled\", \"false\" }\r\n",
"};\r\n",
"\r\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def createCosmosView(cosmosDatabaseName: String, cosmosContainerName: String, co
spark.cosmos.database = '$cosmosDatabaseName',
spark.cosmos.container = '$cosmosContainerName',
spark.cosmos.read.inferSchema.enabled = 'False',
spark.cosmos.read.partitioning.strategy = 'Default'
spark.cosmos.read.partitioning.strategy = 'Restrictive'
);
"""
println("Executing create View...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ spark.sql(createTargetResources)
"spark.cosmos.accountKey" -> cosmosSourceMasterKey,
"spark.cosmos.database" -> cosmosSourceDatabaseName,
"spark.cosmos.container" -> cosmosSourceContainerName,
"spark.cosmos.read.partitioning.strategy" -> "Default",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
"spark.cosmos.changeFeed.mode" -> "Incremental",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ OPTIONS (
spark.cosmos.database = '${cosmosSourceDatabaseName}', -- source database
spark.cosmos.container = '${cosmosSourceContainerName}', -- source container
spark.cosmos.read.inferSchema.enabled = 'False',
spark.cosmos.read.partitioning.strategy = 'Default');
spark.cosmos.read.partitioning.strategy = 'Restrictive');
"""

var selectView = s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@
{
"cell_type": "code",
"source": [
"print(\"Starting validation via change feed: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\nchangeFeedCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.read.partitioning.strategy\": \"Default\",\n \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n \"spark.cosmos.changeFeed.mode\" : \"Incremental\"\n}\nchangeFeed_df = spark.read.format(\"cosmos.oltp.changeFeed\").options(**changeFeedCfg).load()\ncount_changeFeed = changeFeed_df.count()\nprint(\"Number of records retrieved via change feed: \", count_changeFeed) \nprint(\"Finished validation via change feed: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nassert count_source == count_changeFeed"
"print(\"Starting validation via change feed: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\nchangeFeedCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.read.partitioning.strategy\": \"Restrictive\",\n \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n \"spark.cosmos.changeFeed.mode\" : \"Incremental\"\n}\nchangeFeed_df = spark.read.format(\"cosmos.oltp.changeFeed\").options(**changeFeedCfg).load()\ncount_changeFeed = changeFeed_df.count()\nprint(\"Number of records retrieved via change feed: \", count_changeFeed) \nprint(\"Finished validation via change feed: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nassert count_source == count_changeFeed"
],
"metadata": {
"application/vnd.databricks.v1+cell": {
Expand Down Expand Up @@ -301,7 +301,7 @@
{
"cell_type": "code",
"source": [
"import math\n\nprint(\"Starting to identify to be deleted documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\nreadCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.read.partitioning.strategy\": \"Default\",\n \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n}\n\ntoBeDeleted_df = spark.read.format(\"cosmos.oltp\").options(**readCfg).load().limit(100_000)\nprint(\"Number of records to be deleted: \", toBeDeleted_df.count()) \n\nprint(\"Starting to bulk delete documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\ndeleteCfg = writeCfg.copy()\ndeleteCfg[\"spark.cosmos.write.strategy\"] = \"ItemDelete\"\ntoBeDeleted_df \\\n .write \\\n .format(\"cosmos.oltp\") \\\n .mode(\"Append\") \\\n .options(**deleteCfg) \\\n .save()\nprint(\"Finished deleting documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nprint(\"Starting count validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\ncount_query_schema=StructType(fields=[StructField(\"Count\", LongType(), True)])\nreadCfg[\"spark.cosmos.read.customQuery\"] = \"SELECT COUNT(0) AS Count FROM c\"\nquery_df = spark.read.format(\"cosmos.oltp\").schema(count_query_schema).options(**readCfg).load()\ncount_query = query_df.select(F.sum(\"Count\").alias(\"TotalCount\")).first()[\"TotalCount\"]\nprint(\"Number of records retrieved via query: \", count_query) \nprint(\"Finished count validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nassert max(0, count_source - 100_000) == count_query"
"import math\n\nprint(\"Starting to identify to be deleted documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\nreadCfg = {\n \"spark.cosmos.accountEndpoint\": cosmosEndpoint,\n \"spark.cosmos.accountKey\": cosmosMasterKey,\n \"spark.cosmos.database\": \"SampleDatabase\",\n \"spark.cosmos.container\": \"GreenTaxiRecords\",\n \"spark.cosmos.read.partitioning.strategy\": \"Restrictive\",\n \"spark.cosmos.read.inferSchema.enabled\" : \"false\",\n}\n\ntoBeDeleted_df = spark.read.format(\"cosmos.oltp\").options(**readCfg).load().limit(100_000)\nprint(\"Number of records to be deleted: \", toBeDeleted_df.count()) \n\nprint(\"Starting to bulk delete documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\ndeleteCfg = writeCfg.copy()\ndeleteCfg[\"spark.cosmos.write.strategy\"] = \"ItemDelete\"\ntoBeDeleted_df \\\n .write \\\n .format(\"cosmos.oltp\") \\\n .mode(\"Append\") \\\n .options(**deleteCfg) \\\n .save()\nprint(\"Finished deleting documents: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nprint(\"Starting count validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\ncount_query_schema=StructType(fields=[StructField(\"Count\", LongType(), True)])\nreadCfg[\"spark.cosmos.read.customQuery\"] = \"SELECT COUNT(0) AS Count FROM c\"\nquery_df = spark.read.format(\"cosmos.oltp\").schema(count_query_schema).options(**readCfg).load()\ncount_query = query_df.select(F.sum(\"Count\").alias(\"TotalCount\")).first()[\"TotalCount\"]\nprint(\"Number of records retrieved via query: \", count_query) \nprint(\"Finished count validation via query: \", datetime.datetime.utcnow().strftime(\"%Y-%m-%d %H:%M:%S.%f\"))\n\nassert max(0, count_source - 100_000) == count_query"
],
"metadata": {
"application/vnd.databricks.v1+cell": {
Expand Down Expand Up @@ -421,7 +421,7 @@
{
"cell_type": "code",
"source": [
"%sql\nCREATE TABLE cosmosCatalog.SampleDatabase.GreenTaxiRecordsView \n (id STRING, _ts TIMESTAMP, vendorID INT, totalAmount DOUBLE)\nUSING cosmos.oltp\nTBLPROPERTIES(isCosmosView = 'True')\nOPTIONS (\n spark.cosmos.database = 'SampleDatabase',\n spark.cosmos.container = 'GreenTaxiRecords',\n spark.cosmos.read.inferSchema.enabled = 'False',\n spark.cosmos.read.inferSchema.includeSystemProperties = 'True',\n spark.cosmos.read.partitioning.strategy = 'Aggressive');\n\nSELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsView LIMIT 10"
"%sql\nCREATE TABLE cosmosCatalog.SampleDatabase.GreenTaxiRecordsView \n (id STRING, _ts TIMESTAMP, vendorID INT, totalAmount DOUBLE)\nUSING cosmos.oltp\nTBLPROPERTIES(isCosmosView = 'True')\nOPTIONS (\n spark.cosmos.database = 'SampleDatabase',\n spark.cosmos.container = 'GreenTaxiRecords',\n spark.cosmos.read.inferSchema.enabled = 'False',\n spark.cosmos.read.inferSchema.includeSystemProperties = 'True',\n spark.cosmos.read.partitioning.strategy = 'Restrictive');\n\nSELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsView LIMIT 10"
],
"metadata": {
"application/vnd.databricks.v1+cell": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@
" \"spark.cosmos.accountKey\": cosmosMasterKey,\n",
" \"spark.cosmos.database\": \"SampleDatabase\",\n",
" \"spark.cosmos.container\": \"GreenTaxiRecords\",\n",
" \"spark.cosmos.read.partitioning.strategy\": \"Default\",\n",
" \"spark.cosmos.read.partitioning.strategy\": \"Restrictive\",\n",
" \"spark.cosmos.read.inferSchema.enabled\" : \"true\",\n",
" \"spark.cosmos.read.inferSchema.forceNullableProperties\" : \"true\",\n",
" \"spark.cosmos.changeFeed.startFrom\" : \"Beginning\",\n",
Expand Down Expand Up @@ -338,7 +338,7 @@
" spark.cosmos.database = 'SampleDatabase',\n",
" spark.cosmos.container = 'GreenTaxiRecordsCFSink',\n",
" spark.cosmos.read.inferSchema.enabled = 'False',\n",
" spark.cosmos.read.partitioning.strategy = 'Default');\n",
" spark.cosmos.read.partitioning.strategy = 'Restrictive');\n",
"\n",
"SELECT COUNT(*) FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsCFSinkView"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ val changeFeedCfg = Map(
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> "SampleDatabase",
"spark.cosmos.container" -> "GreenTaxiRecords",
"spark.cosmos.read.partitioning.strategy" -> "Default",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
"spark.cosmos.changeFeed.mode" -> "Incremental"
Expand All @@ -255,7 +255,7 @@ val readCfg = Map(
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> "SampleDatabase",
"spark.cosmos.container" -> "GreenTaxiRecords",
"spark.cosmos.read.partitioning.strategy" -> "Default",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
"spark.cosmos.read.inferSchema.enabled" -> "false",
)

Expand Down Expand Up @@ -329,7 +329,7 @@ assert(df_Tables.count() == 3)
// MAGIC spark.cosmos.container = 'GreenTaxiRecords',
// MAGIC spark.cosmos.read.inferSchema.enabled = 'False',
// MAGIC spark.cosmos.read.inferSchema.includeSystemProperties = 'True',
// MAGIC spark.cosmos.read.partitioning.strategy = 'Aggressive');
// MAGIC spark.cosmos.read.partitioning.strategy = 'Restrictive');
// MAGIC
// MAGIC SELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsView LIMIT 10

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ val changeFeedCfg = Map(
"spark.cosmos.auth.aad.clientSecret" -> clientSecret,
"spark.cosmos.database" -> "SampleDatabase",
"spark.cosmos.container" -> "GreenTaxiRecords",
"spark.cosmos.read.partitioning.strategy" -> "Default",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.read.inferSchema.enabled" -> "false",
"spark.cosmos.changeFeed.startFrom" -> "Beginning",
"spark.cosmos.changeFeed.mode" -> "Incremental"
Expand Down Expand Up @@ -284,7 +284,7 @@ val readCfg = Map(
"spark.cosmos.auth.aad.clientId" -> clientId,
"spark.cosmos.auth.aad.clientSecret" -> clientSecret, "spark.cosmos.database" -> "SampleDatabase",
"spark.cosmos.container" -> "GreenTaxiRecords",
"spark.cosmos.read.partitioning.strategy" -> "Default",
"spark.cosmos.read.partitioning.strategy" -> "Restrictive",
"spark.cosmos.read.inferSchema.enabled" -> "false",
)

Expand Down Expand Up @@ -358,7 +358,7 @@ assert(df_Tables.count() == 3)
// MAGIC spark.cosmos.container = 'GreenTaxiRecords',
// MAGIC spark.cosmos.read.inferSchema.enabled = 'False',
// MAGIC spark.cosmos.read.inferSchema.includeSystemProperties = 'True',
// MAGIC spark.cosmos.read.partitioning.strategy = 'Aggressive');
// MAGIC spark.cosmos.read.partitioning.strategy = 'Restrictive');
// MAGIC
// MAGIC SELECT * FROM cosmosCatalog.SampleDatabase.GreenTaxiRecordsView LIMIT 10

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,18 @@ private object CosmosPartitionPlanner extends BasicLoggingTrait {

cosmosPartitioningConfig.partitioningStrategy match {
case PartitioningStrategies.Restrictive =>
case PartitioningStrategies.Default =>
applyRestrictiveStrategy(planningInfo)
case PartitioningStrategies.Custom =>
applyCustomStrategy(
container,
planningInfo,
cosmosPartitioningConfig.targetedPartitionCount.get)
case PartitioningStrategies.Default =>
applyStorageAlignedStrategy(
container,
planningInfo,
1 / defaultMaxPartitionSizeInMB.toDouble,
defaultMinimalPartitionCount
)
case PartitioningStrategies.Aggressive =>
applyStorageAlignedStrategy(
container,
planningInfo,
5 / defaultMaxPartitionSizeInMB.toDouble,
1 / defaultMaxPartitionSizeInMB.toDouble,
defaultMinimalPartitionCount
)
}
Expand Down