Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/bigquery): Handle null values from usage aggregation #7827

Merged
merged 3 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,9 @@ def usage_statistics(self, top_n: int) -> Iterator[UsageStatistic]:
timestamp=row["timestamp"],
resource=row["resource"],
query_count=row["query_count"],
query_freq=json.loads(row["query_freq"]),
user_freq=json.loads(row["user_freq"]),
column_freq=json.loads(row["column_freq"]),
query_freq=json.loads(row["query_freq"] or "[]"),
user_freq=json.loads(row["user_freq"] or "[]"),
column_freq=json.loads(row["column_freq"] or "[]"),
)


Expand Down Expand Up @@ -444,7 +444,7 @@ def _store_usage_event(
):
resource = event.read_event.resource
if str(resource) not in table_refs:
logger.info(f"Skipping non-existent {resource} from usage")
logger.debug(f"Skipping non-existent {resource} from usage")
self.report.num_usage_resources_dropped += 1
self.report.report_dropped(str(resource))
return False
Expand Down
95 changes: 94 additions & 1 deletion metadata-ingestion/tests/unit/test_bigquery_usage.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import random
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
Expand All @@ -8,7 +9,13 @@
from datahub.configuration.time_window_config import BucketDuration
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigQueryTableRef
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
AuditEvent,
BigqueryTableIdentifier,
BigQueryTableRef,
QueryEvent,
ReadEvent,
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryUsageConfig,
BigQueryV2Config,
Expand Down Expand Up @@ -492,6 +499,92 @@ def test_usage_counts_multiple_buckets_and_resources(
]


def test_usage_counts_no_query_event(
caplog: pytest.LogCaptureFixture,
usage_extractor: BigQueryUsageExtractor,
config: BigQueryV2Config,
) -> None:
with caplog.at_level(logging.WARNING):
ref = BigQueryTableRef(BigqueryTableIdentifier("project", "dataset", "table"))
event = AuditEvent.create(
ReadEvent(
jobName="job_name",
timestamp=TS_1,
actor_email=ACTOR_1,
resource=ref,
fieldsRead=["id", "name", "total"],
readReason="JOB",
payload=None,
)
)
workunits = usage_extractor._run([event], [str(ref)])
assert list(workunits) == []
assert not caplog.records


def test_usage_counts_no_columns(
caplog: pytest.LogCaptureFixture,
usage_extractor: BigQueryUsageExtractor,
config: BigQueryV2Config,
) -> None:
job_name = "job_name"
ref = BigQueryTableRef(
BigqueryTableIdentifier(PROJECT_1, DATABASE_1.name, TABLE_1.name)
)
events = [
AuditEvent.create(
ReadEvent(
jobName=job_name,
timestamp=TS_1,
actor_email=ACTOR_1,
resource=ref,
fieldsRead=[],
readReason="JOB",
payload=None,
),
),
AuditEvent.create(
QueryEvent(
job_name=job_name,
timestamp=TS_1,
actor_email=ACTOR_1,
query="SELECT * FROM table_1",
statementType="SELECT",
project_id=PROJECT_1,
destinationTable=None,
referencedTables=[ref],
referencedViews=[],
payload=None,
)
),
]
with caplog.at_level(logging.WARNING):
workunits = usage_extractor._run(events, TABLE_REFS.values())
assert list(workunits) == [
make_usage_workunit(
table=TABLE_1,
dataset_usage_statistics=DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
eventGranularity=TimeWindowSizeClass(
unit=BucketDuration.DAY, multiple=1
),
totalSqlQueries=1,
topSqlQueries=["SELECT * FROM table_1"],
uniqueUserCount=1,
userCounts=[
DatasetUserUsageCountsClass(
user=ACTOR_1_URN,
count=1,
userEmail=ACTOR_1,
),
],
fieldCounts=[],
),
)
]
assert not caplog.records


@freeze_time(FROZEN_TIME)
@patch.object(BigQueryUsageExtractor, "_generate_usage_workunits")
def test_operational_stats(
Expand Down