Skip to content
This repository has been archived by the owner on Jul 17, 2018. It is now read-only.

Commit

Permalink
ARIA-39-Genericize-storage-models
Browse files Browse the repository at this point in the history
  • Loading branch information
mxmrlv committed Dec 22, 2016
1 parent d143772 commit c9ecc54
Show file tree
Hide file tree
Showing 24 changed files with 929 additions and 645 deletions.
25 changes: 12 additions & 13 deletions aria/__init__.py
Expand Up @@ -62,22 +62,21 @@ def application_model_storage(api, api_kwargs=None):
Initiate model storage
"""
models = [
storage.models.Plugin,
storage.models.ProviderContext,
storage.model.Plugin,

storage.models.Blueprint,
storage.models.Deployment,
storage.models.DeploymentUpdate,
storage.models.DeploymentUpdateStep,
storage.models.DeploymentModification,
storage.model.Blueprint,
storage.model.Deployment,
storage.model.DeploymentUpdate,
storage.model.DeploymentUpdateStep,
storage.model.DeploymentModification,

storage.models.Node,
storage.models.NodeInstance,
storage.models.Relationship,
storage.models.RelationshipInstance,
storage.model.Node,
storage.model.NodeInstance,
storage.model.Relationship,
storage.model.RelationshipInstance,

storage.models.Execution,
storage.models.Task,
storage.model.Execution,
storage.model.Task,
]
# if api not in _model_storage:
return storage.ModelStorage(api, items=models, api_kwargs=api_kwargs or {})
Expand Down
10 changes: 6 additions & 4 deletions aria/orchestrator/context/workflow.py
Expand Up @@ -57,8 +57,7 @@ def _create_execution(self):
execution_cls = self.model.execution.model_cls
now = datetime.utcnow()
execution = self.model.execution.model_cls(
blueprint_id=self.blueprint.id,
deployment_id=self.deployment.id,
deployment=self.deployment,
workflow_name=self._workflow_name,
created_at=now,
status=execution_cls.PENDING,
Expand Down Expand Up @@ -86,9 +85,11 @@ def nodes(self):
"""
Iterator over nodes
"""
key = 'deployment_{0}'.format(self.model.node.model_cls.name_column_name())

return self.model.node.iter(
filters={
'deployment_id': self.deployment.id
key: getattr(self.deployment, self.deployment.name_column_name())
}
)

Expand All @@ -97,9 +98,10 @@ def node_instances(self):
"""
Iterator over node instances
"""
key = 'deployment_{0}'.format(self.model.node_instance.model_cls.name_column_name())
return self.model.node_instance.iter(
filters={
'deployment_id': self.deployment.id
key: getattr(self.deployment, self.deployment.name_column_name())
}
)

Expand Down
10 changes: 5 additions & 5 deletions aria/orchestrator/workflows/api/task.py
Expand Up @@ -18,7 +18,7 @@
"""
from uuid import uuid4

from aria.storage import models
from aria.storage import model

from ... import context
from .. import exceptions
Expand Down Expand Up @@ -75,8 +75,8 @@ def __init__(self,
:param actor: the operation host on which this operation is registered.
:param inputs: operation inputs.
"""
assert isinstance(actor, (models.NodeInstance,
models.RelationshipInstance))
assert isinstance(actor, (model.NodeInstance,
model.RelationshipInstance))
super(OperationTask, self).__init__()
self.actor = actor
self.name = '{name}.{actor.id}'.format(name=name, actor=actor)
Expand All @@ -98,7 +98,7 @@ def node_instance(cls, instance, name, inputs=None, *args, **kwargs):
:param instance: the node of which this operation belongs to.
:param name: the name of the operation.
"""
assert isinstance(instance, models.NodeInstance)
assert isinstance(instance, model.NodeInstance)
return cls._instance(instance=instance,
name=name,
operation_details=instance.node.operations[name],
Expand All @@ -118,7 +118,7 @@ def relationship_instance(cls, instance, name, operation_end, inputs=None, *args
with 'source_operations' and 'target_operations'
:param inputs any additional inputs to the operation
"""
assert isinstance(instance, models.RelationshipInstance)
assert isinstance(instance, model.RelationshipInstance)
if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]:
raise exceptions.TaskException('The operation end should be {0} or {1}'.format(
cls.TARGET_OPERATION, cls.SOURCE_OPERATION
Expand Down
6 changes: 3 additions & 3 deletions aria/orchestrator/workflows/builtin/heal.py
Expand Up @@ -34,7 +34,7 @@ def heal(ctx, graph, node_instance_id):
:return:
"""
failing_node = ctx.model.node_instance.get(node_instance_id)
host_node = ctx.model.node_instance.get(failing_node.host_id)
host_node = ctx.model.node_instance.get(failing_node.host.id)
failed_node_instance_subgraph = _get_contained_subgraph(ctx, host_node)
failed_node_instance_ids = list(n.id for n in failed_node_instance_subgraph)

Expand Down Expand Up @@ -165,8 +165,8 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances):
def _get_contained_subgraph(context, host_node_instance):
contained_instances = [node_instance
for node_instance in context.node_instances
if node_instance.host_id == host_node_instance.id and
node_instance.id != node_instance.host_id]
if node_instance.host_fk == host_node_instance.id and
node_instance.host_fk != node_instance.id]
result = [host_node_instance]

if not contained_instances:
Expand Down
17 changes: 9 additions & 8 deletions aria/orchestrator/workflows/core/engine.py
Expand Up @@ -23,7 +23,7 @@
import networkx

from aria import logger
from aria.storage import models
from aria.storage import model
from aria.orchestrator import events

from .. import exceptions
Expand Down Expand Up @@ -82,18 +82,18 @@ def cancel_execution(self):
events.on_cancelling_workflow_signal.send(self._workflow_context)

def _is_cancel(self):
return self._workflow_context.execution.status in [models.Execution.CANCELLING,
models.Execution.CANCELLED]
return self._workflow_context.execution.status in [model.Execution.CANCELLING,
model.Execution.CANCELLED]

def _executable_tasks(self):
now = datetime.utcnow()
return (task for task in self._tasks_iter()
if task.status in models.Task.WAIT_STATES and
if task.status in model.Task.WAIT_STATES and
task.due_at <= now and
not self._task_has_dependencies(task))

def _ended_tasks(self):
return (task for task in self._tasks_iter() if task.status in models.Task.END_STATES)
return (task for task in self._tasks_iter() if task.status in model.Task.END_STATES)

def _task_has_dependencies(self, task):
return len(self._execution_graph.pred.get(task.id, {})) > 0
Expand All @@ -105,18 +105,19 @@ def _tasks_iter(self):
for _, data in self._execution_graph.nodes_iter(data=True):
task = data['task']
if isinstance(task, engine_task.OperationTask):
self._workflow_context.model.task.refresh(task.model_task)
if task.model_task.status not in model.Task.END_STATES:
self._workflow_context.model.task.refresh(task.model_task)
yield task

def _handle_executable_task(self, task):
if isinstance(task, engine_task.StubTask):
task.status = models.Task.SUCCESS
task.status = model.Task.SUCCESS
else:
events.sent_task_signal.send(task)
self._executor.execute(task)

def _handle_ended_tasks(self, task):
if task.status == models.Task.FAILED and not task.ignore_failure:
if task.status == model.Task.FAILED and not task.ignore_failure:
raise exceptions.ExecutorException('Workflow failed')
else:
self._execution_graph.remove_node(task.id)
26 changes: 13 additions & 13 deletions aria/orchestrator/workflows/core/task.py
Expand Up @@ -24,7 +24,7 @@
)

from aria import logger
from aria.storage import models
from aria.storage import model
from aria.orchestrator.context import operation as operation_context

from .. import exceptions
Expand Down Expand Up @@ -66,7 +66,7 @@ class StubTask(BaseTask):

def __init__(self, *args, **kwargs):
super(StubTask, self).__init__(*args, **kwargs)
self.status = models.Task.PENDING
self.status = model.Task.PENDING
self.due_at = datetime.utcnow()


Expand Down Expand Up @@ -106,36 +106,36 @@ class OperationTask(BaseTask):
def __init__(self, api_task, *args, **kwargs):
super(OperationTask, self).__init__(id=api_task.id, **kwargs)
self._workflow_context = api_task._workflow_context
model = api_task._workflow_context.model
model_storage = api_task._workflow_context.model

base_task_model = model.task.model_cls
if isinstance(api_task.actor, models.NodeInstance):
base_task_model = model_storage.task.model_cls
if isinstance(api_task.actor, model.NodeInstance):
context_class = operation_context.NodeOperationContext
task_model_cls = base_task_model.as_node_instance
elif isinstance(api_task.actor, models.RelationshipInstance):
elif isinstance(api_task.actor, model.RelationshipInstance):
context_class = operation_context.RelationshipOperationContext
task_model_cls = base_task_model.as_relationship_instance
else:
raise RuntimeError('No operation context could be created for {actor.model_cls}'
.format(actor=api_task.actor))
plugin = api_task.plugin
plugins = model.plugin.list(filters={'package_name': plugin.get('package_name', ''),
'package_version': plugin.get('package_version', '')},
include=['id'])
plugins = model_storage.plugin.list(filters={
'package_name': plugin.get('package_name', ''),
'package_version': plugin.get('package_version', '')
})
# Validation during installation ensures that at most one plugin can exists with provided
# package_name and package_version
plugin_id = plugins[0].id if plugins else None
operation_task = task_model_cls(
name=api_task.name,
operation_mapping=api_task.operation_mapping,
instance_id=api_task.actor.id,
instance=api_task.actor,
inputs=api_task.inputs,
status=base_task_model.PENDING,
max_attempts=api_task.max_attempts,
retry_interval=api_task.retry_interval,
ignore_failure=api_task.ignore_failure,
plugin_id=plugin_id,
execution_id=self._workflow_context.execution.id
plugin=plugins[0] if plugins else None,
execution=self._workflow_context.execution
)
self._workflow_context.model.task.put(operation_task)

Expand Down
2 changes: 1 addition & 1 deletion aria/orchestrator/workflows/executor/process.py
Expand Up @@ -111,7 +111,7 @@ def execute(self, task):

env = os.environ.copy()
# See _update_env for plugin_prefix usage
if task.plugin_id and self._plugin_manager:
if task.plugin_fk and self._plugin_manager:
plugin_prefix = self._plugin_manager.get_plugin_prefix(task.plugin)
else:
plugin_prefix = None
Expand Down
12 changes: 6 additions & 6 deletions aria/storage/__init__.py
Expand Up @@ -45,19 +45,19 @@
from . import (
exceptions,
api,
structures,
structure,
core,
filesystem_rapi,
sql_mapi,
models
model
)

__all__ = (
'exceptions',
'structures',
# 'Storage',
# 'ModelStorage',
# 'ResourceStorage',
'structure',
'Storage',
'ModelStorage',
'ResourceStorage',
'filesystem_rapi',
'sql_mapi',
'api'
Expand Down

0 comments on commit c9ecc54

Please sign in to comment.