Skip to content

Commit

Permalink
Remove references to simple_name as job redirects handle redirecting …
Browse files Browse the repository at this point in the history
…simple name to fqn

added unit test to verify

Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike committed May 6, 2022
1 parent fbda91b commit a45885b
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 17 deletions.
4 changes: 0 additions & 4 deletions api/src/main/java/marquez/api/JobResource.java
Expand Up @@ -175,10 +175,6 @@ public Response createRun(
JobRow job =
jobService
.findJobByNameAsRow(namespaceName.getValue(), jobName.getValue())
.or(
() ->
jobService.findJobBySimpleNameAsRow(
namespaceName.getValue(), jobName.getValue()))
.orElseThrow(
() ->
new IllegalArgumentException(
Expand Down
18 changes: 5 additions & 13 deletions api/src/main/java/marquez/db/JobDao.java
Expand Up @@ -91,29 +91,21 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {
@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, symlink_target_uuid
FROM jobs j
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
UNION
SELECT j.uuid, j.symlink_target_uuid
FROM jobs j
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, n.name AS namespace_name
FROM jobs AS j
INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
""")
Optional<JobRow> findJobByNameAsRow(String namespaceName, String jobName);

@SqlQuery(
"""
SELECT j.* FROM jobs AS j
WHERE j.namespace_name = :namespaceName AND
j.name = :jobName
""")
Optional<JobRow> findJobBySimpleNameAsRow(String namespaceName, String jobName);

@SqlQuery(
"SELECT j.*, jc.context, f.facets\n"
+ " FROM jobs_view AS j\n"
Expand Down
87 changes: 87 additions & 0 deletions api/src/test/java/marquez/MarquezAppIntegrationTest.java
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import marquez.client.models.Dataset;
import marquez.client.models.DatasetId;
import marquez.client.models.DbTable;
Expand Down Expand Up @@ -700,4 +701,90 @@ public void testApp_getJob() throws SQLException {
.hasFieldOrPropertyWithValue("namespace", NAMESPACE_NAME)
.hasFieldOrPropertyWithValue("name", targetJobName);
}

@Test
public void testApp_getJobWithFQNFromParent() throws SQLException {
Jdbi jdbi =
Jdbi.create(POSTGRES.getJdbcUrl(), POSTGRES.getUsername(), POSTGRES.getPassword())
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin());
createNamespace(NAMESPACE_NAME);

// Create job
String jobName = newJobName().getValue();
final JobMeta jobMeta =
JobMeta.builder()
.type(JOB_TYPE)
.inputs(ImmutableSet.of())
.outputs(ImmutableSet.of())
.location(JOB_LOCATION)
.context(JOB_CONTEXT)
.description(JOB_DESCRIPTION)
.build();
final Job originalJob = client.createJob(NAMESPACE_NAME, jobName, jobMeta);

String parentJobName = newJobName().getValue();
final JobMeta parentJobMeta =
JobMeta.builder()
.type(JOB_TYPE)
.inputs(ImmutableSet.of())
.outputs(ImmutableSet.of())
.location(JOB_LOCATION)
.context(JOB_CONTEXT)
.description(JOB_DESCRIPTION)
.build();
final Job parentJob = client.createJob(NAMESPACE_NAME, parentJobName, parentJobMeta);

JobDao jobDao = jdbi.onDemand(JobDao.class);
NamespaceDao namespaceDao = jdbi.onDemand(NamespaceDao.class);
Optional<NamespaceRow> namespaceRow = namespaceDao.findNamespaceByName(NAMESPACE_NAME);
if (namespaceRow.isEmpty()) {
throw new AssertionError("Couldn't find expected namespace row");
}
Optional<JobRow> originalJobRow = jobDao.findJobByNameAsRow(NAMESPACE_NAME, jobName);
if (originalJobRow.isEmpty()) {
throw new AssertionError("Couldn't find job row we just inserted");
}
Optional<JobRow> parentJobRow = jobDao.findJobByNameAsRow(NAMESPACE_NAME, parentJobName);
if (parentJobRow.isEmpty()) {
throw new AssertionError("Couldn't find parent job we just inserted");
}
PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue("[]");
JobRow jobRow = originalJobRow.get();
JobRow targetJobRow =
jobDao.upsertJob(
UUID.randomUUID(),
parentJobRow.get().getUuid(),
JobType.valueOf(jobRow.getType()),
Instant.now(),
namespaceRow.get().getUuid(),
namespaceRow.get().getName(),
jobRow.getName(),
jobRow.getDescription().orElse(null),
jobRow.getJobContextUuid().orElse(null),
jobRow.getLocation(),
null,
inputs);
// symlink the old job to point to the new one that has a parent uuid
jobDao.upsertJob(
jobRow.getUuid(),
JobType.valueOf(JOB_TYPE.name()),
Instant.now(),
namespaceRow.get().getUuid(),
NAMESPACE_NAME,
jobName,
JOB_DESCRIPTION,
jobRow.getJobContextUuid().orElse(null),
JOB_LOCATION.toString(),
targetJobRow.getUuid(),
inputs);

Job job = client.getJob(NAMESPACE_NAME, jobName);
assertThat(job)
.isNotNull()
.hasFieldOrPropertyWithValue("namespace", NAMESPACE_NAME)
.hasFieldOrPropertyWithValue("name", parentJobName + "." + jobName);
}
}

0 comments on commit a45885b

Please sign in to comment.