Skip to content
This repository has been archived by the owner on May 26, 2018. It is now read-only.

Commit

Permalink
fix #2, with @dridk
Browse files Browse the repository at this point in the history
  • Loading branch information
ikit committed Oct 6, 2016
1 parent 8401bc3 commit 01c83be
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 28 deletions.
8 changes: 6 additions & 2 deletions pirus/api_v1/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@
app.router.add_route('POST', "/v1/run", runHdl.post)
app.router.add_route('GET', "/v1/run/{run_id}", runHdl.get_status)
app.router.add_route('GET', "/v1/run/{run_id}/status", runHdl.get_status)
app.router.add_route('GET', "/v1/run/{run_id}/log", runHdl.get_olog)
app.router.add_route('GET', "/v1/run/{run_id}/out", runHdl.get_olog)
app.router.add_route('GET', "/v1/run/{run_id}/err", runHdl.get_elog)
app.router.add_route('GET', "/v1/run/{run_id}/files", runHdl.get_plog)
app.router.add_route('GET', "/v1/run/{run_id}/log", runHdl.get_plog)
app.router.add_route('GET', "/v1/run/{run_id}/files", runHdl.get_files)
app.router.add_route('GET', "/v1/run/{run_id}/file/{filename}", runHdl.get_file)
app.router.add_route('GET', "/v1/run/{run_id}/pause", runHdl.get_pause)
app.router.add_route('GET', "/v1/run/{run_id}/play", runHdl.get_play)
app.router.add_route('GET', "/v1/run/{run_id}/stop", runHdl.get_stop)

app.router.add_route('GET', "/v1/run/notify/{run_id}/{complete}", runHdl.up_progress)
app.router.add_route('GET', "/v1/run/notify/{run_id}/status/{status}", runHdl.up_status)
Expand Down
52 changes: 26 additions & 26 deletions pirus/pirus_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import os
import sys
import time
import requests
import logging
import json
Expand Down Expand Up @@ -54,15 +55,16 @@ def setup_logger(logger_name, log_file, level=logging.INFO):
l.addHandler(fileHandler)
l.addHandler(streamHandler)

def execute(cmd, olog, elog):
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc:
# TODO FIXME => logging realtime not working :( ...
out = str(proc.stdout.read())
err = str(proc.stderr.read())
if out != "":
olog.info(out)
if err != "":
elog.info(err)
def execute(cmd, olog=None, elog=None):
subprocess.call(cmd)
# with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as proc:
# # TODO FIXME => logging realtime not working :( ...
# out = str(proc.stdout.read())
# err = str(proc.stderr.read())
# if out != "":
# olog.info(out)
# if err != "":
# elog.info(err)



Expand Down Expand Up @@ -116,8 +118,8 @@ def run_pipeline(self, pipe_image_alias, config):
setup_logger('run_out', os.path.join(lpath, "out.log"))
setup_logger('run_err', os.path.join(lpath, "err.log"))
plog = logging.getLogger('pirus_worker')
olog = logging.getLogger('run_out')
elog = logging.getLogger('run_err')
# olog = logging.getLogger('run_out')
# elog = logging.getLogger('run_err')

plog.info('INIT | Pirus worker initialisation : ')
plog.info('INIT | - Pipe ID : ' + pipe_image_alias)
Expand Down Expand Up @@ -147,14 +149,14 @@ def run_pipeline(self, pipe_image_alias, config):
self.notify_status("BUILDING")
try:
# create container
execute(["lxc", "init", "PirusBasic", c_name], olog, elog)
execute(["lxc", "init", "PirusBasic", c_name])
# set up env
execute(["lxc", "config", "set", c_name, "environment.NOTIFY", 'http://' + HOSTNAME + '/run/notify/' + self.run_id + '/'], olog, elog)
execute(["lxc", "config", "set", c_name, "environment.NOTIFY", 'http://' + HOSTNAME + '/run/notify/' + self.run_id + '/'])
# set up devices
execute(["lxc", "config", "device", "add", c_name, "pirus_inputs", "disk", "source="+ipath, "path=pipeline/inputs", "readonly=True"], olog, elog)
execute(["lxc", "config", "device", "add", c_name, "pirus_outputs", "disk", "source="+opath, "path=pipeline/outputs"], olog, elog)
execute(["lxc", "config", "device", "add", c_name, "pirus_logs", "disk", "source="+lpath, "path=pipeline/logs"], olog, elog)
execute(["lxc", "config", "device", "add", c_name, "pirus_db", "disk", "source="+DATABASES_DIR, "path=pipeline/db", "readonly=True"], olog, elog)
execute(["lxc", "config", "device", "add", c_name, "pirus_inputs", "disk", "source="+ipath, "path=pipeline/inputs", "readonly=True"])
execute(["lxc", "config", "device", "add", c_name, "pirus_outputs", "disk", "source="+opath, "path=pipeline/outputs"])
execute(["lxc", "config", "device", "add", c_name, "pirus_logs", "disk", "source="+lpath, "path=pipeline/logs"])
execute(["lxc", "config", "device", "add", c_name, "pirus_db", "disk", "source="+DATABASES_DIR, "path=pipeline/db", "readonly=True"])
# TODO => create symlink in ipath directory
# TODO => copy config file of the run in the ipath directory
except:
Expand All @@ -166,8 +168,10 @@ def run_pipeline(self, pipe_image_alias, config):
plog.info('RUN | Run the pipe !')
self.notify_status("RUN")
try:
execute(["lxc", "start", c_name], olog, elog)
execute(["lxc", "exec", c_name, "/pipeline/run/run.sh"], olog, elog)
execute(["lxc", "start", c_name])
# execute(["echo", '"/pipeline/run/run.sh > /pipeline/logs/out.log 2> /pipeline/logs/err.log"', ">", "/pipeline/run/runcontainer.sh"])
# execute(["chmod", '+x', ">", "/pipeline/run/runcontainer.sh"])
subprocess.call(["lxc", "exec", c_name, "/pipeline/run/run.sh"], stdout=open(lpath+"/out.log", "w"), stderr=open(lpath+"/err.log", "w"))
except:
plog.info('FAILLED | Unexpected error ' + str(sys.exc_info()[0]))
self.notify_status("FAILLED")
Expand All @@ -177,16 +181,12 @@ def run_pipeline(self, pipe_image_alias, config):
plog.info('STOP | Run ending')
self.notify_status("STOP")
try:
# Force the container to change results files owner to allow the server to use them
plog.info('STOP | - chown ' + str(os.getuid()) + ":" + str(os.getgid()) + ' on outputs and logs files produced by the container')
execute(["useradd", "-u", str(os.getuid()), "-g", str(os.getgid()), "lxd"], olog, elog)
execute(["lxc", "exec", c_name, "--", "chown", str(os.getuid()) + ":" + str(os.getgid()), "-Rf", "/pipeline"], olog, elog)

# Clean outputs
plog.info('STOP | - chmod 775 on outputs and logs files produced by the container')
execute(["lxc", "exec", c_name, "--", "chmode", "775", "-Rf", "/pipeline"], olog, elog)
execute(["lxc", "exec", c_name, "--", "chmod", "755", "-Rf", "/pipeline"])

plog.info('STOP | - closing and deleting the lxc container : ' + c_name)
execute(["lxc", "delete", c_name, "--force"], olog, elog)
execute(["lxc", "delete", c_name, "--force"])
except:
plog.info('FAILLED | Unexpected error ' + str(sys.exc_info()[0]))
self.notify_status("FAILLED")
Expand Down

0 comments on commit 01c83be

Please sign in to comment.