Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/bigquery): Increase batch size in metadata extraction if no partitioned table involved #7252

Merged
merged 9 commits into from
Feb 17, 2023

Conversation

treff7es
Copy link
Contributor

@treff7es treff7es commented Feb 3, 2023

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Feb 3, 2023
@@ -78,9 +78,15 @@ class BigQueryV2Config(
)

number_of_datasets_process_in_batch: int = Field(
default=2000,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to 500?

# Conflicts:
#	metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py
- The new query returns max_column_size + 1 column per table maximum
- Only returns the columns to the latest shard and not for the rest
@@ -171,6 +179,12 @@ class BigQueryV2Config(
description="Useful for debugging lineage information. Set to True to see the raw lineage created internally.",
)

run_optimized_column_query: bool = Field(
hidden_from_schema=True,
default=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we default this to False?

Copy link
Collaborator

@jjoyce0510 jjoyce0510 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM

Copy link
Collaborator

@asikowitz asikowitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't have time to finish this review, just a quick comment before you merge

Comment on lines 1194 to 1196
partitioned_table_count_in_this_batch = (
partitioned_table_count_in_this_batch + 1
)
Copy link
Collaborator

@asikowitz asikowitz Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be replaced with a += 1? Also, do we want to increment this even if we skip the table based on prefix?

Copy link
Collaborator

@asikowitz asikowitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be totally misunderstanding the logic flow here, but I have one major concern around how we drop ingestion of tables based on table_identifier prefix. I also think the counting logic may be able to be simplified

description as comment,
c.is_hidden as is_hidden,
c.is_partitioning_column as is_partitioning_column,
-- We count the colums to be able limit it later
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo colums -> columns

-- We count the colums to be able limit it later
row_number() over (partition by c.table_catalog, c.table_schema, c.table_name order by c.ordinal_position asc, c.data_type DESC) as column_num,
-- Getting the maximum shard for each table
row_number() over (partition by c.table_catalog, c.table_schema, ifnull(REGEXP_EXTRACT(c.table_name, r'(.*)_\\d{{8}}$'), c.table_name), cfp.field_path order by c.table_catalog, c.table_schema asc, c.table_name desc) as shard_num
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking my knowledge, the sort by c.table_name desc is what makes it so shard_num = 1 is the most recent shard

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a sharded table has a suffix like table_name_YYYYMMDD and if we order it to descending order then we should get the latest one.

Comment on lines 1190 to 1196
if (
table.time_partitioning
or "range_partitioning" in table._properties
):
partitioned_table_count_in_this_batch += 1

table_items[table.table_id] = table
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be doing this after checking table_identifier? I feel like the below if statement is not actually dropping tables, because they've been added to table_items. If this is the case, I think it makes sense to put this in an elif attached to the if shard block

Comment on lines 1207 to 1216
if (
table_count_in_this_batch
% self.config.number_of_datasets_process_in_batch
== 0
) or (
partitioned_table_count_in_this_batch > 0
and partitioned_table_count_in_this_batch
% self.config.number_of_partitioned_datasets_process_in_batch
== 0
):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we replace the counting with table_count_in_this_batch by just checking len(table_items)? It seems like they'll match each other, unless we're getting duplicate table.table_id keys, and in any case, that seems like the number we want to track when making the query

Also, since we're resetting partitioned_table_count_in_this_batch = 0 each query, I think the second conditional can be simplified to partitioned_table_count_in_this_batch == self.config.number_of_partitioned_datasets_process_in_batch

Copy link
Collaborator

@asikowitz asikowitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One comment but looks good to me!

if (
table_count_in_this_batch
% self.config.number_of_datasets_process_in_batch
len(table_items) % self.config.number_of_datasets_process_in_batch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can also become a == because we clear table_items() on querying

@treff7es treff7es merged commit aa388f0 into datahub-project:master Feb 17, 2023
oleg-ruban pushed a commit to RChygir/datahub that referenced this pull request Feb 28, 2023
yoonhyejin pushed a commit that referenced this pull request Mar 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants