Skip to content

Commit

Permalink
Changed RunDao to use simple RunRow rather than ExtendedRunRow where …
Browse files Browse the repository at this point in the history
…possible

Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike committed Jul 14, 2022
1 parent bd6db24 commit 1faf1bb
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 56 deletions.
58 changes: 29 additions & 29 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
Expand Down Expand Up @@ -161,10 +160,9 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue("[]");
Optional<ExtendedRunRow> parentRunRow = runDao.findRunByUuidAsRow(uuid);
JobRow parentJobRow =
parentRunRow
.flatMap(run -> jobDao.findJobByUuidAsRow(run.getJobUuid()))
runDao
.findJobRowByRunUuid(uuid)
.orElseGet(
() -> {
JobRow newParentJobRow =
Expand All @@ -181,34 +179,36 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
null,
inputs);
log.info("Created new parent job record {}", newParentJobRow);

RunArgsRow argsRow =
runArgsDao.upsertRunArgs(
UUID.randomUUID(),
now,
"{}",
Utils.checksumFor(ImmutableMap.of()));
RunRow newRow =
runDao.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
newParentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType())
.map(this::getRunState)
.orElse(null),
now,
namespace.getName(),
newParentJobRow.getName(),
newParentJobRow.getLocation(),
newParentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);
return newParentJobRow;
});
log.debug("Found parent job record {}", parentJobRow);
if (parentRunRow.isEmpty()) {
RunArgsRow argsRow =
runArgsDao.upsertRunArgs(
UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of()));
ExtendedRunRow newRow =
runDao.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
parentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType())
.map(this::getRunState)
.orElse(null),
now,
namespace.getName(),
parentJobRow.getName(),
parentJobRow.getLocation(),
parentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);
}
return parentJobRow;
} catch (Exception e) {
throw new RuntimeException("Unable to insert parent run", e);
Expand Down
21 changes: 18 additions & 3 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import marquez.common.models.RunId;
import marquez.common.models.RunState;
import marquez.db.mappers.ExtendedRunRowMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.mappers.RunMapper;
import marquez.db.mappers.RunRowMapper;
import marquez.db.models.DatasetRow;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.JobRow;
Expand All @@ -40,7 +42,9 @@
import org.jdbi.v3.sqlobject.transaction.Transaction;

@RegisterRowMapper(ExtendedRunRowMapper.class)
@RegisterRowMapper(RunRowMapper.class)
@RegisterRowMapper(RunMapper.class)
@RegisterRowMapper(JobRowMapper.class)
public interface RunDao extends BaseDao {
@SqlQuery("SELECT EXISTS (SELECT 1 FROM runs WHERE uuid = :rowUuid)")
boolean exists(UUID rowUuid);
Expand Down Expand Up @@ -103,7 +107,18 @@ public interface RunDao extends BaseDao {
Optional<Run> findRunByUuid(UUID runUuid);

@SqlQuery(BASE_FIND_RUN_SQL + "WHERE r.uuid = :runUuid")
Optional<ExtendedRunRow> findRunByUuidAsRow(UUID runUuid);
Optional<ExtendedRunRow> findRunByUuidAsExtendedRow(UUID runUuid);

@SqlQuery("SELECT * FROM runs_view r WHERE r.uuid = :runUuid")
Optional<RunRow> findRunByUuidAsRow(UUID runUuid);

@SqlQuery(
"""
SELECT j.* FROM jobs_view j
INNER JOIN runs_view r ON r.job_uuid=j.uuid
WHERE r.uuid=:uuid
""")
Optional<JobRow> findJobRowByRunUuid(UUID uuid);

@SqlQuery(
"""
Expand Down Expand Up @@ -211,7 +226,7 @@ UUID upsertWithRunState(
String location,
UUID jobContextUuid);

default ExtendedRunRow upsert(
default RunRow upsert(
UUID runUuid,
UUID parentRunUuid,
String externalId,
Expand Down Expand Up @@ -299,7 +314,7 @@ UUID upsertWithoutRunState(
String location,
UUID jobContextUuid);

default ExtendedRunRow upsert(
default RunRow upsert(
UUID runUuid,
UUID parentRunUuid,
String externalId,
Expand Down
63 changes: 63 additions & 0 deletions api/src/main/java/marquez/db/mappers/RunRowMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrNull;
import static marquez.db.Columns.timestampOrThrow;
import static marquez.db.Columns.uuidOrNull;
import static marquez.db.Columns.uuidOrThrow;

import com.fasterxml.jackson.core.type.TypeReference;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import lombok.NonNull;
import marquez.common.Utils;
import marquez.common.models.DatasetVersionId;
import marquez.db.Columns;
import marquez.db.models.RunRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public final class RunRowMapper implements RowMapper<RunRow> {
@Override
public RunRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
Set<String> columnNames = MapperUtils.getColumnNames(results.getMetaData());

return new RunRow(
uuidOrThrow(results, Columns.ROW_UUID),
timestampOrThrow(results, Columns.CREATED_AT),
timestampOrThrow(results, Columns.UPDATED_AT),
uuidOrNull(results, Columns.JOB_UUID),
uuidOrNull(results, Columns.JOB_VERSION_UUID),
uuidOrNull(results, Columns.PARENT_RUN_UUID),
uuidOrThrow(results, Columns.RUN_ARGS_UUID),
timestampOrNull(results, Columns.NOMINAL_START_TIME),
timestampOrNull(results, Columns.NOMINAL_END_TIME),
stringOrNull(results, Columns.CURRENT_RUN_STATE),
columnNames.contains(Columns.STARTED_AT)
? timestampOrNull(results, Columns.STARTED_AT)
: null,
uuidOrNull(results, Columns.START_RUN_STATE_UUID),
columnNames.contains(Columns.ENDED_AT) ? timestampOrNull(results, Columns.ENDED_AT) : null,
uuidOrNull(results, Columns.END_RUN_STATE_UUID),
stringOrThrow(results, Columns.NAMESPACE_NAME),
stringOrThrow(results, Columns.JOB_NAME));
}

private List<DatasetVersionId> toDatasetVersion(ResultSet rs, String column) throws SQLException {
String dsString = rs.getString(column);
if (dsString == null) {
return Collections.emptyList();
}
return Utils.fromJson(dsString, new TypeReference<List<DatasetVersionId>>() {});
}
}
10 changes: 8 additions & 2 deletions api/src/main/java/marquez/db/models/ExtendedRunRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class ExtendedRunRow extends RunRow {
@Getter @NonNull private final List<DatasetVersionId> inputVersions;
@Getter @NonNull private final List<DatasetVersionId> outputVersions;
@Getter private final String args;

public ExtendedRunRow(
Expand Down Expand Up @@ -48,8 +50,6 @@ public ExtendedRunRow(
jobVersionUuid,
parentRunUuid,
runArgsUuid,
inputVersions,
outputVersions,
nominalStartTime,
nominalEndTime,
currentRunState,
Expand All @@ -59,6 +59,12 @@ public ExtendedRunRow(
endRunStateUuid,
namespaceName,
jobName);
this.inputVersions = inputVersions;
this.outputVersions = outputVersions;
this.args = args;
}

public boolean hasInputVersionUuids() {
return !inputVersions.isEmpty();
}
}
10 changes: 1 addition & 9 deletions api/src/main/java/marquez/db/models/RunRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package marquez.db.models;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;
Expand All @@ -15,7 +14,6 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import marquez.common.models.DatasetVersionId;

@AllArgsConstructor
@EqualsAndHashCode
Expand All @@ -24,12 +22,10 @@ public class RunRow {
@Getter @NonNull private final UUID uuid;
@Getter @NonNull private final Instant createdAt;
@Getter @NonNull private final Instant updatedAt;
@Getter @NonNull private final UUID jobUuid;
@Getter private final UUID jobUuid;
@Nullable private final UUID jobVersionUuid;
@Nullable private final UUID parentRunUuid;
@Getter @NonNull private final UUID runArgsUuid;
@Getter @NonNull private final List<DatasetVersionId> inputVersions;
@Getter @NonNull private final List<DatasetVersionId> outputVersions;
@Nullable private final Instant nominalStartTime;
@Nullable private final Instant nominalEndTime;
@Nullable private final String currentRunState;
Expand All @@ -40,10 +36,6 @@ public class RunRow {
@Getter private final String namespaceName;
@Getter private final String jobName;

public boolean hasInputVersionUuids() {
return !inputVersions.isEmpty();
}

public Optional<UUID> getParentRunUuid() {
return Optional.ofNullable(parentRunUuid);
}
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/marquez/service/DatasetService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import marquez.db.DatasetVersionDao;
import marquez.db.RunDao;
import marquez.db.models.ExtendedDatasetVersionRow;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.RunRow;
import marquez.service.RunTransitionListener.JobOutputUpdate;
import marquez.service.models.Dataset;
import marquez.service.models.DatasetMeta;
Expand Down Expand Up @@ -62,7 +62,7 @@ public Dataset createOrUpdate(
@NonNull DatasetMeta datasetMeta) {
if (datasetMeta.getRunId().isPresent()) {
UUID runUuid = datasetMeta.getRunId().get().getValue();
ExtendedRunRow runRow = runDao.findRunByUuidAsRow(runUuid).get();
RunRow runRow = runDao.findRunByUuidAsRow(runUuid).get();

List<ExtendedDatasetVersionRow> outputs =
datasetVersionDao.findOutputDatasetVersionsFor(runUuid);
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/service/JobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Job createOrUpdate(
if (jobMeta.getRunId().isPresent()) {
UUID runUuid = jobMeta.getRunId().get().getValue();
runDao.notifyJobChange(runUuid, jobRow, jobMeta);
ExtendedRunRow runRow = runDao.findRunByUuidAsRow(runUuid).get();
ExtendedRunRow runRow = runDao.findRunByUuidAsExtendedRow(runUuid).get();

List<ExtendedDatasetVersionRow> inputs =
datasetVersionDao.findInputDatasetVersionsFor(runUuid);
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/service/RunService.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void markRunAs(
if (transitionedAt == null) {
transitionedAt = Instant.now();
}
ExtendedRunRow runRow = findRunByUuidAsRow(runId.getValue()).get();
ExtendedRunRow runRow = findRunByUuidAsExtendedRow(runId.getValue()).get();
runStateDao.updateRunStateFor(runId.getValue(), runState, transitionedAt);

if (runState.isDone()) {
Expand Down
6 changes: 3 additions & 3 deletions api/src/test/java/marquez/db/BackfillTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import java.util.Optional;
import java.util.UUID;
import marquez.common.Utils;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.RunArgsRow;
import marquez.db.models.RunRow;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.JobFacet;
import marquez.service.models.LineageEvent.JobLink;
Expand All @@ -35,7 +35,7 @@
public class BackfillTestUtils {
public static final String COMPLETE = "COMPLETE";

public static ExtendedRunRow writeNewEvent(
public static RunRow writeNewEvent(
Jdbi jdbi,
String jobName,
Instant now,
Expand All @@ -52,7 +52,7 @@ public static ExtendedRunRow writeNewEvent(
runArgsDao.upsertRunArgs(
UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of()));
UUID runUuid = UUID.randomUUID();
ExtendedRunRow runRow =
RunRow runRow =
runDao.upsert(
runUuid,
null,
Expand Down
3 changes: 1 addition & 2 deletions api/src/test/java/marquez/db/DbTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import marquez.common.models.RunState;
import marquez.db.models.DatasetRow;
import marquez.db.models.ExtendedJobVersionRow;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
import marquez.db.models.JobVersionRow;
Expand Down Expand Up @@ -256,7 +255,7 @@ static RunRow newRun(final Jdbi jdbi, JobRow jobRow) {
}

/** Adds a new {@link RunRow} object to the {@code runs} table. */
static ExtendedRunRow newRun(
static RunRow newRun(
final Jdbi jdbi,
final UUID jobUuid,
final UUID jobVersionUuid,
Expand Down
3 changes: 1 addition & 2 deletions api/src/test/java/marquez/db/JobVersionDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import marquez.db.models.DatasetRow;
import marquez.db.models.ExtendedDatasetVersionRow;
import marquez.db.models.ExtendedJobVersionRow;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.RunArgsRow;
Expand Down Expand Up @@ -128,7 +127,7 @@ public void testUpdateLatestRunFor() {

// (2) Add a new run.
final RunArgsRow runArgsRow = DbTestUtils.newRunArgs(jdbiForTesting);
final ExtendedRunRow runRow =
final RunRow runRow =
DbTestUtils.newRun(
jdbiForTesting,
jobVersionRow.getJobUuid(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import java.util.UUID;
import marquez.db.NamespaceDao;
import marquez.db.OpenLineageDao;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.RunRow;
import marquez.jdbi.JdbiExternalPostgresExtension.FlywayTarget;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import org.flywaydb.core.api.configuration.Configuration;
Expand Down Expand Up @@ -48,7 +48,7 @@ public void testBackfill() throws SQLException, JsonProcessingException {
NamespaceRow namespace =
namespaceDao.upsertNamespaceRow(UUID.randomUUID(), now, NAMESPACE, "me");
String parentName = "parentJob";
ExtendedRunRow parentRun = writeNewEvent(jdbi, parentName, now, namespace, null, null);
RunRow parentRun = writeNewEvent(jdbi, parentName, now, namespace, null, null);

String task1Name = "task1";
writeNewEvent(jdbi, task1Name, now, namespace, parentRun.getUuid().toString(), parentName);
Expand Down

0 comments on commit 1faf1bb

Please sign in to comment.