Skip to content
This repository has been archived by the owner on Mar 2, 2020. It is now read-only.

Commit

Permalink
Allow aetros start to choose config path.
Browse files Browse the repository at this point in the history
Better debug output for failed api responses.
Set in JobImage `pos` to current time if not set.
Only print `successfully reconnected` for main channel.
Fixed race condition in sending git object packs and setting ref head.
Read resources from job.json in start method.
  • Loading branch information
marcj committed Feb 8, 2018
1 parent d82d006 commit 657fb50
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 45 deletions.
11 changes: 10 additions & 1 deletion aetros/JobModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,23 @@ def get_model_node(self, name):

@property
def resources(self):
if 'resources' in self.config:
if 'resources' in self.config and isinstance(self.config['resources'], dict):
return self.config['resources']

return {}

def has_dpu(self):
return 'dpu' in self.resources and self.resources['dpu'] > 0

def get_cpu(self):
return self.resources.get('cpu', 1)

def get_memory(self):
return self.resources.get('memory', 1)

def get_gpu(self):
return self.resources.get('gpu', 1)

def get_batch_size(self):
return self.job['config']['batchSize']

Expand Down
9 changes: 6 additions & 3 deletions aetros/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ def raise_response_exception(message, response):


def parse_json(content):
a = simplejson.loads(content)
try:
a = simplejson.loads(content)
except simplejson.errors.JSONDecodeError as e:
raise ApiError("Could not decode response from server: %s, response: %s\n" % (str(e), content))

if isinstance(a, dict) and 'error' in a:
raise ApiError('API request failed %s: %s.' % (a['error'], a['message']), a['error'])
Expand All @@ -145,11 +148,11 @@ def user():
return parse_json(request('user'))


def create_job(model_name, local=False, parameters=None, dataset_id=None, config=None):
def create_job(model_name, config_path, local=False, parameters=None, dataset_id=None, config=None):
content = request(
'job',
{'modelId': model_name},
{'parameters': parameters, 'datasetId': dataset_id, 'config': config, 'local': local},
{'parameters': parameters, 'configPath': config_path, 'datasetId': dataset_id, 'config': config, 'local': local},
'put'
)
return parse_json(content)
Expand Down
3 changes: 3 additions & 0 deletions aetros/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ def __init__(self, name, pil_image, label=None, pos=None):
self.label = label
self.pos = pos

if self.pos is None:
self.pos = time.time()


class JobChannel:
NUMBER = 'number'
Expand Down
4 changes: 2 additions & 2 deletions aetros/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ def connect(self, channel):
self.registered[channel] = self.on_connect(self.was_connected_once[channel], channel)
self.connected_since[channel] = time.time()

if self.registered[channel] and self.was_connected_once[channel]:
self.logger.info("[%s] successfully reconnected." % (channel, ))
if channel == '' and self.registered[channel] and self.was_connected_once[channel]:
self.logger.info("Successfully reconnected.")

if not self.registered[channel]:
# make sure to close channel and connection first
Expand Down
1 change: 1 addition & 0 deletions aetros/commands/RunCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def main(self, args):
"NVIDIA-Docker2 is only supported on Linux.")

local_max_resources = {'cpu': cpu['count'], 'memory': ceil(mem / 1024 / 1024 / 1024), 'gpu': gpu}

if create_info['config']['image']:
# read max hardware within Docker
out = docker_call(['run', 'alpine', 'sh', '-c', 'nproc && cat /proc/meminfo | grep MemTotal'])
Expand Down
63 changes: 31 additions & 32 deletions aetros/commands/StartCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import sys

from aetros import api

from aetros.utils import read_config, unpack_full_job_id, read_home_config

from aetros.utils import read_home_config
import aetros.const
import os


class StartCommand:
def __init__(self, logger):
Expand All @@ -16,12 +14,13 @@ def __init__(self, logger):
def main(self, args):
from aetros.starter import start
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter, prog=aetros.const.__prog__ + ' start')
parser.add_argument('name', nargs='?', help='the model name, e.g. aetros/mnist-network to start new job, or job id, e.g. user/modelname/0db75a64acb74c27bd72c22e359de7a4c44a20e5 to start a pre-created job.')
parser.add_argument('name', help='the model name, e.g. aetros/mnist-network to start new job, or job id, e.g. user/modelname/0db75a64acb74c27bd72c22e359de7a4c44a20e5 to start a pre-created job.')

parser.add_argument('-i', '--image', help="Which Docker image to use for the command. Default read in aetros.yml. If not specified, command is executed on the host.")
parser.add_argument('-l', '--local', action='store_true', help="Start the job immediately on the current machine.")
parser.add_argument('-s', '--server', action='append', help="Limits the server pool to this server. Default not limitation or read in aetros.yml. Multiple --server allowed.")
parser.add_argument('-b', '--branch', help="This overwrites the Git branch used when new job should be started.")
parser.add_argument('-c', '--config', help="Default /aetros.yml in Git root.")
parser.add_argument('--priority', help="Increases or decreases priority. Default is 0.")

parser.add_argument('--cpu', help="How many CPU cores should be assigned to job. Docker only.")
Expand All @@ -31,7 +30,7 @@ def main(self, args):

parser.add_argument('--rebuild-image', action='store_true', help="Makes sure the Docker image is re-built without cache.")

parser.add_argument('--gpu-device', action='append', help="Which device id should be mapped into the NVIDIA docker container.")
parser.add_argument('--gpu-device', action='append', help="Which GPU device id should be mapped into the Docker container. Only with --local.")

parser.add_argument('--max-time', help="Limit execution time in seconds. Sends SIGINT to the process group when reached.")
parser.add_argument('--max-epochs', help="Limit execution epochs. Sends SIGINT to the process group when reached.")
Expand All @@ -43,21 +42,24 @@ def main(self, args):

parsed_args = parser.parse_args(args)

home_config = read_home_config()
model_name = parsed_args.name
if not parsed_args.name:
print("fatal: no model defined. 'aetros start user/model-name'.")
sys.exit(2)

if model_name.count('/') > 1:
# start a concret job, by server command
if parsed_args.name and parsed_args.name.count('/') > 1:
# start a concrete job, used by server command
gpu_devices = []
if parsed_args.gpu_device:
gpu_devices = [int(x) for x in parsed_args.gpu_device]

start(self.logger, model_name, cpus=int(parsed_args.cpu), memory=int(parsed_args.memory),
start(self.logger, parsed_args.name, cpus=int(parsed_args.cpu), memory=int(parsed_args.memory),
gpu_devices=gpu_devices)
return

# create a new job
home_config = read_home_config()
model_name = parsed_args.name

# create a new job
hyperparameter = {}
if parsed_args.param:
for param in parsed_args.param:
Expand Down Expand Up @@ -88,24 +90,30 @@ def main(self, args):
if parsed_args.rebuild_image:
job_config['config']['rebuild_image'] = True

if 'resources' not in job_config:
job_config['resources'] = {}

if parsed_args.server:
job_config['servers'] = []
for name in parsed_args.server:
job_config['servers'].append(name)

job_config['resources'] = job_config.get('resources', {})
resources = job_config['resources']
resources['cpu'] = int(parsed_args.cpu or resources.get('cpu', 1))
resources['memory'] = int(parsed_args.memory or resources.get('memory', 1))
resources['gpu'] = int(parsed_args.gpu or resources.get('gpu', 0))
resources['gpu_memory'] = int(parsed_args.gpu_memory or resources.get('gpu_memory', 0))
job_config['resources'] = {}

if parsed_args.cpu:
job_config['resources']['cpu'] = int(parsed_args.cpu)

if parsed_args.memory:
job_config['resources']['memory'] = int(parsed_args.memory)

if parsed_args.gpu:
job_config['resources']['gpu'] = int(parsed_args.gpu)

if parsed_args.gpu_memory:
job_config['resources']['gpu_memory'] = int(parsed_args.gpu_memory)

config_path = parsed_args.config or 'aetros.yml'

try:
self.logger.debug("Create job ...")
created = api.create_job(model_name, parsed_args.local, hyperparameter, parsed_args.dataset, config=job_config)
created = api.create_job(model_name, config_path, parsed_args.local, hyperparameter, parsed_args.dataset, config=job_config)
except api.ApiError as e:
if 'Connection refused' in e.error:
self.logger.error("You are offline")
Expand All @@ -115,15 +123,6 @@ def main(self, args):
self.logger.info("Job %s/%s created." % (model_name, created['id']))

if parsed_args.local:
cpus = job_config['resources']['cpu']
memory = job_config['resources']['memory']

if not parsed_args.gpu_device and job_config['resources']['gpu'] > 0:
# if requested 2 GPUs and we have 3 GPUs with id [0,1,2], gpus should be [0,1]
parsed_args.gpu_device = []
for i in range(0, job_config['resources']['gpu']):
parsed_args.gpu_device.append(i)

start(self.logger, model_name + '/' + created['id'], cpus=cpus, memory=memory, gpu_devices=parsed_args.gpu_device)
start(self.logger, model_name + '/' + created['id'], gpu_devices=parsed_args.gpu_device)
else:
print("Open http://%s/model/%s/job/%s to monitor it." % (home_config['host'], model_name, created['id']))
12 changes: 6 additions & 6 deletions aetros/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,17 +719,17 @@ def commit_file(self, message, path, content):
:param content: str
:return:
"""
if not self.git_batch_commit:
if self.git_batch_commit:
self.add_file(path, content)
self.git_batch_commit_messages.append(message)
else:
with self.lock_write():
if self.job_id:
self.read_tree(self.ref_head)

self.add_file(path, content)

return self.commit_index(message)
else:
self.add_file(path, content)
self.git_batch_commit_messages.append(message)

def diff_objects(self, latest_commit_sha):
"""
Expand Down Expand Up @@ -892,7 +892,7 @@ def push(self):
message = {
'type': 'git-unpack-objects',
'ref': self.ref_head,
'commit': self.get_head_commit(),
'commit': commit_sha,
'pack': pack_content,
'objects': summary,
}
Expand Down Expand Up @@ -920,7 +920,7 @@ def thread_push(self):
while self.active_thread:
try:
head = self.get_head_commit()
if last_synced_head != self.get_head_commit():
if last_synced_head != head:
self.logger.debug("Git head moved from %s to %s" % (last_synced_head, head))
if self.push() is not False:
last_synced_head = head
Expand Down
16 changes: 15 additions & 1 deletion aetros/starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class GitCommandException(Exception):
cmd = None


def start(logger, full_id, fetch=True, env=None, volumes=None, cpus=1, memory=1, gpu_devices=None, offline=False):
def start(logger, full_id, fetch=True, env=None, volumes=None, cpus=None, memory=None, gpu_devices=None, offline=False):
"""
Starts the job with all logging of a job_id
"""
Expand All @@ -44,6 +44,20 @@ def start(logger, full_id, fetch=True, env=None, volumes=None, cpus=1, memory=1,
job_backend.start(collect_system=False, offline=offline)
job_backend.set_status('PREPARE', add_section=False)

job = job_backend.get_job_model()

if not cpus:
cpus = job.get_cpu()

if not memory:
memory = job.get_memory()

if not gpu_devices and job.get_gpu():
# if requested 2 GPUs and we have 3 GPUs with id [0,1,2], gpus should be [0,1]
gpu_devices = []
for i in range(0, job.get_gpu()):
gpu_devices.append(i)

start_command(logger, job_backend, env, volumes, cpus=cpus, memory=memory, gpu_devices=gpu_devices, offline=offline)


Expand Down

0 comments on commit 657fb50

Please sign in to comment.