Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
ARIA-92 Automatic operation task configuration
  • Loading branch information
tliron committed Apr 14, 2017
1 parent 8e1d059 commit a7e7826ed2d8b940b9e74ca15cb0284c39b01001
Show file tree
Hide file tree
Showing 45 changed files with 886 additions and 418 deletions.
@@ -43,14 +43,19 @@ def convert_to_dry(service):
for oper in interface.operations.itervalues():
convert_operation_to_dry(oper)

for group in service.groups.itervalues():
for interface in group.interfaces.itervalues():
for oper in interface.operations.itervalues():
convert_operation_to_dry(oper)


def convert_operation_to_dry(oper):
"""
Converts a single :class:`Operation` to run dryly.
"""

plugin = oper.plugin_specification.name \
if oper.plugin_specification is not None else None
plugin = oper.plugin.name \
if oper.plugin is not None else None
if oper.inputs is None:
oper.inputs = OrderedDict()
oper.inputs['_implementation'] = models.Parameter(name='_implementation',
@@ -48,6 +48,7 @@
'InterfaceTemplate',
'OperationTemplate',
'ArtifactTemplate',
'PluginSpecification',

# Service instance models
'Service',
@@ -71,7 +72,6 @@
'Parameter',
'Type',
'Metadata',
'PluginSpecification',

# Orchestration models
'Execution',
@@ -131,6 +131,9 @@ class OperationTemplate(aria_declarative_base, service_template.OperationTemplat
class ArtifactTemplate(aria_declarative_base, service_template.ArtifactTemplateBase):
pass

class PluginSpecification(aria_declarative_base, service_template.PluginSpecificationBase):
pass

# endregion


@@ -211,10 +214,6 @@ class Type(aria_declarative_base, service_common.TypeBase):
class Metadata(aria_declarative_base, service_common.MetadataBase):
pass


class PluginSpecification(aria_declarative_base, service_common.PluginSpecificationBase):
pass

# endregion


@@ -253,6 +252,7 @@ class Log(aria_declarative_base, orchestration.LogBase):
InterfaceTemplate,
OperationTemplate,
ArtifactTemplate,
PluginSpecification,

# Service instance models
Service,
@@ -276,7 +276,6 @@ class Log(aria_declarative_base, orchestration.LogBase):
Parameter,
Type,
Metadata,
PluginSpecification,

# Orchestration models
Execution,
@@ -67,13 +67,13 @@ class ExecutionBase(ModelMixin):
CANCELLING = 'cancelling'
FORCE_CANCELLING = 'force_cancelling'

STATES = [TERMINATED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, FORCE_CANCELLING]
END_STATES = [TERMINATED, FAILED, CANCELLED]
STATES = (TERMINATED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, FORCE_CANCELLING)
END_STATES = (TERMINATED, FAILED, CANCELLED)

VALID_TRANSITIONS = {
PENDING: [STARTED, CANCELLED],
STARTED: END_STATES + [CANCELLING],
CANCELLING: END_STATES + [FORCE_CANCELLING]
PENDING: (STARTED, CANCELLED),
STARTED: END_STATES + (CANCELLING,),
CANCELLING: END_STATES + (FORCE_CANCELLING,)
}

@orm.validates('status')
@@ -219,15 +219,52 @@ def tasks(cls):

class TaskBase(ModelMixin):
"""
A Model which represents an task
Represents the smallest unit of stateful execution in ARIA. The task state includes inputs,
outputs, as well as an atomic status, ensuring that the task can only be running once at any
given time.
Tasks may be "one shot" or may be configured to run repeatedly in the case of failure.
Tasks are often based on :class:`Operation`, and thus act on either a :class:`Node` or a
:class:`Relationship`, however this is not required.
:ivar node: The node actor (optional)
:vartype node: :class:`Node`
:ivar relationship: The relationship actor (optional)
:vartype relationship: :class:`Relationship`
:ivar plugin: The implementing plugin (set to None for default execution plugin)
:vartype plugin: :class:`Plugin`
:ivar inputs: Parameters that can be used by this task
:vartype inputs: {basestring: :class:`Parameter`}
:ivar implementation: Python path to an ``@operation`` function
:vartype implementation: basestring
:ivar max_attempts: Maximum number of retries allowed in case of failure
:vartype max_attempts: int
:ivar retry_interval: Interval between retries (in seconds)
:vartype retry_interval: int
:ivar ignore_failure: Set to True to ignore failures
:vartype ignore_failure: bool
:ivar due_at: Timestamp to start the task
:vartype due_at: datetime
:ivar execution: Assigned execution
:vartype execution: :class:`Execution`
:ivar status: Current atomic status ('pending', 'retrying', 'sent', 'started', 'success',
'failed')
:vartype status: basestring
:ivar started_at: Timestamp for when task started
:vartype started_at: datetime
:ivar ended_at: Timestamp for when task ended
:vartype ended_at: datetime
:ivar retry_count: How many retries occurred
:vartype retry_count: int
"""

__tablename__ = 'task'

__private_fields__ = ['node_fk',
'relationship_fk',
'plugin_fk',
'execution_fk',
'execution_fk'
'node_name',
'relationship_name',
'execution_name']
@@ -247,11 +284,6 @@ class TaskBase(ModelMixin):
FAILED,
)

RUNS_ON_SOURCE = 'source'
RUNS_ON_TARGET = 'target'
RUNS_ON_NODE = 'node'
RUNS_ON = (RUNS_ON_NODE, RUNS_ON_SOURCE, RUNS_ON_TARGET)

INFINITE_RETRIES = -1

@declared_attr
@@ -278,37 +310,25 @@ def execution(cls):
def inputs(cls):
return relationship.many_to_many(cls, 'parameter', prefix='inputs', dict_key='name')

status = Column(Enum(*STATES, name='status'), default=PENDING)
implementation = Column(String)
max_attempts = Column(Integer, default=1)
retry_interval = Column(Float, default=0)
ignore_failure = Column(Boolean, default=False)

# State
status = Column(Enum(*STATES, name='status'), default=PENDING)
due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow())
started_at = Column(DateTime, default=None)
ended_at = Column(DateTime, default=None)
max_attempts = Column(Integer, default=1)
retry_count = Column(Integer, default=0)
retry_interval = Column(Float, default=0)
ignore_failure = Column(Boolean, default=False)

# Operation specific fields
implementation = Column(String)
_runs_on = Column(Enum(*RUNS_ON, name='runs_on'), name='runs_on')

@property
def has_ended(self):
return self.status in [self.SUCCESS, self.FAILED]
return self.status in (self.SUCCESS, self.FAILED)

@property
def is_waiting(self):
return self.status in [self.PENDING, self.RETRYING]

@property
def runs_on(self):
if self._runs_on == self.RUNS_ON_NODE:
return self.node
elif self._runs_on == self.RUNS_ON_SOURCE:
return self.relationship.source_node # pylint: disable=no-member
elif self._runs_on == self.RUNS_ON_TARGET:
return self.relationship.target_node # pylint: disable=no-member
return None
return self.status in (self.PENDING, self.RETRYING)

@property
def actor(self):
@@ -366,12 +386,12 @@ def execution_name(cls):
# endregion

@classmethod
def for_node(cls, instance, runs_on, **kwargs):
return cls(node=instance, _runs_on=runs_on, **kwargs)
def for_node(cls, actor, **kwargs):
return cls(node=actor, **kwargs)

@classmethod
def for_relationship(cls, instance, runs_on, **kwargs):
return cls(relationship=instance, _runs_on=runs_on, **kwargs)
def for_relationship(cls, actor, **kwargs):
return cls(relationship=actor, **kwargs)

@staticmethod
def abort(message=None):
@@ -101,7 +101,8 @@ def wrap(cls, name, value, description=None):

from . import models
return models.Parameter(name=name,
type_name=formatting.full_type_name(value),
type_name=formatting.full_type_name(value)
if value is not None else None,
value=value,
description=description)

@@ -248,59 +249,3 @@ def dump(self):
console.puts('{0}: {1}'.format(
context.style.property(self.name),
context.style.literal(self.value)))


class PluginSpecificationBase(TemplateModelMixin):
"""
Plugin specification.
:ivar name: Required plugin name
:vartype name: basestring
:ivar version: Minimum plugin version
:vartype version: basestring
"""

__tablename__ = 'plugin_specification'

__private_fields__ = ['service_template_fk']

version = Column(Text, nullable=True)

# region foreign keys

@declared_attr
def service_template_fk(cls):
"""For ServiceTemplate one-to-many to PluginSpecification"""
return relationship.foreign_key('service_template', nullable=True)

# endregion

@declared_attr
def service_template(cls):
return relationship.many_to_one(cls, 'service_template')

@property
def as_raw(self):
return collections.OrderedDict((
('name', self.name),
('version', self.version)))

def coerce_values(self, container, report_issues):
pass

def instantiate(self, container):
from . import models
return models.PluginSpecification(name=self.name,
version=self.version)

def find_plugin(self, plugins):
matching_plugins = []
for plugin in plugins:
# TODO: we need to use a version comparator
if (plugin.name == self.name) and \
((self.version is None) or (plugin.package_version >= self.version)):
matching_plugins.append(plugin)
if matching_plugins:
# Return highest version of plugin
return sorted(matching_plugins, key=lambda plugin: plugin.package_version)[-1]
return None

0 comments on commit a7e7826

Please sign in to comment.