Skip to content

Commit

Permalink
implement inputFacets & outputFacets
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Feb 11, 2023
1 parent 9e0c84b commit a8e74f5
Show file tree
Hide file tree
Showing 18 changed files with 595 additions and 58 deletions.
34 changes: 34 additions & 0 deletions api/src/main/java/marquez/common/models/RunDatasetFacets.java
@@ -0,0 +1,34 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.common.models;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;

/**
* Class used to store `inputFacets` and `outputFacets` which are assigned to datasets within
* OpenLineage spec, but are exposed within Marquez api as a part of {@link
* marquez.service.models.Run}
*/
@EqualsAndHashCode
@ToString
@Getter
public class RunDatasetFacets {

private final DatasetVersionId datasetVersionId;
private final ImmutableMap<String, Object> facets;

public RunDatasetFacets(
@JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId,
@JsonProperty("facets") @NonNull ImmutableMap<String, Object> facets) {
this.datasetVersionId = datasetVersionId;
this.facets = facets;
}
}
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/Columns.java
Expand Up @@ -55,6 +55,7 @@ private Columns() {}
public static final String NAMESPACE_NAME = "namespace_name";
public static final String DATASET_NAME = "dataset_name";
public static final String FACETS = "facets";
public static final String DATASET_FACETS = "dataset_facets";
public static final String TAGS = "tags";
public static final String IS_HIDDEN = "is_hidden";

Expand Down
8 changes: 4 additions & 4 deletions api/src/main/java/marquez/db/DatasetDao.java
Expand Up @@ -81,7 +81,7 @@ WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symli
INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
WHERE dataset_uuid = dv.dataset_uuid AND (type ILIKE 'dataset' OR type ILIKE 'unknown')
) df ON df.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
Expand All @@ -90,7 +90,7 @@ LEFT JOIN LATERAL (
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid
WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid AND (type ILIKE 'dataset' OR type ILIKE 'unknown')
) df ON df.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
Expand Down Expand Up @@ -149,7 +149,7 @@ WITH selected_datasets AS (
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
WHERE dataset_uuid = dv.dataset_uuid AND (type ILIKE 'dataset' OR type ILIKE 'unknown')
) df ON df.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
Expand All @@ -158,7 +158,7 @@ LEFT JOIN LATERAL (
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
WHERE dataset_uuid = dv.dataset_uuid AND (type ILIKE 'dataset' OR type ILIKE 'unknown')
) df ON df.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
Expand Down
52 changes: 52 additions & 0 deletions api/src/main/java/marquez/db/DatasetFacetsDao.java
Expand Up @@ -149,6 +149,58 @@ default void insertDatasetFacetsFor(
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

default void insertInputDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID datasetVersionUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@NonNull LineageEvent.InputDatasetFacets inputFacets) {
final Instant now = Instant.now();

JsonNode jsonNode = Utils.getMapper().valueToTree(inputFacets);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
.forEach(
fieldName ->
insertDatasetFacet(
now,
datasetUuid,
datasetVersionUuid,
runUuid,
lineageEventTime,
lineageEventType,
Type.INPUT,
fieldName,
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

default void insertOutputDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID datasetVersionUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@NonNull LineageEvent.OutputDatasetFacets outputFacets) {
final Instant now = Instant.now();

JsonNode jsonNode = Utils.getMapper().valueToTree(outputFacets);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
.forEach(
fieldName ->
insertDatasetFacet(
now,
datasetUuid,
datasetVersionUuid,
runUuid,
lineageEventTime,
lineageEventType,
Type.OUTPUT,
fieldName,
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

record DatasetFacetRow(
Instant createdAt,
UUID datasetUuid,
Expand Down
6 changes: 3 additions & 3 deletions api/src/main/java/marquez/db/DatasetVersionDao.java
Expand Up @@ -163,7 +163,7 @@ WITH selected_dataset_versions AS (
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
Expand Down Expand Up @@ -194,7 +194,7 @@ WITH selected_dataset_versions AS (
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
Expand Down Expand Up @@ -254,7 +254,7 @@ WITH selected_dataset_versions AS (
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown')
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
Expand Down
24 changes: 24 additions & 0 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Expand Up @@ -301,6 +301,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
now,
event.getEventType(),
facets));

// InputFacets ...
Optional.ofNullable(dataset.getInputFacets())
.ifPresent(
facets ->
datasetFacetsDao.insertInputDatasetFacetsFor(
record.getDatasetRow().getUuid(),
record.getDatasetVersionRow().getUuid(),
runUuid,
now,
event.getEventType(),
facets));
}
}
bag.setInputs(Optional.ofNullable(datasetInputs));
Expand Down Expand Up @@ -336,6 +348,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
now,
event.getEventType(),
facets));

// OutputFacets ...
Optional.ofNullable(dataset.getOutputFacets())
.ifPresent(
facets ->
datasetFacetsDao.insertOutputDatasetFacetsFor(
record.getDatasetRow().getUuid(),
record.getDatasetVersionRow().getUuid(),
runUuid,
now,
event.getEventType(),
facets));
}
}

Expand Down
97 changes: 66 additions & 31 deletions api/src/main/java/marquez/db/RunDao.java
Expand Up @@ -74,34 +74,52 @@ public interface RunDao extends BaseDao {
void updateEndState(UUID rowUuid, Instant transitionedAt, UUID endRunStateUuid);

String BASE_FIND_RUN_SQL =
"SELECT r.*, ra.args, ctx.context, f.facets,\n"
+ "jv.version AS job_version,\n"
+ "ri.input_versions, ro.output_versions\n"
+ "FROM runs_view AS r\n"
+ "LEFT OUTER JOIN\n"
+ "(\n"
+ " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n"
+ " FROM run_facets_view rf\n"
+ " GROUP BY rf.run_uuid\n"
+ ") AS f ON r.uuid=f.run_uuid\n"
+ "LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n"
+ "LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n"
+ "LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
+ "LEFT OUTER JOIN (\n"
+ " SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,\n"
+ " 'name', dv.dataset_name,\n"
+ " 'version', dv.version)) AS input_versions\n"
+ " FROM runs_input_mapping im\n"
+ " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n"
+ " GROUP BY im.run_uuid\n"
+ ") ri ON ri.run_uuid=r.uuid\n"
+ "LEFT OUTER JOIN (\n"
+ " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n"
+ " 'name', dataset_name,\n"
+ " 'version', version)) AS output_versions\n"
+ " FROM dataset_versions\n"
+ " GROUP BY run_uuid\n"
+ ") ro ON ro.run_uuid=r.uuid\n";
"""
SELECT r.*, ra.args, ctx.context, f.facets,
jv.version AS job_version,
ri.input_versions, ro.output_versions, df.dataset_facets
FROM runs_view AS r
LEFT OUTER JOIN
(
SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets
FROM run_facets_view rf
GROUP BY rf.run_uuid
) AS f ON r.uuid=f.run_uuid
LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid
LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid
LEFT OUTER JOIN (
SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,
'name', dv.dataset_name,
'version', dv.version,
'dataset_version_uuid', uuid)) AS input_versions
FROM runs_input_mapping im
INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid
GROUP BY im.run_uuid
) ri ON ri.run_uuid=r.uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
'name', dataset_name,
'version', version,
'dataset_version_uuid', uuid
)) AS output_versions
FROM dataset_versions
GROUP BY run_uuid
) ro ON ro.run_uuid=r.uuid
LEFT OUTER JOIN (
SELECT
run_uuid,
JSON_AGG(json_build_object(
'dataset_version_uuid', dataset_version_uuid,
'name', name,
'type', type,
'facet', facet
) ORDER BY created_at ASC) as dataset_facets
FROM dataset_facets_view
WHERE (type ILIKE 'output' OR type ILIKE 'input')
GROUP BY run_uuid
) AS df ON r.uuid = df.run_uuid
""";

@SqlQuery(BASE_FIND_RUN_SQL + "WHERE r.uuid = :runUuid")
Optional<Run> findRunByUuid(UUID runUuid);
Expand All @@ -124,7 +142,7 @@ public interface RunDao extends BaseDao {
"""
SELECT r.*, ra.args, ctx.context, f.facets,
j.namespace_name, j.name, jv.version AS job_version,
ri.input_versions, ro.output_versions
ri.input_versions, ro.output_versions, df.dataset_facets
FROM runs_view AS r
INNER JOIN jobs_view j ON r.job_uuid=j.uuid
LEFT JOIN LATERAL
Expand All @@ -140,18 +158,35 @@ SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS fac
LEFT OUTER JOIN (
SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,
'name', dv.dataset_name,
'version', dv.version)) AS input_versions
'version', dv.version,
'dataset_version_uuid', uuid
)) AS input_versions
FROM runs_input_mapping im
INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid
GROUP BY im.run_uuid
) ri ON ri.run_uuid=r.uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
'name', dataset_name,
'version', version)) AS output_versions
'version', version,
'dataset_version_uuid', uuid
)) AS output_versions
FROM dataset_versions
GROUP BY run_uuid
) ro ON ro.run_uuid=r.uuid
LEFT OUTER JOIN (
SELECT
run_uuid,
JSON_AGG(json_build_object(
'dataset_version_uuid', dataset_version_uuid,
'name', name,
'type', type,
'facet', facet
) ORDER BY created_at ASC) as dataset_facets
FROM dataset_facets_view
WHERE (type ILIKE 'output' OR type ILIKE 'input')
GROUP BY run_uuid
) AS df ON r.uuid = df.run_uuid
WHERE j.namespace_name=:namespace AND (j.name=:jobName OR :jobName = ANY(j.aliases))
ORDER BY STARTED_AT DESC NULLS LAST
LIMIT :limit OFFSET :offset
Expand Down

0 comments on commit a8e74f5

Please sign in to comment.