Skip to content
This repository has been archived by the owner on Mar 2, 2020. It is now read-only.

Commit

Permalink
fixed ignore path parsing and check resources for aetros run
Browse files Browse the repository at this point in the history
improved performance by not always calling git.push every second, but
check the previouse head correctly.

Made built Docker image more unquie tag name depending on the actual
configuration.
fixed embedding parsing of word2vec format.
ensure docker is installed for certain commands.
Better error reporting for resource assignments.
  • Loading branch information
marcj committed Jan 31, 2018
1 parent 6c8580b commit 25af7dc
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 26 deletions.
12 changes: 6 additions & 6 deletions aetros/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1648,7 +1648,7 @@ def add_embedding_word2vec(self, x, path, dimensions=None, header_with_dimension
if not os.path.exists(path):
raise Exception("Given word2vec file does not exist: " + path)

f = open(path, 'rb')
f = open(path, 'r')

if not header_with_dimensions and not dimensions:
raise Exception('Either the word2vec file should contain the dimensions as header or it needs to be'
Expand All @@ -1661,21 +1661,21 @@ def add_embedding_word2vec(self, x, path, dimensions=None, header_with_dimension
dimensions = np.fromstring(line, dtype=np.uint, sep=' ').tolist()

vectors = b''
labels = b''
labels = ''

line_pos = 1 if header_with_dimensions else 0

# while '' != (line = f.readline())
for line in iter(f.readline, b''):
for line in iter(f.readline, ''):
line_pos += 1
space_pos = line.find(b' ')
space_pos = line.find(' ')
if -1 == space_pos:
message = 'Given word2vec does not have correct format in line ' + str(line_pos)
message += '\nGot: ' + str(line)
raise Exception(message)

labels += line[:space_pos] + b'\n'
vectors += np.fromstring(str(line[space_pos+1:]), dtype=np.float32, sep=' ').tobytes()
labels += line[:space_pos] + '\n'
vectors += np.fromstring(line[space_pos+1:], dtype=np.float32, sep=' ').tobytes()
else:
raise Exception("Given word2vec is not a .txt file. Other file formats are not supported.")

Expand Down
2 changes: 1 addition & 1 deletion aetros/commands/InitCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def main(self, args):
prog=aetros.const.__prog__ + ' init')
parser.add_argument('name', help="Model name")
parser.add_argument('directory', nargs='?', help="Directory, default in current.")
parser.add_argument('--organisation', '-o', help="Create the model in given space. If space does not exist, create it.")
parser.add_argument('--organisation', '-o', help="Create the model in the organisation instead of the user account.")
parser.add_argument('--space', '-s', help="Create the model in given space. If space does not exist, create it.")
parser.add_argument('--private', action='store_true', help="Make the model private. Example: aetros init my-model --private")
parser.add_argument('--force', '-f', action='store_true', help="Force overwriting of already existing configuration file.")
Expand Down
64 changes: 59 additions & 5 deletions aetros/commands/RunCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
import argparse

import sys
import os
from math import ceil

import psutil
from cpuinfo import cpuinfo

import aetros.utils.git
from aetros.cuda_gpu import get_ordered_devices, CudaNotImplementedException
from aetros.starter import start_command

from aetros.backend import JobBackend
from aetros.utils import human_size, lose_parameters_to_full, extract_parameters, find_config, \
loading_text, read_home_config
from aetros.utils import human_size, lose_parameters_to_full, extract_parameters, find_config, loading_text, \
read_home_config, ensure_docker_installed, docker_call


class RunCommand:
Expand All @@ -29,7 +36,7 @@ def main(self, args):
parser.add_argument('command', nargs='?', help="The command to run. Default read in configuration file")
parser.add_argument('-i', '--image', help="Which Docker image to use for the command. Default read in configuration file. If not specified, command is executed on the host.")
parser.add_argument('--no-image', action='store_true', help="Forces not to use docker, even when image is defined in the configuration file.")

parser.add_argument('-s', '--server', action='append', help="Limits the server pool to this server. Default not limitation or read in configuration file. Multiple --server allowed.")
parser.add_argument('-m', '--model', help="Under which model this job should be listed. Default read in configuration file")
parser.add_argument('-l', '--local', action='store_true', help="Start the job immediately on the current machine.")
Expand All @@ -41,7 +48,7 @@ def main(self, args):
parser.add_argument('--gpu', help="How many GPU cards should be assigned to job. Docker only.")
parser.add_argument('--gpu_memory', help="Memory requirement for the GPU. Docker only.")

parser.add_argument('--offline', help="Whether the execution should happen offline.")
parser.add_argument('--offline', '-o', action='store_true', help="Whether the execution should happen offline.")

parser.add_argument('--rebuild-image', action='store_true', help="Makes sure the Docker image is re-built without cache.")

Expand Down Expand Up @@ -79,6 +86,9 @@ def main(self, args):
print("fatal: the priority can only be set for jobs in the cluster.")
sys.exit(1)

if config['image']:
ensure_docker_installed(self.logger)

env = {}
if parsed_args.e:
for item in parsed_args.e:
Expand All @@ -105,7 +115,6 @@ def main(self, args):
adding_files("done with %d file%s added (%s)."
% (files_added, 's' if files_added != 1 else '', human_size(size_added, 2)))

creating_git_job = loading_text("- Create job in local Git ... ")
create_info = {
'type': 'custom',
'config': config
Expand Down Expand Up @@ -165,6 +174,7 @@ def main(self, args):

create_info['config']['resources'] = create_info['config'].get('resources', {})
resources = create_info['config']['resources']

resources['cpu'] = int(parsed_args.cpu or resources.get('cpu', 1))
resources['memory'] = int(parsed_args.memory or resources.get('memory', 1))
resources['gpu'] = int(parsed_args.gpu or resources.get('gpu', 0))
Expand All @@ -173,11 +183,55 @@ def main(self, args):
if parsed_args.local:
create_info['server'] = 'local'

# make sure we do not limit the resources to something that is not available on the local machine
warning = []
cpu = cpuinfo.get_cpu_info()
mem = psutil.virtual_memory().total
gpu = 0
try:
gpu = len(get_ordered_devices())
except CudaNotImplementedException: pass

if not create_info['config']['image'] and not all([x == 0 for x in resources]):
self.logger.warning("! No Docker virtualization since no `image` defined, resources limitation ignored.")

if create_info['config']['image'] and resources['gpu'] > 0:
if not (sys.platform == "linux" or sys.platform == "linux2"):
self.logger.warning("! Your operating system does not support GPU allocation for "
"Docker virtualization. "
"NVIDIA-Docker2 is only supported on Linux.")

local_max_resources = {'cpu': cpu['count'], 'memory': ceil(mem / 1024 / 1024 / 1024), 'gpu': gpu}
if create_info['config']['image']:
# read max hardware within Docker
out = docker_call(['run', 'alpine', 'sh', '-c', 'nproc && cat /proc/meminfo | grep MemTotal'])
cpus, memory = out.decode('utf-8').strip().split('\n')
local_max_resources['cpu'] = int(cpus)

memory = memory.replace('MemTotal:', '').replace('kB', '').strip()
local_max_resources['memory'] = ceil(int(memory) / 1024 / 1024)

if local_max_resources['cpu'] < resources['cpu']:
warning.append('CPU cores %d -> %d' % (resources['cpu'], local_max_resources['cpu']))
resources['cpu'] = local_max_resources['cpu']

if local_max_resources['memory'] < resources['memory']:
warning.append('memory %dGB -> %dGB' % (resources['memory'], local_max_resources['memory']))
resources['memory'] = local_max_resources['memory']

if local_max_resources['gpu'] < resources['gpu']:
warning.append('GPU cards %d -> %d' % (resources['gpu'], local_max_resources['gpu']))
resources['gpu'] = local_max_resources['gpu']

if warning:
self.logger.warning("! Resources downgrade due to missing hardware: %s." % (', '.join(warning),))

if parsed_args.config and not create_info['config']['configPath']:
create_info['config']['configPath'] = parsed_args.config

create_info['config']['sourcesAttached'] = True

creating_git_job = loading_text("- Create job in local Git ... ")
if aetros.utils.git.get_current_commit_hash():
create_info['origin_git_source'] = {
'origin': aetros.utils.git.get_current_remote_url(),
Expand Down
2 changes: 1 addition & 1 deletion aetros/const.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '0.14.3'
__version__ = '0.14.4'
__prog__ = "aetros"

class bcolors:
Expand Down
11 changes: 7 additions & 4 deletions aetros/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def command_exec(self, command, inputdata=None, allowed_to_fail=False, show_outp
interrupted = False

if inputdata is not None and not isinstance(inputdata, six.binary_type):
inputdata = inputdata.encode("utf-8", 'replace')
inputdata = inputdata.encode('utf-8')

if command[0] != 'git':
base_command = ['git', '--bare', '--git-dir', self.git_path]
Expand Down Expand Up @@ -794,7 +794,7 @@ def collect_files_from_commit(commit):
collect_files_from_commit(commit_sha)

shas_to_check = object_shas
self.logger.debug("shas_to_check: %s " % (str(shas_to_check),))
self.logger.debug("shas_to_check %d: %s " % (len(shas_to_check), str(shas_to_check),))

if not shas_to_check:
return [], summary
Expand Down Expand Up @@ -826,7 +826,7 @@ def readall(c):
channel.close()
ssh_stream.close()

# make sure we have in objects only SHAs we actually will sync
# make sure we have in summary only SHAs we actually will sync
for type in six.iterkeys(summary):
ids = summary[type][:]
for sha in ids:
Expand Down Expand Up @@ -884,8 +884,11 @@ def thread_push(self):

while self.active_thread:
try:
head = self.get_head_commit()
if last_synced_head != self.get_head_commit():
self.push()
self.logger.debug("Git head moved from %s to %s" % (last_synced_head, head))
if self.push() is not False:
last_synced_head = head

time.sleep(0.5)
except (SystemExit, KeyboardInterrupt):
Expand Down
10 changes: 5 additions & 5 deletions aetros/starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,16 +561,16 @@ def docker_build_image(logger, home_config, job_backend, rebuild_image=False):
image += '_' + job_config['category'].lower()

if 'configPath' in job_config and job_config['configPath']:
configPath = job_config['configPath']\
config_path = job_config['configPath']\
.replace('aetros.yml', '')\
.replace('aetros-', '')\
.replace('.yml', '')

if configPath:
image += '_' + configPath.lower()
if config_path:
image += '_' + config_path.lower()

image = image.strip('/')
image = re.sub('[^A-Z_/\-a-z0-9]+', '-', image)
image = re.sub('[^A-Z_/\-a-z0-9]+', '-', image).strip('-')

m = hashlib.md5()
m.update(simplejson.dumps([job_config['image'], job_config['install']]).encode('utf-8'))
Expand All @@ -583,8 +583,8 @@ def docker_build_image(logger, home_config, job_backend, rebuild_image=False):

docker_build += ['-t', image, '-f', dockerfile, '.', ]

logger.info("Prepare docker image: $ " + (' '.join(docker_build)))
job_backend.set_status('IMAGE BUILD')
logger.info("Prepare docker image: $ " + (' '.join(docker_build)))
p = execute_command(args=docker_build, bufsize=1, stderr=subprocess.PIPE, stdout=subprocess.PIPE)

if p.returncode:
Expand Down
9 changes: 7 additions & 2 deletions aetros/tests/aetros_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ def test_ignore(self):
self.assertNotIgnored('script.py', '')
self.assertNotIgnored('script.py', '!script.py')

self.assertNotIgnored('script.py', 'script')

self.assertIgnored('script.py', 'script.py')
self.assertIgnored('script.py', '*.py')
self.assertNotIgnored('script.py', 'script.py\n!*.py')
Expand Down Expand Up @@ -141,7 +143,8 @@ def test_ignore(self):

self.assertNotIgnored('folder/script.py', '/*er2/script.py')
self.assertIgnored('folder2/script.py', '/*er2/script.py')
self.assertIgnored('folder/script.py', '/*er2/script.py\nfolder')
self.assertNotIgnored('folder/script.py', '/*er2/script.py\nfolder')
self.assertIgnored('folder/script.py', '/*er/script.py\nfolder')

self.assertIgnored('very/deep/datasets/dataset.zip', '*.zip')
self.assertIgnored('very/deep/datasets/dataset.zip', 'datasets/*')
Expand All @@ -151,6 +154,8 @@ def test_ignore(self):
self.assertIgnored('very/deep/datasets/dataset.zip', '/very/**/*.zip')
self.assertNotIgnored('very/deep/dataset.zip', '/very/**/*.zip')
self.assertIgnored('very/deep/dataset.zip', '/very/*/*.zip')
self.assertIgnored('very/deep/dataset.zip', '/very/')

self.assertNotIgnored('very/deep/dataset.zip', '/very/')
self.assertIgnored('very/deep/dataset.zip', '/very/**')


20 changes: 18 additions & 2 deletions aetros/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,22 @@ def get_logger(name='', debug=None, format=None):
return logger


def ensure_docker_installed(logger, home_config=None):
home_config = home_config or read_home_config()
try:
out = subprocess.check_output([home_config['docker'], '-v'])
return True
except Exception as e:
logger.error('Docker is not installed: ' + (str(e)))
sys.exit(2)


def docker_call(args, home_config=None):
home_config = home_config or read_home_config()
base = [home_config['docker']]
return subprocess.check_output(base + args)


def loading_text(label='Loading ... '):
import itertools, sys
spinner = itertools.cycle(['-', '/', '|', '\\'])
Expand Down Expand Up @@ -588,9 +604,9 @@ def is_ignored(path, ignore_patters):
regex = regex.replace('\\*', '[^/\\\\]+')
regex = '(' + regex + ')'
if pattern[0] == '/' or (pattern[0] == '!' and pattern[1] == '/'):
regex = '^' + regex
regex = '^' + regex + '$'
else:
regex = '^.*' + regex
regex = '^.*' + regex + '$'

reobj = re.compile(regex)
ignore_pattern_cache[pattern] = reobj
Expand Down

0 comments on commit 25af7dc

Please sign in to comment.