-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/datahub): Improvements, bug fixes, and docs #8735
feat(ingest/datahub): Improvements, bug fixes, and docs #8735
Conversation
@@ -55,7 +55,6 @@ def assert_metadata_files_equal( | |||
output = load_json_file(output_path) | |||
|
|||
if update_golden and not golden_exists: | |||
golden = load_json_file(output_path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused
@@ -27,7 +27,7 @@ def _assert_checkpoint_deserialization( | |||
) -> Checkpoint: | |||
# Serialize a checkpoint aspect with the previous state. | |||
checkpoint_aspect = DatahubIngestionCheckpointClass( | |||
timestampMillis=int(datetime.now().timestamp() * 1000), | |||
timestampMillis=int(datetime.now(tz=timezone.utc).timestamp() * 1000), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As requested in previous PR
if self.config.database_connection is None: | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is repetitive but I prefer to (i) an assertion or (ii) passing database_connection
as an argument
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually fine with an assertion here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really like assertions because if we ever make code changes where the assertion is no longer valid, I think it's a really bad error to display to the user. + it bypasses the type system
@@ -27,24 +28,26 @@ class DataHubKafkaReader(Closeable): | |||
def __init__( | |||
self, | |||
config: DataHubSourceConfig, | |||
connection_config: KafkaConsumerConnectionConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passed in separately so that it can be non-optional, avoiding assertions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we only passed in one thing, but then did this in init
assert self.config.kafka_connection
self.connection_config = self.config.kafka_connection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing here, in general I don't like assertions -- this requires callers to pass in a non-optional consumer connection, rather than requiring them to know that the class assumes it exists
* If you are migrating large amounts of data, consider scaling consumer replicas. | ||
- Increase the number of gms pods to add redundancy and increase resilience to node evictions | ||
* If you are migrating large amounts of data, consider increasing elasticsearch's | ||
thread count via the `ELASTICSEARCH_THREAD_COUNT` environment variable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This setting primarily helps with read traffic on the ElasticSearch side, writes should already be pretty efficient with the bulkProcessor (especially for non-deletes), but it shouldn't hurt to have this in there.
metadata-ingestion/src/datahub/ingestion/source/datahub/config.py
Outdated
Show resolved
Hide resolved
enabled: true | ||
ignore_old_state: false | ||
extractor_config: | ||
set_system_metadata: false # Replicate system metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventually I'd like to move these to the "flags" section that we added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I thought the flags section would be for relatively temporary flags. You're thinking we'd store permanent configs in there as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's talk about it more later - we do need a home for some of these things, but not sure where that should be
@@ -27,24 +28,26 @@ class DataHubKafkaReader(Closeable): | |||
def __init__( | |||
self, | |||
config: DataHubSourceConfig, | |||
connection_config: KafkaConsumerConnectionConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we only passed in one thing, but then did this in init
assert self.config.kafka_connection
self.connection_config = self.config.kafka_connection
if self.config.database_connection is None: | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually fine with an assertion here
enabled: true | ||
ignore_old_state: false | ||
extractor_config: | ||
set_system_metadata: false # Replicate system metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's talk about it more later - we do need a home for some of these things, but not sure where that should be
Makes the following changes:
createdon
for the same urn and aspect. In general, we rely oncreated
to impart the correct ordering, not versiondatabase_connection
andkafka_connection
to be None, if you only want to ingest from one of themstop_time
in reportStill need to test with postgres and in general, I'd like to set up an integration test to make development of this source safer
Checklist