Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Model and store column lineage in Marquez DB #2096

Merged
merged 30 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9e2cd20
Create database representation, model classes
Aug 29, 2022
374bdd3
Implement ColumnLevelLineageDao
mzareba382 Aug 30, 2022
148836c
Instantiate ColumnLevelLineageDao in updateBaseMarquezModel
mzareba382 Aug 30, 2022
e5d9490
Merge branch 'main' into add-column-level-lineage
mzareba382 Aug 30, 2022
c28ea86
Merge branch 'main' into add-column-level-lineage
mzareba382 Aug 31, 2022
48b1ebf
Upsert ColumnLevelLineageRow to db, model representation in LineageEvent
mzareba382 Aug 31, 2022
3e6674e
Fix problems in OpenLineageDao, add a list of ColumnLevelLineageRow t…
mzareba382 Sep 1, 2022
b1eab3f
Change wildcard imports to single class imports
mzareba382 Sep 1, 2022
03e2963
Change wildcard imports to single class imports
mzareba382 Sep 1, 2022
2ad3bc4
Change wildcard imports to single class imports
mzareba382 Sep 1, 2022
4a5ec90
Apply spotless
mzareba382 Sep 1, 2022
d547fd6
Merge branch 'main' into add-column-level-lineage
mzareba382 Sep 8, 2022
153a19a
Merge branch 'main' into add-column-level-lineage
mzareba382 Sep 9, 2022
bce38ff
Check for ds.getFacets not null
mzareba382 Sep 9, 2022
a0251a0
Format fix
mzareba382 Sep 9, 2022
f17cdda
Update testUpdateMarquezModelDatasetWithColumnLineageFacet
mzareba382 Sep 9, 2022
dbabd08
Merge branch 'main' into add-column-level-lineage
mzareba382 Sep 12, 2022
2dff0f6
Test for column_level_lineage upsert.
mzareba382 Sep 12, 2022
b42a2ad
Apply spotless
mzareba382 Sep 12, 2022
95ba2e2
Merge branch 'main' into add-column-level-lineage
mzareba382 Sep 12, 2022
8e3fc65
switch to data field references
pawel-big-lebowski Sep 13, 2022
7ce339e
fix broken tests
pawel-big-lebowski Sep 14, 2022
bfd7555
test when dataset_field is missing
pawel-big-lebowski Sep 14, 2022
08539dd
add input_dataset_version_uuid field
pawel-big-lebowski Sep 16, 2022
6fce3df
Merge branch 'main' into add-column-level-lineage
pawel-big-lebowski Sep 16, 2022
bf8c84e
increase db file version
pawel-big-lebowski Sep 16, 2022
c816996
increase db file version
pawel-big-lebowski Sep 16, 2022
b6d37fe
Merge branch 'add-column-level-lineage' of github.com:MarquezProject/…
pawel-big-lebowski Sep 19, 2022
1aefca2
Merge branch 'main' into add-column-level-lineage
pawel-big-lebowski Sep 28, 2022
21dac22
rename ColumnLevelLineage -> ColumnLineage
pawel-big-lebowski Sep 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Changelog

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.26.0...HEAD)
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)


## [0.26.0](https://github.com/MarquezProject/marquez/compare/0.25.0...0.26.0) - 2022-09-15

### Added
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ public interface BaseDao extends SqlObject {

@CreateSqlObject
OpenLineageDao createOpenLineageDao();

@CreateSqlObject
ColumnLevelLineageDao createColumnLevelLineageDao();
}
93 changes: 93 additions & 0 deletions api/src/main/java/marquez/db/ColumnLevelLineageDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import marquez.db.mappers.ColumnLevelLineageRowMapper;
import marquez.db.models.ColumnLevelLineageRow;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindBeanList;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(ColumnLevelLineageRowMapper.class)
public interface ColumnLevelLineageDao extends BaseDao {

default List<ColumnLevelLineageRow> upsertColumnLevelLineageRow(
UUID outputDatasetVersionUuid,
UUID outputDatasetFieldUuid,
List<Pair<UUID, UUID>> inputs,
String transformationDescription,
String transformationType,
Instant now) {

if (inputs.isEmpty()) {
return Collections.emptyList();
}

List<ColumnLevelLineageRow> rows =
inputs.stream()
.map(
input ->
new ColumnLevelLineageRow(
outputDatasetVersionUuid,
outputDatasetFieldUuid,
input.getLeft(), // input_dataset_version_uuid
input.getRight(), // input_dataset_field_uuid
transformationDescription,
transformationType,
now,
now))
.collect(Collectors.toList());
doUpsertColumnLevelLineageRow(rows.toArray(new ColumnLevelLineageRow[0]));
return findColumnLevelLineageByDatasetVersionColumnAndOutputDatasetField(
outputDatasetVersionUuid, outputDatasetFieldUuid);
}

@SqlQuery(
"SELECT * FROM column_level_lineage WHERE output_dataset_version_uuid = :datasetVersionUuid AND output_dataset_field_uuid = :outputDatasetFieldUuid")
List<ColumnLevelLineageRow> findColumnLevelLineageByDatasetVersionColumnAndOutputDatasetField(
UUID datasetVersionUuid, UUID outputDatasetFieldUuid);

@SqlUpdate(
"""
INSERT INTO column_level_lineage (
output_dataset_version_uuid,
output_dataset_field_uuid,
input_dataset_version_uuid,
input_dataset_field_uuid,
transformation_description,
transformation_type,
created_at,
updated_at
) VALUES <values>
ON CONFLICT (output_dataset_version_uuid, output_dataset_field_uuid, input_dataset_version_uuid, input_dataset_field_uuid)
DO UPDATE SET
transformation_description = EXCLUDED.transformation_description,
transformation_type = EXCLUDED.transformation_type,
updated_at = EXCLUDED.updated_at
RETURNING *
""")
void doUpsertColumnLevelLineageRow(
mobuchowski marked this conversation as resolved.
Show resolved Hide resolved
@BindBeanList(
propertyNames = {
"outputDatasetVersionUuid",
"outputDatasetFieldUuid",
"inputDatasetVersionUuid",
"inputDatasetFieldUuid",
"transformationDescription",
"transformationType",
"createdAt",
"updatedAt"
},
value = "values")
ColumnLevelLineageRow... rows);
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
}
9 changes: 9 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,15 @@ private Columns() {}
public static final String RUN_UUID = "run_uuid";
public static final String STATE = "state";

/* COLUMN LEVEL LINEAGE ROW COLUMNS */
public static final String FIELD_NAME = "field_name";
public static final String OUTPUT_DATASET_VERSION_UUID = "output_dataset_version_uuid";
public static final String OUTPUT_DATASET_FIELD_UUID = "output_dataset_field_uuid";
public static final String INPUT_DATASET_FIELD_UUID = "input_dataset_field_uuid";
public static final String INPUT_DATASET_VERSION_UUID = "input_dataset_version_uuid";
public static final String TRANSFORMATION_DESCRIPTION = "transformation_description";
public static final String TRANSFORMATION_TYPE = "transformation_type";

/* LINEAGE EVENT ROW COLUMNS */
public static final String EVENT = "event";

Expand Down
21 changes: 21 additions & 0 deletions api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import marquez.common.models.TagName;
import marquez.db.mappers.DatasetFieldMapper;
import marquez.db.mappers.DatasetFieldRowMapper;
import marquez.db.mappers.FieldDataMapper;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.InputFieldData;
import marquez.db.models.TagRow;
import marquez.service.models.Dataset;
import marquez.service.models.DatasetVersion;
Expand All @@ -29,6 +31,7 @@

@RegisterRowMapper(DatasetFieldRowMapper.class)
@RegisterRowMapper(DatasetFieldMapper.class)
@RegisterRowMapper(FieldDataMapper.class)
public interface DatasetFieldDao extends BaseDao {
@SqlQuery(
"SELECT EXISTS ("
Expand Down Expand Up @@ -101,6 +104,24 @@ default Dataset updateTags(
+ "WHERE fm.dataset_version_uuid = :datasetVersionUuid")
List<Field> find(UUID datasetVersionUuid);

@SqlQuery(
"""
SELECT
datasets_view.namespace_name as namespace_name,
datasets_view.name as dataset_name,
dataset_fields.name as field_name,
datasets_view.uuid as dataset_uuid,
dataset_versions.uuid as dataset_version_uuid,
dataset_fields.uuid as dataset_field_uuid
FROM dataset_fields
JOIN dataset_versions_field_mapping fm ON fm.dataset_field_uuid = dataset_fields.uuid
JOIN dataset_versions ON dataset_versions.uuid = fm.dataset_version_uuid
JOIN datasets_view ON datasets_view.uuid = dataset_versions.dataset_uuid
JOIN runs_input_mapping ON runs_input_mapping.dataset_version_uuid = dataset_versions.uuid
WHERE runs_input_mapping.run_uuid = :runUuid
""")
List<InputFieldData> findInputFieldsDataAssociatedWithRun(UUID runUuid);

@SqlQuery(
"INSERT INTO dataset_fields ("
+ "uuid, "
Expand Down
98 changes: 94 additions & 4 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import marquez.common.Utils;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
Expand All @@ -31,9 +34,11 @@
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.mappers.LineageEventMapper;
import marquez.db.models.ColumnLevelLineageRow;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.InputFieldData;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
Expand All @@ -55,6 +60,7 @@
import marquez.service.models.LineageEvent.RunFacet;
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
import marquez.service.models.LineageEvent.SchemaField;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
Expand Down Expand Up @@ -129,6 +135,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
RunDao runDao = createRunDao();
RunArgsDao runArgsDao = createRunArgsDao();
RunStateDao runStateDao = createRunStateDao();
ColumnLevelLineageDao columnLevelLineageDao = createColumnLevelLineageDao();

Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();

Expand Down Expand Up @@ -320,7 +327,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
datasetDao,
datasetVersionDao,
datasetFieldDao,
runDao);
runDao,
columnLevelLineageDao);
datasetInputs.add(record);
}
}
Expand All @@ -341,7 +349,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
datasetDao,
datasetVersionDao,
datasetFieldDao,
runDao);
runDao,
columnLevelLineageDao);
datasetOutputs.add(record);
}
}
Expand Down Expand Up @@ -529,7 +538,8 @@ default DatasetRecord upsertLineageDataset(
DatasetDao datasetDao,
DatasetVersionDao datasetVersionDao,
DatasetFieldDao datasetFieldDao,
RunDao runDao) {
RunDao runDao,
ColumnLevelLineageDao columnLevelLineageDao) {
NamespaceRow dsNamespace =
namespaceDao.upsertNamespaceRow(
UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER);
Expand Down Expand Up @@ -621,6 +631,7 @@ default DatasetRecord upsertLineageDataset(
return row;
});
List<DatasetFieldMapping> datasetFieldMappings = new ArrayList<>();
List<DatasetFieldRow> datasetFields = new ArrayList<>();
if (fields != null) {
for (SchemaField field : fields) {
DatasetFieldRow datasetFieldRow =
Expand All @@ -631,6 +642,7 @@ default DatasetRecord upsertLineageDataset(
field.getType(),
field.getDescription(),
datasetRow.getUuid());
datasetFields.add(datasetFieldRow);
datasetFieldMappings.add(
new DatasetFieldMapping(datasetVersionRow.getUuid(), datasetFieldRow.getUuid()));
}
Expand All @@ -649,7 +661,85 @@ default DatasetRecord upsertLineageDataset(
}
}

return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace);
List<ColumnLevelLineageRow> columnLineageRows = Collections.emptyList();
if (!isInput) {
columnLineageRows =
upsertColumnLineage(
runUuid,
ds,
now,
datasetFields,
columnLevelLineageDao,
datasetFieldDao,
datasetVersionRow);
}

return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace, columnLineageRows);
}

private List<ColumnLevelLineageRow> upsertColumnLineage(
UUID runUuid,
Dataset ds,
Instant now,
List<DatasetFieldRow> datasetFields,
ColumnLevelLineageDao columnLevelLineageDao,
DatasetFieldDao datasetFieldDao,
DatasetVersionRow datasetVersionRow) {
// get all the fields related to this particular run
List<InputFieldData> runFields = datasetFieldDao.findInputFieldsDataAssociatedWithRun(runUuid);
mobuchowski marked this conversation as resolved.
Show resolved Hide resolved

return Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getColumnLineage)
.map(LineageEvent.ColumnLineageFacet::getOutputColumnsList)
.stream()
.flatMap(list -> list.stream())
.flatMap(
outputColumn -> {
Optional<DatasetFieldRow> outputField =
datasetFields.stream()
.filter(dfr -> dfr.getName().equals(outputColumn.getName()))
.findAny();

if (outputField.isEmpty()) {
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
mobuchowski marked this conversation as resolved.
Show resolved Hide resolved
log.error(
"Cannot produce column lineage for missing output field in output dataset: %s",
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
outputColumn.getName());
return Stream.empty();
}

// get field uuids of input columns related to this run
List<Pair<UUID, UUID>> inputFields =
runFields.stream()
.filter(
fieldData ->
outputColumn.getInputFields().stream()
.filter(
of ->
of.getDatasetNamespace().equals(fieldData.getNamespace())
&& of.getDatasetName()
.equals(fieldData.getDatasetName())
&& of.getFieldName().equals(fieldData.getField()))
.findAny()
.isPresent())
.map(
fieldData ->
Pair.of(
fieldData.getDatasetVersionUuid(),
fieldData.getDatasetFieldUuid()))
.collect(Collectors.toList());

return columnLevelLineageDao
.upsertColumnLevelLineageRow(
datasetVersionRow.getUuid(),
outputField.get().getUuid(),
inputFields,
outputColumn.getTransformationDescription(),
outputColumn.getTransformationType(),
now)
.stream();
})
.collect(Collectors.toList());
}

default String formatDatasetName(String name) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
import static marquez.db.Columns.TRANSFORMATION_TYPE;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrThrow;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.NonNull;
import marquez.db.Columns;
import marquez.db.models.ColumnLevelLineageRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public class ColumnLevelLineageRowMapper implements RowMapper<ColumnLevelLineageRow> {

@Override
public ColumnLevelLineageRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new ColumnLevelLineageRow(
uuidOrThrow(results, Columns.OUTPUT_DATASET_VERSION_UUID),
uuidOrThrow(results, Columns.OUTPUT_DATASET_FIELD_UUID),
uuidOrThrow(results, Columns.INPUT_DATASET_VERSION_UUID),
uuidOrThrow(results, Columns.INPUT_DATASET_FIELD_UUID),
stringOrThrow(results, TRANSFORMATION_DESCRIPTION),
stringOrThrow(results, TRANSFORMATION_TYPE),
timestampOrThrow(results, Columns.CREATED_AT),
timestampOrThrow(results, Columns.UPDATED_AT));
}
}
Loading