Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataset symlinks provided #2087

Merged
merged 1 commit into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.26.0...HEAD)
### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

## [0.26.0](https://github.com/MarquezProject/marquez/compare/0.25.0...0.26.0) - 2022-09-15

### Added
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public interface BaseDao extends SqlObject {
@CreateSqlObject
NamespaceDao createNamespaceDao();

@CreateSqlObject
DatasetSymlinkDao createDatasetSymlinkDao();

@CreateSqlObject
RunDao createRunDao();

Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ private Columns() {}
public static final String FIELD_UUIDS = "field_uuids";
public static final String LIFECYCLE_STATE = "lifecycle_state";

/* DATASET SYMLINK ROW COLUMNS */
public static final String IS_PRIMARY = "is_primary";
wslulciuc marked this conversation as resolved.
Show resolved Hide resolved

/* STREAM VERSION ROW COLUMNS */
public static final String SCHEMA_LOCATION = "schema_location";

Expand Down
26 changes: 16 additions & 10 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import marquez.db.mappers.DatasetMapper;
import marquez.db.mappers.DatasetRowMapper;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetSymlinkRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.SourceRow;
Expand Down Expand Up @@ -73,8 +74,7 @@ void updateLastModifiedAt(
WITH selected_datasets AS (
SELECT d.*
FROM datasets_view d
WHERE d.namespace_name = :namespaceName
AND d.name = :datasetName
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
FROM selected_datasets d
Expand Down Expand Up @@ -229,7 +229,7 @@ INSERT INTO datasets (
:description,
:isDeleted,
false
) ON CONFLICT (namespace_uuid, name)
) ON CONFLICT (uuid)
DO UPDATE SET
type = EXCLUDED.type,
updated_at = EXCLUDED.updated_at,
Expand Down Expand Up @@ -275,7 +275,7 @@ DatasetRow upsert(
+ ":sourceName, "
+ ":name, "
+ ":physicalName) "
+ "ON CONFLICT (namespace_uuid, name) "
+ "ON CONFLICT (uuid) "
+ "DO UPDATE SET "
+ "type = EXCLUDED.type, "
+ "updated_at = EXCLUDED.updated_at, "
Expand All @@ -296,8 +296,10 @@ DatasetRow upsert(
"""
UPDATE datasets
SET is_hidden = true
WHERE namespace_name = :namespaceName
AND name = :name
FROM dataset_symlinks, namespaces
WHERE dataset_symlinks.dataset_uuid = datasets.uuid
AND namespaces.uuid = dataset_symlinks.namespace_uuid
AND namespaces.name=:namespaceName AND dataset_symlinks.name=:name
RETURNING *
""")
Optional<DatasetRow> delete(String namespaceName, String name);
Expand All @@ -310,6 +312,10 @@ default Dataset upsertDatasetMeta(
createNamespaceDao()
.upsertNamespaceRow(
UUID.randomUUID(), now, namespaceName.getValue(), DEFAULT_NAMESPACE_OWNER);
DatasetSymlinkRow symlinkRow =
createDatasetSymlinkDao()
.upsertDatasetSymlinkRow(
UUID.randomUUID(), datasetName.getValue(), namespaceRow.getUuid(), true, null, now);
SourceRow sourceRow =
createSourceDao()
.upsertOrDefault(
Expand All @@ -318,13 +324,12 @@ default Dataset upsertDatasetMeta(
now,
datasetMeta.getSourceName().getValue(),
"");
UUID newDatasetUuid = UUID.randomUUID();
DatasetRow datasetRow;

if (datasetMeta.getDescription().isPresent()) {
datasetRow =
upsert(
newDatasetUuid,
symlinkRow.getUuid(),
datasetMeta.getType(),
now,
namespaceRow.getUuid(),
Expand All @@ -338,7 +343,7 @@ default Dataset upsertDatasetMeta(
} else {
datasetRow =
upsert(
newDatasetUuid,
symlinkRow.getUuid(),
datasetMeta.getType(),
now,
namespaceRow.getUuid(),
Expand All @@ -349,7 +354,8 @@ default Dataset upsertDatasetMeta(
datasetMeta.getPhysicalName().getValue());
}

updateDatasetMetric(namespaceName, datasetMeta.getType(), newDatasetUuid, datasetRow.getUuid());
updateDatasetMetric(
namespaceName, datasetMeta.getType(), symlinkRow.getUuid(), datasetRow.getUuid());

TagDao tagDao = createTagDao();
List<DatasetTagMapping> datasetTagMappings = new ArrayList<>();
Expand Down
51 changes: 51 additions & 0 deletions api/src/main/java/marquez/db/DatasetSymlinkDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import marquez.db.mappers.DatasetSymlinksRowMapper;
import marquez.db.models.DatasetSymlinkRow;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(DatasetSymlinksRowMapper.class)
public interface DatasetSymlinkDao extends BaseDao {

default DatasetSymlinkRow upsertDatasetSymlinkRow(
UUID uuid, String name, UUID namespaceUuid, boolean isPrimary, String type, Instant now) {
doUpsertDatasetSymlinkRow(uuid, name, namespaceUuid, isPrimary, type, now);
return findDatasetSymlinkByNamespaceUuidAndName(namespaceUuid, name).orElseThrow();
}

@SqlQuery("SELECT * FROM dataset_symlinks WHERE namespace_uuid = :namespaceUuid and name = :name")
Optional<DatasetSymlinkRow> findDatasetSymlinkByNamespaceUuidAndName(
UUID namespaceUuid, String name);

@SqlUpdate(
"""
INSERT INTO dataset_symlinks (
dataset_uuid,
name,
namespace_uuid,
is_primary,
type,
created_at,
updated_at
) VALUES (
:uuid,
:name,
:namespaceUuid,
:isPrimary,
:type,
:now,
:now)
ON CONFLICT (name, namespace_uuid) DO NOTHING""")
void doUpsertDatasetSymlinkRow(
UUID uuid, String name, UUID namespaceUuid, boolean isPrimary, String type, Instant now);
}
38 changes: 36 additions & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import marquez.db.mappers.LineageEventMapper;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetSymlinkRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
Expand Down Expand Up @@ -120,6 +121,7 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map

default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) {
NamespaceDao namespaceDao = createNamespaceDao();
DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao();
DatasetDao datasetDao = createDatasetDao();
SourceDao sourceDao = createSourceDao();
JobDao jobDao = createJobDao();
Expand Down Expand Up @@ -316,6 +318,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUuid,
true,
namespaceDao,
datasetSymlinkDao,
sourceDao,
datasetDao,
datasetVersionDao,
Expand All @@ -337,6 +340,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUuid,
false,
namespaceDao,
datasetSymlinkDao,
sourceDao,
datasetDao,
datasetVersionDao,
Expand Down Expand Up @@ -532,6 +536,7 @@ default DatasetRecord upsertLineageDataset(
UUID runUuid,
boolean isInput,
NamespaceDao namespaceDao,
DatasetSymlinkDao datasetSymlinkDao,
SourceDao sourceDao,
DatasetDao datasetDao,
DatasetVersionDao datasetVersionDao,
Expand Down Expand Up @@ -568,6 +573,35 @@ default DatasetRecord upsertLineageDataset(
formatNamespaceName(ds.getNamespace()),
DEFAULT_NAMESPACE_OWNER);

DatasetSymlinkRow symlink =
datasetSymlinkDao.upsertDatasetSymlinkRow(
UUID.randomUUID(),
formatDatasetName(ds.getName()),
dsNamespace.getUuid(),
true,
null,
now);

Optional.ofNullable(ds.getFacets())
.map(facets -> facets.getSymlinks())
.ifPresent(
el ->
el.getIdentifiers().stream()
.forEach(
id ->
datasetSymlinkDao.doUpsertDatasetSymlinkRow(
symlink.getUuid(),
id.getName(),
namespaceDao
.upsertNamespaceRow(
UUID.randomUUID(),
now,
id.getNamespace(),
DEFAULT_NAMESPACE_OWNER)
.getUuid(),
false,
id.getType(),
now)));
String dslifecycleState =
Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getLifecycleStateChange)
Expand All @@ -576,7 +610,7 @@ default DatasetRecord upsertLineageDataset(

DatasetRow datasetRow =
datasetDao.upsert(
UUID.randomUUID(),
symlink.getUuid(),
getDatasetType(ds),
now,
datasetNamespace.getUuid(),
Expand Down Expand Up @@ -609,7 +643,7 @@ default DatasetRecord upsertLineageDataset(
dsNamespace.getName(),
source.getName(),
dsRow.getPhysicalName(),
dsRow.getName(),
symlink.getName(),
dslifecycleState,
fields,
runUuid)
Expand Down
36 changes: 36 additions & 0 deletions api/src/main/java/marquez/db/mappers/DatasetSymlinksRowMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.booleanOrDefault;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrThrow;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.NonNull;
import marquez.db.Columns;
import marquez.db.models.DatasetSymlinkRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public class DatasetSymlinksRowMapper implements RowMapper<DatasetSymlinkRow> {

@Override
public DatasetSymlinkRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new DatasetSymlinkRow(
uuidOrThrow(results, Columns.DATASET_UUID),
stringOrThrow(results, Columns.NAME),
uuidOrThrow(results, Columns.NAMESPACE_UUID),
stringOrNull(results, Columns.TYPE),
booleanOrDefault(results, Columns.IS_PRIMARY, false),
timestampOrThrow(results, Columns.CREATED_AT),
timestampOrThrow(results, Columns.UPDATED_AT));
}
}
33 changes: 33 additions & 0 deletions api/src/main/java/marquez/db/models/DatasetSymlinkRow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.models;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Value;

@AllArgsConstructor
@EqualsAndHashCode
@Value
public class DatasetSymlinkRow {
@NonNull UUID uuid;
@NonNull String name;
@NonNull UUID namespaceUuid;
@Nullable String type;
@NonNull boolean isPrimary;
@Getter @NonNull private final Instant createdAt;
@Getter @NonNull private final Instant updatedAt;

public Optional<String> getType() {
return Optional.ofNullable(type);
}
}
Loading