Skip to content
This repository has been archived by the owner on May 26, 2018. It is now read-only.

Commit

Permalink
fix issues with job start/pause/stop
Browse files Browse the repository at this point in the history
  • Loading branch information
ikit committed May 19, 2017
1 parent 03a4adc commit b80a871
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 44 deletions.
44 changes: 27 additions & 17 deletions pirus/api_rest/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ def rest_error(message:str="Unknow", code:str="0", error_id:str=""):



def rest_notify_all(src, msg):
def rest_notify_all(msg=None, src=None):
for ws in WebsocketHandler.socket_list:
if src != ws[1]:
ws[0].send_str(msg)
if msg:
ws[0].send_str(msg)
else:
print("rest_notify_all no message...", msg, src)

# Give to the core the delegate to call to notify all via websockets
core.notify_all = rest_notify_all
Expand Down Expand Up @@ -168,26 +171,26 @@ def format_pipeline_json(pipe_json):


def format_job_json(job_json, include_log_tail=False):
if job_json["pipeline"]:
if "pipeline" in job_json.keys():
job_json["pipeline"] = format_pipeline_json(job_json["pipeline"])

if job_json["logs"]:
if "logs" in job_json.keys():
if include_log_tail: result.update({"logs_tails": {}})
result.update({"logs": []})
for log in job_json["logs"]:
result["logs"].append("http://{}/dl/job/{}/logs/{}".format(HOST_P, os.path.basename(job_json["path"]), log.name))
result["logs_tails"][log.name] = log.tail()
if result["inputs"]:
if "inputs" in job_json.keys():
inputs = []
for file in result["inputs"]:
for file in job_json["inputs"]:
inputs.append(format_file_json(file))
result["inputs"] = inputs
if result["outputs"]:
job_json["inputs"] = inputs
if "outputs" in job_json.keys():
outputs = []
for file in result["outputs"]:
for file in job_json["outputs"]:
outputs.append(format_file_json(file))
result["outputs"] = outputs
return result
job_json["outputs"] = outputs
return job_json



Expand Down Expand Up @@ -685,12 +688,19 @@ async def new(self, request):
data = json.loads(data)
except Exception as ex:
return rest_error("Error occured when retriving json data to start new job. {}".format(ex))
missing = []
for k in ["pipeline_id", "name", "config", "inputs"]:
if k not in data.keys(): missing.append(k)
if len(missing) > 0:
return rest_error("Following informations are missing to create a new job : {}".format(", ".join(missing)))

pipe_id = data["pipeline_id"]
name = data["name"]
config = data["config"]
inputs = data["inputs"]
# Create the job
try:
job = core.jobs.new(pipe_id, config, inputs, asynch=True)
job = core.jobs.new(pipe_id, name, config, inputs, asynch=True)
except Exception as ex:
return rest_error("Error occured when initializing the new job. {}".format(ex))
if job is None:
Expand All @@ -701,7 +711,7 @@ async def new(self, request):
def pause(self, request):
job_id = request.match_info.get('job_id', -1)
try:
core.jobs.pause(job_id)
core.jobs.pause(job_id, False)
except Exception as ex:
return rest_error("Unable to pause the job {}. {}".format(job.id, ex))
return rest_success()
Expand All @@ -710,7 +720,7 @@ def pause(self, request):
def start(self, request):
job_id = request.match_info.get('job_id', -1)
try:
core.jobs.play(job_id)
core.jobs.start(job_id, False)
except Exception as ex:
return rest_error("Unable to start the job {}. {}".format(job.id, ex))
return rest_success()
Expand All @@ -719,7 +729,7 @@ def start(self, request):
def cancel(self, request):
job_id = request.match_info.get('job_id', -1)
try:
core.jobs.stop(job_id)
core.jobs.stop(job_id, False)
except Exception as ex:
return rest_error("Unable to stop the job {}. {}".format(job.id, ex))
return rest_success()
Expand All @@ -742,7 +752,7 @@ def finalize(self, request):
core.jobs.finalize(job_id, False)
except Exception as ex:
return rest_error("Unable to finalize the job {}. {}".format(job_id, ex))
return rest_success(format_job_json(Job.from_id(job_id)))
return rest_success(format_job_json(Job.from_id(job_id).to_json()))



Expand Down Expand Up @@ -770,7 +780,7 @@ async def get(self, request):
print('WS connection open by', ws_id)
WebsocketHandler.socket_list.append((ws, ws_id))
msg = '{"action":"online_user", "data" : [' + ','.join(['"' + _ws[1] + '"' for _ws in WebsocketHandler.socket_list]) + ']}'
rest_notify_all(None, msg)
core.notify_all(msg=msg)

try:
async for msg in ws:
Expand Down
48 changes: 36 additions & 12 deletions pirus/api_rest/templates/api_test.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@

/* Loading indicator */
.loader {
border: 16px solid #f3f3f3; /* Light grey */
border-top: 16px solid #3498db; /* Blue */
border: 10px solid #f3f3f3; /* Light grey */
border-top: 10px solid #3498db; /* Blue */
border-radius: 50%;
width: 50px;
height: 50px;
Expand Down Expand Up @@ -668,26 +668,50 @@ <h1>Job REST api</h1>
<i>Create a new job with provided json configuration in the body of the post request.</i>
</div>
</div>
contentType:
<div class="row form-group">
<div class="col-sm-2"><b>ContentType</b></div>
<div class="col-sm-10">
<code>'application/json'</code><br/>
</div>
</div>
<div class="row form-group">
<div class="col-sm-2"><b>Body</b></div>
<div class="col-sm-10">
<code>{ "pipeline_id" : &lt;pipe_id&gt;, "config" : &lt;json&gt;, "inputs" : [&lt;file_id&gt;,...]}</code><br/>
<i> <ul><li>&lt;pipe_id&gt; must refer to a pipeline "ready" to be used on the Pirus server;</li>
<li>&lt;json&gt; is the json data that will be provided to the pipeline to configure the job. The only mandatory key in this json
is the <code>"name"</code> to set the name of the job in pirus HMI;</li>
<li>[&lt;file_id&gt;,...] : a list of Pirus file's file id. Note that if one or several file are not ready to be used (still uploading by
<i>As a json request, the body is a json dictionnary with following keys: </i><br/>
<code>"pipeline_id" : int</code><br/>
<i>Refer to the id of a pipeline "ready" to be used on the Pirus server.</i><br/><br/>
<code>"name" : str</code><br/>
<i>The name of the job.</i><br/><br/>

<code>"config" : {json}</code><br/>
<i>The json data that will be provided to the pipeline to configure the job. You can download the json form use to generate this config
by looking the form.json document of the pipe (<code>/pipeline/{pipeline_id}</code>). If the pipeline do no provide form.json file,
then, you can let this entry empty.<br/>
IMPORTANT : the form.json describe the form to set the config json. so you dont have to set config with all information describe in the
form.json file. The config shall be a simple key-value dictionnary</i><br/><br/>

<code>"inputs" : [int,...]</code><br/>
<i>A list of Pirus file's file id. Note that if one or several file are not ready to be used (still uploading by
example), je job will be in "waiting" status and will automaticaly start when all inputs files will be ready. Of course, if a file is deleted
or its upload never end, the job will never start. And you will have to stop or delete it yourself.</li>
</ul>
</i>
or its upload never end, the job will never start. And you will have to stop or delete it yourself.</i>

</div>
</div>
<div class="row form-group">
<div class="col-sm-2"><b>Test</b></div>
<div class="col-sm-10">
Main query : <br/>
<input id="job_api_03_query" type="text" placeholder="/job" value="/job" readonly="readonly"/>
Body : <br/>
<textarea id="job_api_03_body" cols="40" rows="6">{ "pipeline_id" : 1, "config" : {"name" : "my test", "inputs" : []}</textarea><br/>
Body : to set config, retrieve the form.json document of the pipeline you want to run (<code>/pipeline/{pipeline_id}</code>)<br/>
<textarea id="job_api_03_body" cols="60" rows="10">{
"pipeline_id" : 1,
"name" : "My Test Job",
"config" : {
"Prop1" : "value 1",
"HasProp2" : true,
"inputs" : []
}</textarea><br/>
<button onclick="generic_exec_body('job_api_03_query', 'job_api_03_body', 'job_result');" class="btn btn-primary">Test it</button>
</div>
</div>
Expand Down
2 changes: 1 addition & 1 deletion pirus/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
# CORE OBJECT
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def notify_all_print(msg):
def notify_all_print(msg=None, src=None):
"""
Default delegate used by the core for notification.
"""
Expand Down
5 changes: 2 additions & 3 deletions pirus/core/pirus/container_managers/lxd_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,9 @@ def init_job(self, job, asynch=False, auto_notify=True):
exec_cmd(["lxc", "start", lxd_container])
lxd_job_file = os.path.join("/", os.path.basename(job_file))
exec_cmd(["lxc", "file", "push", job_file, lxd_container + lxd_job_file])
exec_cmd(["lxc", "exec", "--mode=non-interactive", lxd_container, "--", "chmod", "+x", lxd_job_file])

cmd = ["lxc", "exec", lxd_container, lxd_job_file]
exec_cmd(cmd, asynch=True)
cmd = ["lxc", "exec", "--mode=non-interactive", lxd_container, "--", "chmod", "+x", lxd_job_file]
r, o, e = exec_cmd(cmd, True)
# TODO : keep future callback and catch error if start command failled

# if not asynch:
Expand Down
21 changes: 10 additions & 11 deletions pirus/core/pirus/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get(self, fields=None, query=None, order=None, offset=None, limit=None, dept



def new(self, pipeline_id, config, inputs_ids=[], asynch=True, auto_notify=True):
def new(self, pipeline_id:int, name:str, config:dict, inputs_ids=[], asynch=True, auto_notify=True):
"""
Create a new job for the specified pipepline (pipeline_id), with provided config and input's files ids
"""
Expand All @@ -60,12 +60,12 @@ def new(self, pipeline_id, config, inputs_ids=[], asynch=True, auto_notify=True)
raise RegovarException("Pipeline not found (id={}).".format(pipeline_id))
if pipeline.status != "ready":
raise RegovarException("Pipeline status ({}) is not \"ready\". Cannot create a job.".format(pipeline.status))
if not isinstance(config, dict) and "name" not in config.keys():
if not name:
raise RegovarException("A name must be provided to create new job")
# Init model
job = Job.new()
job.status = "initializing"
job.name = config["name"]
job.name = name
job.config = json.dumps(config, sort_keys=True, indent=4)
job.progress_value = 0
job.pipeline_id = pipeline_id
Expand All @@ -92,7 +92,7 @@ def new(self, pipeline_id, config, inputs_ids=[], asynch=True, auto_notify=True)
# Set job's config in the inputs directory of the job
config_path = os.path.join(inputs_path, "config.json")
job_config = {
"pirus" : {"notify_url" : NOTIFY_URL.format(job.id)},
"pirus" : {"notify_url" : NOTIFY_URL.format(job.id), "job_name" : job.name},
"job" : config
}
with open(config_path, 'w') as f:
Expand Down Expand Up @@ -231,8 +231,6 @@ def finalize(self, job_id, asynch=True):
job = Job.from_id(job_id)
if not job:
raise RegovarException("Job not found (id={}).".format(job_id))
if job.status not in ["waiting", "running", "pause"]:
raise RegovarException("Job status is \"{}\". Cannot proceed to the finalization of the job.".format(job.status ))
# Register outputs files
outputs_path = os.path.join(job.path, "outputs")
logs_path = os.path.join(job.path, "logs")
Expand Down Expand Up @@ -303,7 +301,7 @@ def set_status(self, job, new_status, notify=True, asynch=True):
self.finalize(job.id, asynch)
# Push notification
if notify:
core.notify_all({"action": "job_updated", "data" : [job.to_json()]})
core.notify_all(msg={"action": "job_updated", "data" : [job.to_json()]})


def __init_job(self, job_id, asynch, auto_notify):
Expand Down Expand Up @@ -370,8 +368,7 @@ def __start_job(self, job_id, asynch):
if core.container_managers[job.pipeline.type].start_job(job):
self.set_status(job, "running", asynch)
return True
else:
return False
return False



Expand All @@ -393,7 +390,7 @@ def __pause_job(self, job_id, asynch):
return False
self.set_status(job, "pause", asynch)
return True

return False


def __stop_job(self, job_id, asynch):
Expand All @@ -410,8 +407,9 @@ def __stop_job(self, job_id, asynch):
# Log error
self.set_status(job, "error", asynch)
return False
self.set_status(job, "waiting", asynch)
self.set_status(job, "canceled", asynch)
return True
return False



Expand All @@ -432,3 +430,4 @@ def __finalize_job(self, job_id, asynch):
else:
self.set_status(job, "error", asynch)
return False
return False

0 comments on commit b80a871

Please sign in to comment.