Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not able to apply push down filtering to ingestion time partitioned table #1005

Closed
pricemg opened this issue Jun 22, 2023 · 7 comments
Closed
Assignees

Comments

@pricemg
Copy link

pricemg commented Jun 22, 2023

In trying to understand how pushdown filters work with the BigQuery connector I realise I have no understanding on how the filters can be applied to a table to yield more performant results. The only documentation can be found here but I'm not having much luck with these examples.

The example in particular I'm considering is having a table which is partitioned by ingestion time (by month, and so has pseudo column _PARTITIONTIME [side note- I don't understand why only day partitions get the _PARTITIONDATE pseudocolumn instead, but I think that's a general BigQuery thing]), and also has some clustering applied also.

Some example code to create a table would be

    from pyspark.sql import SparkSession

    spark = SparkSession.builder.getOrCreate()
    spark.conf.set("materializationDataset", 'test_space')
    spark.conf.set('temporaryGcsBucket', staging_bucket)
    spark.conf.set("intermediateFormat", "orc")

    # Create table:
    # +-------+-------+-------+
    # |grp_col|str_col|int_col|
    # +-------+-------+-------+
    # |     zz|      a|      1|
    # |     zz|      b|      2|
    # |     zz|      c|      3|
    # |     zz|      d|      4|
    # |     zz|      a|      5|
    # |     zz|      b|      6|
    # +-------+-------+-------+

    def output_df(df):
        """Save table with ingestion time (month) partition and cluster."""
        (
            df
            .write
            .format("bigquery")
            .option("partitionType", 'MONTH')
            .option("clusteredFields", 'grp_col,str_col')
            .mode('append')
            .save('test_space.cluster_partition_test')
        )

    output_df(spark.createDataFrame([
            ('zz', 'a', 1),
            ('zz', 'b', 2),
        ],
        schema=('grp_col', 'str_col', 'int_col')
    ))

    output_df(spark.createDataFrame([
            ('zz', 'c', 3),
            ('zz', 'd', 4),
        ],
        schema=('grp_col', 'str_col', 'int_col')
    ))

    output_df(spark.createDataFrame([
            ('zz', 'a', 5),
            ('zz', 'b', 6),
        ],
        schema=('grp_col', 'str_col', 'int_col')
    ))

I have then been running variations of

    df = (
        spark
        .read
        ...
    )
    df.show()
    df.explain(mode='extended')

to explore whether I'm understanding the push down filters correctly

e.g.

== Physical Plan ==
*(1) Scan com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation@4a0f934d [grp_col#51,str_col#52,int_col#53L] PushedFilters: [], ReadSchema: struct<grp_col:string,str_col:string,int_col:bigint>

vs

== Physical Plan ==
*(1) Scan com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation@6a834097 [grp_col#89,str_col#90,int_col#91L] PushedFilters: [*IsNotNull(str_col), *EqualTo(str_col,a)], ReadSchema: struct<grp_col:string,str_col:string,int_col:bigint>

which highlight an attempt at reading and whether the filters have pushed down.

For example I can see I can write these queries

    df = (
        spark
        .read
        .format('bigquery')
        .load('test_space.cluster_partition_test',)
        .select('grp_col', 'str_col', 'int_col')
        .where('str_col = "a"')
    )

or

    df = (
        spark
        .read
        .format('bigquery')
        .load('test_space.cluster_partition_test',)
        .select('grp_col', 'str_col', 'int_col')
        .where('int_col = 1')
    )

which appears to push down the filter as desired when looking at the execution plan.

I am struggling however to perform the filtering on the pseudo partition column. This query on the above generated table:

    df = (
        spark
        .read
        .format('bigquery')
        .option("filter", "_PARTITIONTIME = '2023-06-01'")
        .load('test_space.cluster_partition_test',)
    )

yields the typical long spark error, the crux of which is

23/06/22 14:46:20 INFO DirectBigQueryRelation: |Querying table my-project.test_space.cluster_partition_test, parameters sent from Spark:|requiredColumns=[grp_col,str_col,int_col,_PARTITIONTIME,_PARTITIONDATE],|filters=[]
Traceback (most recent call last):

...

py4j.protocol.Py4JJavaError: An error occurred while calling o144.showString.
: com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.InvalidArgumentException: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: request failed: Row filter for my-project.test_space.cluster_partition_test is invalid. Filter is '(_PARTITIONTIME = '2023-06-01')'

...

Caused by: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: request failed: Row filter for my-project.test_space.cluster_partition_test is invalid. Filter is '(_PARTITIONTIME = '2023-06-01')'

Similarly trying a query like '(_PARTITIONTIME > "2023-05-21")' yields the same errors as above.

The only way I've been able to read a partition is using

    df = (
        spark
        .read
        .load('''
            SELECT *
            FROM test_space.cluster_partition_test
            WHERE _PARTITIONTIME = "2023-06-01"
            ''',
            format="bigquery"
        )
    )

but the execution plan doesn't show any push down filtering occurring

== Physical Plan ==
*(1) Scan com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation@b51086a8 [grp_col#54,str_col#55,int_col#56L] PushedFilters: [], ReadSchema: struct<grp_col:string,str_col:string,int_col:bigint>

However, am I maybe misunderstanding the usage of using the full SQL query in the reading of the table and in this instance the optimisation of reading from the partition is happening, just entirely outside of spark?

@davidrabinowitz
Copy link
Member

Can you please specify the spark and. connector versions, and which connector is it (spark-bigquery-with-dependencies_* or spark-X.Y-bigquery)

@pricemg
Copy link
Author

pricemg commented Jun 22, 2023

sorry, knew I'd missed some information 🤦‍♂️

spark: v3.3.1
connector: gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.31.0.jar

@davidrabinowitz
Copy link
Member

Try the following filter: _PARTITIONDATE = DATE('2023-06-01')

@pricemg
Copy link
Author

pricemg commented Jun 29, 2023

because I'm partitioning by month, the table doesn't have the pseudo column _PARTITIONDATE it has _PARTITIONTIME.

running the read statement with the above gives a long error including

{
  "code": 400,
  "errors": [
    {
      "domain": "global",
      "location": "q",
      "locationType": "parameter",
      "message": "Unrecognized name: _PARTITIONDATE; Did you mean _PARTITIONTIME? at [1:61]",
      "reason": "invalidQuery"
    }
  ],
  "message": "Unrecognized name: _PARTITIONDATE; Did you mean _PARTITIONTIME? at [1:61]",
  "status": "INVALID_ARGUMENT"
}

@pricemg
Copy link
Author

pricemg commented Jul 12, 2023

@davidrabinowitz can you offer any more insights on this? I've now hit another issue as I cannot read from tables where require_partition_filter = True as I cannot specify the filter on the ingestion pseudo column.

@dht7
Copy link

dht7 commented Jul 17, 2023

Hi @pricemg , I was facing a similar issue recently where I had setup a Spark Thrift Server to query Bigquery tables (needed to join table from different sources) and the queries with filter on date/timestamp columns were failing with InvalidArgumentException. Here, I noticed in the source code the compileValue method which parses the filters to be pushed, is handling DATE and TIMESTAMP type columns by wrapping them in required string as follows:

if (value instanceof Date) {
      return "DATE '" + value + "'";
}
if (value instanceof Timestamp) {
      return "TIMESTAMP '" + value + "'";
}

Here, in my case the objects passed were of type java.time.LocalDate and java.time.Instant, while the above code expects java.sql.Date and java.sql.Timestamp type of objects. For me setting this flag did the trick:
set spark.sql.datetime.java8API.enabled=false;

Can you try this once and let me know if it works? If not, can you please share the logs containing details of the requiredColumns and filters?

@vishalkarve15
Copy link
Contributor

Tried reproducing in 0.39.0, this issue is no longer present.

df = (
...     spark
...     .read
...     .format('bigquery')
...     .option("filter", "_PARTITIONTIME = '2024-06-01'")
...     .load('xxx.cluster_partition_test',)
... )
>>> df.show()
+-------+-------+-------+-------------------+
|grp_col|str_col|int_col|     _PARTITIONTIME|
+-------+-------+-------+-------------------+
|     zz|      b|      6|2024-06-01 00:00:00|
|     zz|      a|      5|2024-06-01 00:00:00|
|     zz|      b|      2|2024-06-01 00:00:00|
|     zz|      a|      1|2024-06-01 00:00:00|
|     zz|      c|      3|2024-06-01 00:00:00|
|     zz|      d|      4|2024-06-01 00:00:00|
+-------+-------+-------+-------------------+

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants