Skip to content

Commit

Permalink
Enhance supported and unsupported base_objects_accessed.
Browse files Browse the repository at this point in the history
  • Loading branch information
rslanka committed Nov 22, 2021
1 parent dc0d63a commit 711867d
Showing 1 changed file with 17 additions and 7 deletions.
Expand Up @@ -3,7 +3,7 @@
import json
import logging
from datetime import datetime, timezone
from typing import Dict, Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional

import pydantic
import pydantic.dataclasses
Expand Down Expand Up @@ -70,10 +70,11 @@ class Config:


class SnowflakeObjectAccessEntry(PermissiveModel):
columns: List[SnowflakeColumnReference]
columns: Optional[List[SnowflakeColumnReference]]
objectDomain: str
objectId: int
objectName: str
stageKind: Optional[str]


class SnowflakeJoinedAccessEvent(PermissiveModel):
Expand Down Expand Up @@ -160,17 +161,24 @@ def _get_snowflake_history(self) -> Iterable[SnowflakeJoinedAccessEvent]:
if event_dict["query_text"] is None:
continue

event_dict["base_objects_accessed"] = json.loads(
event_dict["base_objects_accessed"]
)
def is_unsupported_base_object_accessed(obj: Dict[str, Any]) -> bool:
unsupported_keys = ["locations"]
return any([obj.get(key) is not None for key in unsupported_keys])

event_dict["base_objects_accessed"] = [
obj
for obj in json.loads(event_dict["base_objects_accessed"])
if not is_unsupported_base_object_accessed(obj)
]
event_dict["query_start_time"] = (
event_dict["query_start_time"]
).astimezone(tz=timezone.utc)

try: # big hammer try block to ensure we don't fail on parsing events
event = SnowflakeJoinedAccessEvent(**event_dict)
yield event
except Exception:
except Exception as e:
logger.warning(f"Failed to parse usage line {event_dict}", e)
self.report.report_warning(
"usage", f"Failed to parse usage line {event_dict}"
)
Expand All @@ -197,7 +205,9 @@ def _aggregate_access_events(
agg_bucket.add_read_entry(
event.email,
event.query_text,
[colRef.columnName.lower() for colRef in object.columns],
[colRef.columnName.lower() for colRef in object.columns]
if object.columns is not None
else [],
)

return datasets
Expand Down

0 comments on commit 711867d

Please sign in to comment.