Skip to content

Commit

Permalink
db: write workflow status to DB instead of FS
Browse files Browse the repository at this point in the history
* Adds models using plain SQLAlchemy since
  pallets-eco/flask-sqlalchemy#250 looks like it won't be merged soon.

Signed-off-by: Diego Rodriguez <diego.rodriguez@cern.ch>
  • Loading branch information
Diego Rodriguez committed Oct 19, 2017
1 parent 9256fbc commit a60a6ca
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 32 deletions.
37 changes: 37 additions & 0 deletions reana_workflow_engine_yadage/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2017 CERN.
#
# REANA is free software; you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# REANA is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# REANA; if not, write to the Free Software Foundation, Inc., 59 Temple Place,
# Suite 330, Boston, MA 02111-1307, USA.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization or
# submit itself to any jurisdiction.

"""REANA Workflow Engine Yadage config."""

import os

SHARED_VOLUME = os.getenv('SHARED_VOLUME', '/reana/default')
"""Path to the mounted REANA shared volume."""

REANA_DB_FILE = './reana.db'
"""REANA SQLite db file."""

SQLALCHEMY_DATABASE_URI = \
'sqlite:///{SHARED_VOLUME}/{REANA_DB_FILE}'.format(
SHARED_VOLUME=SHARED_VOLUME,
REANA_DB_FILE=REANA_DB_FILE)
"""SQL database URI."""
39 changes: 39 additions & 0 deletions reana_workflow_engine_yadage/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2017 CERN.
#
# REANA is free software; you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# REANA is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# REANA; if not, write to the Free Software Foundation, Inc., 59 Temple Place,
# Suite 330, Boston, MA 02111-1307, USA.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization or
# submit itself to any jurisdiction.

"""Rest API endpoint for workflow management."""

from __future__ import absolute_import

from reana_workflow_engine_yadage.config import SQLALCHEMY_DATABASE_URI
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from reana_workflow_engine_yadage.models import User, Workflow # isort:skip # noqa


def load_session():
"""Load SQLAlchemy database session."""
engine = create_engine(SQLALCHEMY_DATABASE_URI)
Session = sessionmaker(bind=engine)
session = Session()
return session
75 changes: 75 additions & 0 deletions reana_workflow_engine_yadage/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2017 CERN.
#
# REANA is free software; you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# REANA is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# REANA; if not, write to the Free Software Foundation, Inc., 59 Temple Place,
# Suite 330, Boston, MA 02111-1307, USA.
#
# In applying this license, CERN does not waive the privileges and immunities
# granted to it by virtue of its status as an Intergovernmental Organization or
# submit itself to any jurisdiction.

"""Models for REANA Workflow Engine Yadage."""

from __future__ import absolute_import

import enum

from sqlalchemy import Column, DateTime, Enum, ForeignKey, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from sqlalchemy_utils import UUIDType

Base = declarative_base()


class User(Base):
"""User table"""

__tablename__ = 'user'

id_ = Column(UUIDType, primary_key=True)
api_key = Column(String(length=120))
create_date = Column(DateTime, default=func.now())
email = Column(String(length=255))
last_active_date = Column(DateTime)
workflows = relationship("Workflow", backref="user")

def __repr__(self):
"""User string represetantion."""
return '<User %r>' % self.id_


class WorkflowStatus(enum.Enum):
created = 0
running = 1
finished = 2
failed = 3


class Workflow(Base):
"""Workflow table."""

__tablename__ = 'workflow'

id_ = Column(UUIDType, primary_key=True)
create_date = Column(DateTime, default=func.now())
workspace_path = Column(String(255))
status = Column(Enum(WorkflowStatus), default=WorkflowStatus.created)
owner_id = Column(UUIDType, ForeignKey('user.id_'))

def __repr__(self):
"""Workflow string represetantion."""
return '<Workflow %r>' % self.id_
87 changes: 55 additions & 32 deletions reana_workflow_engine_yadage/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,21 @@

import logging
import os
from glob import glob

import zmq
from yadage.steering_api import steering_ctx
from yadage.utils import setupbackend_fromstring

import reana_workflow_engine_yadage.celery_zeromq
from reana_workflow_engine_yadage.celeryapp import app
from reana_workflow_engine_yadage.database import load_session
from reana_workflow_engine_yadage.models import Workflow, WorkflowStatus
from reana_workflow_engine_yadage.zeromq_tracker import ZeroMQTracker

log = logging.getLogger(__name__)


available_workflow_status = (
'running',
'finished',
'failed',
)


def create_analysis_workspace(
def create_workflow_workspace(
workflow_uuid,
user_uuid='00000000-0000-0000-0000-000000000000'):
"""Create analysis and workflow workspaces.
Expand All @@ -55,7 +49,7 @@ def create_analysis_workspace(
the workflow workspace.
:param workflow_uuid: Analysis UUID.
:return: Workflow and analysis workspace path.
:return: Tuple composed of workflow and analysis workspace paths.
"""
analysis_workspace = os.path.join(
os.getenv('SHARED_VOLUME', '/data'),
Expand All @@ -68,31 +62,45 @@ def create_analysis_workspace(
return workflow_workspace, analysis_workspace


def update_analysis_status(status, analysis_workspace, message=None):
"""Update analysis status using a status file.
def update_workflow_status(db_session, workflow_uuid, status, message=None):
"""Update database workflow status.
:param workflow_uuid: UUID which represents the workflow.
:param status: String that represents the analysis status.
:param analysis_workspace: Path to the analysis to update.
:raises: IOError, ValueError
:param status_message: String that represents the message related with the
status, if there is any.
"""
try:
if status not in available_workflow_status:
raise ValueError(
'{0} is not a valid status see the available list: {1}'.format(
status, available_workflow_status))
workflow = \
db_session.query(Workflow).filter_by(id_=workflow_uuid).first()

for status_file in glob(os.path.join(analysis_workspace, '.status.*')):
os.remove(status_file)
if not workflow:
raise Exception('Workflow {0} doesn\'t exist in database.'.format(
workflow_uuid))

with open(os.path.join(analysis_workspace, '.status.{status}'.format(
status=status)), 'a') as status_file:
if message:
status_file.write(message)

except IOError as e:
workflow.status = status
db_session.commit()
except Exception as e:
log.info(
'An error occurred while updating workflow: {0}'.format(str(e)))
raise e
except ValueError as e:


def create_workflow(db_session, workflow_uuid, workspace_path,
owner_uuid='00000000-0000-0000-0000-000000000000'):
"""Create workflow on database.
:param workflow_uuid: UUID which represents the workflow.
:param workspace: String which represents the workflow workspace path.
"""
# create workflow on database
try:
workflow = Workflow(id_=workflow_uuid,
workspace_path=workspace_path, owner_id=owner_uuid)
db_session.add(workflow)
db_session.commit()
except Exception as e:
log.info('Workflow couldn\'t be added to the database: {0}'.format(e))
raise e


Expand All @@ -108,7 +116,10 @@ def run_yadage_workflow_standalone(workflow_uuid, analysis_workspace=None,
cap_backend = setupbackend_fromstring('fromenv')

workflow_workspace, analysis_workspace = \
create_analysis_workspace(workflow_uuid)
create_workflow_workspace(workflow_uuid)

db_session = load_session()
create_workflow(db_session, workflow_uuid, analysis_workspace)

if workflow_json:
# When `yadage` is launched using an already validated workflow file.
Expand All @@ -128,19 +139,31 @@ def run_yadage_workflow_standalone(workflow_uuid, analysis_workspace=None,
**workflow_kwargs) as ys:

log.info('running workflow on context: {0}'.format(locals()))
update_analysis_status('running', analysis_workspace)
update_workflow_status(
db_session,
workflow_uuid,
WorkflowStatus.running)

ys.adage_argument(additional_trackers=[
ZeroMQTracker(socket=socket, identifier=workflow_uuid)])
log.info('added zmq tracker.. ready to go..')
log.info('zmq publishing under: %s', workflow_uuid)

update_analysis_status('finished', analysis_workspace)
update_workflow_status(
db_session,
workflow_uuid,
WorkflowStatus.finished)

log.info('workflow done')
except Exception as e:
log.info('workflow failed: {0}'.format(e))
update_analysis_status('failed', analysis_workspace, message=str(e))
update_workflow_status(
db_session,
workflow_uuid,
WorkflowStatus.failed,
message=str(e))
finally:
db_session.close()


@app.task(name='tasks.run_yadage_workflow', ignore_result=True)
Expand Down
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@

install_requires = [
'celery==3.1.17',
'enum34>=1.1.6',
'SQLAlchemy>=1.1.14',
'SQLAlchemy-Utils>=0.32.18',
'pyzmq==16.0.2',
'requests==2.11.1',
'yadage-schemas==0.7.4',
Expand Down

0 comments on commit a60a6ca

Please sign in to comment.