Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Model and store column lineage in Marquez DB #2096

Merged
merged 30 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
9e2cd20
Create database representation, model classes
Aug 29, 2022
374bdd3
Implement ColumnLevelLineageDao
mzareba382 Aug 30, 2022
148836c
Instantiate ColumnLevelLineageDao in updateBaseMarquezModel
mzareba382 Aug 30, 2022
e5d9490
Merge branch 'main' into add-column-level-lineage
mzareba382 Aug 30, 2022
c28ea86
Merge branch 'main' into add-column-level-lineage
mzareba382 Aug 31, 2022
48b1ebf
Upsert ColumnLevelLineageRow to db, model representation in LineageEvent
mzareba382 Aug 31, 2022
3e6674e
Fix problems in OpenLineageDao, add a list of ColumnLevelLineageRow t…
mzareba382 Sep 1, 2022
b1eab3f
Change wildcard imports to single class imports
mzareba382 Sep 1, 2022
03e2963
Change wildcard imports to single class imports
mzareba382 Sep 1, 2022
2ad3bc4
Change wildcard imports to single class imports
mzareba382 Sep 1, 2022
4a5ec90
Apply spotless
mzareba382 Sep 1, 2022
d547fd6
Merge branch 'main' into add-column-level-lineage
mzareba382 Sep 8, 2022
153a19a
Merge branch 'main' into add-column-level-lineage
mzareba382 Sep 9, 2022
bce38ff
Check for ds.getFacets not null
mzareba382 Sep 9, 2022
a0251a0
Format fix
mzareba382 Sep 9, 2022
f17cdda
Update testUpdateMarquezModelDatasetWithColumnLineageFacet
mzareba382 Sep 9, 2022
dbabd08
Merge branch 'main' into add-column-level-lineage
mzareba382 Sep 12, 2022
2dff0f6
Test for column_level_lineage upsert.
mzareba382 Sep 12, 2022
b42a2ad
Apply spotless
mzareba382 Sep 12, 2022
95ba2e2
Merge branch 'main' into add-column-level-lineage
mzareba382 Sep 12, 2022
8e3fc65
switch to data field references
pawel-big-lebowski Sep 13, 2022
7ce339e
fix broken tests
pawel-big-lebowski Sep 14, 2022
bfd7555
test when dataset_field is missing
pawel-big-lebowski Sep 14, 2022
08539dd
add input_dataset_version_uuid field
pawel-big-lebowski Sep 16, 2022
6fce3df
Merge branch 'main' into add-column-level-lineage
pawel-big-lebowski Sep 16, 2022
bf8c84e
increase db file version
pawel-big-lebowski Sep 16, 2022
c816996
increase db file version
pawel-big-lebowski Sep 16, 2022
b6d37fe
Merge branch 'add-column-level-lineage' of github.com:MarquezProject/…
pawel-big-lebowski Sep 19, 2022
1aefca2
Merge branch 'main' into add-column-level-lineage
pawel-big-lebowski Sep 28, 2022
21dac22
rename ColumnLevelLineage -> ColumnLineage
pawel-big-lebowski Sep 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ public interface BaseDao extends SqlObject {

@CreateSqlObject
OpenLineageDao createOpenLineageDao();

@CreateSqlObject
ColumnLevelLineageDao createColumnLevelLineageDao();
}
79 changes: 79 additions & 0 deletions api/src/main/java/marquez/db/ColumnLevelLineageDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package marquez.db;

/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import marquez.db.mappers.ColumnLevelLineageRowMapper;
import marquez.db.models.ColumnLevelLineageRow;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(ColumnLevelLineageRowMapper.class)
public interface ColumnLevelLineageDao extends BaseDao {

default ColumnLevelLineageRow upsertColumnLevelLineageRow(
UUID uuid,
UUID dataset_version_uuid,
String output_column_name,
String input_field,
String transformation_description,
String transformation_type,
Instant now) {
doUpsertColumnLevelLineageRow(
uuid,
dataset_version_uuid,
output_column_name,
input_field,
transformation_description,
transformation_type,
now);
return findColumnLevelLineageByDatasetVersionColumnAndInput(
dataset_version_uuid, output_column_name, input_field)
.orElseThrow();
}

@SqlQuery("SELECT * FROM column_level_lineage WHERE dataset_version_uuid = :datasetVersionUuid")
Optional<ColumnLevelLineageRow> findColumnLevelLineageByDatasetVersionColumnAndInput(
UUID datasetVersionUuid, String outputColumnName, String inputField);

@SqlUpdate(
"INSERT INTO column_level_lineage ("
mzareba382 marked this conversation as resolved.
Show resolved Hide resolved
+ "uuid, "
+ "dataset_version_uuid, "
+ "output_column_name, "
+ "input_field, "
+ "transformation_description, "
+ "transformation_type, "
+ "created_at, "
+ "updated_at"
+ ") VALUES ( "
+ ":uuid, "
+ ":dataset_version_uuid, "
+ ":output_column_name, "
+ ":input_field, "
+ ":transformation_description, "
+ ":transformation_type, "
+ ":now, "
+ ":now) "
+ "ON CONFLICT (dataset_version_uuid, output_column_name, input_field) "
+ "DO UPDATE SET "
+ "input_field = EXCLUDED.input_field, "
+ "transformation_description = EXCLUDED.transformation_description, "
+ "transformation_type = EXCLUDED.transformation_type, "
+ "updated_at = EXCLUDED.updated_at "
+ "RETURNING *")
void doUpsertColumnLevelLineageRow(
mobuchowski marked this conversation as resolved.
Show resolved Hide resolved
UUID uuid,
UUID dataset_version_uuid,
String output_column_name,
String input_field,
String transformation_description,
String transformation_type,
Instant now);
}
6 changes: 6 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ private Columns() {}
public static final String RUN_UUID = "run_uuid";
public static final String STATE = "state";

/* COLUMN LEVEL LINEAGE ROW COLUMNS */
public static final String OUTPUT_COLUMN_NAME = "output_column_name";
public static final String INPUT_FIELD = "input_field";
public static final String TRANSFORMATION_DESCRIPTION = "transformation_description";
public static final String TRANSFORMATION_TYPE = "transformation_type";

public static UUID uuidOrNull(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
return null;
Expand Down
44 changes: 40 additions & 4 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.mappers.LineageEventMapper;
import marquez.db.models.ColumnLevelLineageRow;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetVersionRow;
Expand Down Expand Up @@ -108,6 +109,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
RunDao runDao = createRunDao();
RunArgsDao runArgsDao = createRunArgsDao();
RunStateDao runStateDao = createRunStateDao();
ColumnLevelLineageDao columnLevelLineageDao = createColumnLevelLineageDao();

Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();

Expand Down Expand Up @@ -349,7 +351,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
datasetDao,
datasetVersionDao,
datasetFieldDao,
runDao);
runDao,
columnLevelLineageDao);
datasetInputs.add(record);
}
}
Expand All @@ -370,7 +373,8 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
datasetDao,
datasetVersionDao,
datasetFieldDao,
runDao);
runDao,
columnLevelLineageDao);
datasetOutputs.add(record);
}
}
Expand Down Expand Up @@ -435,7 +439,8 @@ default DatasetRecord upsertLineageDataset(
DatasetDao datasetDao,
DatasetVersionDao datasetVersionDao,
DatasetFieldDao datasetFieldDao,
RunDao runDao) {
RunDao runDao,
ColumnLevelLineageDao columnLevelLineageDao) {
NamespaceRow dsNamespace =
namespaceDao.upsertNamespaceRow(
UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER);
Expand Down Expand Up @@ -543,6 +548,37 @@ default DatasetRecord upsertLineageDataset(
}
datasetFieldDao.updateFieldMapping(datasetFieldMappings);

List<ColumnLevelLineageRow> columnLineageRows = null;
if (ds.getFacets().getColumnLineage() != null) {
columnLineageRows = new ArrayList<>();
List<LineageEvent.ColumnLineageOutputColumn> columnLevelLineageOutputColumnsList =
Optional.ofNullable(ds.getFacets())
.map(DatasetFacets::getColumnLineage)
.map(LineageEvent.ColumnLineageFacet::getOutputColumnsList)
.orElse(null);

if (columnLevelLineageOutputColumnsList != null) {
for (LineageEvent.ColumnLineageOutputColumn outputColumn :
columnLevelLineageOutputColumnsList) {
for (LineageEvent.ColumnLineageInputField inputField : outputColumn.getInputFields()) {
columnLineageRows.add(
columnLevelLineageDao.upsertColumnLevelLineageRow(
UUID.randomUUID(),
datasetVersionRow.getUuid(),
outputColumn.getName(),
String.format(
"%s.%s.%s",
inputField.getDatasetNamespace(),
inputField.getDatasetName(),
inputField.getFieldName()),
outputColumn.getTransformationDescription(),
outputColumn.getTransformationType(),
now));
}
}
}
}

if (isInput) {
runDao.updateInputMapping(runUuid, datasetVersionRow.getUuid());

Expand All @@ -555,7 +591,7 @@ default DatasetRecord upsertLineageDataset(
}
}

return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace);
return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace, columnLineageRows);
}

default String formatDatasetName(String name) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package marquez.db.mappers;

import static marquez.db.Columns.INPUT_FIELD;
import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
import static marquez.db.Columns.TRANSFORMATION_TYPE;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrThrow;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.NonNull;
import marquez.db.Columns;
import marquez.db.models.ColumnLevelLineageRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public class ColumnLevelLineageRowMapper implements RowMapper<ColumnLevelLineageRow> {

@Override
public ColumnLevelLineageRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new ColumnLevelLineageRow(
uuidOrThrow(results, Columns.ROW_UUID),
uuidOrThrow(results, Columns.DATASET_VERSION_UUID),
stringOrThrow(results, Columns.OUTPUT_COLUMN_NAME),
stringOrThrow(results, INPUT_FIELD),
stringOrThrow(results, TRANSFORMATION_DESCRIPTION),
stringOrThrow(results, TRANSFORMATION_TYPE),
timestampOrThrow(results, Columns.CREATED_AT),
timestampOrThrow(results, Columns.UPDATED_AT));
}
}
23 changes: 23 additions & 0 deletions api/src/main/java/marquez/db/models/ColumnLevelLineageRow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package marquez.db.models;

import java.time.Instant;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;

@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class ColumnLevelLineageRow {
@Getter @NonNull private final UUID uuid;
@Getter @NonNull private final UUID datasetVersionUuid;
@Getter @NonNull private final String outputColumnName;
@Getter @NonNull private final String inputField;
@Getter @NonNull private final String transformationDescription;
@Getter @NonNull private final String transformationType;
@Getter @NonNull private final Instant createdAt;
@Getter @NonNull private Instant updatedAt;
}
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/models/UpdateLineageRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ public static class DatasetRecord {
DatasetRow datasetRow;
DatasetVersionRow datasetVersionRow;
NamespaceRow namespaceRow;
List<ColumnLevelLineageRow> columnLineageRows;
}
}
56 changes: 55 additions & 1 deletion api/src/main/java/marquez/service/models/LineageEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,16 @@ public static class Dataset extends BaseJsonModel {
"schema",
"dataSource",
"description",
"lifecycleStateChange"
"lifecycleStateChange",
"columnLineage",
})
public static class DatasetFacets {

@Valid private DocumentationDatasetFacet documentation;
@Valid private SchemaDatasetFacet schema;
@Valid private LifecycleStateChangeFacet lifecycleStateChange;
@Valid private DatasourceDatasetFacet dataSource;
@Valid private ColumnLineageFacet columnLineage;
private String description;
@Builder.Default @JsonIgnore private Map<String, Object> additional = new LinkedHashMap<>();

Expand Down Expand Up @@ -354,6 +356,10 @@ public DatasourceDatasetFacet getDataSource() {
return dataSource;
}

public ColumnLineageFacet getColumnLineage() {
return columnLineage;
}

public String getDescription() {
return description;
}
Expand Down Expand Up @@ -442,4 +448,52 @@ public LifecycleStateChangeFacet(
this.lifecycleStateChange = lifecycleStateChange;
}
}

@NoArgsConstructor
@Getter
@Setter
@Valid
@ToString
public static class ColumnLineageFacet extends BaseFacet {

private List<ColumnLineageOutputColumn> outputColumnsList;

@Builder
public ColumnLineageFacet(
@NotNull URI _producer,
@NotNull URI _schemaURL,
List<ColumnLineageOutputColumn> outputColumnsList) {
super(_producer, _schemaURL);
this.outputColumnsList = outputColumnsList;
}
}

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Setter
@Getter
@Valid
@ToString
public static class ColumnLineageOutputColumn extends BaseJsonModel {

@NotNull private String name;
@NotNull private List<ColumnLineageInputField> inputFields;
@NotNull private String transformationDescription;
@NotNull private String transformationType;
}

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Setter
@Getter
@Valid
@ToString
public static class ColumnLineageInputField extends BaseJsonModel {

@NotNull private String datasetNamespace;
@NotNull private String datasetName;
@NotNull private String fieldName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* SPDX-License-Identifier: Apache-2.0 */

-- DROP TABLE column_level_lineage;

CREATE TABLE column_level_lineage (
uuid uuid primary key,
dataset_version_uuid uuid REFERENCES dataset_versions(uuid),
output_column_name VARCHAR(255) NOT NULL,
input_field VARCHAR(255) NOT NULL, -- reference dataset_fields.uuid
transformation_description VARCHAR(255) NOT NULL,
transformation_type VARCHAR(255) NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,
UNIQUE (dataset_version_uuid, output_column_name, input_field)
);

-- INSERT INTO column_level_lineage (uuid, dataset_version_uuid, output_column_name, input_field,
-- transformation_description, transformation_type, created_at,
-- updated_at)
-- VALUES (md5('whatever')::uuid, md5('dataset_version_uuid_example')::uuid, 'column_a', 'input_field_a', 'Identity transformation', 'IDENTITY', current_timestamp, current_timestamp);
--
-- INSERT INTO column_level_lineage (uuid, dataset_version_uuid, output_column_name, input_field,
-- transformation_description, transformation_type, created_at,
-- updated_at)
-- VALUES (md5('whatever')::uuid, md5('dataset_version_uuid_example')::uuid, 'column_a', 'input_field_b', 'Identity transformation', 'IDENTITY', current_timestamp, current_timestamp);
Loading