Skip to content
This repository was archived by the owner on Mar 13, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions api/PclusterApiHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,69 @@ def submit_job():
return resp if type(resp) == tuple else {"success": "true"}


def translate_job(request_body, user):
# format job parameters so that they are accepted by Slurm API
job_properties_dict = {
"job-name": "name",
"chdir": "current_working_directory",
"mem": "memory_per_node"
}

translated_job = {
"job": {
"environment": {
"PATH": "/bin:/usr/bin/:/usr/local/bin/:/opt/slurm/bin/"
},
"name": ""
}
}

request_body["chdir"] = f"/home/{user}"
for key in request_body:
if key == "command":
translated_job["script"] = request_body[key]
else:
translated_job["job"][job_properties_dict.get(key, key)] = request_body[key]

return translated_job


def post_job_slurm_api(body, user, token, ip):
body_data = json.dumps(translate_job(body, user))

url = "https://"+ip+"/slurm/v0.0.36/job/submit"
headers = {
"Content-Type": "application/json",
"X-SLURM-USER-NAME": user,
"X-SLURM-USER-TOKEN": token
}

resp = requests.post(url=url, data=body_data, headers=headers, verify=False)
print("POST /slurm/v0.0.36/job/submit", resp.status_code, resp.reason, '\n')

error_dicts = resp.json().get('errors')
errors_text = '\n'.join([error_data.get("error") for error_data in error_dicts])

return {"errors": errors_text, "status_code": resp.status_code, "reason": resp.reason}


def submit_job_script():
body = request.json
cluster_name = request.args.get("cluster_name")
instance_id = request.args.get("instance_id")
region = request.args.get("region")
user = request.args.get("user", "ec2-user")

ec2 = boto3.resource("ec2", region_name=region)
instance = ec2.Instance(instance_id)
ip = instance.public_dns_name

client = boto3.client("secretsmanager")
jwt_token = client.get_secret_value(SecretId="slurm_token_"+cluster_name)["SecretString"]

return post_job_slurm_api(body, user, jwt_token, ip)


def _price_estimate(cluster_name, region, queue_name):
config_text = get_cluster_config_text(cluster_name, region)
config_data = yaml.safe_load(config_text)
Expand Down
85 changes: 85 additions & 0 deletions api/tests/test_submit_job_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import json
from unittest import mock
from requests import Response
from api.PclusterApiHandler import post_job_slurm_api, translate_job


@mock.patch("api.PclusterApiHandler.requests.post")
def test_post_to_slurm_api_with_correct_request(mock_post):
"""
Given a function that posts jobs to the Slurm API
When a job body, user, token, and ip are inputted
It should call request.post with the correct url, data, and headers
"""
post_job_slurm_api(
{'job-name': 'test'},
'test-user', 'slurm-token123', '123.456.7.8'
)

mock_post.assert_called_once_with(
url='https://123.456.7.8/slurm/v0.0.36/job/submit',
data=json.dumps({
"job": {
"environment": {
"PATH": "/bin:/usr/bin/:/usr/local/bin/:/opt/slurm/bin/"
},
"name": "test",
"current_working_directory": "/home/test-user"
}
}),
headers={
'Content-Type': 'application/json',
'X-SLURM-USER-NAME': 'test-user',
'X-SLURM-USER-TOKEN': 'slurm-token123'
},
verify=False
)


@mock.patch("api.PclusterApiHandler.requests.post")
def test_post_to_slurm_api_returns_errors(mock_post):
"""
Given a function that posts jobs to the Slurm API
When the response contains errors
It should return the error messages separated by new lines
"""
resp = Response()
resp.status_code = 400
resp.reason = 'BAD REQUEST'
resp._content = b'{"errors": [ \
{"error": "test", "error_code": -1}, \
{"error": "testing", "error_code": 9001}]}'

mock_post.return_value = resp

ret = post_job_slurm_api(
{'job-name': 'test'},
'test-user', 'slurm-token123', '123.456.7.8'
)

assert ret.get('errors') == 'test\ntesting'
assert ret.get('status_code') == 400
assert ret.get('reason') == 'BAD REQUEST'


def test_translate_job():
"""
Given a function that translates a job
When a dict of job data is inputted
It should format the job to be accepted by the Slurm API
"""
request_body = {'job-name': 'test', 'nodes': 1, 'command': 'test command'}

translation = translate_job(request_body, 'test-user')

assert translation == {
"job": {
"name": "test",
"current_working_directory": "/home/test-user",
"nodes": 1,
"environment": {
"PATH": "/bin:/usr/bin/:/usr/local/bin/:/opt/slurm/bin/"
}
},
"script": 'test command'
}
6 changes: 6 additions & 0 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
scontrol_job,
set_user_role,
submit_job,
submit_job_script
)


Expand Down Expand Up @@ -168,6 +169,11 @@ def submit_job_():
def sacct_():
return sacct()

@app.route("/manager/submit_job_script", methods=["POST"])
@authenticated()
def submit_job_script_():
return submit_job_script()

@app.route("/manager/scontrol_job")
@authenticated()
def scontrol_job_():
Expand Down
55 changes: 54 additions & 1 deletion frontend/locales/en/strings.json
Original file line number Diff line number Diff line change
Expand Up @@ -365,9 +365,62 @@
}
},
"JobSubmitDialog": {
"requiredMemory": {
"header": "Submit Job",
"cancel": "Cancel",
"submit": "Submit",
"job-name": {
"header": "Job Name",
"description": "Please choose an identifier for this job.",
"placeholder": "job-name"
},
"chdir": {
"header": "Working Directory",
"description": "Please choose a working directory for the job [optional]"
},
"nodes": {
"header": "Nodes",
"description": "Number of nodes for job [optional]"
},
"ntasks": {
"header": "Number of tasks",
"description": "Number of tasks for a job [optional]"
},
"mem": {
"header": "Required memory",
"description": "Real memory required per node, in MB. A memory size specification of zero is treated as a special case and grants the job access to all of the memory on each node. [optional]"
},
"jobTypeCommand": {
"header": "Command",
"description": "The command to run as a part of this job.",
"radioGroup": "Run a command"
},
"jobTypeFile": {
"header": "Script Path",
"description": "Path to the script to run",
"radioGroup": "Run a script on the head node"
},
"jobTypeScript": {
"radioGroup": "Enter sbatch script manually",
"radioGroupDisabledDescription": "Requires Slurm REST API"
},
"queue": {
"header": "Queue",
"description": "Queue where the job will run."
},
"costEstimate": {
"header": "Cost estimate",
"alertHeader": "Experimental!",
"alertContent": "This provides a basic cost estimate based on the expected job run-time, the number of nodes and their instance type. Actual costs will vary based on node uptime, storage, and other factors. Please refer to Cost Explorer for actual cluster costs.",
"timeEstimateHeader": "Your estimate of the total runtime of the job (in Hours).",
"button": "Estimate",
"estimatedCost": "Estimated job cost:",
"formula": "Price ($/h) * Time (h) * NodeCount"
},
"errors": {
"mustSelectQueue": "Error: You must select a queue.",
"mustSelectNodes": "Error: You must select a node count.",
"mustSelectRuntime": "Error: You must select a job runtime.",
"emptyJob": "Error: Job is empty"
}
}
}
40 changes: 25 additions & 15 deletions frontend/src/model.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import identityFn from 'lodash/identity';
import { getAppConfig } from './app-config';

// Types
import { Job } from './types/jobs';
type Callback = (arg?: any) => void;

const axiosInstance = axios.create({
Expand Down Expand Up @@ -680,25 +681,33 @@ function CancelJob(instanceId: any, user: any, jobId: any, callback?: Callback)
})
}

function SubmitJob(instanceId: any, user: any, job: any, successCallback?: Callback, failureCallback?: Callback) {
async function SubmitJob(instanceId: string, user: string, job: Job): Promise<string> {
const region = getState(['app', 'selectedRegion']) || getState(['aws', 'region']);
let url = `manager/submit_job?instance_id=${instanceId}&user=${user || 'ec2-user'}&region=${region}`
request('post', url, job).then((response: any) => {
if(response.status === 200) {
console.log(response.data)
successCallback && successCallback(response.data)
}
}).catch((error: any) => {
if(error.response)
{
failureCallback && failureCallback(error.response.data.message)
console.log(error.response)
try {
const { data } = await request('post', url, job);
return data?.message || "";
} catch (error: any) {
if(error.response) {
notify(`Error: ${error.response.data.message}`, 'error');
}
console.log(error)
})
throw error;
}
}

async function SubmitJobScript(clusterName: string, instanceId: string, user: string, job: Job): Promise<string> {
const region = getState(['app', 'selectedRegion']) || getState(['aws', 'region']);
let url = `manager/submit_job_script?cluster_name=${clusterName}&instance_id=${instanceId}&user=${user || 'ec2-user'}&region=${region}`
try {
const { data } = await request('post', url, job);
return data?.errors || "";
} catch (error: any) {
if(error.response) {
notify(`Error: ${error.response.data.message}`, 'error');
}
throw error;
}
}

function JobInfo(instanceId: any, user: any, jobId: any, successCallback?: Callback, failureCallback?: Callback) {
const region = getState(['app', 'selectedRegion']) || getState(['aws', 'region']);
Expand Down Expand Up @@ -823,5 +832,6 @@ export {CreateCluster, UpdateCluster, ListClusters, DescribeCluster,
BuildImage, GetCustomImageStackEvents, ListCustomImageLogStreams,
GetCustomImageLogEvents, ListOfficialImages, LoadInitialState,
Ec2Action,LoadAwsConfig, GetDcvSession, QueueStatus, CancelJob, SubmitJob,
PriceEstimate, SlurmAccounting, JobInfo, ListUsers, SetUserRole, notify,
CreateUser, DeleteUser}
SubmitJobScript, PriceEstimate, SlurmAccounting, JobInfo, ListUsers,
SetUserRole, notify, CreateUser, DeleteUser}

Loading