New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK] improve Cassandra lineage metadata #2479
[FLINK] improve Cassandra lineage metadata #2479
Conversation
4a0c705
to
cd699b5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great improvement. Couple of minor comments added.
integration/flink/app/src/test/resources/events/expected_cassandra.json
Outdated
Show resolved
Hide resolved
@@ -1,6 +1,8 @@ | |||
# Changelog | |||
|
|||
## [Unreleased](https://github.com/OpenLineage/OpenLineage/compare/1.9.1...HEAD) | |||
* **Flink: improve Cassandra lineage metadata** (https://github.com/OpenLineage/OpenLineage/pull/2479) [@HuangZhenQiu](https://github.com/HuangZhenQiu) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be worth adding Cassandra naming convention to:
- https://openlineage.io/docs/spec/naming
- https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
Sorry for having same definition in both places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md. But I am not able to find the source doc file for https://openlineage.io/docs/spec/naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
private static Optional<String> convertToNamespace(Optional<List<Object>> endpointsOpt) { | ||
if (endpointsOpt.isPresent() && !endpointsOpt.isEmpty()) { | ||
return Optional.of("cassandra://" + endpointsOpt.get().get(0).toString().split("/")[1]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can still fail on endpointsOpt.get().get(0)
being null or endpointsOpt.get().get(0).toString().split("/")
having single element.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Added more strict pattern verification.
cd699b5
to
5f18794
Compare
858deb0
to
34d01c6
Compare
Signed-off-by: Zhenqiu Huang <huangzhenqiu0825@gmail.com>
34d01c6
to
036c608
Compare
Signed-off-by: Zhenqiu Huang <huangzhenqiu0825@gmail.com> Signed-off-by: Ruihua Wang <ruihuawang@microsoft.com>
One-line summary:
Following the namespace definition, we should use cassandra://host:port as format, so that users can find which Cassandra cluster is used. At the same time, Cassandra keyspace and table name can be combined as name.
SPDX-License-Identifier: Apache-2.0
Copyright 2018-2023 contributors to the OpenLineage project