Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20194] Add support for partition pruning to in-memory catalog #17510

Closed
wants to merge 3 commits into from

Conversation

adrian-ionescu
Copy link
Contributor

What changes were proposed in this pull request?

This patch implements listPartitionsByFilter() for InMemoryCatalog and thus resolves an outstanding TODO causing the PruneFileSourcePartitions optimizer rule not to apply when "spark.sql.catalogImplementation" is set to "in-memory" (which is the default).

The change is straightforward: it extracts the code for further filtering of the list of partitions returned by the metastore's getPartitionsByFilter() out from HiveExternalCatalog into ExternalCatalogUtils and calls this new function from InMemoryCatalog on the whole list of partitions.

Now that this method is implemented we can always pass the CatalogTable to the DataSource in FindDataSourceTable, so that the latter is resolved to a relation with a CatalogFileIndex, which is what the PruneFileSourcePartitions rule matches for.

How was this patch tested?

Ran existing tests and added new test for listPartitionsByFilter in ExternalCatalogSuite, which is subclassed by both InMemoryCatalogSuite and HiveExternalCatalogSuite.

@gatorsmile
Copy link
Member

ok to test

_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}
if (nonPartitionPruningPredicates.nonEmpty) {
sys.error("Expected only partition pruning predicates: " + nonPartitionPruningPredicates)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Throwing an AnalysisException is preferred.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add the negative test cases in your newly added test cases?

@@ -436,6 +438,37 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown"))).isEmpty)
}

test("list partitions by filter") {
val tz = TimeZone.getDefault().getID()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: val tz = TimeZone.getDefault.getID

@gatorsmile
Copy link
Member

Not related to this PR. It sounds like we have a bug in HiveTableScans. The predicate orders matter. We should not prune the partitions if there exists non-deterministic predicates that place before the partition-pruning predicates. cc @cloud-fan @hvanhovell

@SparkQA
Copy link

SparkQA commented Apr 2, 2017

Test build #75461 has finished for PR 17510 at commit 3b031c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 2, 2017

Test build #75464 has finished for PR 17510 at commit 3ad0327.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

gatorsmile commented Apr 3, 2017

LGTM cc @cloud-fan @hvanhovell @ericl

if (predicates.nonEmpty) {
val clientPrunedPartitions = client.getPartitionsByFilter(rawTable, predicates).map { part =>
val clientPrunedPartitions =
client.getPartitionsByFilter(rawTable, predicates).map { part =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if predicates.isEmpty, the previous code will run client.getPartitions. Can you double check there is no performance regression?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A similar optimization is done in the function itself: client.getPartitionsByFilter(), while Hive Shim_v0_12 delegates to getAllPartitions anyway.

I've now made the only piece of non-trivial code along that path lazy, so I think we're good.

val catalog = newBasicCatalog()

def checkAnswer(table: CatalogTable, filters: Seq[Expression],
expected: Set[CatalogTablePartition]): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: code style

def checkAnswer(
    param1: XX, param2: XX, ...

@cloud-fan
Copy link
Contributor

LGTM except some minor comments

@SparkQA
Copy link

SparkQA commented Apr 3, 2017

Test build #75479 has finished for PR 17510 at commit d6bdcf4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merging to master.

@asfgit asfgit closed this in 703c42c Apr 3, 2017
@adrian-ionescu
Copy link
Contributor Author

Thanks for moving so fast!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants