Skip to content

Commit

Permalink
fix acquire pending for Access job
Browse files Browse the repository at this point in the history
  • Loading branch information
terrywbrady committed Jun 11, 2024
1 parent ec42f63 commit 2e00cd6
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/main/ruby/lib/merritt_zk_access.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def self.acquire_pending_assembly(zk, queue_name)
zk.children(Access.dir(queue_name)).sort.each do |cp|
a = Access.new(queue_name, cp)
a.load(zk)
next unless a.status == AccessState::Pending

begin
return a if a.lock(zk)
rescue ZK::Exceptions::NodeExists
Expand Down
62 changes: 62 additions & 0 deletions src/main/ruby/spec/zk_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,68 @@ def make_batch_json(s = 'bar', u = 'bid-uuid')
arr = MerrittZK::Access.list_jobs_as_json(@zk)
expect(arr.length).to eq(1)
end

it :access_happy_path_del do |_x|
q = MerrittZK::Access::SMALL
a = MerrittZK::Access.create_assembly(@zk, q, { token: 'abc' })
@remap['qid0'] = a.id
aa = MerrittZK::Access.acquire_pending_assembly(@zk, q)
expect(aa).to_not be_nil
expect(a.id).to eq(aa.id)
expect(aa.status.status).to eq(:Pending)
aa2 = MerrittZK::Access.acquire_pending_assembly(@zk, q)
expect(aa2).to be_nil
aa.set_status(@zk, aa.status.state_change(:Processing))
expect(aa.status.status).to eq(:Processing)
aa.unlock(@zk)

aaa = MerrittZK::Access.new(q, a.id)
aaa.load(@zk)
expect(a.id).to eq(aaa.id)

aaa.set_status(@zk, aaa.status.success)

expect(aaa.status.status).to eq(:Completed)
expect(aaa.status.deletable?).to be(true)
aaa.delete(@zk)

# Only for Ruby interface
arr = MerrittZK::Access.list_jobs_as_json(@zk)
expect(arr.length).to eq(0)
end

it :access_fail_path do |_x|
q = MerrittZK::Access::SMALL
a = MerrittZK::Access.create_assembly(@zk, q, { token: 'abc' })
@remap['qid0'] = a.id
aa = MerrittZK::Access.acquire_pending_assembly(@zk, q)
expect(aa).to_not be_nil
expect(a.id).to eq(aa.id)
expect(aa.status.status).to eq(:Pending)
aa2 = MerrittZK::Access.acquire_pending_assembly(@zk, q)
expect(aa2).to be_nil
aa.set_status(@zk, aa.status.state_change(:Processing))
expect(aa.status.status).to eq(:Processing)
aa.unlock(@zk)

aaa = MerrittZK::Access.new(q, a.id)
aaa.load(@zk)
expect(a.id).to eq(aaa.id)

aaa.set_status(@zk, aaa.status.fail)

expect(aaa.status.status).to eq(:Failed)
aa2 = MerrittZK::Access.acquire_pending_assembly(@zk, q)
expect(aa2).to be_nil

aaa.set_status(@zk, aaa.status.state_change(:Deleted))

expect(aaa.status.status).to eq(:Deleted)
aa2 = MerrittZK::Access.acquire_pending_assembly(@zk, q)
expect(aa2).to be_nil

expect(aaa.status.deletable?).to be(true)
end
end

describe 'Find Batch by UUID' do
Expand Down
72 changes: 72 additions & 0 deletions src/test/java/org/cdlib/mrt/zk/ZKTestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ enum Tests {
lock_store,
lock_inventory,
access_happy_path,
access_happy_path_del,
access_fail_path,
find_batch_by_uuid;
}

Expand Down Expand Up @@ -1323,6 +1325,76 @@ public void accessHappyPath() throws KeeperException, InterruptedException, Merr
assertEquals(jobs.size(), 0);
}

@Test
public void accessHappyPathDel() throws KeeperException, InterruptedException, MerrittZKNodeInvalid, MerrittStateError{
load(Tests.access_happy_path_del);
Access a = Access.createAssembly(zk, Access.Queues.small, token("abc"));
remap.put("qid0", a.id());
Access aa = Access.acquirePendingAssembly(zk, Access.Queues.small);
assertNotNull(aa);
assertEquals(a.id(), aa.id());
assertEquals(aa.status(), AccessState.Pending);
Access aa2 = Access.acquirePendingAssembly(zk, Access.Queues.small);
assertNull(aa2);
aa.setStatus(zk, AccessState.Processing);
assertEquals(aa.status(), AccessState.Processing);
aa.unlock(zk);

Access aaa = new Access(Access.Queues.small, a.id());
aaa.load(zk);
assertEquals(a.id(), aaa.id());
aaa.setStatus(zk, aaa.status().success());
assertEquals(aaa.status(), AccessState.Completed);
assertTrue(aaa.status().isDeletable());
aaa.unlock(zk);
aa2 = Access.acquirePendingAssembly(zk, Access.Queues.small);
assertNull(aa2);

aaa.delete(zk);
Access aa3 = Access.acquirePendingAssembly(zk, Access.Queues.small);
assertNull(aa3);
List<Access> jobs = Access.listJobs(zk, Access.Queues.small, null);
assertEquals(jobs.size(), 0);
jobs = Access.listJobs(zk, Access.Queues.large, null);
assertEquals(jobs.size(), 0);
}

@Test
public void accessFailPath() throws KeeperException, InterruptedException, MerrittZKNodeInvalid{
load(Tests.access_fail_path);
Access a = Access.createAssembly(zk, Access.Queues.small, token("abc"));
remap.put("qid0", a.id());
Access aa = Access.acquirePendingAssembly(zk, Access.Queues.small);
assertNotNull(aa);
assertEquals(a.id(), aa.id());
assertEquals(aa.status(), AccessState.Pending);
Access aa2 = Access.acquirePendingAssembly(zk, Access.Queues.small);
assertNull(aa2);
aa.setStatus(zk, AccessState.Processing);
assertEquals(aa.status(), AccessState.Processing);
aa.unlock(zk);

Access aaa = new Access(Access.Queues.small, a.id());
aaa.load(zk);
assertEquals(a.id(), aaa.id());
aaa.setStatus(zk, aaa.status().fail());
assertEquals(aaa.status(), AccessState.Failed);
Access aa3 = Access.acquirePendingAssembly(zk, Access.Queues.small);
assertNull(aa3);
aaa.setStatus(zk, AccessState.Deleted);

assertTrue(aaa.status().isDeletable());
aaa.unlock(zk);
Access aa4 = Access.acquirePendingAssembly(zk, Access.Queues.small);
assertNull(aa4);
List<Access> jobs = Access.listJobs(zk, Access.Queues.small, null);
assertEquals(jobs.size(), 1);
jobs = Access.listJobs(zk, Access.Queues.small, AccessState.Pending);
assertEquals(jobs.size(), 0);
jobs = Access.listJobs(zk, Access.Queues.large, null);
assertEquals(jobs.size(), 0);
}

@Test
public void findByUuid() throws KeeperException, InterruptedException, MerrittZKNodeInvalid{
load(Tests.create_batch);
Expand Down
11 changes: 11 additions & 0 deletions test-cases.yml
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,17 @@ access_happy_path:
/access/small/qid0/status:
status: Completed
last_modified: now
access_happy_path_del:
input:
output:
access_fail_path:
input:
output:
/access/small/qid0/token:
token: abc
/access/small/qid0/status:
status: Deleted
last_modified: now
find_batch_by_uuid:
input:
output:
Expand Down

0 comments on commit 2e00cd6

Please sign in to comment.