Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/snowflake): tables from snowflake shares as siblings #8531

Merged

Conversation

mayurinehate
Copy link
Collaborator

@mayurinehate mayurinehate commented Jul 31, 2023

  1. Introduce new configurations inbound_shares_map and outbound_shares_map - to declare databases included in and created from shares and corresponding snowflake platform instances.
  2. Emit upstreamLineage and siblings aspect for linked tables/views in such shared databases.
  3. Move allow deny pattern filtering in snowflake source such that, this is needed only at one place
  4. Add documentation around snowflake shares and corresponding configuration example
  5. Unit tests for snowflake shares code

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

1. add config maps to accept inbound and outbound share details from user
2. emit mirrored database tables as siblings, with tables from share owner(producer) account as primary sibling.
3. push down allow-deny patterns in snowflake_schema
@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Jul 31, 2023
# 1. attempt listing shares using `show shares` to identify name of share associated with this database (cache query result).
# 2. if corresponding share is listed, then run `show grants to share <share_name>` to identify exact tables, views included in share.
# 3. emit siblings only for the objects listed above.
# This will work only if the configured role has accountadmin role access OR is owner of share.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not advisable to use role with "accountadmin" access hence this is not done. Also this PR takes care to hide ghost nodes in siblings relation so this is not required.

Copy link
Collaborator

@asikowitz asikowitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, thanks for cranking this out. I have some minor naming / refactoring suggestions, but my main question is around how we should format the config. I'm not very familiar with shares, but the format seems a little unintuitive / cumbersome to me. Changing the format will require some code changes though

Also, I know there's some quirks with siblings and don't know how lineage will show up between siblings (which it seems like we're generating?) but if you've ran this by Gabe or Harshal then that's probably sufficient.

@@ -42,6 +43,12 @@ class TagOption(str, Enum):
skip = "skip"


@dataclass(frozen=True)
class SnowflakeDatabaseDataHubId:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name is pretty confusing to me... not sure on alternatives though

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about DatabaseId?

- platform_instance: instance2 # this is a list, as db1 can be shared with multiple snowflake accounts using X
database_name: db1_from_X
```
- In snowflake recipe of `account2` :
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a lot of snowflake recipes, I could see how this could get tiresome to set up for every ingestion pipeline. Thoughts on having a config that could be the same for every recipe, e.g.

shares:
    X:
        database: db1
        platform_instance: instance1
        outbounds:
            - database: db1_from_X
              platform_instance: instance2

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has crossed my mind too and I'd really like "having a config that could be the same for every recipe". The only downside is we require additional unused information - e.g. share name "X" and it is possible that some of the shares config will not be relevant for some account recipes, making validation and probable errors hard to find. I feel, the ease of using same shares config outweighs the downsides so let me think more on this and update.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the config to use similar structure as your example, except that using term consumers instead of outbounds. Outbound is relative and can be confusing term. Consumer has a peculiar meaning for snowflake shares and hence unambiguous.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't want to have them specify the share name, we can also do:

  shares:
    - platform_instance: instance1
      database_name: db1
      consumers:
        - platform_instance: instance2 # this is a list, as db1 can be shared with multiple snowflake accounts using X
          database_name: db1_from_X
    - platform_instance: ...

but maybe that's more confusing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I like the one with share name better - more precise and readable. Also shares do have unique names across accounts. This change primarily makes shares configuration absolute and exhaustive for all accounts and configurations need not be thought of wrt the particular account/recipe.

Copy link
Collaborator

@asikowitz asikowitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me. I have a refactor proposal to hopefully make snowflake_shares.py easier to understand, let me know what you think.

metadata-ingestion/docs/sources/snowflake/snowflake_pre.md Outdated Show resolved Hide resolved
databases_included_in_share: List[DatabaseId] = []
databases_created_from_share: List[DatabaseId] = []

for _, share_details in shares.items():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for _, share_details in shares.items():
for share_details in shares.values():

@@ -197,3 +226,41 @@ def get_sql_alchemy_url(
@property
def parse_view_ddl(self) -> bool:
return self.include_view_column_lineage

@validator("shares")
def validate_shares(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally we raise ValueError in validators, not use assertions. Do you want to change that convention? For now at least, can you change to stay consistent?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we allow ValueError, AssertionError, TypeError as a convention, as also mentioned here - https://datahubproject.io/docs/metadata-ingestion/developing/#coding

Sometimes asserts are more readable /briefer so I'd prefer them. In this case, I'm okay to change.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice. I like the assert syntax more, I think we're just hesitant because you can disable assertions with a certain flag. I don't feel strongly here, up to you

self, databases: List[SnowflakeDatabase]
) -> Iterable[MetadataWorkUnit]:
shared_databases = self._get_shared_databases(
self.config.shares or {}, self.config.platform_instance
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.config.shares can't be null here right (otherwise the assert in _get_shared_databases could fail). Perhaps instead of passing self.config into SnowflakeSharesHandler, we can pass non-optional self.config.shares and self.config.platform_instance so we don't have to put any assertions in this class

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.config is also used in other places in SnowflakeCommonMixin - primarily in deciding whether to lowercase urn. hence keeping self.config and refractoring a bit to avoid asserts.

Comment on lines 34 to 37
created_from_share: bool

# This will have exactly entry if created_from_share = True
shares: List[str]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name created_from_share is confusing to me, because I feel like all of these are created from "shares" lol. Could we just call primary or inbound or something similar. Although inbound also doesn't really make sense to me... I think of it more as is_share_source

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, technically some database are created from share while others are used to create a share, i.e. included in share. I am okay to use "primary/is_share_source" = not (created_from_share/secondary).

class SharedDatabase:
"""
Represents shared database from current platform instance
This is either created from an inbound share or included in an outbound share.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention that this relies on the invariant that a snowflake database can't both be in a share and the consumer of a share

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Comment on lines 69 to 70
else:
shared_databases[share_details.database].shares.append(share_name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the same platform instance and database really appear as inbound in shares multiple times?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a corner case but yes.

self.logger = logger
self.dataset_urn_builder = dataset_urn_builder

def _get_shared_databases(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall thought the logic in this method and by users of the shared_databases dict returned by this method was confusing, where we basically have 2 different cases for inbound and outbound shares info. I think this could be clearer if we separated the two.

I think this logic could be also simplified if we did some preprocessing first. What do you think about, in the config file:

class SnowflakeShareConfig(ConfigModel):
    # Add to this class
    @property
    def inbound(self) -> DatabaseId:
        return DatabaseId(share.database, share.platform_instance)

# For below, add to SnowflakeV2Config
@lru_cache(maxsize=1)  # would love to use @cached_property
def inbound_to_consumers(self) -> Dict[DatabaseId, Set[DatabaseId]]:
    d = defaultdict(set)
    for share in self.shares:
        d[share.inbound].update(share.consumers)
    return d

@lru_cache(maxsize=1)
def outbound_to_inbound(self) -> Dict[DatabaseId, DatabaseId]:
    d = {}
    for share in self.shares:
        for outbound in share.consumers:
            d[outbound] = share.inbound

Could def have better naming, but once you have these, then I think you can get rid of _get_shared_databases and do something like:

key = DatabaseId(db, self.platform_instance) 
is_inbound = key in self.inbound_to_consumers
is_outbound = key in self.outbound_to_inbound
if not is_inbound and not is_outbound:
    continue
sibling_databases = inbound_to_consumers[key] if is_inbound else [self.outbound_to_inbound[key]]

report_missing_databases will be a bit more complicated but I don't think a dealbreaker

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, let me check.

Copy link
Collaborator

@asikowitz asikowitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

) -> None:
db_names = [db.name for db in databases]
missing_dbs = [db for db in shared_databases.keys() if db not in db_names]
missing_dbs = [db for db in inbounds + outbounds if db not in db_names]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could alternatively not cast to list and do inbounds | outbounds

return

logger.debug("Checking databases for inbound or outbound shares.")
for db in databases:
if db.name not in shared_databases:
db.name = db.name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a no-op

logger.debug(f"database {db.name} is not shared.")
continue

sibling_dbs = self.get_sibling_databases(shared_databases[db.name])
sibling_dbs = (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is typed as Collection or Iterable then you don't have to cast to list. Doesn't matter though

@asikowitz asikowitz added the merge-pending-ci A PR that has passed review and should be merged once CI is green. label Aug 23, 2023
@asikowitz asikowitz merged commit e285da3 into datahub-project:master Aug 24, 2023
50 of 51 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata merge-pending-ci A PR that has passed review and should be merged once CI is green.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants