Skip to content
This repository has been archived by the owner on May 26, 2018. It is now read-only.

Commit

Permalink
fix worker init
Browse files Browse the repository at this point in the history
  • Loading branch information
ikit committed Oct 13, 2016
1 parent e78574f commit 78b1532
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 31 deletions.
13 changes: 6 additions & 7 deletions pirus/api_v1/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,11 @@ async def upload_simple(self, request):
pirusfile = PirusFile()
pirusfile.import_data({
"file_name" : uploadFile.filename,
"file_type" : os.path.splitext(file_name)[1],
"file_type" : os.path.splitext(uploadFile.filename)[1][1:].strip().lower(),
"file_path" : file_path,
"file_size" : humansize(os.path.getsize(file_path)),
"status" : "OK",
"comments" : "",
"owner" : "",
"status" : "TMP", # DOWNLOADING, TMP, OK
"create_date" : str(datetime.datetime.now().timestamp()),
"tags" : [os.path.splitext(uploadFile.filename)[0]],
"runs_stats" : {},
"md5sum" : md5(file_path),
})
pirusfile.save()
Expand Down Expand Up @@ -309,15 +305,18 @@ async def post(self, request):
data = await request.json()
pipe_id = data["pipeline_id"]
config = data["config"]
inputs = data["inputs"]

# 2- Load pipeline from database
pipeline = Pipeline.from_id(pipe_id)
if pipeline is None:
return rest_error("Unknow pipeline id " + str(pipe_id))
if config is None:
return rest_error("Config is empty")

# 3- Enqueue run of the pipeline with celery
try:
cw = run_pipeline.delay("PirusSimple", config) # pipeline.path, config)
cw = run_pipeline.delay(pipeline.lxd_alias, config, inputs)
plog.info('RUNNING | New Run start : ' + str(cw.id))
except:
# TODO : clean filesystem
Expand Down
45 changes: 27 additions & 18 deletions pirus/api_v1/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ class PirusFile(Document):
file_name = StringField(required=True)
file_type = StringField()
file_path = StringField()
file_size = StringField()
status = StringField()
file_size = StringField()
status = StringField() # DL, TMP, OK
comments = StringField()
owner = StringField()
locations = DynamicField() # dic {key=Run_ID : val="in"|"out"} #empty mean not used
create_date = StringField()
tags = ListField(StringField())
runs_stats = DynamicField()
md5sum = StringField()

def __str__(self):
Expand All @@ -45,9 +45,9 @@ def export_server_data(self):
"status" : self.status,
"comments" : self.comments,
"owner" : self.owner,
"locations" : self.locations,
"create_date" : self.create_date,
"tags" : self.tags,
"runs_stats" : self.runs_stats,
"md5sum" : self.md5sum,
"id": str(self.id)
}
Expand All @@ -60,26 +60,30 @@ def export_client_data(self):
"status" : self.status,
"comments" : self.comments,
"owner" : self.owner,
"locations" : self.locations,
"create_date" : self.create_date,
"tags" : self.tags,
"runs_stats" : self.runs_stats,
"md5sum" : self.md5sum,
"id": str(self.id)
}

def import_data(self, data):
try:
self.file_name = data['file_name']
self.file_type = data["file_type"]
self.file_path = data['file_path']
self.file_size = data["file_size"]
self.status = data["status"]
self.comments = data["comments"]
self.owner = data["owner"]
self.create_date = data["create_date"]
self.tags = data['tags']
self.runs_stats = data['runs_stats']
self.md5sum = data['md5sum']
self.file_name = data['file_name']
self.file_type = data["file_type"]
self.file_path = data['file_path']
self.file_size = data["file_size"]
self.status = data["status"]
self.create_date = data["create_date"]
self.md5sum = data['md5sum']
if "locations" in data.keys():
self.locations = data["locations"]
if "tags" in data.keys():
self.tags = data['tags']
if "comments" in data.keys():
self.comments = data["comments"]
if "owner" in data.keys():
self.owner = data["owner"]
except KeyError as e:
raise ValidationError('Invalid input file: missing ' + e.args[0])
return self
Expand All @@ -97,6 +101,7 @@ def from_id(ifile_id):

class Pipeline(Document):
name = StringField(required=True)
lxd_alias = StringField()
description = StringField()
version = StringField(required=True)
version_api = StringField(required=True)
Expand All @@ -121,6 +126,7 @@ def export_server_data(self):
return {
"id" : str(self.id),
"name" : self.name,
"lxd_alias" : self.lxd_alias,
"description" : self.description,
"version" : self.version,
"version_api" : self.version_api,
Expand Down Expand Up @@ -165,6 +171,8 @@ def import_data(self, data):
self.dpath = data['dpath']
self.cfile = data['cfile']
self.ffile = data['ffile']
if "lxd_alias" in data.keys():
self.lxd_alias = data["lxd_alias"]
if "description" in data.keys():
self.description = data["description"]
if "version" in data.keys():
Expand Down Expand Up @@ -276,7 +284,8 @@ def install(ppackage_name, ppackage_path, ppackage_file):
"dpath" : manifest_data["databases"],
"cfile" : cfile_dst,
"ffile" : ffile_dst,
"lfile" : lfile_dst
"lfile" : lfile_dst,
"lxd_alias" : "pirus-pipe-" + ppackage_name
})

pipeline = Pipeline()
Expand All @@ -291,7 +300,7 @@ def install(ppackage_name, ppackage_path, ppackage_file):
plog.info('I: [OK ] Save pipeline information in database with id='+ str(pipeline.id))

# 6- Install lxd container
cmd = ["lxc", "image", "import", ppackage_file, "--alias", "pirus-pipe-" + ppackage_name]
cmd = ["lxc", "image", "import", ppackage_file, "--alias", pipeline.lxd_alias]
try:
out_tmp = '/tmp/' + ppackage_name + '-out'
err_tmp = '/tmp/' + ppackage_name + '-err'
Expand Down
25 changes: 22 additions & 3 deletions pirus/pirus_worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!env/python3
# coding: utf-8
import ipdb;

import os
import sys
Expand Down Expand Up @@ -83,7 +84,7 @@ def notify_status(self, status:str):


@app.task(base=PirusTask, queue='PirusQueue', bind=True)
def run_pipeline(self, pipe_image_alias, config):
def run_pipeline(self, pipe_image_alias, config, inputs):
self.run_id = str(self.request.id)
self.notify_url = NOTIFY_URL + str(self.request.id)

Expand Down Expand Up @@ -113,14 +114,31 @@ def run_pipeline(self, pipe_image_alias, config):
# elog = logging.getLogger('run_err')

wlog.info('INIT | Pirus worker initialisation : ')
wlog.info('INIT | - Pipe ID : ' + pipe_image_alias)
wlog.info('INIT | - LXD alias : ' + pipe_image_alias)
wlog.info('INIT | - Run ID : ' + self.run_id)
wlog.info('INIT | Directory created : ')
wlog.info('INIT | - inputs : ' + ipath)
wlog.info('INIT | - outputs : ' + opath)
wlog.info('INIT | - logs : ' + lpath)
wlog.info('INIT | - db : ' + DATABASES_DIR)



# Init inputs
cfile = os.path.join(ipath, "config.json")
with open(cfile, 'w') as f:
f.write(json.dumps(config))
os.chmod(cfile, 0o777)

#for ifile in inputs:
#pirusfile = PirusFile.from_id(ifile[0])
#if pirusfile is None:
# pass
#else:
# Todo : manage case when different file having same file_name.
os.symlink("/var/tmp/pirus_v1/downloads/6d8b51ae-bf0f-4cc8-9165-9517dfb2a70f", os.path.join(ipath, "0031.jpg"))


# Check database, to see how many container are running and if we can create a new one for this run
wlog.info('WAITING | Looking for lxc container creation ...')
self.notify_status("WAITING")
Expand All @@ -140,7 +158,7 @@ def run_pipeline(self, pipe_image_alias, config):
self.notify_status("BUILDING")
try:
# create container
execute(["lxc", "init", "PirusBasic", c_name])
execute(["lxc", "init", pipe_image_alias, c_name])
# set up env
execute(["lxc", "config", "set", c_name, "environment.NOTIFY", 'http://' + HOSTNAME + '/run/notify/' + self.run_id + '/'])
# set up devices
Expand All @@ -163,6 +181,7 @@ def run_pipeline(self, pipe_image_alias, config):
# execute(["echo", '"/pipeline/run/run.sh > /pipeline/logs/out.log 2> /pipeline/logs/err.log"', ">", "/pipeline/run/runcontainer.sh"])
# execute(["chmod", '+x', ">", "/pipeline/run/runcontainer.sh"])
subprocess.call(["lxc", "exec", c_name, "/pipeline/run/run.sh"], stdout=open(lpath+"/out.log", "w"), stderr=open(lpath+"/err.log", "w"))

except:
wlog.info('FAILLED | Unexpected error ' + str(sys.exc_info()[0]))
self.notify_status("FAILLED")
Expand Down
18 changes: 15 additions & 3 deletions pirus/templates/home.html
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ <h1 class="content-subhead"><i class="fa fa-play" aria-hidden="true"></i> Run a
data = jsonFile["data"];
for (var i=0; i<data.length; i++)
{
json_files[i] = data[i]["file_name"] + " [" + data[i]["id"] + "]"
json_files[i] = "('" + data[i]["id"] + "', '" + data[i]["file_name"] + "')'"
}
})
json_final = json_final.replace(/"__PIRUS_INPUT_FILES__"/g, JSON.stringify(json_files))
Expand Down Expand Up @@ -249,12 +249,24 @@ <h1 class="content-subhead"><i class="fa fa-play" aria-hidden="true"></i> Run a
function post_run(pipe_id)
{
if (bf.validate())
{
{
var inputs = []
var config = bf.getData()
// create input list with all available files
$.ajax({ url: "http://{{ hostname }}/file", type: "GET", async: false}).done(function(jsonFile)
{
data = jsonFile["data"];
for (var i=0; i<data.length; i++)
{
inputs[i] = "('" + data[i]["id"] + "', '" + data[i]["file_name"] + "')'"
}
})

$.ajax(
{
url: "http://{{ hostname }}/run",
type: "POST",
data: "{\"pipeline_id\" : \""+ pipe_id +"\", \"config\" : " + JSON.stringify(bf.getData(), null, 4) + "}"
data: "{\"pipeline_id\" : \""+ pipe_id +"\", \"config\" : " + JSON.stringify(config, null, 4) + ", \"inputs\" : " + JSON.stringify(inputs, null, 4) + "}"
}).fail(function()
{
alert( "ERROR" );
Expand Down

0 comments on commit 78b1532

Please sign in to comment.