-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): Add DataHub source #8561
Conversation
@@ -314,7 +314,7 @@ def auto_empty_dataset_usage_statistics( | |||
logger.warning( | |||
f"Usage statistics with unexpected timestamps, bucket_duration={config.bucket_duration}:\n" | |||
", ".join( | |||
str(datetime.fromtimestamp(ts, tz=timezone.utc)) | |||
str(datetime.fromtimestamp(ts / 1000, tz=timezone.utc)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yikes - kinda bad that we missed this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just in a warning that shouldn't get hit right now, but yeah, not great, because I believe this will raise an out of bounds exception
metadata-ingestion/setup.py
Outdated
@@ -255,6 +255,10 @@ def get_long_description(): | |||
"requests", | |||
} | |||
|
|||
mysql = sql_common | {"pymysql>=1.0.2"} | |||
|
|||
kafka = kafka_common | kafka_protobuf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need kafka_protobuf for the datahub source?
we should only need kafka_common right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I don't know I just took everything. I'll try with just kafka common
class DataHubSourceConfig(StatefulIngestionConfigBase): | ||
mysql_connection: MySQLConfig = Field( | ||
# TODO: Check, do these defaults make sense? | ||
default=MySQLConfig(username="datahub", password="datahub", database="datahub"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MySQLConfig also has table_pattern, view_pattern, domain, etc
seems like we might need something separate here
Overall - we should always have a split of "connection config" vs "source config", where the latter inherits/embeds the former
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also probably doesn't make sense to have a default here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, I'll remove defaults. I guess I can start disentangling our configs here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes made, which didn't really make things simpler unfortunately:
- Renamed SQLAlchemyConfig -> SQLCommonConfig
- Split out the connection parts (i.e. all) of BasicSQLAlchemyConfig into
SQLAlchemyConnectionConfig
- Created
MySQLConnectionConfig
off ofSQLAlchemyConnectionConfig
andMySQLConfig
now also inherits fromMySQLConnectionConfig
) | ||
|
||
kafka_topic_name: str = Field( | ||
default="MetadataChangeLog_Timeseries_v1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's extract these to constants
|
||
yield mcl, msg.offset() | ||
|
||
self.consumer.unassign() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be in a finally:
block, or does it not really matter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably best to do so yeah
@property | ||
def query(self) -> str: | ||
return f""" | ||
SELECT urn, aspect, metadata, createdon |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably also want to copy system metadata
return MetadataChangeProposalWrapper( | ||
entityUrn=row.urn, | ||
# TODO: Get rid of deserialization -- create MCPC? | ||
aspect=ASPECT_MAP[row.aspect].from_obj(json.loads(row.metadata)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs the post_json_transform here
def commit_checkpoint(self) -> None: | ||
if self.state_provider.ingestion_checkpointing_state_provider: | ||
self.state_provider.prepare_for_commit() | ||
self.state_provider.ingestion_checkpointing_state_provider.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow this just reminds me of how much I dislike our stateful ingestion implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I kinda had to hack this in because it's not really built to be committed individually -- the only interface is registering committables and committing them all at once. Being able to commit individually is pretty useful, would be nice to add at some point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor questions but overall looking good
on_interval = ( | ||
i | ||
and self.config.commit_state_interval | ||
and i % self.config.commit_state_interval == 0 | ||
) | ||
|
||
if not has_errors and (i is None or on_interval): | ||
if i is None or on_interval: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so commits happen regardless of commit_with_parse_errors
, while updating the state only happens conditionally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I swapped it cause this logic seemed simpler -- this allows me to update the kafka state if there are only mysql errors, and vice versa.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense
with DataHubKafkaReader(self.config, self.report, self.ctx) as reader: | ||
mcls = reader.get_mcls(from_offsets=from_offsets, stop_time=stop_time) | ||
for i, (mcl, offset) in enumerate(mcls): | ||
mcp = MetadataChangeProposalWrapper.try_from_mcl(mcl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add some logging around this - call out that changeType=DELETE is not supported yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah might as well. So if changeType=DELETE think I should just drop the mcp?
@@ -91,7 +91,7 @@ def get_sql_alchemy_url(self): | |||
pass | |||
|
|||
|
|||
class BasicSQLAlchemyConfig(SQLAlchemyConfig): | |||
class SQLAlchemyConnectionConfig(ConfigModel): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should map out what we want the full hierarchy to look like in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah...
metadata-ingestion/src/datahub/ingestion/source/state/checkpoint.py
Outdated
Show resolved
Hide resolved
…nt.py Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
@@ -27,7 +27,7 @@ def _assert_checkpoint_deserialization( | |||
) -> Checkpoint: | |||
# Serialize a checkpoint aspect with the previous state. | |||
checkpoint_aspect = DatahubIngestionCheckpointClass( | |||
timestampMillis=int(datetime.utcnow().timestamp() * 1000), | |||
timestampMillis=int(datetime.now().timestamp() * 1000), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed tz=utc here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add in followup
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
Some remaining questions on deserialization and workunit ids -- see TODOs. Also needs to be more thoroughly tested, have only run on my local machine to a file sink. Already got one error from that test:
which makes me think I should probably not try to deserialize into MCPWs
Checklist