-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/teradata): view parsing #9005
Conversation
…ort Teradata properly
Addresssing pr review comments
# Conflicts: # metadata-ingestion/docs/sources/teradata/teradata_pre.md # metadata-ingestion/docs/sources/teradata/teradata_recipe.yml # metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall. One actual change requested, rest are style
@classmethod | ||
def create(cls, config_dict, ctx): | ||
config = TeradataConfig.parse_obj(config_dict) | ||
return cls(config, ctx) | ||
|
||
def get_view_lineage(self) -> Iterable[MetadataWorkUnit]: | ||
for key in self._view_definition_cache.keys(): | ||
view_definition = self._view_definition_cache[key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to be safe we should truncate this before putting in the cache, in case it's absurdly large... but I think this is fine for now
Actually, teradata has that query size cap right? Should be safe then
@@ -221,8 +247,34 @@ def get_metadata_engine(self) -> Engine: | |||
return create_engine(url, **self.config.options) | |||
|
|||
def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: | |||
yield from super().get_workunits_internal() | |||
# Add all schemas to the schema resolver | |||
for wu in super().get_workunits_internal(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can prob make a helper for this in the future
if isinstance(wu.metadata, MetadataChangeEventClass): | ||
if wu.metadata.proposedSnapshot: | ||
for aspect in wu.metadata.proposedSnapshot.aspects: | ||
if isinstance(aspect, SchemaMetadataClass): | ||
self.schema_resolver.add_schema_metadata( | ||
wu.metadata.proposedSnapshot.urn, | ||
aspect, | ||
) | ||
break | ||
if isinstance(wu.metadata, MetadataChangeProposalWrapper): | ||
if ( | ||
wu.metadata.entityUrn | ||
and isinstance(wu.metadata.aspect, ViewPropertiesClass) | ||
and wu.metadata.aspect.viewLogic | ||
): | ||
self._view_definition_cache[ | ||
wu.metadata.entityUrn | ||
] = wu.metadata.aspect.viewLogic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a method wu.get_aspect_of_type
that we can use in the future
result = sqlglot_lineage( | ||
sql=query, | ||
schema_resolver=self.schema_resolver, | ||
default_db=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should set this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvm, seems good
Checklist