Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/bigquery): Fix for table cache was not cleared #7323

Merged
merged 5 commits into from
Feb 13, 2023

Conversation

treff7es
Copy link
Contributor

  • Fix for not clearing BigQuery table cache, which caused to carry over tables from one project to the other
  • Fixing range partition profiling
  • Fixing peak memory report

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

…r tables from one project to the other

- Fixing range partition profiling
- Fixing peak memory report
@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Feb 12, 2023
Copy link
Collaborator

@jjoyce0510 jjoyce0510 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks okay to me, but I am definitely nervous about the time partitioning changes given that we've had to change it a few times now

@@ -123,15 +123,15 @@ class CliReport(Report):
py_version: str = sys.version
py_exec_path: str = sys.executable
os_details: str = platform.platform()
_peek_memory_usage: int = 0
_peak_memory_usage: int = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice :)

@@ -572,7 +568,10 @@ def _process_project(
self.report.report_dropped(f"{bigquery_dataset.name}.*")
continue
try:
yield from self._process_schema(conn, project_id, bigquery_dataset)
# db_tables and db_views are populated in the this method
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this comment

@@ -768,12 +789,12 @@ def _process_table(
)

# If table has time partitioning, set the data type of the partitioning field
if table.time_partitioning:
table.time_partitioning.column = next(
if table.partition_info:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious -- Why was the previous code wrong??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was only supported time partitioned columns and not time and range partitioned columns

# When table is none, we use dataset_name as table_name
table_name = table_identifier.get_table_name().split(".")[-1]
assert stored_shard
if stored_shard < shard:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition makes very little sense to me as an unfamiliar reader. Consider adding a comment to explain

if stored_shard < shard:
sharded_tables[table_name] = table
continue
else:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard to follow all of these nested if else conditions. Would maybe be easier to read if we broke into small well-named functions

continue
else:
table_count = table_count + 1
table_items[table.table_id] = table
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no clue from reading the name of this data structure what is inside of it. 'Table items' is a super generic name. Wondering if we can make it clearer

table_items,
with_data_read_permission=self.config.profiling.enabled,
)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so having read through this section of the code I can say I have little idea what the implications of making these changes will be

tables[table.table_name].time_partitioning
)
if tables and tables[table.table_name].time_partitioning
partition_info=PartitionInfo.from_table_info(tables[table.table_name])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we previously assumed all Partitioned Tables were time partitioned? And now we know there's another default case of range partitioning that we are handling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the bigquery api if you call the list_tables API call, then it returns TableInfo objects which have a TimePartitioning property that is filled in if it is a time partitioned column.
In the case of a ranged partitioned table, this is not filled in, and the only way to detect it is to check if there is a rangePartitioning key in it's _properties.
It is weird why this is not exposed as a main property, but it is as it is.
Earlier, I used only the TimePartitioning field, and this way we skipped the range partitioned tables.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it - makes sense

from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler


def test_not_generate_partition_profiler_query_if_not_partitioned_sharded_table():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test!

conn=client_mock, project_id="test-project", dataset_name="test-dataset"
)

assert data_dictionary_mock.call_count == 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding these!

Copy link
Collaborator

@jjoyce0510 jjoyce0510 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing comments!

@treff7es treff7es merged commit b34e4fe into datahub-project:master Feb 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants