Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local override #56

Merged
merged 5 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ As described above, the adapter is responsible for parsing the raw, user-provide
The pipeline script is the heart of the pipeline. This is the actual bash script which will be run. The `script` key can either be a filepath to a bash script to run, or a list of strings, each of which is a command to run.
Either way, the script gets executed by each job of the pipeline.

**NOTE:** During setup, every job will configure a `$CANINE_DOCKER_ARGS` environment variable. We recommend that you expand this variable inside the argument list to `docker run` commands to enable the container to properly interact with the canine environment

### overrides

Localization overrides, defined in `localization.overrides` allow the user to change the localizer's default handling for a specific input.
Expand All @@ -144,8 +146,10 @@ The overrides section should be a dictionary mapping input names, to a string de
* Any file or Google Storage object which appears as an input to multiple jobs is considered `common` and will be localized once to a common directory, visible to all jobs
* If any input to any arbitrary job is a list, the contents of the list are interpreted using the same rules
* `Common`: Inputs marked as common will be considered common to all jobs and localized once, to a directory which is visible to all jobs. Inputs marked as common which cannot be interpreted as a filepath or a Google Cloud Storage object are ignored and treated as strings
* `Stream`: Inputs marked as `Stream` will be streamed into a FIFO pipe, and the path to the pipe will be exported to the job. The `Stream` override is ignored for inputs which are not Google Cloud Storage objects, causing those inputs to be localized under default rules. Jobs which are requeued due to node failure will always restart the stream
* `Stream`: Inputs marked as `Stream` will be streamed into a FIFO pipe, and the path to the pipe will be exported to the job. The `Stream` override is ignored for inputs which are not Google Cloud Storage objects, causing those inputs to be localized under default rules. Jobs which are requeued due to node failure will always restart the stream. Streams are created in a temporary directory on the local disk of the compute node
* `Delayed`: Inputs marked as `Delayed` will be downloaded by the job once it starts, instead of upfront during localization. The `Delayed` override is ignored for inputs which are not Google Cloud Storage objects, causing those inputs to be localized under default rules. Jobs which are requeued due to node failures will only re-download delayed inputs if the job failed before the download completed
* `Local`: Similar to `Delayed`. Inputs marked as `Local` will be downloaded by the job once it starts. The difference between `Delayed` and `Local` is that for `Local` files, a new disk is provisioned and mounted to the worker node and `Local` downloads are saved there. The disk is automatically sized
to fit all files marked as `Local` plus a small safety margin. **Warning:** Do not create or unzip files in the local download directory. The local download disks are sized automatically to fit the size of the downloaded files and will likely run out of space if additional files are created or unpacked
* `Localize`: Inputs marked as `Localize` will be treated as files and localized to job-specific input directories. This can be used to force files which would be handled as common, to be localized for each job. The `Localize` override is ignored for inputs which are not valid filepaths or Google Cloud Storage objects, causing those inputs to be treated as strings
* `Null` or `None`: Inputs marked this way are treated as strings, and no localization will be applied.

Expand Down
95 changes: 87 additions & 8 deletions canine/localization/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import pandas as pd

Localization = namedtuple("Localization", ['type', 'path'])
# types: stream, download, None
# types: stream, download, local, None
# indicates what kind of action needs to be taken during job startup

PathType = namedtuple(
Expand All @@ -37,13 +37,18 @@ class AbstractLocalizer(abc.ABC):
def __init__(
self, backend: AbstractSlurmBackend, transfer_bucket: typing.Optional[str] = None,
common: bool = True, staging_dir: str = None,
project: typing.Optional[str] = None, **kwargs
project: typing.Optional[str] = None, temporary_disk_type: str = 'standard',
local_download_dir: typing.Optional[str] = None, **kwargs
):
"""
Initializes the Localizer using the given transport.
Localizer assumes that the SLURMBackend is connected and functional during
the localizer's entire life cycle.
If staging_dir is not provided, a random directory is chosen
If staging_dir is not provided, a random directory is chosen.
local_download_dir: Where `local` overrides should be saved. Default: /mnt/canine-local-downloads/(random id).
temporary_disk_type: "standard" or "ssd". Default "standard".
NOTE: If temporary_disk_type is explicitly "None", disks will not be created. Files will be downloaded
to local_download_dir without mounting a disk there. The directory will not be created in that case
"""
self.transfer_bucket = transfer_bucket
if transfer_bucket is not None and self.transfer_bucket.startswith('gs://'):
Expand All @@ -63,6 +68,10 @@ def __init__(
self.inputs = {} # {jobId: {inputName: (handle type, handle value)}}
self.clean_on_exit = True
self.project = project if project is not None else get_default_gcp_project()
self.local_download_size = {} # {jobId: size}
self.disk_key = os.urandom(4).hex()
self.local_download_dir = local_download_dir if local_download_dir is not None else '/mnt/canine-local-downloads/{}'.format(self.disk_key)
self.temporary_disk_type = temporary_disk_type
self.requester_pays = {}

def get_requester_pays(self, path: str) -> bool:
Expand Down Expand Up @@ -98,6 +107,19 @@ def get_requester_pays(self, path: str) -> bool:
raise subprocess.CalledProcessError(rc, command)
return bucket in self.requester_pays and self.requester_pays[bucket]

def get_object_size(self, path: str) -> int:
"""
Returns the total number of bytes of the given gsutil object.
If a directory is given, this will return the total space used by all objects in the directory
"""
cmd = 'gsutil {} du -s {}'.format(
'-u {}'.format(self.project) if self.get_requester_pays(path) else '',
path
)
rc, sout, serr = self.backend.invoke(cmd)
check_call(cmd, rc, sout, serr)
return int(sout.read().split()[0])

@contextmanager
def transport_context(self, transport: typing.Optional[AbstractTransport] = None) -> typing.ContextManager[AbstractTransport]:
"""
Expand Down Expand Up @@ -461,7 +483,7 @@ def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, str], comm
'stream',
value
)
elif mode in ['localize', 'symlink']:
elif mode in {'localize', 'symlink'}:
self.inputs[jobId][arg] = Localization(
None,
self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(value)))
Expand All @@ -471,7 +493,7 @@ def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, str], comm
self.inputs[jobId][arg].path,
transport=transport
)
elif mode == 'delayed':
elif mode in {'delayed', 'local'}:
if not value.startswith('gs://'):
print("Ignoring 'delayed' override for", arg, "with value", value, "and localizing now", file=sys.stderr)
self.inputs[jobId][arg] = Localization(
Expand All @@ -483,6 +505,12 @@ def prepare_job_inputs(self, jobId: str, job_inputs: typing.Dict[str, str], comm
self.inputs[jobId][arg].path,
transport=transport
)
elif mode == 'local':
self.local_download_size[jobId] = self.local_download_size.get(jobId, 0) + self.get_object_size(value)
self.inputs[jobId][arg] = Localization(
'local',
value
)
else:
self.inputs[jobId][arg] = Localization(
'download',
Expand Down Expand Up @@ -521,6 +549,47 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ
localization_tasks = [
'if [[ -d $CANINE_JOB_INPUTS ]]; then cd $CANINE_JOB_INPUTS; fi'
]
local_download_size = self.local_download_size.get(jobId, 0)
disk_name = None
if local_download_size > 0 and self.temporary_disk_type is not None:
local_download_size = max(
10,
1+int(local_download_size / 1022611260) # bytes -> gib with 5% safety margin
)
if local_download_size > 65535:
raise ValueError("Cannot provision {} GB disk for job {}".format(local_download_size, jobId))
disk_name = 'canine-{}-{}-{}'.format(self.disk_key, os.urandom(4).hex(), jobId)
device_name = 'cn{}{}'.format(os.urandom(2).hex(), jobId)
exports += [
'export CANINE_LOCAL_DISK_SIZE={}GB'.format(local_download_size),
'export CANINE_LOCAL_DISK_TYPE={}'.format(self.temporary_disk_type),
'export CANINE_NODE_NAME=$(curl -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/name)',
'export CANINE_NODE_ZONE=$(basename $(curl -H "Metadata-Flavor: Google" http://metadata.google.internal/computeMetadata/v1/instance/zone))',
'export CANINE_LOCAL_DISK_DIR={}/{}'.format(self.local_download_dir, disk_name),
]
localization_tasks += [
'sudo mkdir -p $CANINE_LOCAL_DISK_DIR',
'if [[ -z "$CANINE_NODE_NAME" ]]; then echo "Unable to provision disk (not running on GCE instance). Attempting download to directory on boot disk" > /dev/stderr; else',
'echo Provisioning and mounting temporary disk {}'.format(disk_name),
'gcloud compute disks create {} --size {} --type pd-{} --zone $CANINE_NODE_ZONE'.format(
disk_name,
local_download_size,
self.temporary_disk_type
),
'gcloud compute instances attach-disk $CANINE_NODE_NAME --zone $CANINE_NODE_ZONE --disk {} --device-name {}'.format(
disk_name,
device_name
),
'gcloud compute instances set-disk-auto-delete $CANINE_NODE_NAME --zone $CANINE_NODE_ZONE --disk {}'.format(disk_name),
'sudo mkfs.ext4 -m 0 -E lazy_itable_init=0,lazy_journal_init=0,discard /dev/disk/by-id/google-{}'.format(device_name),

'sudo mount -o discard,defaults /dev/disk/by-id/google-{} $CANINE_LOCAL_DISK_DIR'.format(
device_name,
),
'sudo chmod -R a+rwX {}'.format(self.local_download_dir),
'fi'
]
docker_args.append('-v $CANINE_LOCAL_DISK_DIR:$CANINE_LOCAL_DISK_DIR')
compute_env = self.environment('remote')
stream_dir_ready = False
for key, val in self.inputs[jobId].items():
Expand All @@ -545,9 +614,13 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ
key,
dest
))
elif val.type == 'download':
elif val.type in {'download', 'local'}:
job_vars.append(shlex.quote(key))
dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path)))
if val.type == 'download':
dest = self.reserve_path('jobs', jobId, 'inputs', os.path.basename(os.path.abspath(val.path)))
else:
# Local and controller paths not needed on this object
dest = PathType(None, os.path.join(self.local_download_dir, disk_name, os.path.basename(val.path)))
localization_tasks += [
"if [[ ! -e {2}.fin ]]; then gsutil {0} -o GSUtil:check_hashes=if_fast_else_skip cp {1} {2} && touch {2}.fin; fi".format(
'-u {}'.format(shlex.quote(self.project)) if self.get_requester_pays(val.path) else '',
Expand Down Expand Up @@ -606,7 +679,13 @@ def job_setup_teardown(self, jobId: str, patterns: typing.Dict[str, str]) -> typ
)
),
'if [[ -n "$CANINE_STREAM_DIR" ]]; then rm -rf $CANINE_STREAM_DIR; fi'
]
] + (
[
'sudo umount {}/{}'.format(self.local_download_dir, disk_name),
'gcloud compute instances detach-disk $CANINE_NODE_NAME --zone $CANINE_NODE_ZONE --disk {}'.format(disk_name),
'gcloud compute disks delete {} --zone $CANINE_NODE_ZONE'.format(disk_name)
] if disk_name is not None else []
)
)
return setup_script, localization_script, teardown_script

Expand Down
19 changes: 18 additions & 1 deletion canine/localization/delocalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,28 @@
import glob
import os
import shutil
import subprocess
import shlex

"""
This is not actually part of the canine package
This is helper code which is run in the backend
"""

def same_volume(a, b):
"""
From NFSLocalizer
Check if file a and b exist on same device
"""
vols = subprocess.check_output(
"df {} {} | awk 'NR > 1 {{ print $1 }}'".format(
a,
b
),
shell = True
)
return len(set(vols.decode("utf-8").rstrip().split("\n"))) == 1

def main(output_dir, jobId, patterns, copy):
jobdir = os.path.join(output_dir, str(jobId))
if not os.path.isdir(jobdir):
Expand All @@ -24,7 +40,8 @@ def main(output_dir, jobId, patterns, copy):
if not os.path.isdir(os.path.dirname(dest)):
os.makedirs(os.path.dirname(dest))
if os.path.isfile(target):
if copy:
# Same volume check catches outputs from outside the workspace
if copy or not same_volume(target, jobdir):
shutil.copyfile(os.path.abspath(target), dest)
elif os.stat(target).st_dev == os.stat(os.path.dirname(dest)).st_dev:
os.symlink(os.path.relpath(target, os.path.dirname(dest)), dest)
Expand Down
7 changes: 6 additions & 1 deletion canine/localization/nfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class NFSLocalizer(BatchedLocalizer):
def __init__(
self, backend: AbstractSlurmBackend, transfer_bucket: typing.Optional[str] = None,
common: bool = True, staging_dir: str = None,
project: typing.Optional[str] = None, **kwargs
project: typing.Optional[str] = None, temporary_disk_type: str = 'standard',
local_download_dir: typing.Optional[str] = None,**kwargs
):
"""
Initializes the Localizer using the given transport.
Expand All @@ -56,6 +57,10 @@ def __init__(
self.inputs = {} # {jobId: {inputName: (handle type, handle value)}}
self.clean_on_exit = True
self.project = project if project is not None else get_default_gcp_project()
self.local_download_size = {} # {jobId: size}
self.disk_key = os.urandom(4).hex()
self.local_download_dir = local_download_dir if local_download_dir is not None else '/mnt/canine-local-downloads/{}'.format(self.disk_key)
self.temporary_disk_type = temporary_disk_type
self.requester_pays = {}

def localize_file(self, src: str, dest: PathType, transport: typing.Optional[AbstractTransport] = None):
Expand Down
Loading