Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve BQ point-in-time joining scalability #3429

Merged

Conversation

sudohainguyen
Copy link
Collaborator

@sudohainguyen sudohainguyen commented Jan 3, 2023

Signed-off-by: Hai Nguyen quanghai.ng1512@gmail.com

What this PR does / why we need it: point-in-time joining in BQ offline store can be more scalable

Which issue(s) this PR fixes:

Fixes #3426
Fixes #3003

@sudohainguyen sudohainguyen changed the title fix: improve BQ point-in-time joining scalability fix: Improve BQ point-in-time joining scalability Jan 3, 2023
@sudohainguyen
Copy link
Collaborator Author

/assign @mavysavydav

@@ -917,7 +931,7 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
,created_timestamp
{% endif %}
)
){% if loop.last %}{% else %}, {% endif %}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't taken a close look, but seems like you removed this line even though there's a loop. Was that intended?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to add comma when using temp table. Treat it as a single script separately using semicolon instead

@@ -488,10 +488,24 @@ def to_bigquery(
return str(job_config.destination)

with self._query_generator() as query:
self._execute_query(query, job_config, timeout)
dest = job_config.destination
# because setting destination for scripts is not valid
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this just not working before? Seems like it was using the job_config.destination before?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we run multiple scripts separated by semicolon, BigQuery just doesn't accept passing destination explicitly, will throw error
google.api_core.exceptions.BadRequest: 400 configuration.query.destinationTable cannot be set for scripts

that's why I add some workaround here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simply retrieve BQ-generated temporary table after scripts being executed (should look like this <project_id>._08dc845331ibb01824ea0f3c3e079e93b3a076.anon933abd7cb6950c3c488af61fe94ad1713b6820cd1115fddd0c09cde32ec12823 )

then we persist it as our naming convention with prefix historical_

temp_dest_table = f"{tmp_dest['projectId']}.{tmp_dest['datasetId']}.{tmp_dest['tableId']}"

# persist temp table
sql = f"CREATE TABLE {dest} AS SELECT * FROM {temp_dest_table}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly, confused why there is a new need to do this when it seemed like maybe it worked before?

@@ -777,7 +791,7 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
Compute a deterministic hash for the `left_table_query_string` that will be used throughout
all the logic as the field to GROUP BY the data
*/
WITH entity_dataframe AS (
CREATE TEMP TABLE entity_dataframe AS (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the main change here to create temp tables for everything? (and everything else got indented)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, create temp table for entity dataframe and every joining feature views.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the temp tables only exist within the current execution session

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you able to confirm what the scope of the session is? Does each invocation of self._execute_query create a new session? At a cursory glance, I see that we're not explicitly passing create_session=True in the QueryJobConfig instance. I think it would be very helpful to have a confirmation that there isn't going to be any cross contamination between concurrent historical fetches.

Copy link
Collaborator Author

@sudohainguyen sudohainguyen Jan 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does each invocation of self._execute_query create a new session? yes it does

I see that we're not explicitly passing create_session=True in the QueryJobConfig instance. we only need to declare this config when we need to reuse the session in a different client code scope. In this case, we gather all of feature views in a collection of scripts to execute once. I can confirm after the scripts are executed, temp tables do not exist anywhere else.

@@ -488,10 +488,24 @@ def to_bigquery(
return str(job_config.destination)

with self._query_generator() as query:
self._execute_query(query, job_config, timeout)
dest = job_config.destination
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, the PR seems reasonable (once any doubts about temp table cross-contamination are put to rest). But in terms of code readability, I can't help but notice that if this method is called without job_config, we first create it containing only the destination at the start of the method, then we remove the destination here. Perhaps this could use a bit of a cleanup where the job_config management is all in one spot?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strongly agree! but in this PR I'm trying to not break any other flows. Perhaps some users would prefer passing some of their own job configurations to to_bigquery method, that's why I only deal with the destination attribute.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we shouldn't allow users to pass the destination, there would be some changes needed but let's deal with them in a separate PR

@adchia adchia assigned chhabrakadabra and unassigned mavysavydav Jan 20, 2023
@sudohainguyen
Copy link
Collaborator Author

@sudohainguyen sudohainguyen force-pushed the fix/bq_point_in_time_scalable branch 2 times, most recently from 5ea5698 to 676395b Compare January 30, 2023 08:34
@adchia adchia force-pushed the fix/bq_point_in_time_scalable branch from 676395b to 48b0865 Compare January 30, 2023 22:51
Copy link
Collaborator

@chhabrakadabra chhabrakadabra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I apologize for not getting back to this PR sooner. I think I need to fix some github notification settings so it doesn't happen again.

@feast-ci-bot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: chhabrakadabra, sudohainguyen

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Signed-off-by: Danny Chiao <danny@tecton.ai>
@adchia adchia force-pushed the fix/bq_point_in_time_scalable branch from c813305 to 3c233d9 Compare February 4, 2023 01:27
@adchia
Copy link
Collaborator

adchia commented Feb 4, 2023

/lgtm

@feast-ci-bot feast-ci-bot merged commit ff66784 into feast-dev:master Feb 4, 2023
kevjumba pushed a commit that referenced this pull request Mar 17, 2023
# [0.30.0](v0.29.0...v0.30.0) (2023-03-17)

### Bug Fixes

* Add description attribute to the Field.from_proto method ([#3469](#3469)) ([473f8d9](473f8d9))
* Add filesystem kwargs when read prev_table on FileRetrievalJob (… ([#3491](#3491)) ([dca4745](dca4745)), closes [#3490](#3490)
* Feature view `entities` from_proto type ([#3524](#3524)) ([57bbb61](57bbb61))
* Fix missing requests requirement after GCP requirement removed. Make BigQuerySource not require gcp extra ([2c85421](2c85421))
* Fix SQL Registry cache miss ([#3482](#3482)) ([3249b97](3249b97))
* Fixed path inside quickstart notebook ([#3456](#3456)) ([66edc32](66edc32))
* Improve BQ point-in-time joining scalability ([#3429](#3429)) ([ff66784](ff66784))
* Pin typeguard to 2.13.3 which is what we are currently using. ([#3542](#3542)) ([61f6fb0](61f6fb0))
* Protobuf lower bound to 3.20 to alert that Feast is incompatible with tensorflow ([#3476](#3476)) ([9ca59e3](9ca59e3))
* Spark kafka processor sorting ([#3479](#3479)) ([f2cbf43](f2cbf43))
* UI working behind base url ([#3514](#3514)) ([9a3fd98](9a3fd98))
* Update go dependencies ([#3512](#3512)) ([bada97c](bada97c))

### Features

* Add Rockset as an OnlineStore ([#3405](#3405)) ([fd91cda](fd91cda))
* Add Snowflake Registry ([#3363](#3363)) ([ec1e61d](ec1e61d))
* Adding query timeout to `to_df` and `to_arrow` retrieval methods ([#3505](#3505)) ([bab6644](bab6644))
* adds k8s config options to Bytewax materialization engine ([#3518](#3518)) ([1883f55](1883f55))
achals pushed a commit that referenced this pull request Mar 24, 2023
# [0.30.0](v0.29.0...v0.30.0) (2023-03-24)

### Bug Fixes

* Add description attribute to the Field.from_proto method ([#3469](#3469)) ([473f8d9](473f8d9))
* Add filesystem kwargs when read prev_table on FileRetrievalJob (… ([#3491](#3491)) ([dca4745](dca4745)), closes [#3490](#3490)
* Bytewax image pull secret config ([#3547](#3547)) ([d2d13b1](d2d13b1))
* Clean up Rockset Online Store for use ([#3549](#3549)) ([a76c6d0](a76c6d0))
* Feature view `entities` from_proto type ([#3524](#3524)) ([57bbb61](57bbb61))
* Fix missing requests requirement after GCP requirement removed. Make BigQuerySource not require gcp extra ([2c85421](2c85421))
* Fix SQL Registry cache miss ([#3482](#3482)) ([3249b97](3249b97))
* Fixed path inside quickstart notebook ([#3456](#3456)) ([66edc32](66edc32))
* Improve BQ point-in-time joining scalability ([#3429](#3429)) ([ff66784](ff66784))
* Pin typeguard to 2.13.3 which is what we are currently using. ([#3542](#3542)) ([61f6fb0](61f6fb0))
* Protobuf lower bound to 3.20 to alert that Feast is incompatible with tensorflow ([#3476](#3476)) ([9ca59e3](9ca59e3))
* Spark kafka processor sorting ([#3479](#3479)) ([f2cbf43](f2cbf43))
* UI working behind base url ([#3514](#3514)) ([9a3fd98](9a3fd98))
* Update go dependencies ([#3512](#3512)) ([bada97c](bada97c))

### Features

* Add Rockset as an OnlineStore ([#3405](#3405)) ([fd91cda](fd91cda))
* Add Snowflake Registry ([#3363](#3363)) ([ec1e61d](ec1e61d))
* Added SnowflakeConnection caching ([#3531](#3531)) ([f9f8df2](f9f8df2))
* Adding query timeout to `to_df` and `to_arrow` retrieval methods ([#3505](#3505)) ([bab6644](bab6644))
* adds k8s config options to Bytewax materialization engine ([#3518](#3518)) ([1883f55](1883f55))
@sudohainguyen sudohainguyen deleted the fix/bq_point_in_time_scalable branch March 28, 2023 05:54
@RadionBik
Copy link

hey @adchia , @sudohainguyen ! It appears I have started having problems with BQ offline queries after I bumped feast version from 0.28.0 to 0.30.2.
I had a test that checks results of a point-in-time view with several requests querying in row the same entity, but with different event_timestamps, e.g. datetime(2022, 5, 26, 0, 0, 0), then datetime(2022, 5, 25, 0, 0, 0). The second query now yields to identical feature values with the first one, although it can't be / wasn't the case before.

I think it has something to do with the temp table mechanism, likely temp tables are not properly re-created / replaced/ removed. I am not sure how to solve the problem, maybe adding a random suffix to a temp table name upon each new request could help to avoid the collisions.

@sudohainguyen
Copy link
Collaborator Author

sudohainguyen commented Apr 12, 2023

hmm, did you check your data? In case your feature view ttl is 1, and there were no records of that entity on 2022-05-26 but 25th, the results are expected to be the same.

FYI: temp table only be reused when the query is exactly the same.

@RadionBik
Copy link

RadionBik commented Apr 13, 2023

@sudohainguyen , the problem has gone when I downgraded feast to 0.29.0

TTL = 0.

so here is the data-snippet I spot the problem on:
image

related_deactivated_at is the timestamp_field.

here is the test code:

@pytest.mark.parametrize(
    'entity_name,entities,model_name,results,event_timestamps',
    [
        (
            'alias',
            ['Pfi9FE1Qbcb022z1D317', 'Pfi9FE1Qbcb022z1D317'],
            ModelName.onboarding_model,
            [
                {
                    fields.bad_full_name_matches.name: 1,
                    fields.bad_mobile_matches.name: 0,
                    fields.bad_address_matches.name: 0,
                    fields.bad_nob_matches.name: 2,
                },
                {
                    fields.bad_full_name_matches.name: 1,
                    fields.bad_mobile_matches.name: 1,
                    fields.bad_address_matches.name: 1,
                    fields.bad_nob_matches.name: 3,
                },
            ],
            [datetime(2022, 5, 25, 0, 0, 0), datetime(2022, 5, 26, 0, 0, 0)],
        ),
    ]
)
def test_model_offline_features(entity_name, entities, model_name, results, event_timestamps, feature_store_client):
    offline_result = feature_store_client.get_historical_features(
        entity_name=entity_name,
        entities=entities,
        model_name=model_name,
        event_timestamps=event_timestamps
    )
    offline_dicts = offline_result.to_dict(orient='records')
    for offline_dict, result in zip(offline_dicts, results):
        offline_dict.pop(entity_name)
        assert offline_dict == result

it passed from the first attempt, but when I ran it the second time -> it failed, with all entries for the datetime(2022, 5, 25, 0, 0, 0) timestamp being equal to the second test-case (i.e. 1,1,1,3). That's why I assumed that temp tables were not properly re-created.

I am not sure where the problem lies, but it occurs only in feast 0.30.2, and I think it is related to this merged PR, because I didn't find others that could have changed historical feature querying.

Do you have any ideas how to fix the problem?

@sudohainguyen
Copy link
Collaborator Author

sudohainguyen commented Apr 13, 2023

weird, I didn't experience the same, my TTL is 1 btw

@sudohainguyen
Copy link
Collaborator Author

I conducted retrieval with 5 entities and 2 different timestamps (2023/03/24 and 2023/03/25) in the same notebook session and they are not identical :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants