From b6544ec865950e1b0366e278384200172750f79a Mon Sep 17 00:00:00 2001 From: mzareba382 <77925576+mzareba382@users.noreply.github.com> Date: Fri, 30 Sep 2022 11:23:16 +0200 Subject: [PATCH] Model and store column lineage in Marquez DB (#2096) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Create database representation, model classes Signed-off-by: mzareba * Implement ColumnLevelLineageDao Signed-off-by: mzareba * Instantiate ColumnLevelLineageDao in updateBaseMarquezModel Signed-off-by: mzareba * Upsert ColumnLevelLineageRow to db, model representation in LineageEvent Signed-off-by: mzareba * Fix problems in OpenLineageDao, add a list of ColumnLevelLineageRow to DatasetRecord, write test for createLineageRow() invocation Signed-off-by: mzareba * Change wildcard imports to single class imports Signed-off-by: mzareba * Change wildcard imports to single class imports Signed-off-by: mzareba * Change wildcard imports to single class imports Signed-off-by: mzareba * Apply spotless Signed-off-by: mzareba * Check for ds.getFacets not null Signed-off-by: mzareba * Format fix Signed-off-by: mzareba * Update testUpdateMarquezModelDatasetWithColumnLineageFacet Signed-off-by: mzareba * Test for column_level_lineage upsert. Signed-off-by: mzareba * Apply spotless Signed-off-by: mzareba * switch to data field references Signed-off-by: Pawel Leszczynski * fix broken tests Signed-off-by: Pawel Leszczynski * test when dataset_field is missing Signed-off-by: Pawel Leszczynski * add input_dataset_version_uuid field Signed-off-by: Pawel Leszczynski * increase db file version Signed-off-by: Pawel Leszczynski * increase db file version Signed-off-by: Pawel Leszczynski * rename ColumnLevelLineage -> ColumnLineage Signed-off-by: Pawel Leszczynski Signed-off-by: mzareba Signed-off-by: Pawel Leszczynski Co-authored-by: Mariusz Zaręba Co-authored-by: Pawel Leszczynski --- CHANGELOG.md | 6 +- api/src/main/java/marquez/db/BaseDao.java | 3 + .../java/marquez/db/ColumnLineageDao.java | 91 +++++++ api/src/main/java/marquez/db/Columns.java | 9 + .../main/java/marquez/db/DatasetFieldDao.java | 21 ++ .../main/java/marquez/db/OpenLineageDao.java | 98 +++++++- .../db/mappers/ColumnLineageRowMapper.java | 37 +++ .../marquez/db/mappers/FieldDataMapper.java | 31 +++ .../marquez/db/models/ColumnLineageRow.java | 28 +++ .../marquez/db/models/InputFieldData.java | 22 ++ .../marquez/db/models/UpdateLineageRow.java | 1 + .../marquez/service/models/LineageEvent.java | 54 +++++ .../db/migration/V49__column_lineage.sql | 13 + .../java/marquez/db/ColumnLineageDaoTest.java | 222 +++++++++++++++++ .../java/marquez/db/OpenLineageDaoTest.java | 225 ++++++++++++++++++ 15 files changed, 856 insertions(+), 5 deletions(-) create mode 100644 api/src/main/java/marquez/db/ColumnLineageDao.java create mode 100644 api/src/main/java/marquez/db/mappers/ColumnLineageRowMapper.java create mode 100644 api/src/main/java/marquez/db/mappers/FieldDataMapper.java create mode 100644 api/src/main/java/marquez/db/models/ColumnLineageRow.java create mode 100644 api/src/main/java/marquez/db/models/InputFieldData.java create mode 100644 api/src/main/resources/marquez/db/migration/V49__column_lineage.sql create mode 100644 api/src/test/java/marquez/db/ColumnLineageDaoTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 11d8cc1057..ecdec36bb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,13 @@ # Changelog ## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.26.0...HEAD) + +### Added +* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) +* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) + ### Fixed * Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike) -* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) ## [0.26.0](https://github.com/MarquezProject/marquez/compare/0.25.0...0.26.0) - 2022-09-15 diff --git a/api/src/main/java/marquez/db/BaseDao.java b/api/src/main/java/marquez/db/BaseDao.java index f21a7c67f3..f35357bf08 100644 --- a/api/src/main/java/marquez/db/BaseDao.java +++ b/api/src/main/java/marquez/db/BaseDao.java @@ -53,4 +53,7 @@ public interface BaseDao extends SqlObject { @CreateSqlObject OpenLineageDao createOpenLineageDao(); + + @CreateSqlObject + ColumnLineageDao createColumnLineageDao(); } diff --git a/api/src/main/java/marquez/db/ColumnLineageDao.java b/api/src/main/java/marquez/db/ColumnLineageDao.java new file mode 100644 index 0000000000..3bc9410a2d --- /dev/null +++ b/api/src/main/java/marquez/db/ColumnLineageDao.java @@ -0,0 +1,91 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db; + +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import marquez.db.mappers.ColumnLineageRowMapper; +import marquez.db.models.ColumnLineageRow; +import org.apache.commons.lang3.tuple.Pair; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.customizer.BindBeanList; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import org.jdbi.v3.sqlobject.statement.SqlUpdate; + +@RegisterRowMapper(ColumnLineageRowMapper.class) +public interface ColumnLineageDao extends BaseDao { + + default List upsertColumnLineageRow( + UUID outputDatasetVersionUuid, + UUID outputDatasetFieldUuid, + List> inputs, + String transformationDescription, + String transformationType, + Instant now) { + + if (inputs.isEmpty()) { + return Collections.emptyList(); + } + + doUpsertColumnLineageRow( + inputs.stream() + .map( + input -> + new ColumnLineageRow( + outputDatasetVersionUuid, + outputDatasetFieldUuid, + input.getLeft(), // input_dataset_version_uuid + input.getRight(), // input_dataset_field_uuid + transformationDescription, + transformationType, + now, + now)) + .collect(Collectors.toList())); + return findColumnLineageByDatasetVersionColumnAndOutputDatasetField( + outputDatasetVersionUuid, outputDatasetFieldUuid); + } + + @SqlQuery( + "SELECT * FROM column_lineage WHERE output_dataset_version_uuid = :datasetVersionUuid AND output_dataset_field_uuid = :outputDatasetFieldUuid") + List findColumnLineageByDatasetVersionColumnAndOutputDatasetField( + UUID datasetVersionUuid, UUID outputDatasetFieldUuid); + + @SqlUpdate( + """ + INSERT INTO column_lineage ( + output_dataset_version_uuid, + output_dataset_field_uuid, + input_dataset_version_uuid, + input_dataset_field_uuid, + transformation_description, + transformation_type, + created_at, + updated_at + ) VALUES + ON CONFLICT (output_dataset_version_uuid, output_dataset_field_uuid, input_dataset_version_uuid, input_dataset_field_uuid) + DO UPDATE SET + transformation_description = EXCLUDED.transformation_description, + transformation_type = EXCLUDED.transformation_type, + updated_at = EXCLUDED.updated_at + """) + void doUpsertColumnLineageRow( + @BindBeanList( + propertyNames = { + "outputDatasetVersionUuid", + "outputDatasetFieldUuid", + "inputDatasetVersionUuid", + "inputDatasetFieldUuid", + "transformationDescription", + "transformationType", + "createdAt", + "updatedAt" + }, + value = "values") + List rows); +} diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index f1642c1295..b01f9f33d0 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -129,6 +129,15 @@ private Columns() {} public static final String RUN_UUID = "run_uuid"; public static final String STATE = "state"; + /* COLUMN LEVEL LINEAGE ROW COLUMNS */ + public static final String FIELD_NAME = "field_name"; + public static final String OUTPUT_DATASET_VERSION_UUID = "output_dataset_version_uuid"; + public static final String OUTPUT_DATASET_FIELD_UUID = "output_dataset_field_uuid"; + public static final String INPUT_DATASET_FIELD_UUID = "input_dataset_field_uuid"; + public static final String INPUT_DATASET_VERSION_UUID = "input_dataset_version_uuid"; + public static final String TRANSFORMATION_DESCRIPTION = "transformation_description"; + public static final String TRANSFORMATION_TYPE = "transformation_type"; + /* LINEAGE EVENT ROW COLUMNS */ public static final String EVENT = "event"; diff --git a/api/src/main/java/marquez/db/DatasetFieldDao.java b/api/src/main/java/marquez/db/DatasetFieldDao.java index bc5fc86bfd..99a2630016 100644 --- a/api/src/main/java/marquez/db/DatasetFieldDao.java +++ b/api/src/main/java/marquez/db/DatasetFieldDao.java @@ -16,8 +16,10 @@ import marquez.common.models.TagName; import marquez.db.mappers.DatasetFieldMapper; import marquez.db.mappers.DatasetFieldRowMapper; +import marquez.db.mappers.FieldDataMapper; import marquez.db.models.DatasetFieldRow; import marquez.db.models.DatasetRow; +import marquez.db.models.InputFieldData; import marquez.db.models.TagRow; import marquez.service.models.Dataset; import marquez.service.models.DatasetVersion; @@ -29,6 +31,7 @@ @RegisterRowMapper(DatasetFieldRowMapper.class) @RegisterRowMapper(DatasetFieldMapper.class) +@RegisterRowMapper(FieldDataMapper.class) public interface DatasetFieldDao extends BaseDao { @SqlQuery( "SELECT EXISTS (" @@ -101,6 +104,24 @@ default Dataset updateTags( + "WHERE fm.dataset_version_uuid = :datasetVersionUuid") List find(UUID datasetVersionUuid); + @SqlQuery( + """ + SELECT + datasets_view.namespace_name as namespace_name, + datasets_view.name as dataset_name, + dataset_fields.name as field_name, + datasets_view.uuid as dataset_uuid, + dataset_versions.uuid as dataset_version_uuid, + dataset_fields.uuid as dataset_field_uuid + FROM dataset_fields + JOIN dataset_versions_field_mapping fm ON fm.dataset_field_uuid = dataset_fields.uuid + JOIN dataset_versions ON dataset_versions.uuid = fm.dataset_version_uuid + JOIN datasets_view ON datasets_view.uuid = dataset_versions.dataset_uuid + JOIN runs_input_mapping ON runs_input_mapping.dataset_version_uuid = dataset_versions.uuid + WHERE runs_input_mapping.run_uuid = :runUuid + """) + List findInputFieldsDataAssociatedWithRun(UUID runUuid); + @SqlQuery( "INSERT INTO dataset_fields (" + "uuid, " diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index c05fe71146..4e94ea1748 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -13,6 +13,7 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -20,6 +21,8 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; import marquez.common.Utils; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; @@ -31,10 +34,12 @@ import marquez.db.DatasetFieldDao.DatasetFieldMapping; import marquez.db.JobVersionDao.BagOfJobVersionInfo; import marquez.db.mappers.LineageEventMapper; +import marquez.db.models.ColumnLineageRow; import marquez.db.models.DatasetFieldRow; import marquez.db.models.DatasetRow; import marquez.db.models.DatasetSymlinkRow; import marquez.db.models.DatasetVersionRow; +import marquez.db.models.InputFieldData; import marquez.db.models.JobContextRow; import marquez.db.models.JobRow; import marquez.db.models.NamespaceRow; @@ -56,6 +61,7 @@ import marquez.service.models.LineageEvent.RunFacet; import marquez.service.models.LineageEvent.SchemaDatasetFacet; import marquez.service.models.LineageEvent.SchemaField; +import org.apache.commons.lang3.tuple.Pair; import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.jdbi.v3.sqlobject.statement.SqlUpdate; @@ -131,6 +137,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper RunDao runDao = createRunDao(); RunArgsDao runArgsDao = createRunArgsDao(); RunStateDao runStateDao = createRunStateDao(); + ColumnLineageDao columnLineageDao = createColumnLineageDao(); Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); @@ -323,7 +330,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper datasetDao, datasetVersionDao, datasetFieldDao, - runDao); + runDao, + columnLineageDao); datasetInputs.add(record); } } @@ -345,7 +353,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper datasetDao, datasetVersionDao, datasetFieldDao, - runDao); + runDao, + columnLineageDao); datasetOutputs.add(record); } } @@ -541,7 +550,8 @@ default DatasetRecord upsertLineageDataset( DatasetDao datasetDao, DatasetVersionDao datasetVersionDao, DatasetFieldDao datasetFieldDao, - RunDao runDao) { + RunDao runDao, + ColumnLineageDao columnLineageDao) { NamespaceRow dsNamespace = namespaceDao.upsertNamespaceRow( UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER); @@ -662,6 +672,7 @@ default DatasetRecord upsertLineageDataset( return row; }); List datasetFieldMappings = new ArrayList<>(); + List datasetFields = new ArrayList<>(); if (fields != null) { for (SchemaField field : fields) { DatasetFieldRow datasetFieldRow = @@ -672,6 +683,7 @@ default DatasetRecord upsertLineageDataset( field.getType(), field.getDescription(), datasetRow.getUuid()); + datasetFields.add(datasetFieldRow); datasetFieldMappings.add( new DatasetFieldMapping(datasetVersionRow.getUuid(), datasetFieldRow.getUuid())); } @@ -690,7 +702,85 @@ default DatasetRecord upsertLineageDataset( } } - return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace); + List columnLineageRows = Collections.emptyList(); + if (!isInput) { + columnLineageRows = + upsertColumnLineage( + runUuid, + ds, + now, + datasetFields, + columnLineageDao, + datasetFieldDao, + datasetVersionRow); + } + + return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace, columnLineageRows); + } + + private List upsertColumnLineage( + UUID runUuid, + Dataset ds, + Instant now, + List datasetFields, + ColumnLineageDao columnLineageDao, + DatasetFieldDao datasetFieldDao, + DatasetVersionRow datasetVersionRow) { + // get all the fields related to this particular run + List runFields = datasetFieldDao.findInputFieldsDataAssociatedWithRun(runUuid); + + return Optional.ofNullable(ds.getFacets()) + .map(DatasetFacets::getColumnLineage) + .map(LineageEvent.ColumnLineageFacet::getOutputColumnsList) + .stream() + .flatMap(list -> list.stream()) + .flatMap( + outputColumn -> { + Optional outputField = + datasetFields.stream() + .filter(dfr -> dfr.getName().equals(outputColumn.getName())) + .findAny(); + + if (outputField.isEmpty()) { + Logger log = LoggerFactory.getLogger(OpenLineageDao.class); + log.error( + "Cannot produce column lineage for missing output field in output dataset: {}", + outputColumn.getName()); + return Stream.empty(); + } + + // get field uuids of input columns related to this run + List> inputFields = + runFields.stream() + .filter( + fieldData -> + outputColumn.getInputFields().stream() + .filter( + of -> + of.getDatasetNamespace().equals(fieldData.getNamespace()) + && of.getDatasetName() + .equals(fieldData.getDatasetName()) + && of.getFieldName().equals(fieldData.getField())) + .findAny() + .isPresent()) + .map( + fieldData -> + Pair.of( + fieldData.getDatasetVersionUuid(), + fieldData.getDatasetFieldUuid())) + .collect(Collectors.toList()); + + return columnLineageDao + .upsertColumnLineageRow( + datasetVersionRow.getUuid(), + outputField.get().getUuid(), + inputFields, + outputColumn.getTransformationDescription(), + outputColumn.getTransformationType(), + now) + .stream(); + }) + .collect(Collectors.toList()); } default String formatDatasetName(String name) { diff --git a/api/src/main/java/marquez/db/mappers/ColumnLineageRowMapper.java b/api/src/main/java/marquez/db/mappers/ColumnLineageRowMapper.java new file mode 100644 index 0000000000..6b6c3de099 --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/ColumnLineageRowMapper.java @@ -0,0 +1,37 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.mappers; + +import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION; +import static marquez.db.Columns.TRANSFORMATION_TYPE; +import static marquez.db.Columns.stringOrThrow; +import static marquez.db.Columns.timestampOrThrow; +import static marquez.db.Columns.uuidOrThrow; + +import java.sql.ResultSet; +import java.sql.SQLException; +import lombok.NonNull; +import marquez.db.Columns; +import marquez.db.models.ColumnLineageRow; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +public class ColumnLineageRowMapper implements RowMapper { + + @Override + public ColumnLineageRow map(@NonNull ResultSet results, @NonNull StatementContext context) + throws SQLException { + return new ColumnLineageRow( + uuidOrThrow(results, Columns.OUTPUT_DATASET_VERSION_UUID), + uuidOrThrow(results, Columns.OUTPUT_DATASET_FIELD_UUID), + uuidOrThrow(results, Columns.INPUT_DATASET_VERSION_UUID), + uuidOrThrow(results, Columns.INPUT_DATASET_FIELD_UUID), + stringOrThrow(results, TRANSFORMATION_DESCRIPTION), + stringOrThrow(results, TRANSFORMATION_TYPE), + timestampOrThrow(results, Columns.CREATED_AT), + timestampOrThrow(results, Columns.UPDATED_AT)); + } +} diff --git a/api/src/main/java/marquez/db/mappers/FieldDataMapper.java b/api/src/main/java/marquez/db/mappers/FieldDataMapper.java new file mode 100644 index 0000000000..4093d0c90c --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/FieldDataMapper.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.mappers; + +import static marquez.db.Columns.stringOrNull; +import static marquez.db.Columns.uuidOrThrow; + +import java.sql.ResultSet; +import java.sql.SQLException; +import lombok.NonNull; +import marquez.db.Columns; +import marquez.db.models.InputFieldData; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +public final class FieldDataMapper implements RowMapper { + @Override + public InputFieldData map(@NonNull ResultSet results, @NonNull StatementContext context) + throws SQLException { + return new InputFieldData( + stringOrNull(results, Columns.NAMESPACE_NAME), + stringOrNull(results, Columns.DATASET_NAME), + stringOrNull(results, Columns.FIELD_NAME), + uuidOrThrow(results, Columns.DATASET_UUID), + uuidOrThrow(results, Columns.DATASET_FIELD_UUID), + uuidOrThrow(results, Columns.DATASET_VERSION_UUID)); + } +} diff --git a/api/src/main/java/marquez/db/models/ColumnLineageRow.java b/api/src/main/java/marquez/db/models/ColumnLineageRow.java new file mode 100644 index 0000000000..8ccd47c580 --- /dev/null +++ b/api/src/main/java/marquez/db/models/ColumnLineageRow.java @@ -0,0 +1,28 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.models; + +import java.time.Instant; +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +@AllArgsConstructor +@EqualsAndHashCode +@ToString +public class ColumnLineageRow { + @Getter @NonNull private final UUID outputDatasetVersionUuid; + @Getter @NonNull private final UUID outputDatasetFieldUuid; + @Getter @NonNull private final UUID inputDatasetVersionUuid; + @Getter @NonNull private final UUID inputDatasetFieldUuid; + @Getter @NonNull private final String transformationDescription; + @Getter @NonNull private final String transformationType; + @Getter @NonNull private final Instant createdAt; + @Getter @NonNull private Instant updatedAt; +} diff --git a/api/src/main/java/marquez/db/models/InputFieldData.java b/api/src/main/java/marquez/db/models/InputFieldData.java new file mode 100644 index 0000000000..cfc9175644 --- /dev/null +++ b/api/src/main/java/marquez/db/models/InputFieldData.java @@ -0,0 +1,22 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.models; + +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; + +@Getter +@AllArgsConstructor +public class InputFieldData { + @NonNull String namespace; + @NonNull String datasetName; + @NonNull String field; + @NonNull UUID datasetUuid; + @NonNull UUID datasetFieldUuid; + @NonNull UUID datasetVersionUuid; +} diff --git a/api/src/main/java/marquez/db/models/UpdateLineageRow.java b/api/src/main/java/marquez/db/models/UpdateLineageRow.java index 694fbf5277..755fe93b3a 100644 --- a/api/src/main/java/marquez/db/models/UpdateLineageRow.java +++ b/api/src/main/java/marquez/db/models/UpdateLineageRow.java @@ -30,5 +30,6 @@ public static class DatasetRecord { DatasetRow datasetRow; DatasetVersionRow datasetVersionRow; NamespaceRow namespaceRow; + List columnLineageRows; } } diff --git a/api/src/main/java/marquez/service/models/LineageEvent.java b/api/src/main/java/marquez/service/models/LineageEvent.java index 5065ad7ffa..8202028944 100644 --- a/api/src/main/java/marquez/service/models/LineageEvent.java +++ b/api/src/main/java/marquez/service/models/LineageEvent.java @@ -323,6 +323,7 @@ public static class Dataset extends BaseJsonModel { "dataSource", "description", "lifecycleStateChange", + "columnLineage", "symlinks" }) public static class DatasetFacets { @@ -331,6 +332,7 @@ public static class DatasetFacets { @Valid private SchemaDatasetFacet schema; @Valid private LifecycleStateChangeFacet lifecycleStateChange; @Valid private DatasourceDatasetFacet dataSource; + @Valid private ColumnLineageFacet columnLineage; @Valid private DatasetSymlinkFacet symlinks; private String description; @Builder.Default @JsonIgnore private Map additional = new LinkedHashMap<>(); @@ -365,6 +367,10 @@ public DatasourceDatasetFacet getDataSource() { return dataSource; } + public ColumnLineageFacet getColumnLineage() { + return columnLineage; + } + public String getDescription() { return description; } @@ -483,4 +489,52 @@ public LifecycleStateChangeFacet( this.lifecycleStateChange = lifecycleStateChange; } } + + @NoArgsConstructor + @Getter + @Setter + @Valid + @ToString + public static class ColumnLineageFacet extends BaseFacet { + + private List outputColumnsList; + + @Builder + public ColumnLineageFacet( + @NotNull URI _producer, + @NotNull URI _schemaURL, + List outputColumnsList) { + super(_producer, _schemaURL); + this.outputColumnsList = outputColumnsList; + } + } + + @Builder + @AllArgsConstructor + @NoArgsConstructor + @Setter + @Getter + @Valid + @ToString + public static class ColumnLineageOutputColumn extends BaseJsonModel { + + @NotNull private String name; + @NotNull private List inputFields; + @NotNull private String transformationDescription; + @NotNull private String transformationType; + } + + @Builder + @AllArgsConstructor + @NoArgsConstructor + @Setter + @Getter + @Valid + @ToString + public static class ColumnLineageInputField extends BaseJsonModel { + + @NotNull private String datasetNamespace; + @NotNull private String datasetName; + @NotNull private String fieldName; + } } diff --git a/api/src/main/resources/marquez/db/migration/V49__column_lineage.sql b/api/src/main/resources/marquez/db/migration/V49__column_lineage.sql new file mode 100644 index 0000000000..bced4eff50 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V49__column_lineage.sql @@ -0,0 +1,13 @@ +/* SPDX-License-Identifier: Apache-2.0 */ + +CREATE TABLE column_lineage ( + output_dataset_version_uuid uuid REFERENCES dataset_versions(uuid), -- allows join to run_id + output_dataset_field_uuid uuid REFERENCES dataset_fields(uuid), + input_dataset_version_uuid uuid REFERENCES dataset_versions(uuid), -- speed up graph column lineage graph traversal + input_dataset_field_uuid uuid REFERENCES dataset_fields(uuid), + transformation_description VARCHAR(255) NOT NULL, + transformation_type VARCHAR(255) NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + UNIQUE (output_dataset_version_uuid, output_dataset_field_uuid, input_dataset_version_uuid, input_dataset_field_uuid) +); \ No newline at end of file diff --git a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java new file mode 100644 index 0000000000..6a8e7e2a1c --- /dev/null +++ b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java @@ -0,0 +1,222 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db; + +import static marquez.db.OpenLineageDao.DEFAULT_NAMESPACE_OWNER; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import marquez.common.models.DatasetType; +import marquez.db.models.ColumnLineageRow; +import marquez.db.models.DatasetRow; +import marquez.db.models.DatasetVersionRow; +import marquez.db.models.NamespaceRow; +import marquez.db.models.SourceRow; +import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import org.apache.commons.lang3.tuple.Pair; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(MarquezJdbiExternalPostgresExtension.class) +public class ColumnLineageDaoTest { + + private static ColumnLineageDao dao; + private static DatasetFieldDao fieldDao; + private static DatasetDao datasetDao; + private static NamespaceDao namespaceDao; + private static SourceDao sourceDao; + private static DatasetVersionDao datasetVersionDao; + + private UUID outputDatasetFieldUuid = UUID.randomUUID(); + private String transformationDescription = "some-description"; + private String transformationType = "some-type"; + private Instant now = Instant.now(); + private DatasetRow inputDatasetRow; + private DatasetRow outputDatasetRow; + private DatasetVersionRow inputDatasetVersionRow; + private DatasetVersionRow outputDatasetVersionRow; + + @BeforeAll + public static void setUpOnce(Jdbi jdbi) { + dao = jdbi.onDemand(ColumnLineageDao.class); + fieldDao = jdbi.onDemand(DatasetFieldDao.class); + datasetDao = jdbi.onDemand(DatasetDao.class); + namespaceDao = jdbi.onDemand(NamespaceDao.class); + sourceDao = jdbi.onDemand(SourceDao.class); + datasetVersionDao = jdbi.onDemand(DatasetVersionDao.class); + } + + @BeforeEach + public void setup() { + // setup some dataset + NamespaceRow namespaceRow = + namespaceDao.upsertNamespaceRow(UUID.randomUUID(), now, "", DEFAULT_NAMESPACE_OWNER); + SourceRow sourceRow = sourceDao.upsertOrDefault(UUID.randomUUID(), "", now, "", ""); + inputDatasetRow = + datasetDao.upsert( + UUID.randomUUID(), + DatasetType.DB_TABLE, + now, + namespaceRow.getUuid(), + "", + sourceRow.getUuid(), + "", + "inputDataset", + "", + "", + false); + outputDatasetRow = + datasetDao.upsert( + UUID.randomUUID(), + DatasetType.DB_TABLE, + now, + namespaceRow.getUuid(), + "", + sourceRow.getUuid(), + "", + "outputDataset", + "", + "", + false); + + inputDatasetVersionRow = + datasetVersionDao.upsert( + UUID.randomUUID(), + now, + inputDatasetRow.getUuid(), + UUID.randomUUID(), + UUID.randomUUID(), + null, + "", + "", + ""); + outputDatasetVersionRow = + datasetVersionDao.upsert( + UUID.randomUUID(), + now, + outputDatasetRow.getUuid(), + UUID.randomUUID(), + UUID.randomUUID(), + null, + "", + "", + ""); + + inputDatasetVersionRow = + datasetVersionDao.upsert( + UUID.randomUUID(), + now, + inputDatasetRow.getUuid(), + UUID.randomUUID(), + UUID.randomUUID(), + null, + "", + "", + ""); + + // insert output dataset field + fieldDao.upsert( + outputDatasetFieldUuid, now, "output-field", "string", "desc", outputDatasetRow.getUuid()); + } + + @AfterEach + public void tearDown(Jdbi jdbi) { + jdbi.inTransaction( + handle -> { + handle.execute("DELETE FROM column_lineage"); + handle.execute("DELETE FROM dataset_versions"); + handle.execute("DELETE FROM dataset_fields"); + handle.execute("DELETE FROM datasets"); + handle.execute("DELETE FROM sources"); + handle.execute("DELETE FROM namespaces"); + return null; + }); + } + + @Test + void testUpsertMultipleColumns() { + UUID inputFieldUuid1 = UUID.randomUUID(); + UUID inputFieldUuid2 = UUID.randomUUID(); + + // insert input dataset fields + fieldDao.upsert(inputFieldUuid1, now, "a", "string", "desc", inputDatasetRow.getUuid()); + fieldDao.upsert(inputFieldUuid2, now, "b", "string", "desc", inputDatasetRow.getUuid()); + + List rows = + dao.upsertColumnLineageRow( + outputDatasetVersionRow.getUuid(), + outputDatasetFieldUuid, + Arrays.asList( + Pair.of(inputDatasetVersionRow.getUuid(), inputFieldUuid1), + Pair.of(inputDatasetVersionRow.getUuid(), inputFieldUuid2)), + transformationDescription, + transformationType, + now); + + assertEquals(2, rows.size()); + assertEquals(inputDatasetVersionRow.getUuid(), rows.get(0).getInputDatasetVersionUuid()); + assertEquals(outputDatasetVersionRow.getUuid(), rows.get(0).getOutputDatasetVersionUuid()); + assertEquals(outputDatasetFieldUuid, rows.get(0).getOutputDatasetFieldUuid()); + assertTrue( + Arrays.asList(inputFieldUuid1, inputFieldUuid2) + .contains(rows.get(0).getInputDatasetFieldUuid())); // ordering may differ per run + assertEquals(transformationDescription, rows.get(0).getTransformationDescription()); + assertEquals(transformationType, rows.get(0).getTransformationType()); + assertEquals(now.getEpochSecond(), rows.get(0).getCreatedAt().getEpochSecond()); + assertEquals(now.getEpochSecond(), rows.get(0).getUpdatedAt().getEpochSecond()); + } + + @Test + void testUpsertEmptyList() { + List rows = + dao.upsertColumnLineageRow( + UUID.randomUUID(), + outputDatasetFieldUuid, + Collections.emptyList(), // provide empty list + transformationDescription, + transformationType, + now); + + assertEquals(0, rows.size()); + } + + @Test + void testUpsertOnUpdatePreventsDuplicates() { + // insert input dataset fields + UUID inputFieldUuid = UUID.randomUUID(); + fieldDao.upsert(inputFieldUuid, now, "a", "string", "desc", inputDatasetRow.getUuid()); + + dao.upsertColumnLineageRow( + inputDatasetVersionRow.getUuid(), + outputDatasetFieldUuid, + Arrays.asList(Pair.of(inputDatasetVersionRow.getUuid(), inputFieldUuid)), + transformationDescription, + transformationType, + now); + List rows = + dao.upsertColumnLineageRow( + inputDatasetVersionRow.getUuid(), + outputDatasetFieldUuid, + Arrays.asList(Pair.of(inputDatasetVersionRow.getUuid(), inputFieldUuid)), + transformationDescription, + transformationType, + now.plusSeconds(1000)); + + // make sure there is one row with updatedAt modified + assertEquals(1, rows.size()); + assertEquals( + now.plusSeconds(1000).getEpochSecond(), rows.get(0).getUpdatedAt().getEpochSecond()); + } +} diff --git a/api/src/test/java/marquez/db/OpenLineageDaoTest.java b/api/src/test/java/marquez/db/OpenLineageDaoTest.java index 46c4254f1e..e6993e7656 100644 --- a/api/src/test/java/marquez/db/OpenLineageDaoTest.java +++ b/api/src/test/java/marquez/db/OpenLineageDaoTest.java @@ -8,10 +8,12 @@ import static marquez.db.LineageTestUtils.PRODUCER_URL; import static marquez.db.LineageTestUtils.SCHEMA_URL; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.UUID; import marquez.db.models.UpdateLineageRow; import marquez.db.models.UpdateLineageRow.DatasetRecord; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; @@ -22,6 +24,7 @@ import marquez.service.models.LineageEvent.SchemaDatasetFacet; import marquez.service.models.LineageEvent.SchemaField; import org.assertj.core.api.InstanceOfAssertFactories; +import org.assertj.core.groups.Tuple; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -34,9 +37,17 @@ class OpenLineageDaoTest { public static final String READ_JOB_NAME = "readJobName"; public static final String DATASET_NAME = "theDataset"; + public static final String OUTPUT_COLUMN = "output_column"; + public static final String INPUT_NAMESPACE = "input_namespace"; + public static final String INPUT_DATASET = "input_dataset"; + public static final String INPUT_FIELD_NAME = "input_field_name"; + public static final String TRANSFORMATION_TYPE = "transformation_type"; + public static final String TRANSFORMATION_DESCRIPTION = "transformation_description"; + private static OpenLineageDao dao; private static DatasetSymlinkDao symlinkDao; private static NamespaceDao namespaceDao; + private static DatasetFieldDao datasetFieldDao; private final DatasetFacets datasetFacets = LineageTestUtils.newDatasetFacet( new SchemaField("name", "STRING", "my name"), new SchemaField("age", "INT", "my age")); @@ -46,6 +57,7 @@ public static void setUpOnce(Jdbi jdbi) { dao = jdbi.onDemand(OpenLineageDao.class); symlinkDao = jdbi.onDemand(DatasetSymlinkDao.class); namespaceDao = jdbi.onDemand(NamespaceDao.class); + datasetFieldDao = jdbi.onDemand(DatasetFieldDao.class); } /** When reading a dataset, the version is assumed to be the version last written */ @@ -99,6 +111,180 @@ void testUpdateMarquezModelLifecycleStateChangeFacet() { .isEqualTo("TRUNCATE"); } + @Test + void testUpdateMarquezModelDatasetWithColumnLineageFacet() { + JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + dao, + WRITE_JOB_NAME, + "COMPLETE", + jobFacet, + Arrays.asList(getInputDataset()), + Arrays.asList(getOutputDatasetWithColumnLineage())); + + UUID inputDatasetVersion = writeJob.getInputs().get().get(0).getDatasetVersionRow().getUuid(); + UUID outputDatasetVersion = writeJob.getOutputs().get().get(0).getDatasetVersionRow().getUuid(); + + assertThat( + writeJob.getOutputs().get().stream().toList().stream() + .findAny() + .orElseThrow() + .getColumnLineageRows()) + .size() + .isEqualTo(1); + + assertThat( + writeJob.getOutputs().get().stream().toList().stream() + .findAny() + .orElseThrow() + .getColumnLineageRows()) + .extracting( + (ds) -> ds.getInputDatasetFieldUuid(), + (ds) -> ds.getInputDatasetVersionUuid(), + (ds) -> ds.getOutputDatasetFieldUuid(), + (ds) -> ds.getOutputDatasetVersionUuid(), + (ds) -> ds.getTransformationDescription(), + (ds) -> ds.getTransformationType()) + .containsExactly( + Tuple.tuple( + datasetFieldDao + .findUuid( + writeJob.getInputs().get().get(0).getDatasetRow().getUuid(), + INPUT_FIELD_NAME) + .get(), + inputDatasetVersion, + datasetFieldDao + .findUuid( + writeJob.getOutputs().get().get(0).getDatasetRow().getUuid(), OUTPUT_COLUMN) + .get(), + outputDatasetVersion, + TRANSFORMATION_DESCRIPTION, + TRANSFORMATION_TYPE)); + } + + @Test + void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenInputFieldDoesNotExist() { + JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + dao, + WRITE_JOB_NAME, + "COMPLETE", + jobFacet, + Collections.emptyList(), + Arrays.asList(getOutputDatasetWithColumnLineage())); + + // make sure no column lineage was written + assertEquals(0, writeJob.getOutputs().get().get(0).getColumnLineageRows().size()); + } + + @Test + void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenOutputFieldDoesNotExist() { + Dataset outputDatasetWithoutOutputFieldSchema = + new Dataset( + LineageTestUtils.NAMESPACE, + DATASET_NAME, + LineageEvent.DatasetFacets.builder() // schema is missing + .columnLineage( + new LineageEvent.ColumnLineageFacet( + PRODUCER_URL, + SCHEMA_URL, + Collections.singletonList( + new LineageEvent.ColumnLineageOutputColumn( + OUTPUT_COLUMN, + Collections.singletonList( + new LineageEvent.ColumnLineageInputField( + INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)), + TRANSFORMATION_DESCRIPTION, + TRANSFORMATION_TYPE)))) + .build()); + + JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + dao, + WRITE_JOB_NAME, + "COMPLETE", + jobFacet, + Arrays.asList(getInputDataset()), + Arrays.asList(outputDatasetWithoutOutputFieldSchema)); + + // make sure no column lineage was written + assertEquals(0, writeJob.getOutputs().get().get(0).getColumnLineageRows().size()); + } + + @Test + /** + * When trying to insert new column level lineage data, do not create additional row if triad + * (dataset_version_uuid, output_column_name and input_field) is the same. Upsert instead. + */ + void testUpsertColumnLineageData() { + final String UPDATED_TRANSFORMATION_TYPE = "transformation_type"; + final String UPDATED_TRANSFORMATION_DESCRIPTION = "updated_transformation_description"; + + Dataset inputDataset = getInputDataset(); + Dataset dataset = getOutputDatasetWithColumnLineage(); + + Dataset updateDataset = + new Dataset( + LineageTestUtils.NAMESPACE, + DATASET_NAME, + LineageEvent.DatasetFacets.builder() + .schema( + new SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new SchemaField(OUTPUT_COLUMN, "STRING", "my name")))) + .columnLineage( + new LineageEvent.ColumnLineageFacet( + PRODUCER_URL, + SCHEMA_URL, + Collections.singletonList( + new LineageEvent.ColumnLineageOutputColumn( + OUTPUT_COLUMN, + Collections.singletonList( + new LineageEvent.ColumnLineageInputField( + INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)), + UPDATED_TRANSFORMATION_DESCRIPTION, + UPDATED_TRANSFORMATION_TYPE)))) + .build()); + + JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + UpdateLineageRow writeJob1 = + LineageTestUtils.createLineageRow( + dao, + WRITE_JOB_NAME, + "COMPLETE", + jobFacet, + Arrays.asList(inputDataset), + Arrays.asList(dataset)); + + UpdateLineageRow writeJob2 = + LineageTestUtils.createLineageRow( + dao, + WRITE_JOB_NAME, + "COMPLETE", + jobFacet, + Arrays.asList(inputDataset), + Arrays.asList(updateDataset)); + + // try to read with same inputs as writeJob1 and check if size=1 + UpdateLineageRow readJob2 = + LineageTestUtils.createLineageRow( + dao, WRITE_JOB_NAME, "COMPLETE", jobFacet, Arrays.asList(dataset), Arrays.asList()); + + // only 1 row should be present (no multiple Optional candidates) + assertThat(readJob2.getInputs()).isPresent().get().asList().size().isEqualTo(1); + + // finally, test if upsert was successful + assertThat(readJob2.getInputs().get().get(0).getDatasetVersionRow()) + .isNotEqualTo(writeJob1.getOutputs().get().get(0).getDatasetVersionRow()); + + assertThat(readJob2.getInputs().get().get(0).getDatasetVersionRow()) + .isEqualTo(writeJob2.getOutputs().get().get(0).getDatasetVersionRow()); + } + @Test void testUpdateMarquezModelDatasetWithSymlinks() { Dataset dataset = @@ -201,6 +387,7 @@ void testUpdateMarquezModelWithNonMatchingReadSchema() { new SchemaField("eyeColor", "STRING", "my eye color"))), this.datasetFacets.getLifecycleStateChange(), this.datasetFacets.getDataSource(), + this.datasetFacets.getColumnLineage(), null, this.datasetFacets.getDescription(), this.datasetFacets.getAdditionalFacets()); @@ -318,4 +505,42 @@ void testGetOpenLineageEvents() { .extracting("namespace", "name") .contains(LineageTestUtils.NAMESPACE, WRITE_JOB_NAME); } + + private Dataset getInputDataset() { + return new Dataset( + INPUT_NAMESPACE, + INPUT_DATASET, + LineageEvent.DatasetFacets.builder() + .schema( + new SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new SchemaField(INPUT_FIELD_NAME, "STRING", "my name")))) + .build()); + } + + private Dataset getOutputDatasetWithColumnLineage() { + return new Dataset( + LineageTestUtils.NAMESPACE, + DATASET_NAME, + LineageEvent.DatasetFacets.builder() + .schema( + new SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new SchemaField(OUTPUT_COLUMN, "STRING", "my name")))) + .columnLineage( + new LineageEvent.ColumnLineageFacet( + PRODUCER_URL, + SCHEMA_URL, + Collections.singletonList( + new LineageEvent.ColumnLineageOutputColumn( + OUTPUT_COLUMN, + Collections.singletonList( + new LineageEvent.ColumnLineageInputField( + INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)), + TRANSFORMATION_DESCRIPTION, + TRANSFORMATION_TYPE)))) + .build()); + } }