Skip to content
This repository has been archived by the owner on May 26, 2018. It is now read-only.

Commit

Permalink
fix #29 + refactoring pipeline package installation
Browse files Browse the repository at this point in the history
  • Loading branch information
ikit committed Oct 19, 2016
1 parent 8d5dbf4 commit 7460891
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 86 deletions.
95 changes: 53 additions & 42 deletions pirus/api_v1/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,17 @@ async def post(self, request):
def delete(self, request):
# 1- Retrieve pirus pipeline from post request
pipe_id = request.match_info.get('pipe_id', -1)
# 2- Check that the user is allowed to remove the package (owner or admin)
# Todo
# 3- Check that the pipeline is not running
# Todo
# 4- Remove pipeline files
ipdb.set_trace()
if pipe_id == -1:
return rest_error("Unknow pipeline id " + str(pipe_id))

# 5- Remove pipeline informations in database
print ("DELETE pipeline/<id=" + str(pipe_id) + ">")
return rest_success("Uninstall of pipeline " + str(pipe_id) + " success.")
try:
pipeline = Pipeline.delete(pipe_id)
except Exception as error:
# TODO : manage error
return rest_error("Server Error : The following occure during installation of the pipeline. " + error.msg)

return rest_success("Pipeline " + str(pipe_id) + " deleted.")


def get_details(self, request):
Expand Down Expand Up @@ -297,6 +299,8 @@ async def post(self, request):
if pipeline is None:
return rest_error("Unknow pipeline id " + str(pipe_id))


config = { "run" : config, "pirus" : { "notify_url" : ""}}
# 3- Enqueue run of the pipeline with celery
try:
cw = run_pipeline.delay(pipeline.lxd_alias, config, inputs)
Expand All @@ -308,7 +312,7 @@ async def post(self, request):
run = Run()
run.import_data({
"pipe_id" : pipe_id,
"name" : config["name"],
"name" : config["run"]["name"],
"celery_id" : str(cw.id),
"start" : str(datetime.datetime.now().timestamp()),
"status" : "INIT",
Expand Down Expand Up @@ -384,49 +388,56 @@ def get_file(self, request):
filename = request.match_info.get('filename', "")
return self.download_file(run_id, filename)

async def up_progress(self, request):
# async def up_progress(self, request):
# # 1- Retrieve data from request
# data = await request.json()
# pipe_id = data["pipeline_id"]
# config = data["config"]
# inputs = data["inputs"]
# run_id = request.match_info.get('run_id', -1)
# complete = request.match_info.get('complete', None)
# print("RunHandler[up_progress] : taskid=" + run_id, complete)
# run = Run.from_celery_id(run_id)
# if run is not None:
# p = run.progress.copy()
# p.update({"value" : complete, "label" : str(complete) + " %"})
# # TODO FIXME : workaround to force the update of dynamic field "progress" - only updating progress dictionary doesn't work :(
# run.progress = 0
# run.save()
# run.progress = p
# run.save()
# msg = '{"action":"run_progress", "data" : ' + json.dumps(run.export_client_data()) + '}'
# print (msg)
# notify_all(None, msg)
# return web.Response()

# def up_status(self, request):
# run_id = request.match_info.get('run_id', -1)
# status = request.match_info.get('status', None)
# print("RunHandler[up_status] : taskid=" + run_id , status)
# run = Run.from_celery_id(run_id)
# if run is not None:
# run.status = status
# run.save()
# msg = '{"action":"run_progress", "data" : ' + json.dumps(last_runs()) + '}'
# notify_all(None, msg)
# return web.Response()

async def up_data(self, request):
# 1- Retrieve data from request
data = await request.json()
pipe_id = data["pipeline_id"]
config = data["config"]
inputs = data["inputs"]
run_id = request.match_info.get('run_id', -1)
complete = request.match_info.get('complete', None)
print("RunHandler[up_progress] : taskid=" + run_id, complete)
run = Run.from_celery_id(run_id)
if run is not None:
p = run.progress.copy()
p.update({"value" : complete, "label" : str(complete) + " %"})
# TODO FIXME : workaround to force the update of dynamic field "progress" - only updating progress dictionary doesn't work :(
run.progress = 0
run.save()
run.progress = p
if "progress" in data.keys():
run.progress = data["progress"]
if "status" in data.keys():
run.status = data["status"]
run.save()
msg = '{"action":"run_progress", "data" : ' + json.dumps(run.export_client_data()) + '}'
print (msg)
notify_all(None, msg)
return web.Response()

def up_status(self, request):
run_id = request.match_info.get('run_id', -1)
status = request.match_info.get('status', None)
print("RunHandler[up_status] : taskid=" + run_id , status)
run = Run.from_celery_id(run_id)
if run is not None:
run.status = status
run.save()
msg = '{"action":"run_progress", "data" : ' + json.dumps(last_runs()) + '}'
notify_all(None, msg)
return web.Response()

async def up_data(self, request):
# 1- Retrieve data from request
data = await request.json()
run_id = data["pipeline_id"]
config = data["config"]
inputs = data["inputs"]
pass


def get_pause(self, request):
# Todo
Expand Down
90 changes: 51 additions & 39 deletions pirus/api_v1/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,59 +186,71 @@ def from_id(pipe_id):
pipe = Pipeline.objects.get(pk=pipe_id)
return pipe


@staticmethod
def delete(pipe_id):
pipe = Pipeline.from_id(pipe_id)
if pipe != None:
shutil.rmtree(pipe.path)
pipe.delete()


@staticmethod
def install(ppackage_name, ppackage_path, ppackage_file):
plog.info('I: Installation of the pipeline package : ' + ppackage_path)
# 1- Extract pipeline package
# 1- Extract pipeline metadata
try:
tar = tarfile.open(ppackage_file)
tar.extractall(path=ppackage_path)
tar.close()
xdir = [info for info in tar.getmembers() if info.name == "metadata.yaml"]
metadata = tar.extractfile(member=xdir[0])
metadata = metadata.read()
metadata = json.loads(metadata.decode())
metadata = metadata["pirus"]
except:
# TODO : manage error + remove package file
plog.info('E: [FAILLED] Extraction of PirusPipeline.tar.gz.')
raise PirusException("XXXX", "Unable to extract package. Corrupted file or wrong format.")
plog.info('I: [OK ] Extraction of PirusPipeline.tar.gz.')
plog.info('E: [FAILLED] Extraction of ' + ppackage_file)
raise PirusException("XXXX", "Unable to extract package. Corrupted file or wrong format")
plog.info('I: [OK ] Extraction of metadata from ' + ppackage_file)

# 2- Check module
metadata_file = os.path.join(ppackage_path, "metadata.yaml")
if not os.path.exists(metadata_file):
# TODO : manage error + remove package file
plog.info('E: [FAILLED] Manifest.json file extraction.')
raise PirusException("XXXX", "No manifest file found.")
metadata = None
try :
with open(metadata_file) as f:
metadata = json.load(f)
metadata = metadata["pirus"]
except :
# TODO : manage error
raise PirusException("XXXX", "Bad pirus pipeline format : Manifest file corrupted.")
plog.info('I: [OK ] Manifest.json file extraction.')

# 3- Check that mandatory fields exists
# 2- Check that mandatory fields exists
missing = ""
for k in MANIFEST_MANDATORY.keys():
if k not in metadata.keys():
missing += k + ", "
if missing != "":
missing = missing[:-2]
plog.info('E: [FAILLED] Checking validity of manifest.json file. (missing : ' + missing + ")")
raise PirusException("XXXX", "Bad pirus pipeline format : Mandory fields missing in the manifest.json file (missing : " + missing + ")")
plog.info('I: [OK ] Checking validity of manifest.json file.')

# 4- Extract pirus technicals files from the package
ffile_src = os.path.join(ppackage_path, "rootfs",metadata['form'][1:] if metadata['form'][0]=="/" else metadata['form'])
ffile_dst = os.path.join(ppackage_path, "form.json")
shutil.copyfile(ffile_src, ffile_dst)
lfile_dst = ""
if "icon" in metadata.keys():
lfile_src = os.path.join(ppackage_path, "rootfs",metadata['icon'][1:] if metadata['icon'][0]=="/" else metadata['icon'])
lfile_dst = os.path.join(ppackage_path, os.path.basename(metadata['icon']))
shutil.copyfile(lfile_src, lfile_dst)
else:
lfile_dst = os.path.join(ppackage_path, "logo.png")
shutil.copyfile(os.path.join(TEMPLATE_DIR, "logo.png", lfile_dst))
plog.info('E: [FAILLED] Checking validity of metadata (missing : ' + missing + ")")
raise PirusException("XXXX", "Bad pirus pipeline format. Mandory fields missing in the metadata : " + missing)
plog.info('I: [OK ] Checking validity of metadata')

# 3- Extract pirus technicals files from the tar file
try:
ffile_src = os.path.join("rootfs",metadata['form'][1:] if metadata['form'][0]=="/" else metadata['form'])
xdir = [info for info in tar.getmembers() if info.name == ffile_src]
file = tar.extractfile(member=xdir[0])
ffile_src = os.path.join(ppackage_path, ffile_src)
ffile_dst = os.path.join(ppackage_path, "form.json")
with open(ffile_dst, 'bw+') as f:
f.write(file.read())

lfile_src = PIPELINE_DEFAULT_ICON_PATH
lfile_dst = os.path.join(ppackage_path, "icon.png")
if "icon" in metadata.keys():
lfile_src = os.path.join("rootfs",metadata['icon'][1:] if metadata['icon'][0]=="/" else metadata['icon'])
xdir = [info for info in tar.getmembers() if info.name == lfile_src]
file = tar.extractfile(member=xdir[0])
lfile_src = os.path.join(ppackage_path, lfile_src)
lfile_dst = os.path.join(ppackage_path, os.path.basename(metadata['icon']))
with open(lfile_dst, 'bw+') as f:
f.write(file.read())
else:
shutil.copyfile(lfile_src, lfile_dst)
except:
# TODO : manage error + remove package file
plog.info('E: [FAILLED] Extraction of ' + ppackage_file)
raise PirusException("XXXX", "Error occure during extraction of pipeline technical files (form.json / icon)")
plog.info('I: [OK ] Extraction of pipeline technical files (form.json / icon)')


# 5- Save pipeline into database
metadata.update({
Expand Down
2 changes: 1 addition & 1 deletion pirus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@
"run" : "The command line that will executed by pirus to run the pipeline.",
}

PIPELINE_DEFAULT_ICON = "pipeline_icon.pngl"
PIPELINE_DEFAULT_ICON_PATH = os.path.join(TEMPLATE_DIR , "pipeline_icon.png")
4 changes: 2 additions & 2 deletions pirus/pirus_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ def dump_context(self):


def notify_status(self, status:str):
requests.get(self.notify_url + "/s/" + status)
print ("send notify status : ", self.notify_url + "/s/" + status)
requests.post(self.notify_url, , data = {"status": status} )



Expand All @@ -90,6 +89,7 @@ def run_pipeline(self, pipe_image_alias, config, inputs):

self.run_celery_id = str(self.request.id)
self.notify_url = 'http://' + HOSTNAME + '/run/notify/' + self.run_celery_id
config["pirus"]["notify_url"] = self.notify_url

# Init path
rpath = os.path.join(RUNS_DIR, self.run_celery_id)
Expand Down
5 changes: 3 additions & 2 deletions pirus/templates/home.html
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ <h1 class="content-subhead"><i class="fa fa-file-o" aria-hidden="true"></i> File
<div class="panel panel-default">
<div class="panel-heading">
<h1 class="panel-title">
<a data-toggle="collapse" href="#newFilePanel"><i class="fa fa-plus" aria-hidden="true"></i> Uload a file</a>
<a data-toggle="collapse" href="#newFilePanel"><i class="fa fa-plus" aria-hidden="true"></i> Upload a file</a>
</h1>
</div>
<div id="newFilePanel" class="panel-collapse collapse">
Expand Down Expand Up @@ -201,7 +201,7 @@ <h1 class="content-subhead"><i class="fa fa-tasks" aria-hidden="true"></i> Runs<
<div class="alert alert-warning" role="alert">
<strong>Warning!</strong> No run have been launched.
</div>
{% endif %}
{% else %}
<table id="runsList" class="table table-striped table-bordered" cellspacing="0" width="100%">
<thead>
<tr>
Expand Down Expand Up @@ -252,6 +252,7 @@ <h1 class="content-subhead"><i class="fa fa-tasks" aria-hidden="true"></i> Runs<
</tr>
{% endfor %}
</table>
{% endif %}
</div>


Expand Down

0 comments on commit 7460891

Please sign in to comment.