Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix symlink display on marquez #2736

5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
* Web: minor UI enhancements [`#2727`](https://github.com/MarquezProject/marquez/pull/2727) [@phixMe](https://github.com/phixMe)
*Hygienic cleanup of project as a follow-up to [`#2725`](https://github.com/MarquezProject/marquez/pull/2725), including a fix for [`#2747`](https://github.com/MarquezProject/marquez/issues/2747).*

### Fixed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the changelog 💯


* bug: marquez dataset symlinks facet create empty namespace: [`#2645`](https://github.com/MarquezProject/marquez/pull/2645) [@sophiely](https://github.com/sophiely)
Display symlink dataset in the previously empty namespace and link the symlink dataset lineage to the main dataset.

## [0.44.0](https://github.com/MarquezProject/marquez/compare/0.43.1...0.44.0) - 2024-01-22

### Added
Expand Down
11 changes: 8 additions & 3 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,21 @@ INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.job_symlink_target_uu
"""
SELECT ds.*, dv.fields, dv.lifecycle_state
FROM datasets_view ds
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid
WHERE ds.uuid IN (<dsUuids>)""")
LEFT JOIN dataset_versions dv ON dv.uuid = ds.current_version_uuid
LEFT JOIN dataset_symlinks dsym ON dsym.namespace_uuid = ds.namespace_uuid and dsym.name = ds.name
WHERE dsym.is_primary = true
AND ds.uuid IN (<dsUuids>)""")
Comment on lines +105 to +108

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here since the view datasets_views can have several rows with the same uuid we choose the one flagged as primary.

Set<DatasetData> getDatasetData(@BindList Set<UUID> dsUuids);

@SqlQuery(
"""
SELECT ds.*, dv.fields, dv.lifecycle_state
FROM datasets_view ds
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid
WHERE ds.name = :datasetName AND ds.namespace_name = :namespaceName""")
LEFT JOIN dataset_symlinks dsym ON dsym.namespace_uuid = ds.namespace_uuid and dsym.name = ds.name
INNER JOIN datasets_view AS d ON d.uuid = ds.uuid
WHERE dsym.is_primary is true
AND CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)""")
DatasetData getDatasetData(String namespaceName, String datasetName);

@SqlQuery(
Expand Down
21 changes: 13 additions & 8 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,20 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
if (!datasetIds.isEmpty()) {
datasets.addAll(this.getDatasetData(datasetIds));
}
if (nodeId.isDatasetType()
&& datasets.stream().noneMatch(n -> n.getId().equals(nodeId.asDatasetId()))) {
log.warn(
"Found jobs {} which no longer share lineage with dataset '{}' - discarding",
jobData.stream().map(JobData::getId).toList(),
nodeId.getValue());
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}

if (nodeId.isDatasetType()) {
DatasetId datasetId = nodeId.asDatasetId();
DatasetData datasetData =
this.getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue());

if (!datasetIds.contains(datasetData.getUuid())) {
log.warn(
"Found jobs {} which no longer share lineage with dataset '{}' - discarding",
jobData.stream().map(JobData::getId).toList(),
nodeId.getValue());
return toLineageWithOrphanDataset(nodeId.asDatasetId());
}
}
Comment on lines +118 to +130

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we check if the uuid of the node and not the namespace+name

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for adding the warn log 💯

return toLineage(jobData, datasets);
}

Expand Down
52 changes: 31 additions & 21 deletions api/src/main/resources/marquez/db/migration/R__3_Datasets_view.sql
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
DROP VIEW IF EXISTS datasets_view;
CREATE VIEW datasets_view
AS
CREATE VIEW datasets_view AS
SELECT d.uuid,
d.type,
d.created_at,
d.updated_at,
d.namespace_uuid,
d.source_uuid,
d.name,
array_agg(CAST((namespaces.name, symlinks.name) AS DATASET_NAME)) AS dataset_symlinks,
d.physical_name,
d.description,
d.current_version_uuid,
d.last_modified_at,
d.namespace_name,
d.source_name,
d.is_deleted
FROM datasets d
JOIN dataset_symlinks symlinks ON d.uuid = symlinks.dataset_uuid
INNER JOIN namespaces ON symlinks.namespace_uuid = namespaces.uuid
WHERE d.is_hidden IS FALSE
GROUP BY d.uuid;
d.type,
d.created_at,
d.updated_at,
CASE
WHEN (d.namespace_name = namespaces.name AND d.name = symlinks.name) THEN d.namespace_uuid
ELSE namespaces.uuid
END
AS namespace_uuid ,
d.source_uuid,
CASE
WHEN (d.namespace_name = namespaces.name and d.name = symlinks.name) THEN d.name
ELSE symlinks.name
END
AS name,
array(SELECT ROW(namespaces.name::character varying(255), symlinks.name::character varying(255))::dataset_name) AS dataset_symlinks,
d.physical_name,
d.description,
d.current_version_uuid,
d.last_modified_at,
CASE
WHEN (d.namespace_name = namespaces.name AND d.name = symlinks.name) THEN d.namespace_name
ELSE namespaces.name
END
AS namespace_name,
d.source_name,
d.is_deleted
FROM datasets d
JOIN dataset_symlinks symlinks ON d.uuid = symlinks.dataset_uuid
JOIN namespaces ON symlinks.namespace_uuid = namespaces.uuid
WHERE d.is_hidden is false;
62 changes: 62 additions & 0 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -588,4 +590,64 @@ public void testLineageForOrphanedDataset() {
private boolean jobNameEquals(Node node, String writeJob) {
return node.getId().asJobId().getName().getValue().equals(writeJob);
}

@Test
public void testSymlinkDatasetLineage() {
// (1) Create symlink facet for our main dataset
Map<String, Object> symlink = new HashMap<>();
Map<String, Object> symlinkInfo = new HashMap<>();
Map<String, Object> symlinkIdentifiers = new HashMap<>();
symlinkIdentifiers.put("name", "symlinkDataset");
symlinkIdentifiers.put("namespace", NAMESPACE);
symlinkIdentifiers.put("type", "DB_TABLE");
symlinkInfo.put("producer", "https://github.com/OpenLineage/producer/");
symlinkInfo.put("schemaURL", "https://openlineage.io/schema/url/");
symlinkInfo.put("identifiers", symlinkIdentifiers);
symlink.put("symlinks", symlinkInfo);

// (2) Create main dataset with a symlink
Dataset mainDataset =
new Dataset(
NAMESPACE,
"mainDataset",
newDatasetFacet(symlink, new SchemaField("firstname", "string", "the first name")));

// (3) Create the symlink dataset
Dataset symlinkDataset =
new Dataset(
NAMESPACE,
"symlinkDataset",
newDatasetFacet(new SchemaField("firstname", "string", "the first name")));

// (3) Create a job with the main dataset
UpdateLineageRow firstJob =
LineageTestUtils.createLineageRow(
openLineageDao,
"firstJob",
"COMPLETE",
jobFacet,
Arrays.asList(mainDataset),
Arrays.asList());

// (4) Create a job with the symlink dataset
UpdateLineageRow secondJob =
LineageTestUtils.createLineageRow(
openLineageDao,
"secondJob",
"COMPLETE",
jobFacet,
Arrays.asList(symlinkDataset),
Arrays.asList());

// (5) We expect the first and second job linked together because the main
// and symlink dataset are in fact the same dataset
Lineage lineage =
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("symlinkDataset"))),
5,
true);

assertThat(lineage.getGraph()).hasSize(2);
}
}
Loading