Skip to content

Commit

Permalink
Merge pull request #63 from MatterMiners/fix/jobstart-60
Browse files Browse the repository at this point in the history
Runnings jobs independently from scheduler
  • Loading branch information
eileen-kuehn committed Nov 12, 2019
2 parents a698820 + a16b2aa commit 213a69d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 14 deletions.
14 changes: 12 additions & 2 deletions lapis/drone.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from cobald import interfaces
from usim import time, Scope, instant, Capacities, ResourcesUnavailable
from usim import time, Scope, instant, Capacities, ResourcesUnavailable, Queue

from lapis.job import Job

Expand Down Expand Up @@ -44,6 +44,7 @@ def __init__(
self.jobs = 0
self._allocation = None
self._utilisation = None
self._job_queue = Queue()

@property
def theoretical_available_resources(self):
Expand All @@ -60,6 +61,9 @@ async def run(self):
self._supply = 1
self.scheduler.register_drone(self)
await sampling_required.put(self)
async with Scope() as scope:
async for job, kill in self._job_queue:
scope.do(self._run_job(job=job, kill=kill))

@property
def supply(self) -> float:
Expand Down Expand Up @@ -103,7 +107,10 @@ async def shutdown(self):
await sampling_required.put(self) # TODO: introduce state of drone
await (time + 1)

async def start_job(self, job: Job, kill: bool = False):
async def schedule_job(self, job: Job, kill: bool = False):
await self._job_queue.put((job, kill))

async def _run_job(self, job: Job, kill: bool):
"""
Method manages to start a job in the context of the given drone.
The job is started independent of available resources. If resources of
Expand Down Expand Up @@ -147,6 +154,9 @@ async def start_job(self, job: Job, kill: bool = False):
await instant
job_execution.cancel()
self.jobs -= 1
if not job.successful:
job.drone = None
await self.scheduler.retry_job(job)
self._utilisation = self._allocation = None
self.scheduler.update_drone(self)
await sampling_required.put(self)
Expand Down
11 changes: 7 additions & 4 deletions lapis/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,21 @@ def __init__(
assert init <= capacity
self.make_drone = make_drone
self._drones = []
self.init_pool(init=init)
self._demand = 1
self._level = init
self._capacity = capacity
self._name = name

def init_pool(self, init: int = 0):
async def init_pool(self, scope: Scope, init: int = 0):
"""
Initialisation of existing drones at creation time of pool.
:param init: Number of drones to create.
"""
for _ in range(init):
self._drones.append(self.make_drone(0))
drone = self.make_drone(0)
scope.do(drone.run())
self._drones.append(drone)

# TODO: the run method currently needs to be called manually
async def run(self):
Expand All @@ -52,6 +53,7 @@ async def run(self):
initialising new drones. Otherwise drones get removed.
"""
async with Scope() as scope:
await self.init_pool(scope=scope, init=self._level)
async for _ in interval(1):
drones_required = min(self._demand, self._capacity) - self._level
while drones_required > 0:
Expand Down Expand Up @@ -145,5 +147,6 @@ async def run(self):
"""
Pool runs forever and does not check if number of drones needs to be adapted.
"""
while True:
async with Scope() as scope:
await self.init_pool(scope=scope, init=self._level)
await eternity
5 changes: 4 additions & 1 deletion lapis/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async def run(self):
for job in self.job_queue:
best_match = self._schedule_job(job)
if best_match:
scope.do(best_match.start_job(job))
await best_match.schedule_job(job)
self.job_queue.remove(job)
await sampling_required.put(self.job_queue)
self.unregister_drone(best_match)
Expand All @@ -111,6 +111,9 @@ async def _collect_jobs(self):
await sampling_required.put(self.job_queue)
self._collecting = False

async def retry_job(self, job):
await self._stream_queue.put(job)

def _schedule_job(self, job) -> Drone:
priorities = {}
for cluster in self.drone_cluster:
Expand Down
18 changes: 11 additions & 7 deletions lapis_tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ async def test_job_in_drone(self):
pool_resources={"cores": 1, "memory": 1},
scheduling_duration=0,
)
await drone.run()
async with Scope() as scope:
scope.do(drone.start_job(job=job))
assert 10 == time
scope.do(drone.schedule_job(job=job))
assert 10 == time.now
assert 0 == job.waiting_time
assert job.successful

Expand All @@ -65,8 +66,9 @@ async def test_nonmatching_job_in_drone(self):
pool_resources={"cores": 1, "memory": 1},
scheduling_duration=0,
)
await drone.run()
async with Scope() as scope:
scope.do(drone.start_job(job=job))
scope.do(drone.schedule_job(job=job))
assert 0 == time
assert not job.successful
assert 0 == job.waiting_time
Expand All @@ -87,9 +89,10 @@ async def test_two_nonmatching_jobs(self):
pool_resources={"cores": 1, "memory": 1},
scheduling_duration=0,
)
await drone.run()
async with Scope() as scope:
scope.do(drone.start_job(job=job_one))
scope.do(drone.start_job(job=job_two))
scope.do(drone.schedule_job(job=job_one))
scope.do(drone.schedule_job(job=job_two))
assert 10 == time
assert job_one.successful
assert not job_two.successful
Expand All @@ -112,9 +115,10 @@ async def test_two_matching_jobs(self):
pool_resources={"cores": 2, "memory": 2},
scheduling_duration=0,
)
await drone.run()
async with Scope() as scope:
scope.do(drone.start_job(job=job_one))
scope.do(drone.start_job(job=job_two))
scope.do(drone.schedule_job(job=job_one))
scope.do(drone.schedule_job(job=job_two))
assert 10 == time
assert job_one.successful
assert job_two.successful
Expand Down

0 comments on commit 213a69d

Please sign in to comment.