Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/bigquery): Querying table metadata details in batch properly #7429

Conversation

treff7es
Copy link
Contributor

Even though we queried table details earlier in batch as well but there were multiple problems with that:
Unfortunately, in the INFORMATION_SCHEAM.PARTITIONS table (which we query to get table stats for table profile info + to get the latest partition to profile) contains the non-partitioned tables as well so even if we don't have any partitioned tables the if filter on tables in that table it will raise the too many table was queried exception.

In this fix I changed to basically use a higher batch size (200) if profiling is not enabled and a lower batch size (80) if it is enabled.

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Feb 24, 2023
Copy link
Collaborator

@asikowitz asikowitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the cleaner logic but am not yet understanding how this reduces the number of tables hit in the query

@@ -80,10 +80,10 @@ class BigQueryV2Config(
number_of_datasets_process_in_batch: int = Field(
hidden_from_schema=True,
default=500,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the description you said 200 without profiling, did you mean 500?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh, yes, sorry

stored_table_identifier.raw_table_name()
)
# When table is none, we use dataset_name as table_name
assert stored_shard
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have exception handling for this assert? What does this statement do to the execution flow of ingestion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not happen and it is for mypy

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I guess if we wanted to be more type safe we could make sharded_tables store mapping table_name -> (table, shard) but doesn't seem necessary

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should wrap with a try catch and log an error just in case we later make a code change that adds a non-sharded table to this dict

) -> Dict[str, TableListItem]:
table_items: Dict[str, TableListItem] = {}
# Dict to store sharded table and the last seen max shard id
sharded_tables: Dict[str, TableListItem] = defaultdict()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this being a defaultdict give us? The only default usage I see is sharded_tables[table_name].table_id but idk how this would work because the default_factory is None right? Just seems like it complicates the logic because we have to check:
if not sharded_tables.get(table_name): rather than if table_name not in sharded_tables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, I can change it

table_items = self.get_core_table_details(conn, dataset_name, project_id)

items_to_get: Dict[str, TableListItem] = {}
for table_item in table_items.keys():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ultimately we're querying INFORMATION_SCHEMA.TABLES (perhaps joined with INFORMATION_SCHEMA.PARTITIONS) with some filter and t.table_name in .... Is there any way to order the tables so we're fetching data most efficiently, or not really?

I guess I'm a bit confused how accessing the PARTITIONS table causes us to query more tables, unless each partition counts as its own table (or maybe a % of a table? lol)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Partitions table has this limitation the other information schemas don't and that's why only this table is affected. We don't query more tables here than in any other information schema table.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this documented on https://cloud.google.com/bigquery/docs/information-schema-partitions? Or just something you have to find out lol

Comment on lines -1240 to -1242
if (
len(table_items) % self.config.number_of_datasets_process_in_batch
== 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were already batching this even if there was no partitioning right? What is this PR changing that will fix the issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier, we added the list of sharded tables to the last batch and because of this the last batch could be way more than the max batch size. This is now fixed

Copy link
Collaborator

@asikowitz asikowitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

# For example some_dataset.20220110 will be turned to some_dataset.some_dataset
# It seems like there are some bigquery user who uses this non-standard way of sharding the tables.
if shard:
if table_name in sharded_tables:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think you need to change this to not in !!

@treff7es treff7es merged commit 14a6604 into datahub-project:master Feb 27, 2023
oleg-ruban pushed a commit to RChygir/datahub that referenced this pull request Feb 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants