Job queue system
Apr 14, 2019
import copy
import logging
import os
import pathlib
import queue
import subprocess
import threading
import tempfile
import time
import yaml

import click

OUT_ROOT_DIR = '/home/clemens/Dropbox/artifacts/DeepCodeCraft'

logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO)

class JobQueue:
def __init__(self, queue_dir, concurrency):
self.queue_dir = queue_dir
self.concurrency = concurrency
self.known_jobs = {}
self.queue = queue.Queue()
self.active_jobs = 0
self.lock = threading.Lock()

def run(self):"Watching {self.queue_dir} for new jobs...")

while True:
for job_file in os.listdir(self.queue_dir):
if job_file not in self.known_jobs:"Found new job file {job_file}")

while self.queue.qsize() > 0 and self.active_jobs < self.concurrency:
job = self.queue.get()
self.active_jobs += 1
threading.Thread(target=self.run_job, args=(job,)).start()"In queue: {self.queue.qsize()} Running: {self.active_jobs}")


def run_job(self, job):
with tempfile.TemporaryDirectory() as dir:

def git(args, workdir=dir):
FNULL = open(os.devnull, 'w')
cmd = ["git"]
if workdir is not None:
cmd.extend(["-C", dir])
subprocess.check_call(cmd, stdout=FNULL, stderr=subprocess.STDOUT)

git(["clone", job.repo_path, dir], workdir=None)
git(["reset", "--hard", "HEAD"])
git(["clean", "-fd"])
git(["checkout", job.revision])
revision = subprocess.check_output(
["git", "-C", dir, "describe", "--tags", "--always", "--dirty"]).decode("UTF-8")[:-1]

out_dir = os.path.join(OUT_ROOT_DIR, f'{time.strftime("%Y-%m-%d~%H:%M:%S")}-{revision}')
for name, value in job.params.items():
out_dir += f"-{name}{value}"
pathlib.Path(out_dir).mkdir(parents=True, exist_ok=True)

job_desc = f"{job.repo_path} at {job.revision} with {job.params}"
args = [f"--{name}={value}" for name, value in job.params.items()]
logpath = os.path.join(out_dir, "out.txt")"Running {job_desc}")"Output in {logpath}")
with open(logpath, "w+") as outfile:
retcode =["python3", os.path.join(dir, ""), "--out-dir", out_dir] + args,
stdout=outfile, stderr=outfile)
if retcode != 0:
logging.warning(f"Command {job_desc} returned non-zero exit status {retcode}. Logs: {logpath}")
else:"Success: {job_desc}")

self.active_jobs -= 1
self.known_jobs[job.handle] -= 1
if self.known_jobs[job.handle] == 0:
os.remove(os.path.join(self.queue_dir, job.handle))
del self.known_jobs[job.handle]

def process_job_file(self, job_file):
job = yaml.safe_load(open(os.path.join(self.queue_dir, job_file), "r"))
param_sets = []
for param_set in job["params"]:
param_sets.extend(self.all_combinations(param_set))"Enqueuing {len(param_sets)} jobs")
self.known_jobs[job_file] = len(param_sets)

for param_set in param_sets:
self.queue.put(Job(job["repo-path"], job["revision"], param_set, job_file))

def all_combinations(self, params_dict):
param_sets = [{}]
for name, values in params_dict.items():
if type(values) is list:
new_sets = []
for value in values:
for param_set in param_sets:
ps = copy.deepcopy(param_set)
ps[name] = value
param_sets = new_sets
for param_set in param_sets:
param_set[name] = values

return param_sets

class Job:
def __init__(self, repo_path, revision, params, handle):
self.repo_path = repo_path
self.revision = revision
self.params = params
self.handle = handle

@click.option("--concurrency", default=2, help="Maximum number of jobs running at the same time.")
def main(concurrency):
job_queue = JobQueue("/home/clemens/xprun/queue", concurrency)

if __name__ == "__main__":

import os
import pathlib
from shutil import copyfile
import subprocess
import sys
import tempfile
import time

import click
import yaml

QUEUE_DIR = "/home/clemens/xprun/queue"

@click.option("--repo-path", default=".", help="Path to git code repository to execute.")
@click.option("--revision", default="HEAD", help="Git revision to execute.")
@click.option("--params-file", default="params.yaml", help="Path to parameter file.")
def main(repo_path, revision, params_file):
pathlib.Path(QUEUE_DIR).mkdir(parents=True, exist_ok=True)
commit = subprocess.check_output(["git", "rev-parse", revision]).decode("UTF-8")[:-1]
repo_path = os.path.abspath(repo_path)

with open(params_file, "r") as f:
params = yaml.safe_load(f)
job = {
"repo-path": repo_path,
"revision": commit,
"params": params,

fd, path = tempfile.mkstemp()
with open(fd, 'w') as f:
os.rename(path, os.path.join(QUEUE_DIR, f"{int(time.time())}.yaml"))

if __name__ == "__main__":

