Skip to content

Commit

Permalink
Model and store column lineage in Marquez DB (#2096)
Browse files Browse the repository at this point in the history
* Create database representation, model classes

Signed-off-by: mzareba <mzareba382@gmail.com>

* Implement ColumnLevelLineageDao

Signed-off-by: mzareba <mzareba382@gmail.com>

* Instantiate ColumnLevelLineageDao in updateBaseMarquezModel

Signed-off-by: mzareba <mzareba382@gmail.com>

* Upsert ColumnLevelLineageRow to db, model representation in LineageEvent

Signed-off-by: mzareba <mzareba382@gmail.com>

* Fix problems in OpenLineageDao, add a list of ColumnLevelLineageRow to DatasetRecord, write test for createLineageRow() invocation

Signed-off-by: mzareba <mzareba382@gmail.com>

* Change wildcard imports to single class imports

Signed-off-by: mzareba <mzareba382@gmail.com>

* Change wildcard imports to single class imports

Signed-off-by: mzareba <mzareba382@gmail.com>

* Change wildcard imports to single class imports

Signed-off-by: mzareba <mzareba382@gmail.com>

* Apply spotless

Signed-off-by: mzareba <mzareba382@gmail.com>

* Check for ds.getFacets not null

Signed-off-by: mzareba <mzareba382@gmail.com>

* Format fix

Signed-off-by: mzareba <mzareba382@gmail.com>

* Update testUpdateMarquezModelDatasetWithColumnLineageFacet

Signed-off-by: mzareba <mzareba382@gmail.com>

* Test for column_level_lineage upsert.

Signed-off-by: mzareba <mzareba382@gmail.com>

* Apply spotless

Signed-off-by: mzareba <mzareba382@gmail.com>

* switch to data field references

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* fix broken tests

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* test when dataset_field is missing

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* add input_dataset_version_uuid field

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* increase db file version

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* increase db file version

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* rename ColumnLevelLineage -> ColumnLineage

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

Signed-off-by: mzareba <mzareba382@gmail.com>
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
Co-authored-by: Mariusz Zaręba <mzareba382@getindata.com>
Co-authored-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
3 people committed Sep 30, 2022
1 parent 2909864 commit b6544ec
Show file tree
Hide file tree
Showing 15 changed files with 856 additions and 5 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# Changelog

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.26.0...HEAD)

### Added
* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

## [0.26.0](https://github.com/MarquezProject/marquez/compare/0.25.0...0.26.0) - 2022-09-15

Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,7 @@ public interface BaseDao extends SqlObject {

@CreateSqlObject
OpenLineageDao createOpenLineageDao();

@CreateSqlObject
ColumnLineageDao createColumnLineageDao();
}
91 changes: 91 additions & 0 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import marquez.db.mappers.ColumnLineageRowMapper;
import marquez.db.models.ColumnLineageRow;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindBeanList;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(ColumnLineageRowMapper.class)
public interface ColumnLineageDao extends BaseDao {

default List<ColumnLineageRow> upsertColumnLineageRow(
UUID outputDatasetVersionUuid,
UUID outputDatasetFieldUuid,
List<Pair<UUID, UUID>> inputs,
String transformationDescription,
String transformationType,
Instant now) {

if (inputs.isEmpty()) {
return Collections.emptyList();
}

doUpsertColumnLineageRow(
inputs.stream()
.map(
input ->
new ColumnLineageRow(
outputDatasetVersionUuid,
outputDatasetFieldUuid,
input.getLeft(), // input_dataset_version_uuid
input.getRight(), // input_dataset_field_uuid
transformationDescription,
transformationType,
now,
now))
.collect(Collectors.toList()));
return findColumnLineageByDatasetVersionColumnAndOutputDatasetField(
outputDatasetVersionUuid, outputDatasetFieldUuid);
}

@SqlQuery(
"SELECT * FROM column_lineage WHERE output_dataset_version_uuid = :datasetVersionUuid AND output_dataset_field_uuid = :outputDatasetFieldUuid")
List<ColumnLineageRow> findColumnLineageByDatasetVersionColumnAndOutputDatasetField(
UUID datasetVersionUuid, UUID outputDatasetFieldUuid);

@SqlUpdate(
"""
INSERT INTO column_lineage (
output_dataset_version_uuid,
output_dataset_field_uuid,
input_dataset_version_uuid,
input_dataset_field_uuid,
transformation_description,
transformation_type,
created_at,
updated_at
) VALUES <values>
ON CONFLICT (output_dataset_version_uuid, output_dataset_field_uuid, input_dataset_version_uuid, input_dataset_field_uuid)
DO UPDATE SET
transformation_description = EXCLUDED.transformation_description,
transformation_type = EXCLUDED.transformation_type,
updated_at = EXCLUDED.updated_at
""")
void doUpsertColumnLineageRow(
@BindBeanList(
propertyNames = {
"outputDatasetVersionUuid",
"outputDatasetFieldUuid",
"inputDatasetVersionUuid",
"inputDatasetFieldUuid",
"transformationDescription",
"transformationType",
"createdAt",
"updatedAt"
},
value = "values")
List<ColumnLineageRow> rows);
}
9 changes: 9 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ private Columns() {}
public static final String RUN_UUID = "run_uuid";
public static final String STATE = "state";

/* COLUMN LEVEL LINEAGE ROW COLUMNS */
public static final String FIELD_NAME = "field_name";
public static final String OUTPUT_DATASET_VERSION_UUID = "output_dataset_version_uuid";
public static final String OUTPUT_DATASET_FIELD_UUID = "output_dataset_field_uuid";
public static final String INPUT_DATASET_FIELD_UUID = "input_dataset_field_uuid";
public static final String INPUT_DATASET_VERSION_UUID = "input_dataset_version_uuid";
public static final String TRANSFORMATION_DESCRIPTION = "transformation_description";
public static final String TRANSFORMATION_TYPE = "transformation_type";

/* LINEAGE EVENT ROW COLUMNS */
public static final String EVENT = "event";

Expand Down
21 changes: 21 additions & 0 deletions api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import marquez.common.models.TagName;
import marquez.db.mappers.DatasetFieldMapper;
import marquez.db.mappers.DatasetFieldRowMapper;
import marquez.db.mappers.FieldDataMapper;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.InputFieldData;
import marquez.db.models.TagRow;
import marquez.service.models.Dataset;
import marquez.service.models.DatasetVersion;
Expand All @@ -29,6 +31,7 @@

@RegisterRowMapper(DatasetFieldRowMapper.class)
@RegisterRowMapper(DatasetFieldMapper.class)
@RegisterRowMapper(FieldDataMapper.class)
public interface DatasetFieldDao extends BaseDao {
@SqlQuery(
"SELECT EXISTS ("
Expand Down Expand Up @@ -101,6 +104,24 @@ default Dataset updateTags(
+ "WHERE fm.dataset_version_uuid = :datasetVersionUuid")
List<Field> find(UUID datasetVersionUuid);

@SqlQuery(
"""
SELECT
datasets_view.namespace_name as namespace_name,
datasets_view.name as dataset_name,
dataset_fields.name as field_name,
datasets_view.uuid as dataset_uuid,
dataset_versions.uuid as dataset_version_uuid,
dataset_fields.uuid as dataset_field_uuid
FROM dataset_fields
JOIN dataset_versions_field_mapping fm ON fm.dataset_field_uuid = dataset_fields.uuid
JOIN dataset_versions ON dataset_versions.uuid = fm.dataset_version_uuid
JOIN datasets_view ON datasets_view.uuid = dataset_versions.dataset_uuid
JOIN runs_input_mapping ON runs_input_mapping.dataset_version_uuid = dataset_versions.uuid
WHERE runs_input_mapping.run_uuid = :runUuid
""")
List<InputFieldData> findInputFieldsDataAssociatedWithRun(UUID runUuid);

@SqlQuery(
"INSERT INTO dataset_fields ("
+ "uuid, "
Expand Down
98 changes: 94 additions & 4 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import marquez.common.Utils;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
Expand All @@ -31,10 +34,12 @@
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.mappers.LineageEventMapper;
import marquez.db.models.ColumnLineageRow;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetSymlinkRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.InputFieldData;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
Expand All @@ -56,6 +61,7 @@
import marquez.service.models.LineageEvent.RunFacet;
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
import marquez.service.models.LineageEvent.SchemaField;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
Expand Down Expand Up @@ -131,6 +137,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
RunDao runDao = createRunDao();
RunArgsDao runArgsDao = createRunArgsDao();
RunStateDao runStateDao = createRunStateDao();
ColumnLineageDao columnLineageDao = createColumnLineageDao();

Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();

Expand Down Expand Up @@ -323,7 +330,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
datasetDao,
datasetVersionDao,
datasetFieldDao,
runDao);
runDao,
columnLineageDao);
datasetInputs.add(record);
}
}
Expand All @@ -345,7 +353,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
datasetDao,
datasetVersionDao,
datasetFieldDao,
runDao);
runDao,
columnLineageDao);
datasetOutputs.add(record);
}
}
Expand Down Expand Up @@ -541,7 +550,8 @@ default DatasetRecord upsertLineageDataset(
DatasetDao datasetDao,
DatasetVersionDao datasetVersionDao,
DatasetFieldDao datasetFieldDao,
RunDao runDao) {
RunDao runDao,
ColumnLineageDao columnLineageDao) {
NamespaceRow dsNamespace =
namespaceDao.upsertNamespaceRow(
UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER);
Expand Down Expand Up @@ -662,6 +672,7 @@ default DatasetRecord upsertLineageDataset(
return row;
});
List<DatasetFieldMapping> datasetFieldMappings = new ArrayList<>();
List<DatasetFieldRow> datasetFields = new ArrayList<>();
if (fields != null) {
for (SchemaField field : fields) {
DatasetFieldRow datasetFieldRow =
Expand All @@ -672,6 +683,7 @@ default DatasetRecord upsertLineageDataset(
field.getType(),
field.getDescription(),
datasetRow.getUuid());
datasetFields.add(datasetFieldRow);
datasetFieldMappings.add(
new DatasetFieldMapping(datasetVersionRow.getUuid(), datasetFieldRow.getUuid()));
}
Expand All @@ -690,7 +702,85 @@ default DatasetRecord upsertLineageDataset(
}
}

return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace);
List<ColumnLineageRow> columnLineageRows = Collections.emptyList();
if (!isInput) {
columnLineageRows =
upsertColumnLineage(
runUuid,
ds,
now,
datasetFields,
columnLineageDao,
datasetFieldDao,
datasetVersionRow);
}

return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace, columnLineageRows);
}

private List<ColumnLineageRow> upsertColumnLineage(
UUID runUuid,
Dataset ds,
Instant now,
List<DatasetFieldRow> datasetFields,
ColumnLineageDao columnLineageDao,
DatasetFieldDao datasetFieldDao,
DatasetVersionRow datasetVersionRow) {
// get all the fields related to this particular run
List<InputFieldData> runFields = datasetFieldDao.findInputFieldsDataAssociatedWithRun(runUuid);

return Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getColumnLineage)
.map(LineageEvent.ColumnLineageFacet::getOutputColumnsList)
.stream()
.flatMap(list -> list.stream())
.flatMap(
outputColumn -> {
Optional<DatasetFieldRow> outputField =
datasetFields.stream()
.filter(dfr -> dfr.getName().equals(outputColumn.getName()))
.findAny();

if (outputField.isEmpty()) {
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
log.error(
"Cannot produce column lineage for missing output field in output dataset: {}",
outputColumn.getName());
return Stream.empty();
}

// get field uuids of input columns related to this run
List<Pair<UUID, UUID>> inputFields =
runFields.stream()
.filter(
fieldData ->
outputColumn.getInputFields().stream()
.filter(
of ->
of.getDatasetNamespace().equals(fieldData.getNamespace())
&& of.getDatasetName()
.equals(fieldData.getDatasetName())
&& of.getFieldName().equals(fieldData.getField()))
.findAny()
.isPresent())
.map(
fieldData ->
Pair.of(
fieldData.getDatasetVersionUuid(),
fieldData.getDatasetFieldUuid()))
.collect(Collectors.toList());

return columnLineageDao
.upsertColumnLineageRow(
datasetVersionRow.getUuid(),
outputField.get().getUuid(),
inputFields,
outputColumn.getTransformationDescription(),
outputColumn.getTransformationType(),
now)
.stream();
})
.collect(Collectors.toList());
}

default String formatDatasetName(String name) {
Expand Down
37 changes: 37 additions & 0 deletions api/src/main/java/marquez/db/mappers/ColumnLineageRowMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
import static marquez.db.Columns.TRANSFORMATION_TYPE;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrThrow;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.NonNull;
import marquez.db.Columns;
import marquez.db.models.ColumnLineageRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public class ColumnLineageRowMapper implements RowMapper<ColumnLineageRow> {

@Override
public ColumnLineageRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new ColumnLineageRow(
uuidOrThrow(results, Columns.OUTPUT_DATASET_VERSION_UUID),
uuidOrThrow(results, Columns.OUTPUT_DATASET_FIELD_UUID),
uuidOrThrow(results, Columns.INPUT_DATASET_VERSION_UUID),
uuidOrThrow(results, Columns.INPUT_DATASET_FIELD_UUID),
stringOrThrow(results, TRANSFORMATION_DESCRIPTION),
stringOrThrow(results, TRANSFORMATION_TYPE),
timestampOrThrow(results, Columns.CREATED_AT),
timestampOrThrow(results, Columns.UPDATED_AT));
}
}
Loading

0 comments on commit b6544ec

Please sign in to comment.