Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Sep 26, 2015

JIRA: https://issues.apache.org/jira/browse/SPARK-10841

Currently we can't push down filters involving UDFs to Parquet. In practice, we have some usage of UDFs in filters, e.g.,

 SELECT * FROM table WHERE udf(customer_id) = "ABC"

In above query, customer_id is a column storing customer id in some way. udf is a function used to parse this column to string value. Without pushing down the filter to Parquet, we will fetch all data from many Parquet files and then perform filtering in Spark.

Using Parquet's UserDefinedPredicate, we can push down these filters to Parquet.

@SparkQA
Copy link

SparkQA commented Sep 26, 2015

Test build #43048 has finished for PR 8922 at commit a673e78.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ParquetEqUDP[T <: Comparable[T], U <: Comparable[U]](udf: AnyRef, v: U)
    • case class UDFEqualTo(attribute: String, udf: ScalaUDF, value: Any) extends Filter

@SparkQA
Copy link

SparkQA commented Sep 26, 2015

Test build #43049 has finished for PR 8922 at commit 3a235b2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ParquetEqUDP[T <: Comparable[T], U <: Comparable[U]](udf: AnyRef, v: U)
    • case class UDFEqualTo(attribute: String, udf: ScalaUDF, value: Any) extends Filter

@viirya
Copy link
Member Author

viirya commented Sep 26, 2015

retest this please.

@SparkQA
Copy link

SparkQA commented Sep 26, 2015

Test build #43050 has finished for PR 8922 at commit 3a235b2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ParquetEqUDP[T <: Comparable[T], U <: Comparable[U]](udf: AnyRef, v: U)
    • case class UDFEqualTo(attribute: String, udf: ScalaUDF, value: Any) extends Filter

@SparkQA
Copy link

SparkQA commented Sep 28, 2015

Test build #43061 has finished for PR 8922 at commit f7a7b2d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ParquetUDP
    • case class ParquetEqUDP[T <: Comparable[T], U <: Comparable[U]](
    • case class UDFEqualTo(attribute: String, udf: ScalaUDF, value: Any) extends Filter

@SparkQA
Copy link

SparkQA commented Oct 1, 2015

Test build #43146 has finished for PR 8922 at commit dc1b544.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ParquetUDP
    • case class ParquetPushDownUDP[T <: Comparable[T], U <: Comparable[U]](
    • case class UDFEqualTo(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFGreaterThan(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFGreaterThanOrEqual(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFLessThan(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFLessThanOrEqual(attribute: String, udf: ScalaUDF, value: Any) extends Filter

@SparkQA
Copy link

SparkQA commented Oct 1, 2015

Test build #43147 has finished for PR 8922 at commit 388de88.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ParquetUDP
    • case class ParquetPushDownUDP[T <: Comparable[T], U <: Comparable[U]](
    • case class UDFEqualTo(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFGreaterThan(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFGreaterThanOrEqual(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFLessThan(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFLessThanOrEqual(attribute: String, udf: ScalaUDF, value: Any) extends Filter

@viirya
Copy link
Member Author

viirya commented Oct 1, 2015

ping @liancheng @marmbrus

@marmbrus
Copy link
Contributor

marmbrus commented Oct 1, 2015

I'm a little skeptical that this is worth the complexity. Do you have real works loads that this speeds up significantly?

@viirya
Copy link
Member Author

viirya commented Oct 1, 2015

I will post performance comparison later.

@SparkQA
Copy link

SparkQA commented Oct 2, 2015

Test build #43175 has finished for PR 8922 at commit 39d672d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ParquetUDP
    • case class ParquetPushDownUDP[T <: Comparable[T], U <: Comparable[U]](
    • case class UDFEqualTo(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFGreaterThan(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFGreaterThanOrEqual(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFLessThan(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFLessThanOrEqual(attribute: String, udf: ScalaUDF, value: Any) extends Filter

@viirya
Copy link
Member Author

viirya commented Oct 2, 2015

I've used daily sql query to test the performance difference. Roughly, based on on Spark 1.4.1 backported with this patch, it shows about relative 20% improvement. The actual improvement may vary depending on the number of columns filtered by and how many records actually pass the filter.

More importantly, I think this patch should be able to filter unnecessary data in advance, and thus reduce the memory usage.

@SparkQA
Copy link

SparkQA commented Oct 3, 2015

Test build #43214 has finished for PR 8922 at commit 51fb082.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ParquetUDP
    • case class ParquetPushDownUDP[T <: Comparable[T], U <: Comparable[U]](
    • case class UDFEqualTo(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFGreaterThan(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFGreaterThanOrEqual(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFLessThan(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFLessThanOrEqual(attribute: String, udf: ScalaUDF, value: Any) extends Filter

@marmbrus
Copy link
Contributor

marmbrus commented Oct 5, 2015

"daily sql query" is not sufficiently descriptive. Please post actual benchmark results with code when making pull requests that claim to improve performance. It would also be good to evaluate the cost in degenerate cases. For example, I think you are adding an object allocation per input tuple when boxing for any queries that filter by UDF in parquet. Are you slowing down cases where the filter is not selective?

If we want to improve the set of things that we push down, I don't think specializing for just UDFs in comparison operations is worth it given how much you are widening the API. Could we just have a single Function filter:

case class FilterFunction(attribute: String, function: Any => Boolean)

or maybe some specialized variants:

case class IntegerFilter(attribute: String, function: Int => Boolean)
...

@viirya
Copy link
Member Author

viirya commented Oct 6, 2015

Hmm, because the sql query and data schema is sensitive for company business, I may not be able to post publicly here. The data size is hundreds GB to 1TB, and the sql query is roughly selecting dozen of columns from the table with few filters involving UDFs and lateral view and group by.

I just realized that we don't need to do CatalystTypeConverters for UDF input here. By removing it I think it should reduce some boxing time? For the degenerate cases, if you meant do I test it on the data which mostly doesn't satisfy the filter, it is no. However, for such cases, it will add some overhead computation cost definitely in the filtering, no matter with or without this patch.

You suggestion is correct. But for our cases, because these existing UDFs are not always with signatures such as Any => Boolean or Int => Boolean, our filtering condition would be like where udf(column1) = 'ABCDE...'. That is why I need to widen the API and use more general signature here. I think it should be able to deal with the pushdown usage of UDFs. With the single Function filter or the specialized variants you suggested, these UDFs will be needed to modify to be used for it.

The above is the reason why I designed the API as it in this patch. If you still think this API is too general to use, I can update it as you suggest.

@SparkQA
Copy link

SparkQA commented Oct 6, 2015

Test build #43263 has finished for PR 8922 at commit 89fe512.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ParquetUDP
    • case class ParquetPushDownUDP[T <: Comparable[T], U <: Comparable[U]](
    • case class UDFEqualTo(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFGreaterThan(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFGreaterThanOrEqual(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFLessThan(attribute: String, udf: ScalaUDF, value: Any) extends Filter
    • case class UDFLessThanOrEqual(attribute: String, udf: ScalaUDF, value: Any) extends Filter

@marmbrus
Copy link
Contributor

marmbrus commented Oct 6, 2015

I'm not suggesting we specialize for UDFs that are Int => Boolean. I'm suggesting that we generate a function for any predicate that only takes a single attribute as input and then pass that generated function as a filter to the data sources. That way this will work for more than just simple comparisons with UDFs. We can do this for any predicates that are not matched by the other filter patterns.

@marmbrus
Copy link
Contributor

marmbrus commented Oct 6, 2015

Also, I understand you can't share your internal data/code, but you can create a similar benchmark on synthetic data.

@viirya
Copy link
Member Author

viirya commented Oct 7, 2015

@marmbrus Thanks for clarifying that.

I have scanned quickly the predicates in expressions. Actually, seems that I can't find any predicate that only takes a single attribute as input and is not matched to be pushed down to Parquet.

Looks like UDFs are the only target we need to address?

@marmbrus
Copy link
Contributor

marmbrus commented Oct 7, 2015

The point I'm trying to make is that we should generalize this, so that we don't have to special case every possible udf(attr) <some comparison> literal but can instead push down any case where we are running some predicate that involves only a single attribute reference.

Your implementation, for example, doesn't handle: a + 1 = 1 or udf(a) = udf(a + 1) and adding each of these individually is not going to scale. Since we are already resorting to pushing down a function, why not leverage the existing evaluation framework.

Here is a very rough sketch:

case class FilterFunction(func: Any => Boolean) extends Filter

protected[sql] def selectFilters(filters: Seq[Expression]) = {
  filters.match {
    ...
      case e: Expression if e.references.size == 1 =>
        val boundExpression = BindReferences.bindReference(e, e.references.toSeq)

        Some(FilterFunction(
        (a: Any) => {
          val inputRow = new GenericInternalRow(Array(a))
          boundExpression.eval(inputRow).asInstanceOf[Boolean]
        }))
  }
}

There are a bunch of things that need to be done though before we could commit this though:

  • data type conversions
  • evaluate the cost of boxing and decide if we should specialize
  • consider codegen for this function

@viirya
Copy link
Member Author

viirya commented Oct 7, 2015

This implementation only considers the use case to evaluate a single attribute with an UDF and compare the result with a literal value. We only consider this because in the current implementation of selectFilters in DataSourceStrategy, only the predicates involving an attribute and a literal value (i.e., col = 1, col2 > 2, etc.) are selected to be the candidates for pushing down. Besides, the form like udf(column) = 'ABCDE...' is mostly common and widely used in our SQL queries involving UDFs in filtering condition.

Your proposal looks good and very general. However, I am little worrying the performance regressions brought by creating a row for each input value and evaluating on the row. It should be slower than original built-in Parquet filters we are using currently.

This patch helps us reduce the memory footprint required for loading lot of data from Parquet files. For performance, the improvement is not significant but competes with the case of not pushing down at least.

I agree that this patch introduces additional complexity to the API. If you still think it is not worth, I will close this PR first.

Thanks for reviewing and suggestion.

@viirya viirya closed this Oct 7, 2015
@viirya viirya deleted the parquet-udf-pushdown branch December 27, 2023 18:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants