Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion): Enhance supported and unsupported base_objects_accessed for Snowflake Usage #3608

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -3,7 +3,7 @@
import json
import logging
from datetime import datetime, timezone
from typing import Dict, Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional

import pydantic
import pydantic.dataclasses
Expand Down Expand Up @@ -70,10 +70,11 @@ class Config:


class SnowflakeObjectAccessEntry(PermissiveModel):
columns: List[SnowflakeColumnReference]
columns: Optional[List[SnowflakeColumnReference]]
objectDomain: str
objectId: int
objectName: str
stageKind: Optional[str]


class SnowflakeJoinedAccessEvent(PermissiveModel):
Expand Down Expand Up @@ -160,17 +161,24 @@ def _get_snowflake_history(self) -> Iterable[SnowflakeJoinedAccessEvent]:
if event_dict["query_text"] is None:
continue

event_dict["base_objects_accessed"] = json.loads(
event_dict["base_objects_accessed"]
)
def is_unsupported_base_object_accessed(obj: Dict[str, Any]) -> bool:
unsupported_keys = ["locations"]
return any([obj.get(key) is not None for key in unsupported_keys])

event_dict["base_objects_accessed"] = [
obj
for obj in json.loads(event_dict["base_objects_accessed"])
if not is_unsupported_base_object_accessed(obj)
]
event_dict["query_start_time"] = (
event_dict["query_start_time"]
).astimezone(tz=timezone.utc)

try: # big hammer try block to ensure we don't fail on parsing events
event = SnowflakeJoinedAccessEvent(**event_dict)
yield event
except Exception:
except Exception as e:
logger.warning(f"Failed to parse usage line {event_dict}", e)
self.report.report_warning(
"usage", f"Failed to parse usage line {event_dict}"
)
Expand All @@ -197,7 +205,9 @@ def _aggregate_access_events(
agg_bucket.add_read_entry(
event.email,
event.query_text,
[colRef.columnName.lower() for colRef in object.columns],
[colRef.columnName.lower() for colRef in object.columns]
if object.columns is not None
else [],
)

return datasets
Expand Down