Skip to content

Commit

Permalink
test when dataset_field is missing
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Sep 14, 2022
1 parent 7ce339e commit bfd7555
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.25.0...HEAD)
* Add job/dataset soft delete API [`#2032`](https://github.com/MarquezProject/marquez/pull/2032)
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

## [0.25.0](https://github.com/MarquezProject/marquez/compare/0.24.0...0.25.0) - 2022-08-08

Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/marquez/db/ColumnLevelLineageDao.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package marquez.db;

/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
Expand Down
13 changes: 10 additions & 3 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import marquez.common.Utils;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
Expand Down Expand Up @@ -654,7 +655,6 @@ default DatasetRecord upsertLineageDataset(
return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace, columnLineageRows);
}

// TODO: write more extensive tests to upsert column lineage
private List<ColumnLevelLineageRow> upsertColumnLineage(
UUID runUuid,
Dataset ds,
Expand All @@ -676,8 +676,15 @@ private List<ColumnLevelLineageRow> upsertColumnLineage(
Optional<DatasetFieldRow> outputField =
datasetFields.stream()
.filter(dfr -> dfr.getName().equals(outputColumn.getName()))
.findAny(); // TODO: get rid of optional, break flow if output column not
// present
.findAny();

if (outputField.isEmpty()) {
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
log.error(
"Cannot produce column lineage for missing output field in output dataset: %s",
outputColumn.getName());
return Stream.empty();
}

// get field uuids of input columns related to this run
List<UUID> inputFields =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.models;

import java.time.Instant;
Expand Down
6 changes: 3 additions & 3 deletions api/src/test/java/marquez/db/ColumnLevelLineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ void testUpsertMultipleColumns() {
assertEquals(outputDatasetFieldUuid, rows.get(0).getOutputDatasetFieldUuid());
assertEquals(transformationDescription, rows.get(0).getTransformationDescription());
assertEquals(transformationType, rows.get(0).getTransformationType());
assertEquals(now, rows.get(0).getCreatedAt());
assertEquals(now, rows.get(0).getUpdatedAt());
assertEquals(now.getEpochSecond(), rows.get(0).getCreatedAt().getEpochSecond());
assertEquals(now.getEpochSecond(), rows.get(0).getUpdatedAt().getEpochSecond());
}

@Test
Expand Down Expand Up @@ -169,6 +169,6 @@ void testUpsertOnUpdatePreventsDuplicates() {

// make sure there is one row with updatedAt modified
assertEquals(1, rows.size());
assertEquals(now.plusSeconds(1000), rows.get(0).getUpdatedAt());
assertEquals(now.plusSeconds(1000).getEpochSecond(), rows.get(0).getUpdatedAt().getEpochSecond());
}
}
167 changes: 91 additions & 76 deletions api/src/test/java/marquez/db/OpenLineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static marquez.db.LineageTestUtils.PRODUCER_URL;
import static marquez.db.LineageTestUtils.SCHEMA_URL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -36,6 +37,13 @@ class OpenLineageDaoTest {
public static final String READ_JOB_NAME = "readJobName";
public static final String DATASET_NAME = "theDataset";

public static final String OUTPUT_COLUMN = "output_column";
public static final String INPUT_NAMESPACE = "input_namespace";
public static final String INPUT_DATASET = "input_dataset";
public static final String INPUT_FIELD_NAME = "input_field_name";
public static final String TRANSFORMATION_TYPE = "transformation_type";
public static final String TRANSFORMATION_DESCRIPTION = "transformation_description";

private static OpenLineageDao dao;
private static DatasetFieldDao datasetFieldDao;
private final DatasetFacets datasetFacets =
Expand Down Expand Up @@ -101,58 +109,15 @@ void testUpdateMarquezModelLifecycleStateChangeFacet() {

@Test
void testUpdateMarquezModelDatasetWithColumnLineageFacet() {
final String OUTPUT_COLUMN = "output_column";
final String INPUT_NAMESPACE = "input_namespace";
final String INPUT_DATASET = "input_dataset";
final String INPUT_FIELD_NAME = "input_field_name";
final String TRANSFORMATION_TYPE = "transformation_type";
final String TRANSFORMATION_DESCRIPTION = "transformation_description";

Dataset inputDataset =
new Dataset(
INPUT_NAMESPACE,
INPUT_DATASET,
LineageEvent.DatasetFacets.builder()
.schema(
new SchemaDatasetFacet(
PRODUCER_URL,
SCHEMA_URL,
Arrays.asList(new SchemaField(INPUT_FIELD_NAME, "STRING", "my name"))))
.build());

Dataset outputDataset =
new Dataset(
LineageTestUtils.NAMESPACE,
DATASET_NAME,
LineageEvent.DatasetFacets.builder()
.schema(
new SchemaDatasetFacet(
PRODUCER_URL,
SCHEMA_URL,
Arrays.asList(new SchemaField(OUTPUT_COLUMN, "STRING", "my name"))))
.columnLineage(
new LineageEvent.ColumnLineageFacet(
PRODUCER_URL,
SCHEMA_URL,
Collections.singletonList(
new LineageEvent.ColumnLineageOutputColumn(
OUTPUT_COLUMN,
Collections.singletonList(
new LineageEvent.ColumnLineageInputField(
INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)),
TRANSFORMATION_DESCRIPTION,
TRANSFORMATION_TYPE))))
.build());

JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
dao,
WRITE_JOB_NAME,
"COMPLETE",
jobFacet,
Arrays.asList(inputDataset),
Arrays.asList(outputDataset));
Arrays.asList(getInputDataset()),
Arrays.asList(getOutputDatasetWithColumnLineage()));

UUID inputDatasetVersion = writeJob.getInputs().get().get(0).getDatasetVersionRow().getUuid();
UUID outputDatasetVersion = writeJob.getOutputs().get().get(0).getDatasetVersionRow().getUuid();
Expand Down Expand Up @@ -193,42 +158,28 @@ void testUpdateMarquezModelDatasetWithColumnLineageFacet() {
}

@Test
/**
* When trying to insert new column level lineage data, do not create additional row if triad
* (dataset_version_uuid, output_column_name and input_field) is the same. Upsert instead.
*/
void testUpsertColumnLineageData() {
final String OUTPUT_COLUMN = "output_column";
final String INPUT_NAMESPACE = "input_namespace";
final String INPUT_DATASET = "input_dataset";
final String INPUT_FIELD_NAME = "input_field_name";
final String TRANSFORMATION_TYPE = "transformation_type";
final String UPDATED_TRANSFORMATION_TYPE = "transformation_type";
final String TRANSFORMATION_DESCRIPTION = "transformation_description";
final String UPDATED_TRANSFORMATION_DESCRIPTION = "updated_transformation_description";
void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenInputFieldDoesNotExist() {
JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
dao,
WRITE_JOB_NAME,
"COMPLETE",
jobFacet,
Collections.emptyList(),
Arrays.asList(getOutputDatasetWithColumnLineage()));

Dataset inputDataset =
new Dataset(
INPUT_NAMESPACE,
INPUT_DATASET,
LineageEvent.DatasetFacets.builder()
.schema(
new SchemaDatasetFacet(
PRODUCER_URL,
SCHEMA_URL,
Arrays.asList(new SchemaField(INPUT_FIELD_NAME, "STRING", "my name"))))
.build());
// make sure no column lineage was written
assertEquals(0, writeJob.getOutputs().get().get(0).getColumnLineageRows().size());
}

Dataset dataset =
@Test
void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenOutputFieldDoesNotExist() {
Dataset outputDatasetWithoutOutputFieldSchema =
new Dataset(
LineageTestUtils.NAMESPACE,
DATASET_NAME,
LineageEvent.DatasetFacets.builder()
.schema(
new SchemaDatasetFacet(
PRODUCER_URL,
SCHEMA_URL,
Arrays.asList(new SchemaField(OUTPUT_COLUMN, "STRING", "my name"))))
LineageEvent.DatasetFacets.builder() // schema is missing
.columnLineage(
new LineageEvent.ColumnLineageFacet(
PRODUCER_URL,
Expand All @@ -243,6 +194,32 @@ void testUpsertColumnLineageData() {
TRANSFORMATION_TYPE))))
.build());

JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
dao,
WRITE_JOB_NAME,
"COMPLETE",
jobFacet,
Arrays.asList(getInputDataset()),
Arrays.asList(outputDatasetWithoutOutputFieldSchema));

// make sure no column lineage was written
assertEquals(0, writeJob.getOutputs().get().get(0).getColumnLineageRows().size());
}

@Test
/**
* When trying to insert new column level lineage data, do not create additional row if triad
* (dataset_version_uuid, output_column_name and input_field) is the same. Upsert instead.
*/
void testUpsertColumnLineageData() {
final String UPDATED_TRANSFORMATION_TYPE = "transformation_type";
final String UPDATED_TRANSFORMATION_DESCRIPTION = "updated_transformation_description";

Dataset inputDataset = getInputDataset();
Dataset dataset = getOutputDatasetWithColumnLineage();

Dataset updateDataset =
new Dataset(
LineageTestUtils.NAMESPACE,
Expand Down Expand Up @@ -471,4 +448,42 @@ void testGetOpenLineageEvents() {
.extracting("namespace", "name")
.contains(LineageTestUtils.NAMESPACE, WRITE_JOB_NAME);
}

private Dataset getInputDataset() {
return new Dataset(
INPUT_NAMESPACE,
INPUT_DATASET,
LineageEvent.DatasetFacets.builder()
.schema(
new SchemaDatasetFacet(
PRODUCER_URL,
SCHEMA_URL,
Arrays.asList(new SchemaField(INPUT_FIELD_NAME, "STRING", "my name"))))
.build());
}

private Dataset getOutputDatasetWithColumnLineage() {
return new Dataset(
LineageTestUtils.NAMESPACE,
DATASET_NAME,
LineageEvent.DatasetFacets.builder()
.schema(
new SchemaDatasetFacet(
PRODUCER_URL,
SCHEMA_URL,
Arrays.asList(new SchemaField(OUTPUT_COLUMN, "STRING", "my name"))))
.columnLineage(
new LineageEvent.ColumnLineageFacet(
PRODUCER_URL,
SCHEMA_URL,
Collections.singletonList(
new LineageEvent.ColumnLineageOutputColumn(
OUTPUT_COLUMN,
Collections.singletonList(
new LineageEvent.ColumnLineageInputField(
INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)),
TRANSFORMATION_DESCRIPTION,
TRANSFORMATION_TYPE))))
.build());
}
}

0 comments on commit bfd7555

Please sign in to comment.