diff --git a/api/src/main/java/marquez/common/models/RunDatasetFacets.java b/api/src/main/java/marquez/common/models/RunDatasetFacets.java new file mode 100644 index 0000000000..67c0471bdd --- /dev/null +++ b/api/src/main/java/marquez/common/models/RunDatasetFacets.java @@ -0,0 +1,34 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.common.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +/** + * Class used to store `inputFacets` and `outputFacets` which are assigned to datasets within + * OpenLineage spec, but are exposed within Marquez api as a part of {@link + * marquez.service.models.Run} + */ +@EqualsAndHashCode +@ToString +@Getter +public class RunDatasetFacets { + + private final DatasetVersionId datasetVersionId; + private final ImmutableMap facets; + + public RunDatasetFacets( + @JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId, + @JsonProperty("facets") @NonNull ImmutableMap facets) { + this.datasetVersionId = datasetVersionId; + this.facets = facets; + } +} diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index 3a3ddeb40d..b71d701aba 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -55,6 +55,7 @@ private Columns() {} public static final String NAMESPACE_NAME = "namespace_name"; public static final String DATASET_NAME = "dataset_name"; public static final String FACETS = "facets"; + public static final String DATASET_FACETS = "dataset_facets"; public static final String TAGS = "tags"; public static final String IS_HIDDEN = "is_hidden"; diff --git a/api/src/main/java/marquez/db/DatasetDao.java b/api/src/main/java/marquez/db/DatasetDao.java index 7439813d63..26241d08a5 100644 --- a/api/src/main/java/marquez/db/DatasetDao.java +++ b/api/src/main/java/marquez/db/DatasetDao.java @@ -81,7 +81,7 @@ WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symli INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid LEFT JOIN LATERAL ( SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view - WHERE dataset_uuid = dv.dataset_uuid + WHERE dataset_uuid = dv.dataset_uuid AND (type ILIKE 'dataset' OR type ILIKE 'unknown') ) df ON df.run_uuid = dv.run_uuid UNION SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet @@ -90,7 +90,7 @@ LEFT JOIN LATERAL ( LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid LEFT JOIN LATERAL ( SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view - WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid + WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid AND (type ILIKE 'dataset' OR type ILIKE 'unknown') ) df ON df.run_uuid = rim.run_uuid ) SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets @@ -149,7 +149,7 @@ WITH selected_datasets AS ( INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid LEFT JOIN LATERAL ( SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view - WHERE dataset_uuid = dv.dataset_uuid + WHERE dataset_uuid = dv.dataset_uuid AND (type ILIKE 'dataset' OR type ILIKE 'unknown') ) df ON df.run_uuid = dv.run_uuid UNION SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet @@ -158,7 +158,7 @@ LEFT JOIN LATERAL ( LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid LEFT JOIN LATERAL ( SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view - WHERE dataset_uuid = dv.dataset_uuid + WHERE dataset_uuid = dv.dataset_uuid AND (type ILIKE 'dataset' OR type ILIKE 'unknown') ) df ON df.run_uuid = rim.run_uuid ) SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets diff --git a/api/src/main/java/marquez/db/DatasetFacetsDao.java b/api/src/main/java/marquez/db/DatasetFacetsDao.java index 28361aaa70..679a9bfaa3 100644 --- a/api/src/main/java/marquez/db/DatasetFacetsDao.java +++ b/api/src/main/java/marquez/db/DatasetFacetsDao.java @@ -149,6 +149,58 @@ default void insertDatasetFacetsFor( FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName)))); } + default void insertInputDatasetFacetsFor( + @NonNull UUID datasetUuid, + @NonNull UUID datasetVersionUuid, + @NonNull UUID runUuid, + @NonNull Instant lineageEventTime, + @NonNull String lineageEventType, + @NonNull LineageEvent.InputDatasetFacets inputFacets) { + final Instant now = Instant.now(); + + JsonNode jsonNode = Utils.getMapper().valueToTree(inputFacets); + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false) + .forEach( + fieldName -> + insertDatasetFacet( + now, + datasetUuid, + datasetVersionUuid, + runUuid, + lineageEventTime, + lineageEventType, + Type.INPUT, + fieldName, + FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName)))); + } + + default void insertOutputDatasetFacetsFor( + @NonNull UUID datasetUuid, + @NonNull UUID datasetVersionUuid, + @NonNull UUID runUuid, + @NonNull Instant lineageEventTime, + @NonNull String lineageEventType, + @NonNull LineageEvent.OutputDatasetFacets outputFacets) { + final Instant now = Instant.now(); + + JsonNode jsonNode = Utils.getMapper().valueToTree(outputFacets); + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false) + .forEach( + fieldName -> + insertDatasetFacet( + now, + datasetUuid, + datasetVersionUuid, + runUuid, + lineageEventTime, + lineageEventType, + Type.OUTPUT, + fieldName, + FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName)))); + } + record DatasetFacetRow( Instant createdAt, UUID datasetUuid, diff --git a/api/src/main/java/marquez/db/DatasetVersionDao.java b/api/src/main/java/marquez/db/DatasetVersionDao.java index 00f26eb0b6..b6efe272cf 100644 --- a/api/src/main/java/marquez/db/DatasetVersionDao.java +++ b/api/src/main/java/marquez/db/DatasetVersionDao.java @@ -163,7 +163,7 @@ WITH selected_dataset_versions AS ( ), selected_dataset_version_facets AS ( SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet FROM selected_dataset_versions dv - LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid + LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, @@ -194,7 +194,7 @@ WITH selected_dataset_versions AS ( ), selected_dataset_version_facets AS ( SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet FROM selected_dataset_versions dv - LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid + LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, @@ -254,7 +254,7 @@ WITH selected_dataset_versions AS ( ), selected_dataset_version_facets AS ( SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet FROM selected_dataset_versions dv - LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid + LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid AND (df.type ILIKE 'dataset' OR df.type ILIKE 'unknown') ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state, dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index c9ec8afa41..8fe41a2d09 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -301,6 +301,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper now, event.getEventType(), facets)); + + // InputFacets ... + Optional.ofNullable(dataset.getInputFacets()) + .ifPresent( + facets -> + datasetFacetsDao.insertInputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + event.getEventType(), + facets)); } } bag.setInputs(Optional.ofNullable(datasetInputs)); @@ -336,6 +348,18 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper now, event.getEventType(), facets)); + + // OutputFacets ... + Optional.ofNullable(dataset.getOutputFacets()) + .ifPresent( + facets -> + datasetFacetsDao.insertOutputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + event.getEventType(), + facets)); } } diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index 13527f93be..d942c60968 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -74,34 +74,52 @@ public interface RunDao extends BaseDao { void updateEndState(UUID rowUuid, Instant transitionedAt, UUID endRunStateUuid); String BASE_FIND_RUN_SQL = - "SELECT r.*, ra.args, ctx.context, f.facets,\n" - + "jv.version AS job_version,\n" - + "ri.input_versions, ro.output_versions\n" - + "FROM runs_view AS r\n" - + "LEFT OUTER JOIN\n" - + "(\n" - + " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n" - + " FROM run_facets_view rf\n" - + " GROUP BY rf.run_uuid\n" - + ") AS f ON r.uuid=f.run_uuid\n" - + "LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n" - + "LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n" - + "LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n" - + "LEFT OUTER JOIN (\n" - + " SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,\n" - + " 'name', dv.dataset_name,\n" - + " 'version', dv.version)) AS input_versions\n" - + " FROM runs_input_mapping im\n" - + " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n" - + " GROUP BY im.run_uuid\n" - + ") ri ON ri.run_uuid=r.uuid\n" - + "LEFT OUTER JOIN (\n" - + " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n" - + " 'name', dataset_name,\n" - + " 'version', version)) AS output_versions\n" - + " FROM dataset_versions\n" - + " GROUP BY run_uuid\n" - + ") ro ON ro.run_uuid=r.uuid\n"; + """ + SELECT r.*, ra.args, ctx.context, f.facets, + jv.version AS job_version, + ri.input_versions, ro.output_versions, df.dataset_facets + FROM runs_view AS r + LEFT OUTER JOIN + ( + SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets + FROM run_facets_view rf + GROUP BY rf.run_uuid + ) AS f ON r.uuid=f.run_uuid + LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid + LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid + LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid + LEFT OUTER JOIN ( + SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name, + 'name', dv.dataset_name, + 'version', dv.version, + 'dataset_version_uuid', uuid)) AS input_versions + FROM runs_input_mapping im + INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid + GROUP BY im.run_uuid + ) ri ON ri.run_uuid=r.uuid + LEFT OUTER JOIN ( + SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name, + 'name', dataset_name, + 'version', version, + 'dataset_version_uuid', uuid + )) AS output_versions + FROM dataset_versions + GROUP BY run_uuid + ) ro ON ro.run_uuid=r.uuid + LEFT OUTER JOIN ( + SELECT + run_uuid, + JSON_AGG(json_build_object( + 'dataset_version_uuid', dataset_version_uuid, + 'name', name, + 'type', type, + 'facet', facet + ) ORDER BY created_at ASC) as dataset_facets + FROM dataset_facets_view + WHERE (type ILIKE 'output' OR type ILIKE 'input') + GROUP BY run_uuid + ) AS df ON r.uuid = df.run_uuid + """; @SqlQuery(BASE_FIND_RUN_SQL + "WHERE r.uuid = :runUuid") Optional findRunByUuid(UUID runUuid); @@ -124,7 +142,7 @@ public interface RunDao extends BaseDao { """ SELECT r.*, ra.args, ctx.context, f.facets, j.namespace_name, j.name, jv.version AS job_version, - ri.input_versions, ro.output_versions + ri.input_versions, ro.output_versions, df.dataset_facets FROM runs_view AS r INNER JOIN jobs_view j ON r.job_uuid=j.uuid LEFT JOIN LATERAL @@ -140,7 +158,9 @@ SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS fac LEFT OUTER JOIN ( SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name, 'name', dv.dataset_name, - 'version', dv.version)) AS input_versions + 'version', dv.version, + 'dataset_version_uuid', uuid + )) AS input_versions FROM runs_input_mapping im INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid GROUP BY im.run_uuid @@ -148,10 +168,25 @@ SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name, LEFT OUTER JOIN ( SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name, 'name', dataset_name, - 'version', version)) AS output_versions + 'version', version, + 'dataset_version_uuid', uuid + )) AS output_versions FROM dataset_versions GROUP BY run_uuid ) ro ON ro.run_uuid=r.uuid + LEFT OUTER JOIN ( + SELECT + run_uuid, + JSON_AGG(json_build_object( + 'dataset_version_uuid', dataset_version_uuid, + 'name', name, + 'type', type, + 'facet', facet + ) ORDER BY created_at ASC) as dataset_facets + FROM dataset_facets_view + WHERE (type ILIKE 'output' OR type ILIKE 'input') + GROUP BY run_uuid + ) AS df ON r.uuid = df.run_uuid WHERE j.namespace_name=:namespace AND (j.name=:jobName OR :jobName = ANY(j.aliases)) ORDER BY STARTED_AT DESC NULLS LAST LIMIT :limit OFFSET :offset diff --git a/api/src/main/java/marquez/db/mappers/RunMapper.java b/api/src/main/java/marquez/db/mappers/RunMapper.java index e44e0dba39..f184bef522 100644 --- a/api/src/main/java/marquez/db/mappers/RunMapper.java +++ b/api/src/main/java/marquez/db/mappers/RunMapper.java @@ -6,6 +6,8 @@ package marquez.db.mappers; import static java.time.temporal.ChronoUnit.MILLIS; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toList; import static marquez.common.models.RunState.NEW; import static marquez.db.Columns.stringOrNull; import static marquez.db.Columns.stringOrThrow; @@ -15,30 +17,44 @@ import static marquez.db.Columns.uuidOrThrow; import static marquez.db.mappers.MapperUtils.toFacetsOrNull; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import marquez.common.Utils; +import marquez.common.models.DatasetName; import marquez.common.models.DatasetVersionId; +import marquez.common.models.NamespaceName; +import marquez.common.models.RunDatasetFacets; import marquez.common.models.RunId; import marquez.common.models.RunState; import marquez.db.Columns; import marquez.service.models.Run; import org.jdbi.v3.core.mapper.RowMapper; import org.jdbi.v3.core.statement.StatementContext; +import org.postgresql.util.PGobject; +@Slf4j public final class RunMapper implements RowMapper { private final String columnPrefix; + private static final ObjectMapper MAPPER = Utils.getMapper(); + public RunMapper() { this(""); } @@ -56,6 +72,14 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context) Optional durationMs = Optional.ofNullable(timestampOrNull(results, columnPrefix + Columns.ENDED_AT)) .flatMap(endedAt -> startedAt.map(s -> s.until(endedAt, MILLIS))); + List inputVersions = + columnNames.contains(columnPrefix + Columns.INPUT_VERSIONS) + ? toQueryDatasetVersion(results, columnPrefix + Columns.INPUT_VERSIONS) + : ImmutableList.of(); + List outputVersions = + columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS) + ? toQueryDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS) + : ImmutableList.of(); return new Run( RunId.of(uuidOrThrow(results, columnPrefix + Columns.ROW_UUID)), timestampOrThrow(results, columnPrefix + Columns.CREATED_AT), @@ -77,24 +101,23 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context) stringOrThrow(results, columnPrefix + Columns.JOB_NAME), uuidOrNull(results, columnPrefix + Columns.JOB_VERSION), stringOrNull(results, columnPrefix + Columns.LOCATION), - columnNames.contains(columnPrefix + Columns.INPUT_VERSIONS) - ? toDatasetVersion(results, columnPrefix + Columns.INPUT_VERSIONS) - : ImmutableList.of(), - columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS) - ? toDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS) - : ImmutableList.of(), + inputVersions.stream().map(QueryDatasetVersion::toDatasetVersionId).collect(toList()), + outputVersions.stream().map(QueryDatasetVersion::toDatasetVersionId).collect(toList()), columnNames.contains(columnPrefix + Columns.CONTEXT) ? JobMapper.toContext(results, columnPrefix + Columns.CONTEXT) : null, - toFacetsOrNull(results, columnPrefix + Columns.FACETS)); + toFacetsOrNull(results, columnPrefix + Columns.FACETS), + toRunDatasetFacets(results, inputVersions, true), + toRunDatasetFacets(results, outputVersions, false)); } - private List toDatasetVersion(ResultSet rs, String column) throws SQLException { + private List toQueryDatasetVersion(ResultSet rs, String column) + throws SQLException { String dsString = rs.getString(column); if (dsString == null) { return Collections.emptyList(); } - return Utils.fromJson(dsString, new TypeReference>() {}); + return Utils.fromJson(dsString, new TypeReference>() {}); } private Map toArgsOrNull(ResultSet results, String argsColumn) @@ -108,4 +131,79 @@ private Map toArgsOrNull(ResultSet results, String argsColumn) } return Utils.fromJson(args, new TypeReference>() {}); } + + private List toRunDatasetFacets( + ResultSet resultSet, List datasetVersionIds, boolean input) + throws SQLException { + String column = columnPrefix + Columns.DATASET_FACETS; + if (!Columns.exists(resultSet, column) || resultSet.getObject(column) == null) { + return Collections.emptyList(); + } + + ImmutableList runDatasetFacets; + PGobject pgObject = (PGobject) resultSet.getObject(column); + try { + runDatasetFacets = + MAPPER.readValue( + pgObject.getValue(), new TypeReference>() {}); + } catch (JsonProcessingException e) { + log.error(String.format("Could not read dataset from job row %s", column), e); + runDatasetFacets = ImmutableList.of(); + } + + Map datasetVersionIdMap = new HashMap<>(); + datasetVersionIds.forEach(dv -> datasetVersionIdMap.put(dv.datasetVersionUUID, dv)); + + try { + return runDatasetFacets.stream() + .filter(df -> df.type.equalsIgnoreCase(input ? "input" : "output")) + .collect(groupingBy(QueryDatasetFacet::datasetVersionUUID)) + .entrySet() + .stream() + .map( + entry -> + new RunDatasetFacets( + datasetVersionIdMap.get(entry.getKey()).toDatasetVersionId(), + ImmutableMap.copyOf( + entry.getValue().stream() + .collect( + Collectors.toMap( + QueryDatasetFacet::name, + facet -> + Utils.getMapper() + .convertValue( + Utils.getMapper() + .valueToTree(facet.facet) + .get(facet.name), + Object.class), + (a1, a2) -> a2 // in case of duplicates, choose more recent + ))))) + .collect(toList()); + } catch (IllegalStateException e) { + return Collections.emptyList(); + } + } + + record QueryDatasetFacet( + @JsonProperty("dataset_version_uuid") String datasetVersionUUID, + String name, + String type, + Object facet) {} + + record QueryDatasetVersion( + String namespace, + String name, + UUID version, + @JsonProperty("dataset_version_uuid") + String + datasetVersionUUID // field required to merge input versions with input dataset facets + ) { + public DatasetVersionId toDatasetVersionId() { + return DatasetVersionId.builder() + .name(DatasetName.of(name)) + .namespace(NamespaceName.of(namespace)) + .version(version) + .build(); + } + } } diff --git a/api/src/main/java/marquez/service/models/LineageEvent.java b/api/src/main/java/marquez/service/models/LineageEvent.java index 6ae0b2419c..9478a7bdbd 100644 --- a/api/src/main/java/marquez/service/models/LineageEvent.java +++ b/api/src/main/java/marquez/service/models/LineageEvent.java @@ -309,6 +309,22 @@ public static class Dataset extends BaseJsonModel { @NotNull private String namespace; @NotNull private String name; @Valid private DatasetFacets facets; + @Valid private InputDatasetFacets inputFacets; + @Valid private OutputDatasetFacets outputFacets; + + /** + * Constructor with three args added manually to support dozens of existing usages created + * before adding inputFacets and outputFacets, as Lombok does not provide SomeArgsConstructor. + * + * @param namespace + * @param name + * @param facets + */ + public Dataset(String namespace, String name, DatasetFacets facets) { + this.namespace = namespace; + this.name = name; + this.facets = facets; + } } @Builder @@ -561,4 +577,48 @@ public static class ColumnLineageInputField extends BaseJsonModel { @NotNull private String name; @NotNull private String field; } + + @Builder + @AllArgsConstructor + @NoArgsConstructor + @Setter + @Getter + @Valid + @ToString + public static class InputDatasetFacets { + + @Builder.Default @JsonIgnore private Map additional = new LinkedHashMap<>(); + + @JsonAnySetter + public void setInputFacet(String key, Object value) { + additional.put(key, value); + } + + @JsonAnyGetter + public Map getAdditionalFacets() { + return additional; + } + } + + @Builder + @AllArgsConstructor + @NoArgsConstructor + @Setter + @Getter + @Valid + @ToString + public static class OutputDatasetFacets { + + @Builder.Default @JsonIgnore private Map additional = new LinkedHashMap<>(); + + @JsonAnySetter + public void setOutputFacet(String key, Object value) { + additional.put(key, value); + } + + @JsonAnyGetter + public Map getAdditionalFacets() { + return additional; + } + } } diff --git a/api/src/main/java/marquez/service/models/Run.java b/api/src/main/java/marquez/service/models/Run.java index 20af5cce48..e52103a154 100644 --- a/api/src/main/java/marquez/service/models/Run.java +++ b/api/src/main/java/marquez/service/models/Run.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; import com.google.common.collect.ImmutableMap; import java.time.Instant; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -29,6 +30,7 @@ import marquez.common.models.JobName; import marquez.common.models.JobVersionId; import marquez.common.models.NamespaceName; +import marquez.common.models.RunDatasetFacets; import marquez.common.models.RunId; import marquez.common.models.RunState; @@ -62,6 +64,8 @@ public final class Run { @Getter private final List outputVersions; @Getter private final Map context; @Getter private final ImmutableMap facets; + @Getter private final List inputFacets; + @Getter private final List outputFacets; public Run( @NonNull final RunId id, @@ -81,7 +85,9 @@ public Run( List inputVersions, List outputVersions, Map context, - @Nullable final ImmutableMap facets) { + @Nullable final ImmutableMap facets, + @Nullable final List inputFacets, + @Nullable final List outputFacets) { this.id = id; this.createdAt = createdAt; this.updatedAt = updatedAt; @@ -100,6 +106,8 @@ public Run( this.outputVersions = outputVersions; this.context = context; this.facets = (facets == null) ? ImmutableMap.of() : facets; + this.inputFacets = (inputFacets == null) ? Collections.emptyList() : inputFacets; + this.outputFacets = (outputFacets == null) ? Collections.emptyList() : outputFacets; } public Optional getNominalStartTime() { @@ -171,6 +179,12 @@ public static class Builder { @JsonInclude(JsonInclude.Include.NON_NULL) private ImmutableMap facets; + @JsonInclude(JsonInclude.Include.NON_NULL) + private List inputFacets; + + @JsonInclude(JsonInclude.Include.NON_NULL) + private List outputFacets; + public Run build() { return new Run( id, @@ -190,7 +204,9 @@ public Run build() { inputVersions, outputVersions, context, - facets); + facets, + inputFacets, + outputFacets); } } } diff --git a/api/src/test/java/marquez/RunIntegrationTest.java b/api/src/test/java/marquez/RunIntegrationTest.java index f1ed560666..6fa0205920 100644 --- a/api/src/test/java/marquez/RunIntegrationTest.java +++ b/api/src/test/java/marquez/RunIntegrationTest.java @@ -13,6 +13,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.time.Instant; +import java.util.Collections; import java.util.UUID; import marquez.client.models.DbTableMeta; import marquez.client.models.JobMeta; @@ -111,7 +112,9 @@ public void testSerialization() throws JsonProcessingException { ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(), - ImmutableMap.of()); + ImmutableMap.of(), + Collections.emptyList(), + Collections.emptyList()); ObjectMapper objectMapper = Utils.newObjectMapper(); String json = objectMapper.writeValueAsString(run); marquez.service.models.Run deser = diff --git a/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java b/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java index fd408a2477..2b615b2d8b 100644 --- a/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java @@ -18,6 +18,7 @@ import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.Dataset; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -25,6 +26,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.postgresql.util.PGobject; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @ExtendWith(MarquezJdbiExternalPostgresExtension.class) public class DatasetFacetsDaoTest { @@ -304,6 +306,70 @@ public void testInsertDatasetFacetsForUnknownTypeFacet() { assertThat(facet.facet().toString()).isEqualTo("{\"custom-output\": \"{whatever}\"}"); } + @Test + public void testInsertOutputDatasetFacetsFor() { + LineageEvent.JobFacet jobFacet = + new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job_" + UUID.randomUUID(), + "COMPLETE", + jobFacet, + Collections.emptyList(), + Arrays.asList( + new Dataset( + "namespace", + "dataset_output", + null, + null, + LineageEvent.OutputDatasetFacets.builder() + .additional( + ImmutableMap.of( + "outputFacet1", "{some-facet1}", + "outputFacet2", "{some-facet2}")) + .build())), + null); + + assertThat(getDatasetFacet(lineageRow, "outputFacet1").facet().toString()) + .isEqualTo("{\"outputFacet1\": \"{some-facet1}\"}"); + assertThat(getDatasetFacet(lineageRow, "outputFacet2").facet().toString()) + .isEqualTo("{\"outputFacet2\": \"{some-facet2}\"}"); + } + + @Test + public void testInsertInputDatasetFacetsFor() { + LineageEvent.JobFacet jobFacet = + new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job_" + UUID.randomUUID(), + "COMPLETE", + jobFacet, + Arrays.asList( + new Dataset( + "namespace", + "dataset_output", + null, + LineageEvent.InputDatasetFacets.builder() + .additional( + ImmutableMap.of( + "inputFacet1", "{some-facet1}", + "inputFacet2", "{some-facet2}")) + .build(), + null)), + Collections.emptyList(), + null); + + assertThat(getDatasetFacet(lineageRow, "inputFacet1").facet().toString()) + .isEqualTo("{\"inputFacet1\": \"{some-facet1}\"}"); + assertThat(getDatasetFacet(lineageRow, "inputFacet2").facet().toString()) + .isEqualTo("{\"inputFacet2\": \"{some-facet2}\"}"); + } + private UpdateLineageRow createLineageRowWithInputDataset( LineageEvent.DatasetFacets.DatasetFacetsBuilder inputDatasetFacetsbuilder) { LineageEvent.JobFacet jobFacet = diff --git a/api/src/test/java/marquez/db/OpenLineageDaoTest.java b/api/src/test/java/marquez/db/OpenLineageDaoTest.java index fd68d1ec9a..026eb3b4d4 100644 --- a/api/src/test/java/marquez/db/OpenLineageDaoTest.java +++ b/api/src/test/java/marquez/db/OpenLineageDaoTest.java @@ -14,6 +14,9 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +import marquez.common.models.DatasetName; +import marquez.common.models.DatasetVersionId; +import marquez.common.models.NamespaceName; import marquez.db.models.UpdateLineageRow; import marquez.db.models.UpdateLineageRow.DatasetRecord; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; @@ -23,12 +26,14 @@ import marquez.service.models.LineageEvent.JobFacet; import marquez.service.models.LineageEvent.SchemaDatasetFacet; import marquez.service.models.LineageEvent.SchemaField; +import marquez.service.models.Run; import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.groups.Tuple; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @ExtendWith(MarquezJdbiExternalPostgresExtension.class) class OpenLineageDaoTest { @@ -48,6 +53,7 @@ class OpenLineageDaoTest { private static DatasetSymlinkDao symlinkDao; private static NamespaceDao namespaceDao; private static DatasetFieldDao datasetFieldDao; + private static RunDao runDao; private final DatasetFacets datasetFacets = LineageTestUtils.newDatasetFacet( new SchemaField("name", "STRING", "my name"), new SchemaField("age", "INT", "my age")); @@ -58,6 +64,7 @@ public static void setUpOnce(Jdbi jdbi) { symlinkDao = jdbi.onDemand(DatasetSymlinkDao.class); namespaceDao = jdbi.onDemand(NamespaceDao.class); datasetFieldDao = jdbi.onDemand(DatasetFieldDao.class); + runDao = jdbi.onDemand(RunDao.class); } /** When reading a dataset, the version is assumed to be the version last written */ @@ -508,6 +515,69 @@ void testGetOpenLineageEvents() { .contains(LineageTestUtils.NAMESPACE, WRITE_JOB_NAME); } + @Test + void testInputOutputDatasetFacets() { + JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + dao, + WRITE_JOB_NAME, + "COMPLETE", + jobFacet, + Arrays.asList( + new Dataset( + "namespace", + "dataset_input", + null, + LineageEvent.InputDatasetFacets.builder() + .additional( + ImmutableMap.of( + "inputFacet1", "{some-facet1}", + "inputFacet2", "{some-facet2}")) + .build(), + null)), + Arrays.asList( + new Dataset( + "namespace", + "dataset_output", + null, + null, + LineageEvent.OutputDatasetFacets.builder() + .additional( + ImmutableMap.of( + "outputFacet1", "{some-facet1}", + "outputFacet2", "{some-facet2}")) + .build()))); + + Run run = runDao.findRunByUuid(lineageRow.getRun().getUuid()).get(); + + assertThat(run.getInputFacets()).hasSize(1); + assertThat(run.getInputFacets().get(0).getDatasetVersionId()) + .isEqualTo( + new DatasetVersionId( + NamespaceName.of("namespace"), + DatasetName.of("dataset_input"), + lineageRow.getInputs().get().get(0).getDatasetVersionRow().getVersion())); + assertThat(run.getInputFacets().get(0).getFacets()) + .containsAllEntriesOf( + ImmutableMap.of( + "inputFacet1", "{some-facet1}", + "inputFacet2", "{some-facet2}")); + + assertThat(run.getOutputFacets()).hasSize(1); + assertThat(run.getOutputFacets().get(0).getDatasetVersionId()) + .isEqualTo( + new DatasetVersionId( + NamespaceName.of("namespace"), + DatasetName.of("dataset_output"), + lineageRow.getOutputs().get().get(0).getDatasetVersionRow().getVersion())); + assertThat(run.getOutputFacets().get(0).getFacets()) + .containsAllEntriesOf( + ImmutableMap.of( + "outputFacet1", "{some-facet1}", + "outputFacet2", "{some-facet2}")); + } + private Dataset getInputDataset() { return new Dataset( INPUT_NAMESPACE, diff --git a/clients/java/src/main/java/marquez/client/models/Run.java b/clients/java/src/main/java/marquez/client/models/Run.java index e2e8d43ec8..23ca1d3657 100644 --- a/clients/java/src/main/java/marquez/client/models/Run.java +++ b/clients/java/src/main/java/marquez/client/models/Run.java @@ -8,6 +8,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableMap; import java.time.Instant; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; @@ -27,6 +29,8 @@ public final class Run extends RunMeta { @Nullable private final Long durationMs; @Nullable private final Instant endedAt; @Getter private final Map facets; + @Getter private final List inputFacets; + @Getter private final List outputFacets; public Run( @NonNull final String id, @@ -39,7 +43,9 @@ public Run( @Nullable final Instant endedAt, @Nullable final Long durationMs, @Nullable final Map args, - @Nullable final Map facets) { + @Nullable final Map facets, + @Nullable final List inputFacets, + @Nullable final List outputFacets) { super(id, nominalStartTime, nominalEndTime, args); this.createdAt = createdAt; this.updatedAt = updatedAt; @@ -48,6 +54,8 @@ public Run( this.durationMs = durationMs; this.endedAt = endedAt; this.facets = (facets == null) ? ImmutableMap.of() : ImmutableMap.copyOf(facets); + this.inputFacets = (inputFacets == null) ? Collections.emptyList() : inputFacets; + this.outputFacets = (outputFacets == null) ? Collections.emptyList() : outputFacets; } public Optional getStartedAt() { diff --git a/clients/java/src/main/java/marquez/client/models/RunDatasetFacets.java b/clients/java/src/main/java/marquez/client/models/RunDatasetFacets.java new file mode 100644 index 0000000000..1be3a08f48 --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/RunDatasetFacets.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import lombok.NonNull; +import lombok.Value; + +/** Class to contain inputFacets and outputFacets. */ +@Value +public class RunDatasetFacets { + + private final DatasetVersionId datasetVersionId; + private final ImmutableMap facets; + + public RunDatasetFacets( + @JsonProperty("datasetVersionId") @NonNull DatasetVersionId datasetVersionId, + @JsonProperty("facets") @NonNull ImmutableMap facets) { + this.datasetVersionId = datasetVersionId; + this.facets = facets; + } +} diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index 926ed342a7..b7edec3665 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -23,6 +23,7 @@ import static marquez.client.models.ModelGenerator.newOutputs; import static marquez.client.models.ModelGenerator.newOwnerName; import static marquez.client.models.ModelGenerator.newRunArgs; +import static marquez.client.models.ModelGenerator.newRunDatasetFacets; import static marquez.client.models.ModelGenerator.newRunId; import static marquez.client.models.ModelGenerator.newSchemaLocation; import static marquez.client.models.ModelGenerator.newSourceName; @@ -91,6 +92,7 @@ import marquez.client.models.NodeId; import marquez.client.models.NodeType; import marquez.client.models.Run; +import marquez.client.models.RunDatasetFacets; import marquez.client.models.RunMeta; import marquez.client.models.RunState; import marquez.client.models.Source; @@ -264,6 +266,13 @@ public class MarquezClientTest { private static final Instant ENDED_AT = START_AT.plusMillis(1000L); private static final long DURATION = START_AT.until(ENDED_AT, MILLIS); private static final Map RUN_ARGS = newRunArgs(); + + private static final List INPUT_FACETS = + Collections.singletonList(newRunDatasetFacets()); + + private static final List OUTPUT_FACETS = + Collections.singletonList(newRunDatasetFacets()); + private static final Run NEW = new Run( newRunId(), @@ -276,7 +285,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_FACETS, + OUTPUT_FACETS); private static final Run RUNNING = new Run( newRunId(), @@ -289,7 +300,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_FACETS, + OUTPUT_FACETS); private static final Run COMPLETED = new Run( newRunId(), @@ -302,7 +315,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_FACETS, + OUTPUT_FACETS); private static final Run ABORTED = new Run( newRunId(), @@ -315,7 +330,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_FACETS, + OUTPUT_FACETS); private static final Run FAILED = new Run( newRunId(), @@ -328,7 +345,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null); + null, + INPUT_FACETS, + OUTPUT_FACETS); private static final String RUN_ID = newRunId(); private static final Job JOB_WITH_LATEST_RUN = @@ -357,7 +376,9 @@ public class MarquezClientTest { ENDED_AT, DURATION, RUN_ARGS, - null), + null, + INPUT_FACETS, + OUTPUT_FACETS), null, null); diff --git a/clients/java/src/test/java/marquez/client/models/JsonGenerator.java b/clients/java/src/test/java/marquez/client/models/JsonGenerator.java index 7b2803bf16..03f73cb5da 100644 --- a/clients/java/src/test/java/marquez/client/models/JsonGenerator.java +++ b/clients/java/src/test/java/marquez/client/models/JsonGenerator.java @@ -316,12 +316,17 @@ private static ObjectNode toObj(final Run run) { .put("id", run.getId()) .put("createdAt", ISO_INSTANT.format(run.getCreatedAt())) .put("updatedAt", ISO_INSTANT.format(run.getUpdatedAt())); + final ArrayNode inputFacets = MAPPER.valueToTree(run.getInputFacets()); + final ArrayNode outputFacets = MAPPER.valueToTree(run.getOutputFacets()); + obj.put("nominalStartTime", run.getNominalStartTime().map(ISO_INSTANT::format).orElse(null)); obj.put("nominalEndTime", run.getNominalEndTime().map(ISO_INSTANT::format).orElse(null)); obj.put("state", run.getState().name()); obj.put("startedAt", run.getStartedAt().map(ISO_INSTANT::format).orElse(null)); obj.put("endedAt", run.getEndedAt().map(ISO_INSTANT::format).orElse(null)); obj.put("durationMs", run.getDurationMs().orElse(null)); + obj.putArray("inputFacets").addAll(inputFacets); + obj.putArray("outputFacets").addAll(outputFacets); final ObjectNode runArgs = MAPPER.createObjectNode(); run.getArgs().forEach(runArgs::put); diff --git a/clients/java/src/test/java/marquez/client/models/ModelGenerator.java b/clients/java/src/test/java/marquez/client/models/ModelGenerator.java index 0226e744d3..159a6d045c 100644 --- a/clients/java/src/test/java/marquez/client/models/ModelGenerator.java +++ b/clients/java/src/test/java/marquez/client/models/ModelGenerator.java @@ -240,7 +240,19 @@ public static List newRuns(final int limit) { public static Run newRun() { final Instant now = newTimestamp(); return new Run( - newRunId(), now, now, now, now, RunState.NEW, null, null, null, newRunArgs(), null); + newRunId(), + now, + now, + now, + now, + RunState.NEW, + null, + null, + null, + newRunArgs(), + null, + null, + null); } public static String newOwnerName() { @@ -401,4 +413,10 @@ public static Map.Entry newFacetProducer() { public static Map.Entry newFacetSchemaURL() { return new AbstractMap.SimpleImmutableEntry<>("_schemaURL", "test_schemaURL" + newId()); } + + public static RunDatasetFacets newRunDatasetFacets() { + return new RunDatasetFacets( + new DatasetVersionId(newNamespaceName(), newDatasetName(), UUID.randomUUID()), + ImmutableMap.of("datasetFacet", "{some-facet1}")); + } }