Skip to content

Commit

Permalink
Merge pull request #66 from MatterMiners/fix/simulationend-64
Browse files Browse the repository at this point in the history
Simulation awaits proper finishing of all jobs
  • Loading branch information
eileen-kuehn committed Nov 14, 2019
2 parents 213a69d + 5eccefd commit 5e10d37
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 8 deletions.
5 changes: 2 additions & 3 deletions lapis/drone.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,8 @@ async def _run_job(self, job: Job, kill: bool):
await instant
job_execution.cancel()
self.jobs -= 1
if not job.successful:
job.drone = None
await self.scheduler.retry_job(job)
job.drone = None
await self.scheduler.job_finished(job)
self._utilisation = self._allocation = None
self.scheduler.update_drone(self)
await sampling_required.put(self)
Expand Down
17 changes: 13 additions & 4 deletions lapis/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Dict
from usim import Scope, interval
from usim import Scope, interval, Resources

from lapis.drone import Drone
from lapis.monitor import sampling_required
Expand Down Expand Up @@ -32,6 +32,7 @@ def __init__(self, job_queue):
self.interval = 60
self.job_queue = JobQueue()
self._collecting = True
self._processing = Resources(jobs=0)

@property
def drone_list(self):
Expand Down Expand Up @@ -100,19 +101,27 @@ async def run(self):
for key, value in left_resources.items()
}
self._add_drone(best_match, left_resources)
if not self._collecting and not self.job_queue:
if (
not self._collecting
and not self.job_queue
and self._processing.levels.jobs == 0
):
break
await sampling_required.put(self)

async def _collect_jobs(self):
async for job in self._stream_queue:
self.job_queue.append(job)
await self._processing.increase(jobs=1)
# TODO: logging happens with each job
await sampling_required.put(self.job_queue)
self._collecting = False

async def retry_job(self, job):
await self._stream_queue.put(job)
async def job_finished(self, job):
if job.successful:
await self._processing.decrease(jobs=1)
else:
await self._stream_queue.put(job)

def _schedule_job(self, job) -> Drone:
priorities = {}
Expand Down
4 changes: 3 additions & 1 deletion lapis/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, seed=1234):
self.cost = 0
self._job_generators = []
self.monitoring = None
self.duration = None
self.enable_monitoring()

def enable_monitoring(self):
Expand Down Expand Up @@ -77,7 +78,8 @@ async def _simulate(self, end):
for controller in self.controllers:
while_running.do(controller.run(), volatile=True)
while_running.do(self.monitoring.run(), volatile=True)
print(f"Finished simulation at {time.now}")
self.duration = time.now
print(f"Finished simulation at {self.duration}")

async def _queue_jobs(self, job_input, job_reader):
await job_to_queue_scheduler(
Expand Down
40 changes: 40 additions & 0 deletions lapis_tests/test_simulator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from tempfile import NamedTemporaryFile

from lapis.job_io.htcondor import htcondor_job_reader
from lapis.pool import StaticPool
from lapis.pool_io.htcondor import htcondor_pool_reader
from lapis.scheduler import CondorJobScheduler
from lapis.simulator import Simulator


class TestSimulator(object):
def test_simulation_exit(self):
simulator = Simulator()
with NamedTemporaryFile(suffix=".csv") as machine_config, NamedTemporaryFile(
suffix=".csv"
) as job_config:
with open(machine_config.name, "w") as write_stream:
write_stream.write(
"TotalSlotCPUs TotalSlotDisk TotalSlotMemory Count\n"
"1 44624348.0 8000 1"
)
with open(job_config.name, "w") as write_stream:
write_stream.write(
"QDate RequestCpus RequestWalltime RequestMemory RequestDisk "
"RemoteWallClockTime MemoryUsage DiskUsage_RAW RemoteSysCpu "
"RemoteUserCpu\n"
"1567155456 1 60 2000 6000000 100.0 2867 41898 10.0 40.0"
)
job_input = open(job_config.name, "r+")
machine_input = open(machine_config.name, "r+")
simulator.create_job_generator(
job_input=job_input, job_reader=htcondor_job_reader
)
simulator.create_scheduler(scheduler_type=CondorJobScheduler)
simulator.create_pools(
pool_input=machine_input,
pool_reader=htcondor_pool_reader,
pool_type=StaticPool,
)
simulator.run()
assert 180 == simulator.duration

0 comments on commit 5e10d37

Please sign in to comment.