diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index fc3440555b..687b408175 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -21,6 +21,7 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import java.util.stream.Stream; import marquez.common.Utils; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; @@ -654,7 +655,6 @@ default DatasetRecord upsertLineageDataset( return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace, columnLineageRows); } - // TODO: write more extensive tests to upsert column lineage private List upsertColumnLineage( UUID runUuid, Dataset ds, @@ -676,8 +676,15 @@ private List upsertColumnLineage( Optional outputField = datasetFields.stream() .filter(dfr -> dfr.getName().equals(outputColumn.getName())) - .findAny(); // TODO: get rid of optional, break flow if output column not - // present + .findAny(); + + if (outputField.isEmpty()) { + Logger log = LoggerFactory.getLogger(OpenLineageDao.class); + log.error( + "Cannot produce column lineage for missing output field in output dataset: %s", + outputColumn.getName()); + return Stream.empty(); + } // get field uuids of input columns related to this run List inputFields = diff --git a/api/src/test/java/marquez/db/ColumnLevelLineageDaoTest.java b/api/src/test/java/marquez/db/ColumnLevelLineageDaoTest.java index 1d102157a2..34117b0932 100644 --- a/api/src/test/java/marquez/db/ColumnLevelLineageDaoTest.java +++ b/api/src/test/java/marquez/db/ColumnLevelLineageDaoTest.java @@ -127,8 +127,8 @@ void testUpsertMultipleColumns() { assertEquals(outputDatasetFieldUuid, rows.get(0).getOutputDatasetFieldUuid()); assertEquals(transformationDescription, rows.get(0).getTransformationDescription()); assertEquals(transformationType, rows.get(0).getTransformationType()); - assertEquals(now, rows.get(0).getCreatedAt()); - assertEquals(now, rows.get(0).getUpdatedAt()); + assertEquals(now.getEpochSecond(), rows.get(0).getCreatedAt().getEpochSecond()); + assertEquals(now.getEpochSecond(), rows.get(0).getUpdatedAt().getEpochSecond()); } @Test @@ -169,6 +169,6 @@ void testUpsertOnUpdatePreventsDuplicates() { // make sure there is one row with updatedAt modified assertEquals(1, rows.size()); - assertEquals(now.plusSeconds(1000), rows.get(0).getUpdatedAt()); + assertEquals(now.plusSeconds(1000).getEpochSecond(), rows.get(0).getUpdatedAt().getEpochSecond()); } } diff --git a/api/src/test/java/marquez/db/OpenLineageDaoTest.java b/api/src/test/java/marquez/db/OpenLineageDaoTest.java index 2ab9c37668..68ca33bf20 100644 --- a/api/src/test/java/marquez/db/OpenLineageDaoTest.java +++ b/api/src/test/java/marquez/db/OpenLineageDaoTest.java @@ -8,6 +8,7 @@ import static marquez.db.LineageTestUtils.PRODUCER_URL; import static marquez.db.LineageTestUtils.SCHEMA_URL; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.Arrays; import java.util.Collections; @@ -36,6 +37,13 @@ class OpenLineageDaoTest { public static final String READ_JOB_NAME = "readJobName"; public static final String DATASET_NAME = "theDataset"; + public static final String OUTPUT_COLUMN = "output_column"; + public static final String INPUT_NAMESPACE = "input_namespace"; + public static final String INPUT_DATASET = "input_dataset"; + public static final String INPUT_FIELD_NAME = "input_field_name"; + public static final String TRANSFORMATION_TYPE = "transformation_type"; + public static final String TRANSFORMATION_DESCRIPTION = "transformation_description"; + private static OpenLineageDao dao; private static DatasetFieldDao datasetFieldDao; private final DatasetFacets datasetFacets = @@ -101,49 +109,6 @@ void testUpdateMarquezModelLifecycleStateChangeFacet() { @Test void testUpdateMarquezModelDatasetWithColumnLineageFacet() { - final String OUTPUT_COLUMN = "output_column"; - final String INPUT_NAMESPACE = "input_namespace"; - final String INPUT_DATASET = "input_dataset"; - final String INPUT_FIELD_NAME = "input_field_name"; - final String TRANSFORMATION_TYPE = "transformation_type"; - final String TRANSFORMATION_DESCRIPTION = "transformation_description"; - - Dataset inputDataset = - new Dataset( - INPUT_NAMESPACE, - INPUT_DATASET, - LineageEvent.DatasetFacets.builder() - .schema( - new SchemaDatasetFacet( - PRODUCER_URL, - SCHEMA_URL, - Arrays.asList(new SchemaField(INPUT_FIELD_NAME, "STRING", "my name")))) - .build()); - - Dataset outputDataset = - new Dataset( - LineageTestUtils.NAMESPACE, - DATASET_NAME, - LineageEvent.DatasetFacets.builder() - .schema( - new SchemaDatasetFacet( - PRODUCER_URL, - SCHEMA_URL, - Arrays.asList(new SchemaField(OUTPUT_COLUMN, "STRING", "my name")))) - .columnLineage( - new LineageEvent.ColumnLineageFacet( - PRODUCER_URL, - SCHEMA_URL, - Collections.singletonList( - new LineageEvent.ColumnLineageOutputColumn( - OUTPUT_COLUMN, - Collections.singletonList( - new LineageEvent.ColumnLineageInputField( - INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)), - TRANSFORMATION_DESCRIPTION, - TRANSFORMATION_TYPE)))) - .build()); - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( @@ -151,8 +116,8 @@ void testUpdateMarquezModelDatasetWithColumnLineageFacet() { WRITE_JOB_NAME, "COMPLETE", jobFacet, - Arrays.asList(inputDataset), - Arrays.asList(outputDataset)); + Arrays.asList(getInputDataset()), + Arrays.asList(getOutputDatasetWithColumnLineage())); UUID inputDatasetVersion = writeJob.getInputs().get().get(0).getDatasetVersionRow().getUuid(); UUID outputDatasetVersion = writeJob.getOutputs().get().get(0).getDatasetVersionRow().getUuid(); @@ -193,42 +158,28 @@ void testUpdateMarquezModelDatasetWithColumnLineageFacet() { } @Test - /** - * When trying to insert new column level lineage data, do not create additional row if triad - * (dataset_version_uuid, output_column_name and input_field) is the same. Upsert instead. - */ - void testUpsertColumnLineageData() { - final String OUTPUT_COLUMN = "output_column"; - final String INPUT_NAMESPACE = "input_namespace"; - final String INPUT_DATASET = "input_dataset"; - final String INPUT_FIELD_NAME = "input_field_name"; - final String TRANSFORMATION_TYPE = "transformation_type"; - final String UPDATED_TRANSFORMATION_TYPE = "transformation_type"; - final String TRANSFORMATION_DESCRIPTION = "transformation_description"; - final String UPDATED_TRANSFORMATION_DESCRIPTION = "updated_transformation_description"; + void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenInputFieldDoesNotExist() { + JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + dao, + WRITE_JOB_NAME, + "COMPLETE", + jobFacet, + Collections.emptyList(), + Arrays.asList(getOutputDatasetWithColumnLineage())); - Dataset inputDataset = - new Dataset( - INPUT_NAMESPACE, - INPUT_DATASET, - LineageEvent.DatasetFacets.builder() - .schema( - new SchemaDatasetFacet( - PRODUCER_URL, - SCHEMA_URL, - Arrays.asList(new SchemaField(INPUT_FIELD_NAME, "STRING", "my name")))) - .build()); + // make sure no column lineage was written + assertEquals(0, writeJob.getOutputs().get().get(0).getColumnLineageRows().size()); + } - Dataset dataset = + @Test + void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenOutputFieldDoesNotExist() { + Dataset outputDatasetWithoutOutputFieldSchema = new Dataset( LineageTestUtils.NAMESPACE, DATASET_NAME, - LineageEvent.DatasetFacets.builder() - .schema( - new SchemaDatasetFacet( - PRODUCER_URL, - SCHEMA_URL, - Arrays.asList(new SchemaField(OUTPUT_COLUMN, "STRING", "my name")))) + LineageEvent.DatasetFacets.builder() // schema is missing .columnLineage( new LineageEvent.ColumnLineageFacet( PRODUCER_URL, @@ -243,6 +194,32 @@ void testUpsertColumnLineageData() { TRANSFORMATION_TYPE)))) .build()); + JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + dao, + WRITE_JOB_NAME, + "COMPLETE", + jobFacet, + Arrays.asList(getInputDataset()), + Arrays.asList(outputDatasetWithoutOutputFieldSchema)); + + // make sure no column lineage was written + assertEquals(0, writeJob.getOutputs().get().get(0).getColumnLineageRows().size()); + } + + @Test + /** + * When trying to insert new column level lineage data, do not create additional row if triad + * (dataset_version_uuid, output_column_name and input_field) is the same. Upsert instead. + */ + void testUpsertColumnLineageData() { + final String UPDATED_TRANSFORMATION_TYPE = "transformation_type"; + final String UPDATED_TRANSFORMATION_DESCRIPTION = "updated_transformation_description"; + + Dataset inputDataset = getInputDataset(); + Dataset dataset = getOutputDatasetWithColumnLineage(); + Dataset updateDataset = new Dataset( LineageTestUtils.NAMESPACE, @@ -471,4 +448,42 @@ void testGetOpenLineageEvents() { .extracting("namespace", "name") .contains(LineageTestUtils.NAMESPACE, WRITE_JOB_NAME); } + + private Dataset getInputDataset() { + return new Dataset( + INPUT_NAMESPACE, + INPUT_DATASET, + LineageEvent.DatasetFacets.builder() + .schema( + new SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new SchemaField(INPUT_FIELD_NAME, "STRING", "my name")))) + .build()); + } + + private Dataset getOutputDatasetWithColumnLineage() { + return new Dataset( + LineageTestUtils.NAMESPACE, + DATASET_NAME, + LineageEvent.DatasetFacets.builder() + .schema( + new SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new SchemaField(OUTPUT_COLUMN, "STRING", "my name")))) + .columnLineage( + new LineageEvent.ColumnLineageFacet( + PRODUCER_URL, + SCHEMA_URL, + Collections.singletonList( + new LineageEvent.ColumnLineageOutputColumn( + OUTPUT_COLUMN, + Collections.singletonList( + new LineageEvent.ColumnLineageInputField( + INPUT_NAMESPACE, INPUT_DATASET, INPUT_FIELD_NAME)), + TRANSFORMATION_DESCRIPTION, + TRANSFORMATION_TYPE)))) + .build()); + } }