-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingestion) Allow for ingestion to read files remotely #7552
Conversation
Excited for this! Support for s3, gcs would be great to add as well. |
I will try to finish the missing bits soon! |
After working on it yesterday, I think i can finish it sometime this weekend! |
let me work on the failed tests first |
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
@shirshanka I could do with some advice on the failed assert in auditStamp time between the golden and generated json file. |
|
||
@freeze_time(FROZEN_TIME) | ||
@pytest.mark.integration | ||
def test_remote_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @xiphl, the mock_time
fixture you're using here also mocks time.time()
which is what generates audit stamps. I think this test will pass if you remove the fixture!
Nevermind, seems like that's not the issue, although I think that mock should no longer be necessary. I believe the issue is that the file-based lineage source defines:
auditStamp = models.AuditStampClass(
time=get_sys_time(), actor="urn:li:corpUser:pythonEmitter"
)
at the top of datahub/ingestion/source/metadata/lineage.py
, which gets run when the file is imported, which happens when tests/unit/test_file_lineage_source.py
is imported, before freeze_time
does its monkeypatching for this test.
We should move the definition of that audit stamp elsewhere, perhaps in get_lineage_metadata_change_event_proposal
, or just create the audit stamps on demand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup that was the cause indeed!
Codecov ReportPatch coverage:
📣 This organization is not using Codecov’s GitHub App Integration. We recommend you install it so Codecov can continue to function properly for your repositories. Learn more Additional details and impacted files@@ Coverage Diff @@
## master #7552 +/- ##
==========================================
+ Coverage 74.87% 74.92% +0.04%
==========================================
Files 353 353
Lines 35385 35429 +44
==========================================
+ Hits 26495 26544 +49
+ Misses 8890 8885 -5
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 2 files with indirect coverage changes Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great! Thanks so much for working on this and contributing further to datahub. I left a few docs / style comments, mostly for my own benefit; I think this is good to go.
pathlib.Path(self.config.filename), mode="r", encoding="utf-8-sig" | ||
) as f: | ||
rows = csv.DictReader(f, delimiter=self.config.delimiter) | ||
keep_rows = [row for row in rows] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be more succinctly list(row)
. Also, can you add a quick note in this source's docstring that this source will not work with very large csv files that do not fit into memory?
for wu in self.get_resource_workunits( | ||
entity_urn=entity_urn, | ||
term_associations=term_associations, | ||
tag_associations=tag_associations, | ||
owners=owners, | ||
domain=domain, | ||
description=description, | ||
): | ||
yield wu |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Know you didn't write this, but can be more concisely yield from self.get_resource_workunits(...)
for x in list( | ||
self.config.path.glob(f"*{self.config.file_extension}") | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary list call
if self.config.path.is_file(): | ||
path_parsed = parse.urlparse(str(self.config.path)) | ||
if path_parsed.scheme in ("file", ""): | ||
self.config.path = pathlib.Path(self.config.path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems self.config.path
is only used in this method. It might be cleaner to always have self.config.path: str
and store the Path
object in a separate variable
@@ -160,7 +155,6 @@ def _get_entity_urn(entity_config: EntityConfig) -> Optional[str]: | |||
new_upstream = models.UpstreamClass( | |||
dataset=upstream_entity_urn, | |||
type=models.DatasetLineageTypeClass.TRANSFORMED, | |||
auditStamp=auditStamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you keep this, but use an auditStamp created in this function?
Co-authored-by: xiphl <xiphlerl9@gmail.com> Allows the CsvEnricher, BusinessGlossary, File, and LineageFile sources to read from URLs.
The proposition of this PR is to allow the following sources to read from URLs (especially git repos) instead of being constrained to local files:
Still work in progress. (Especially updating the tests)