Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos DB Spark Connector - query optimizations for queries targeting logical partitions #25889

Conversation

FabianMeiswinkel
Copy link
Member

@FabianMeiswinkel FabianMeiswinkel commented Dec 8, 2021

Description

This PR contains the remaining fixes to a perf regression in a certain scenario for queries

The impacted scenario
Customer very high-level and simplified wanted to query data from about 2.7 million logical partitions in a relatively large container (order of TBs) with >100 physical partitions via Spark as quickly as possible

The approach in Spark 2.4
Splitting the 2.7 logical partition key values into mini-batches with 15000 partition key values. Then using pySpark ExecutorService running individual queries for each of the mini-batches concurrently and afterwards creating a union across them

The regression
Execution time in Spark 2.4 with about 40 Cores was about 2 minutes. With the Spark 3 connector it was close to 9 minutes

The fix
Perf investigation showed a couple of optimizations where I/O calls could be reduced/removed. They play a much bigger role than usual in this case because ultimately the scenario above would execute about 18000 Cosmos queries (2700000/15000=180) Spark queries, each creating as many Spark tasks as physical partitions - and for each Spark Tak as separate Cosmos Query would be executed.
The following possible optimizations were found

  • Avoid query plan retrieval (I/O call to Gateway) in Spark (it can safely be suppressed for all actual queries and for schema inference as long as no custom query for schema inference is used) this has been merged in previous PR already
  • Avoid FeedRanges retrieval (I/O call to Backend) for Spark partitioning (would reduce "only" 180 I/O calls - each couple ms) this change is included in this PR
  • Allow reducing the number of Spark partitions being created for queries that have a "FeedRangeFilter" in config. So, a query for a certain pk-value would only create one Spark partition (and as such Spark Task) instead of as many Spark partitions as physical partitions in Cosmos or more this change is included in this PR
  • Allow customers to determine the FeedRange for a certain Partition key value - this would allow identifying co-located documents, which can be beneficial in certain scenarios - for example when trying to provide a feed range filter in a query this change is included in this PR

Design summary
The main decision was how to expose this new capability to customers in the public surface area. High-level there were two options - either allowing customers to provide a "filter" in queries based on pk-values - then at query execution we would have calculated feed ranges and filtered partitions on them. The problem is that by hiding the feed range concept it would be impossible to avoid that most batches with 15000 pk-values will actually span the vast majority of about 100 partitions some way or the other. So, the better alternative would be to allow customers to retrieve the feed range to use it for identifying co-located documents (by getting feed range and partitioning/ordering the DataFrame by it for example). Feed ranges still allow to hide the actual physical partitioning - which is important to not risk split-proofing etc.
It also provides benefits in other scenarios - for example for bulk ingestion into containers with large number of partitions it can be useful to partition the input DataFrame in a way that each Spark partition only has data for a subset of the cosmos DB physical partitions (because that will lower the CPU pressure for each spark task).
The functionality to retrieve the feed range of a PK-value in this PR is done via a UDF. This has the benefit that it works efficiently both from Scala and Python. To determine the FeedRange either a reference to the container or the PK-Definition are needed. For performance reasons (and for easier to use Api) the UDF expects the PK-Definition as input parameter - this way no I/O calls will ever be necessary within the UDF. This means we also had to expose a mechanism to retrieve the PK-Definition via Spark somehow. This has been done by exposing the PK-Definition in the TBLPROPERTIES when using the DESCRIBE TABLE EXTENDED SQL command. So, the first step would be to retrieve the PK-Definition - then subsequently the FeedRange can be determined very efficiently and without any I/O calls.

** Sample **
registering the UDF so that it can be used in Spark-SQL
spark.udf.register("GetFeedRangeForPartitionKey", new GetFeedRangeForPartitionKeyValue(), StringType)

Calculating FeedRange

val pkDefinition = "{\"paths\":[\"/id\"],\"kind\":\"Hash\"}"
val dummyDf = spark.sql(s"SELECT GetFeedRangeForPartitionKey('$pkDefinition', '$lastId')")

val feedRange = dummyDf
  .collect()(0)
  .getAs[String](0)

Running a query filtering to single logical partition

cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
  "spark.cosmos.accountKey" -> cosmosMasterKey,
  "spark.cosmos.database" -> cosmosDatabase,
  "spark.cosmos.container" -> cosmosContainer,
  "spark.cosmos.read.partitioning.strategy" -> "Restrictive",
  "spark.cosmos.partitioning.feedRangeFilter" -> feedRange
)

df = spark.read.schema(customSchema).format("cosmos.oltp").options(cfg).load()
rowsArray = df.where(s"id = '$lastId'").collect()
rowsArray should have size 1
rowsArray(0).getAs[String]("id") shouldEqual lastId
df.rdd.getNumPartitions shouldEqual 1

All SDK Contribution checklist:

  • The pull request does not introduce [breaking changes]

General Guidelines and Best Practices

  • Title of the pull request is clear and informative.

Testing Guidelines

  • Pull request includes test coverage for the included changes.

Allowing the following new options when creating new containers:
- analyticalStorageTttlInSeconds - this is required to make the Catalog integration useful in Synapse
- partitionKeyVersion - we don't allow creating containers with Hash V2 (whichis needed to support long partition keys and requirement to later support hierarchical Pk)

Also allowing to retrieve more container properties mapped into TBLPROPERTIES when using DESCRIBE TABLE EXTENDED command
Copy link
Member

@xinlian12 xinlian12 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks

@FabianMeiswinkel FabianMeiswinkel merged commit 84779c9 into Azure:main Dec 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants