Skip to content

Commit

Permalink
dataset symlinks provided (#2087)
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Sep 28, 2022
1 parent bb3d163 commit 2909864
Show file tree
Hide file tree
Showing 16 changed files with 348 additions and 29 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.26.0...HEAD)
### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

## [0.26.0](https://github.com/MarquezProject/marquez/compare/0.25.0...0.26.0) - 2022-09-15

### Added
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public interface BaseDao extends SqlObject {
@CreateSqlObject
NamespaceDao createNamespaceDao();

@CreateSqlObject
DatasetSymlinkDao createDatasetSymlinkDao();

@CreateSqlObject
RunDao createRunDao();

Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ private Columns() {}
public static final String FIELD_UUIDS = "field_uuids";
public static final String LIFECYCLE_STATE = "lifecycle_state";

/* DATASET SYMLINK ROW COLUMNS */
public static final String IS_PRIMARY = "is_primary";

/* STREAM VERSION ROW COLUMNS */
public static final String SCHEMA_LOCATION = "schema_location";

Expand Down
26 changes: 16 additions & 10 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import marquez.db.mappers.DatasetMapper;
import marquez.db.mappers.DatasetRowMapper;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetSymlinkRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.SourceRow;
Expand Down Expand Up @@ -73,8 +74,7 @@ void updateLastModifiedAt(
WITH selected_datasets AS (
SELECT d.*
FROM datasets_view d
WHERE d.namespace_name = :namespaceName
AND d.name = :datasetName
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
FROM selected_datasets d
Expand Down Expand Up @@ -229,7 +229,7 @@ INSERT INTO datasets (
:description,
:isDeleted,
false
) ON CONFLICT (namespace_uuid, name)
) ON CONFLICT (uuid)
DO UPDATE SET
type = EXCLUDED.type,
updated_at = EXCLUDED.updated_at,
Expand Down Expand Up @@ -275,7 +275,7 @@ DatasetRow upsert(
+ ":sourceName, "
+ ":name, "
+ ":physicalName) "
+ "ON CONFLICT (namespace_uuid, name) "
+ "ON CONFLICT (uuid) "
+ "DO UPDATE SET "
+ "type = EXCLUDED.type, "
+ "updated_at = EXCLUDED.updated_at, "
Expand All @@ -296,8 +296,10 @@ DatasetRow upsert(
"""
UPDATE datasets
SET is_hidden = true
WHERE namespace_name = :namespaceName
AND name = :name
FROM dataset_symlinks, namespaces
WHERE dataset_symlinks.dataset_uuid = datasets.uuid
AND namespaces.uuid = dataset_symlinks.namespace_uuid
AND namespaces.name=:namespaceName AND dataset_symlinks.name=:name
RETURNING *
""")
Optional<DatasetRow> delete(String namespaceName, String name);
Expand All @@ -310,6 +312,10 @@ default Dataset upsertDatasetMeta(
createNamespaceDao()
.upsertNamespaceRow(
UUID.randomUUID(), now, namespaceName.getValue(), DEFAULT_NAMESPACE_OWNER);
DatasetSymlinkRow symlinkRow =
createDatasetSymlinkDao()
.upsertDatasetSymlinkRow(
UUID.randomUUID(), datasetName.getValue(), namespaceRow.getUuid(), true, null, now);
SourceRow sourceRow =
createSourceDao()
.upsertOrDefault(
Expand All @@ -318,13 +324,12 @@ default Dataset upsertDatasetMeta(
now,
datasetMeta.getSourceName().getValue(),
"");
UUID newDatasetUuid = UUID.randomUUID();
DatasetRow datasetRow;

if (datasetMeta.getDescription().isPresent()) {
datasetRow =
upsert(
newDatasetUuid,
symlinkRow.getUuid(),
datasetMeta.getType(),
now,
namespaceRow.getUuid(),
Expand All @@ -338,7 +343,7 @@ default Dataset upsertDatasetMeta(
} else {
datasetRow =
upsert(
newDatasetUuid,
symlinkRow.getUuid(),
datasetMeta.getType(),
now,
namespaceRow.getUuid(),
Expand All @@ -349,7 +354,8 @@ default Dataset upsertDatasetMeta(
datasetMeta.getPhysicalName().getValue());
}

updateDatasetMetric(namespaceName, datasetMeta.getType(), newDatasetUuid, datasetRow.getUuid());
updateDatasetMetric(
namespaceName, datasetMeta.getType(), symlinkRow.getUuid(), datasetRow.getUuid());

TagDao tagDao = createTagDao();
List<DatasetTagMapping> datasetTagMappings = new ArrayList<>();
Expand Down
51 changes: 51 additions & 0 deletions api/src/main/java/marquez/db/DatasetSymlinkDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import marquez.db.mappers.DatasetSymlinksRowMapper;
import marquez.db.models.DatasetSymlinkRow;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(DatasetSymlinksRowMapper.class)
public interface DatasetSymlinkDao extends BaseDao {

default DatasetSymlinkRow upsertDatasetSymlinkRow(
UUID uuid, String name, UUID namespaceUuid, boolean isPrimary, String type, Instant now) {
doUpsertDatasetSymlinkRow(uuid, name, namespaceUuid, isPrimary, type, now);
return findDatasetSymlinkByNamespaceUuidAndName(namespaceUuid, name).orElseThrow();
}

@SqlQuery("SELECT * FROM dataset_symlinks WHERE namespace_uuid = :namespaceUuid and name = :name")
Optional<DatasetSymlinkRow> findDatasetSymlinkByNamespaceUuidAndName(
UUID namespaceUuid, String name);

@SqlUpdate(
"""
INSERT INTO dataset_symlinks (
dataset_uuid,
name,
namespace_uuid,
is_primary,
type,
created_at,
updated_at
) VALUES (
:uuid,
:name,
:namespaceUuid,
:isPrimary,
:type,
:now,
:now)
ON CONFLICT (name, namespace_uuid) DO NOTHING""")
void doUpsertDatasetSymlinkRow(
UUID uuid, String name, UUID namespaceUuid, boolean isPrimary, String type, Instant now);
}
38 changes: 36 additions & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import marquez.db.mappers.LineageEventMapper;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetSymlinkRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
Expand Down Expand Up @@ -120,6 +121,7 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map

default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) {
NamespaceDao namespaceDao = createNamespaceDao();
DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao();
DatasetDao datasetDao = createDatasetDao();
SourceDao sourceDao = createSourceDao();
JobDao jobDao = createJobDao();
Expand Down Expand Up @@ -316,6 +318,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUuid,
true,
namespaceDao,
datasetSymlinkDao,
sourceDao,
datasetDao,
datasetVersionDao,
Expand All @@ -337,6 +340,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUuid,
false,
namespaceDao,
datasetSymlinkDao,
sourceDao,
datasetDao,
datasetVersionDao,
Expand Down Expand Up @@ -532,6 +536,7 @@ default DatasetRecord upsertLineageDataset(
UUID runUuid,
boolean isInput,
NamespaceDao namespaceDao,
DatasetSymlinkDao datasetSymlinkDao,
SourceDao sourceDao,
DatasetDao datasetDao,
DatasetVersionDao datasetVersionDao,
Expand Down Expand Up @@ -568,6 +573,35 @@ default DatasetRecord upsertLineageDataset(
formatNamespaceName(ds.getNamespace()),
DEFAULT_NAMESPACE_OWNER);

DatasetSymlinkRow symlink =
datasetSymlinkDao.upsertDatasetSymlinkRow(
UUID.randomUUID(),
formatDatasetName(ds.getName()),
dsNamespace.getUuid(),
true,
null,
now);

Optional.ofNullable(ds.getFacets())
.map(facets -> facets.getSymlinks())
.ifPresent(
el ->
el.getIdentifiers().stream()
.forEach(
id ->
datasetSymlinkDao.doUpsertDatasetSymlinkRow(
symlink.getUuid(),
id.getName(),
namespaceDao
.upsertNamespaceRow(
UUID.randomUUID(),
now,
id.getNamespace(),
DEFAULT_NAMESPACE_OWNER)
.getUuid(),
false,
id.getType(),
now)));
String dslifecycleState =
Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getLifecycleStateChange)
Expand All @@ -576,7 +610,7 @@ default DatasetRecord upsertLineageDataset(

DatasetRow datasetRow =
datasetDao.upsert(
UUID.randomUUID(),
symlink.getUuid(),
getDatasetType(ds),
now,
datasetNamespace.getUuid(),
Expand Down Expand Up @@ -609,7 +643,7 @@ default DatasetRecord upsertLineageDataset(
dsNamespace.getName(),
source.getName(),
dsRow.getPhysicalName(),
dsRow.getName(),
symlink.getName(),
dslifecycleState,
fields,
runUuid)
Expand Down
36 changes: 36 additions & 0 deletions api/src/main/java/marquez/db/mappers/DatasetSymlinksRowMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.booleanOrDefault;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrThrow;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.NonNull;
import marquez.db.Columns;
import marquez.db.models.DatasetSymlinkRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public class DatasetSymlinksRowMapper implements RowMapper<DatasetSymlinkRow> {

@Override
public DatasetSymlinkRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new DatasetSymlinkRow(
uuidOrThrow(results, Columns.DATASET_UUID),
stringOrThrow(results, Columns.NAME),
uuidOrThrow(results, Columns.NAMESPACE_UUID),
stringOrNull(results, Columns.TYPE),
booleanOrDefault(results, Columns.IS_PRIMARY, false),
timestampOrThrow(results, Columns.CREATED_AT),
timestampOrThrow(results, Columns.UPDATED_AT));
}
}
33 changes: 33 additions & 0 deletions api/src/main/java/marquez/db/models/DatasetSymlinkRow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.models;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.Value;

@AllArgsConstructor
@EqualsAndHashCode
@Value
public class DatasetSymlinkRow {
@NonNull UUID uuid;
@NonNull String name;
@NonNull UUID namespaceUuid;
@Nullable String type;
@NonNull boolean isPrimary;
@Getter @NonNull private final Instant createdAt;
@Getter @NonNull private final Instant updatedAt;

public Optional<String> getType() {
return Optional.ofNullable(type);
}
}
Loading

0 comments on commit 2909864

Please sign in to comment.