Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add limit pushdown rule #55

Merged
merged 10 commits into from
Sep 1, 2021
Merged

Add limit pushdown rule #55

merged 10 commits into from
Sep 1, 2021

Conversation

FX196
Copy link
Collaborator

@FX196 FX196 commented Aug 24, 2021

This PR adds a custom catalyst rule for doing limit pushdown for Delta Sharing queries. When the rule is applied, the limit in SQL queries will be pushed down and sent to the server as limitHints. e.g. SELECT * FROM table LIMIT 100

Added tests.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome. Left some minor comments.

val tablePath = testProfileFile.getCanonicalPath + "#share2.default.table2"
withTable("delta_sharing_test") {
sql(s"CREATE TABLE delta_sharing_test USING deltaSharing LOCATION '$tablePath'")
sql(s"SELECT * FROM delta_sharing_test LIMIT 2").show()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also test that selecting columns also works, such as select c1, c2 from delta_sharing_test LIMIT 2?

}

def clear(): Unit = {
TestDeltaSharingClient.limits = TestDeltaSharingClient.limits.take(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: TestDeltaSharingClient.limits = Nil

override def getTableVersion(table: Table): Long = 0

override def getFiles(
table: Table,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 spaces

| "endpoint": "https://localhost:12345/delta-sharing",
| "bearerToken": "mock"
|}""".stripMargin, UTF_8)
SparkSession.active.sessionState.conf.setConfString(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withSQLConf("spark.delta.sharing.client.class" -> "io.delta.sharing.spark.TestDeltaSharingClient") { ... } so that it resets the config after the test and it won't impact any tests after this one.

import io.delta.sharing.spark.model.{DeltaTableFiles, DeltaTableMetadata, Metadata, Protocol, Table}

class TestDeltaSharingClient(
profileProvider: DeltaSharingProfileProvider = null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 spaces. We don't need the default values since we always create it through reflection

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the default value since we still make a client without the profile provider in the unit tests.

@@ -123,8 +125,21 @@ private[sharing] object RemoteDeltaLog {
timeoutInSeconds.toInt
}

val client =
new DeltaSharingRestClient(profileProvider, timeoutInSeconds, numRetries, sslTrustAll)
SparkSession.active.experimental.extraOptimizations ++= Seq(DeltaSharingLimitPushDown)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to verify if we have already added it. It's better to add a new config to turn off this: spark.delta.sharing.limitPushdown.enabled. Such as

object DeltaSharingLimitPushDown {
  def setup(spark: SparkSession): Unit = synchronized {
     if (!spark.experimental.extraOptimizations.contains(DeltaSharingLimitPushDown) ) {
         spark.experimental.extraOptimizations += DeltaSharingLimitPushDown
     }
  }
}

And it's better to call this in createRelation so that you can get the spark reference.

@@ -295,6 +311,31 @@ private[sharing] class RemoteDeltaFileIndex(
}
}

object DeltaSharingLimitPushDown extends Rule[LogicalPlan] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's better to put this in a separate package such as io.delta.sharing.spark.perf so that it's easier to find performance optimizations if we add more rules in future.

@FX196
Copy link
Collaborator Author

FX196 commented Aug 26, 2021

@zsxwing I kept the default parameter for TestDeltaSharingClient since it's used here, hope that's okay.

@FX196 FX196 requested a review from zsxwing August 26, 2021 20:56
@@ -22,7 +22,7 @@ import scala.util.Random

import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unused change

val fileIndex = new RemoteDeltaFileIndex(spark, this, path, snapshotToUse)
val fileIndex = new RemoteDeltaFileIndex(spark, this, path, snapshotToUse, None)
if (spark.sessionState.conf.getConfString(
"spark.delta.sharing.limitPushdown.enabled", "false").toBoolean) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be enabled by default

}

def apply(p: LogicalPlan): LogicalPlan = {
p transform {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also check plan.conf.getConfString("spark.delta.sharing.limitPushdown.enabled", "true"). If it's not enabled, we just return p directly.

@FX196 FX196 requested a review from zsxwing August 31, 2021 04:59
Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor comment. Otherwise, LGTM

@@ -14,4 +14,4 @@
# limitations under the License.
#

__version__ = "0.3.0.dev0"
__version__ = ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry don't know how that got in there. Reverted.

@zsxwing zsxwing merged commit b11ff68 into delta-io:main Sep 1, 2021
@FX196 FX196 deleted the limit-pushdown branch September 1, 2021 17:01
@zsxwing zsxwing added this to the 0.3.0 milestone Sep 15, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants