Skip to content

Commit

Permalink
Merge branch 'RESTAPI-902-internal-transfer-constraint' into 'master'
Browse files Browse the repository at this point in the history
Add constraint in internal transfer jobs

See merge request firecrest/firecrest!241
  • Loading branch information
ekouts committed Aug 18, 2023
2 parents 35e98a1 + 35de059 commit b7f1d26
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 16 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.14.0]

### Added

- Add constraint in xfer-internal job script when provided by the configuration. The associated environment variable is `F7T_XFER_CONSTRAINT` and can be empty, when no machine needs it. Otherwise, the different constraints should be separated by `;`.

## [1.13.1]

Expand Down
2 changes: 2 additions & 0 deletions deploy/demo/common/common.env
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ F7T_STORAGE_MAX_FILE_SIZE=5120
F7T_OBJECT_STORAGE='s3v4'
# partition for internal transfer
F7T_XFER_PARTITION=xfer
# constraint for internal transfer
F7T_XFER_CONSTRAINT=''
# set if account is needed for SLURM job submission
F7T_USE_SCHED_PROJECT=True
#-------
Expand Down
1 change: 1 addition & 0 deletions deploy/k8s/config/templates/cm.common.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ data:
F7T_UTILITIES_TIMEOUT: "{{ .Values.common.F7T_UTILITIES_TIMEOUT }}"
F7T_UTILITIES_URL: "{{ .Values.common.F7T_UTILITIES_URL }}"
F7T_XFER_PARTITION: "{{ .Values.common.F7T_XFER_PARTITION }}"
F7T_XFER_CONSTRAINT: "{{ .Values.common.F7T_XFER_CONSTRAINT }}"
kind: ConfigMap
metadata:
name: common-env-file
Expand Down
1 change: 1 addition & 0 deletions deploy/test-build/environment/storage.env
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ F7T_OS_PROJECT_ID=

# xfer partition (scheduler partition where internal data movement is done)
F7T_XFER_PARTITION=xfer
F7T_XFER_CONSTRAINT=''

# Storage polling interval for uploads in seconds
F7T_STORAGE_POLLING_INTERVAL=60
Expand Down
1 change: 1 addition & 0 deletions src/common/common.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ F7T_STORAGE_MAX_FILE_SIZE=5368709120
F7T_OBJECT_STORAGE='s3v4'
# partition for internal transfer
F7T_XFER_PARTITION=xfer
F7T_XFER_CONSTRAINT=''
#-------
# STATUS: microservices & systems to pool:
F7T_STATUS_SERVICES='certificator;utilities;compute;tasks;storage'
Expand Down
10 changes: 9 additions & 1 deletion src/common/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,22 @@ class JobScript:
"""

def __init__(
self, name, time, partition, command, dependency_id=None, account=None
self,
name,
time,
partition,
command,
dependency_id=None,
account=None,
constraint=None
):
self.name = name
self.time = time
self.partition = partition
self.command = command
self.dependency_id = dependency_id
self.account = account
self.constraint = constraint


class JobScheduler(abc.ABC):
Expand Down
3 changes: 3 additions & 0 deletions src/common/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def script_template(self, id, script_spec):
f"#SBATCH --ntasks=1\n"
f"#SBATCH --partition={script_spec.partition}\n"
)
if script_spec.constraint:
script += f"#SBATCH --constraint='{script_spec.constraint}'\n"

if script_spec.dependency_id:
script += f"#SBATCH --dependency=afterok:{script_spec.dependency_id}\n"

Expand Down
39 changes: 24 additions & 15 deletions src/storage/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@
# Scheduller partition used for internal transfers, one per public system
XFER_PARTITION = os.environ.get("F7T_XFER_PARTITION", "").strip('\'"').split(";")

# Scheduller constraint used for internal transfers, one per public system
XFER_CONSTRAINT = os.environ.get("F7T_XFER_CONSTRAINT", "").strip('\'"').split(";")

# use project account in submission
# F7T_USE_SLURM_ACCOUNT is deprecated so we use it only in case F7T_USE_SCHED_PROJECT is not set
USE_SCHED_PROJECT = get_boolean_var(
Expand Down Expand Up @@ -435,13 +438,13 @@ def os_to_fs(task_id):
cert_list = [f"{td}/user-key-cert.pub", f"{td}/user-key.pub", f"{td}/user-key", td]

# start download from OS to FS
update_task(task_id, headers, async_task.ST_DWN_BEG,
update_task(task_id, headers, async_task.ST_DWN_BEG,
msg={
"source": objectname,
"target": upl_file["target"],
"source": objectname,
"target": upl_file["target"],
"system_name": system_name,
"system_addr": system_addr
},
},
is_json=True)

# execute download
Expand All @@ -451,11 +454,11 @@ def os_to_fs(task_id):
if result["error"] == 0:
update_task(task_id, headers, async_task.ST_DWN_END,
msg={
"source": objectname,
"target": upl_file["target"],
"source": objectname,
"target": upl_file["target"],
"system_name": system_name,
"system_addr": system_addr
},
},
is_json=True)

# No need to delete the dictionary, it will be cleaned on next iteration
Expand Down Expand Up @@ -505,7 +508,7 @@ def download_task(headers, system_name, system_addr, sourcePath, task_id):
app.logger.error(f"Couldn't extract username from JWT token: {is_username_ok['reason']}")
msg = {"error": is_username_ok['reason'], "source": sourcePath, "system_name": system_name}
update_task(task_id, headers, async_task.ERROR, msg, is_json=True)

container_name = is_username_ok["username"]

if not staging.is_container_created(container_name):
Expand Down Expand Up @@ -534,7 +537,7 @@ def download_task(headers, system_name, system_addr, sourcePath, task_id):
# if upload to SWIFT fails:
if res["error"] != 0:
error_msg = f"Upload to Staging area has failed. Object: {object_name}"


error_str = res["msg"]
if in_str(error_str,"OPENSSH"):
Expand Down Expand Up @@ -791,7 +794,7 @@ def upload_request():
system_name = request.headers["X-Machine-Name"]
except KeyError as e:
return jsonify(description="No machine name given"), 400

# public endpoints from Kong to users
if system_name not in SYSTEMS_PUBLIC:
header = {"X-Machine-Does-Not-Exists": "Machine does not exists"}
Expand All @@ -810,14 +813,14 @@ def upload_request():
v = validate_input(sourcePath)
if v != "":
return jsonify(description="Failed to upload file", error=f"'sourcePath' {v}"), 400

[headers, ID] = get_tracing_headers(request)
# checks if the targetPath is a directory (targetPath = "/path/to/dir") that can be accessed by the user
check_is_dir = is_valid_dir(targetPath, headers, system_name, system_addr)

# if so, then the actual targetPath has to include the name extracted from sourcePath
_targetPath = f"{targetPath}/{sourcePath.split('/')[-1]}"

if not check_is_dir["result"]:

# check if targetPath is a file path by extracting last part of the path
Expand All @@ -828,7 +831,7 @@ def upload_request():
return jsonify(description="Failed to upload file", error="'targetPath' directory not allowed"), 400, check_is_dir["headers"]
# if targetPath is a file, then no changes need to be done
_targetPath = targetPath

# obtain new task from Tasks microservice
task_id = create_task(headers, service="storage", system=system_name, init_data={"source":sourcePath, "target": _targetPath})

Expand Down Expand Up @@ -873,13 +876,19 @@ def exec_internal_command(headers, system_idx, command, jobName, jobTime, stageO
try:
td = tempfile.mkdtemp(prefix="job")
ID = headers.get(TRACER_HEADER, '')
if XFER_CONSTRAINT == []:
constraint = None
else:
constraint = XFER_CONSTRAINT[system_idx]

script_spec = JobScript(
name=jobName,
time=jobTime,
partition=XFER_PARTITION[system_idx],
command=command,
dependency_id=stageOutJobId,
account=account
account=account,
constraint=constraint
)

with open(td + "/sbatch-job.sh", "w") as sbatch_file:
Expand Down

0 comments on commit b7f1d26

Please sign in to comment.