#
#--
# Copyright (c) 2006-2008, John Mettraux, OpenWFE.org
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# . Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# . Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# . Neither the name of the "OpenWFE" nor the names of its contributors may be
# used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#++
#
#
# "made in Japan"
#
# John Mettraux at openwfe.org
#
require 'uri'
require 'openwfe/utils'
require 'openwfe/service'
require 'openwfe/logging'
require 'openwfe/omixins'
require 'openwfe/rudefinitions'
require 'openwfe/flowexpressionid'
require 'openwfe/util/observable'
require 'openwfe/expool/parser'
require 'openwfe/expool/representation'
require 'openwfe/expressions/environment'
require 'openwfe/expressions/raw'
require 'rufus/lru' # gem 'rufus-lru'
require 'rufus/verbs' # gem 'rufus-lru'
module OpenWFE
#
# The ExpressionPool stores expressions (pieces of workflow instance).
# It's the core of the workflow engine.
# It relies on an expression storage for actual persistence of the
# expressions.
#
class ExpressionPool
include ServiceMixin
include OwfeServiceLocator
include OwfeObservable
include FeiMixin
#
# The hash containing the wfid of the process instances currently
# paused.
#
attr_reader :paused_instances
#
# The constructor for the expression pool.
#
def initialize (service_name, application_context)
super()
service_init service_name, application_context
@paused_instances = {}
#@monitors = MonitorProvider.new(application_context)
@observers = {}
@stopped = false
engine_environment_id
# makes sure it's called now
end
#
# Stops this expression pool (especially its workqueue).
#
def stop
@stopped = true
onotify :stop
end
#--
# Obtains a unique monitor for an expression.
# It avoids the need for the FlowExpression instances to include
# the monitor mixin by themselves
#
#def get_monitor (fei)
# @monitors[fei]
#end
#++
#
# This method is called by the launch method. It's actually the first
# stage of that method.
# It may be interessant to use to 'validate' a launchitem and its
# process definition, as it will raise an exception in case
# of 'parameter' mismatch.
#
# There is a 'pre_launch_check' alias for this method in the
# Engine class.
#
def prepare_raw_expression (launchitem)
wfdurl = launchitem.workflow_definition_url
raise "launchitem.workflow_definition_url not set, cannot launch" \
unless wfdurl
definition = if wfdurl.match "^field:"
raise(
":definition_in_launchitem_allowed not set to true, "+
"cannot launch"
) if ac[:definition_in_launchitem_allowed] != true
wfdfield = wfdurl[6..-1]
launchitem.attributes.delete wfdfield
else
read_uri wfdurl
end
raise "didn't find process definition at '#{wfdurl}'" \
unless definition
raw_expression = build_raw_expression launchitem, definition
raw_expression.check_parameters launchitem
#
# will raise an exception if there are requirements
# and one of them is not met
raw_expression
end
#
# Instantiates a workflow definition and launches it.
#
# This method call will return immediately, it could even return
# before the actual launch is completely over.
#
# Returns the FlowExpressionId instance of the root expression of
# the newly launched flow.
#
def launch (launchitem, options={})
#
# prepare raw expression
raw_expression = prepare_raw_expression launchitem
#
# will raise an exception if there are requirements
# and one of them is not met
raw_expression = wrap_in_schedule(raw_expression, options) \
if options.size > 0
raw_expression.new_environment
#
# as this expression is the root of a new process instance,
# it has to have an environment for all the variables of
# the process instance
fei = raw_expression.fei
#
# apply prepared raw expression
wi = build_workitem launchitem
onotify :launch, fei, launchitem
apply raw_expression, wi
fei
end
#
# This is the first stage of the tlaunch_child() method.
#
# (it's used by the concurrent iterator when preparing all its
# iteration children)
#
def tprepare_child (
parent_exp, template, sub_id, register_child, vars)
return fetch_expression(template) \
if template.is_a?(FlowExpressionId)
fei = parent_exp.fei.dup
fei.expression_name = template.first
fei.expression_id = "#{fei.expid}.#{sub_id}"
raw_exp = RawExpression.new_raw(
fei, nil, nil, @application_context, template)
raw_exp.parent_id = parent_exp.fei
if vars
raw_exp.new_environment vars
else
raw_exp.environment_id = parent_exp.environment_id
end
#workitem.fei = raw_exp.fei
# done in do_apply...
if register_child
(parent_exp.children ||= []) << raw_exp.fei
update raw_exp
end
raw_exp
end
#
# Launches the given template (sexp) as the child of its
# parent expression.
#
# If the last, register_child, is set to true, this method will
# take care of adding the new child to the parent expression.
#
# (used by 'cron' and more)
#
def tlaunch_child (
parent_exp, template, sub_id, workitem, register_child, vars=nil)
raw_exp = tprepare_child(
parent_exp, template, sub_id, register_child, vars)
onotify :tlaunch_child, raw_exp.fei, workitem
apply raw_exp, workitem
raw_exp.fei
end
#
# Launches a template, but makes sure the new expression has no
# parent.
#
# (used by 'listen')
#
def tlaunch_orphan (
firing_exp, template, sub_id, workitem, register_child)
fei = firing_exp.fei.dup
fei.expression_id = "#{fei.expid}.#{sub_id}"
fei.expression_name = template.first
raw_exp = RawExpression.new_raw(
fei, nil, nil, @application_context, template)
#raw_exp.parent_id = GONE_PARENT_ID
raw_exp.parent_id = nil
# it's an orphan, no parent
raw_exp.environment_id = firing_exp.environment_id
# tapping anyway into the firer's environment
(firing_exp.children ||= []) << raw_exp.fei \
if register_child
onotify :tlaunch_orphan, raw_exp.fei, workitem
apply raw_exp, workitem
raw_exp.fei
end
#
# Launches a subprocess.
# The resulting wfid is a subid for the wfid of the firing expression.
#
# (used by 'subprocess')
#
def launch_subprocess (
firing_exp, template, forget, workitem, params)
raw_exp = if template.is_a?(FlowExpressionId)
fetch_expression template
elsif template.is_a?(RawExpression)
template.application_context = @application_context
template
else # probably an URI
build_raw_expression nil, template
end
raw_exp = raw_exp.dup
raw_exp.fei = raw_exp.fei.dup
if forget
raw_exp.parent_id = nil
else
raw_exp.parent_id = firing_exp.fei
end
#raw_exp.fei.wfid = get_wfid_generator.generate
raw_exp.fei.wfid =
"#{firing_exp.fei.wfid}.#{firing_exp.get_next_sub_id}"
raw_exp.new_environment params
raw_exp.store_itself
apply raw_exp, workitem
raw_exp.fei
end
#
# Applies a given expression (id or expression)
#
def apply (exp_or_fei, workitem)
get_workqueue.push(
self, :do_apply_reply, :apply, exp_or_fei, workitem)
end
#
# Replies to a given expression
#
def reply (exp_or_fei, workitem)
get_workqueue.push(
self, :do_apply_reply, :reply, exp_or_fei, workitem)
end
#
# Cancels the given expression.
# The param might be an expression instance or a FlowExpressionId
# instance.
#
def cancel (exp)
exp, fei = fetch exp
unless exp
linfo { "cancel() cannot cancel missing #{fei.to_debug_s}" }
return nil
end
ldebug { "cancel() for #{fei.to_debug_s}" }
onotify :cancel, exp
wi = exp.cancel
remove exp
wi
end
#
# Cancels the given expression and makes sure to resume the flow
# if the expression or one of its children were active.
#
# If the cancelled branch was not active, this method will take
# care of removing the cancelled expression from the parent
# expression.
#
def cancel_expression (exp)
exp = fetch_expression exp
wi = cancel exp
# ( remember that in case of error, no wi could get returned...)
if wi
reply_to_parent exp, wi, false
elsif exp.parent_id
parent_exp = fetch_expression exp.parent_id
parent_exp.remove_child(exp.fei) if parent_exp
end
end
#
# Given any expression of a process, cancels the complete process
# instance.
#
def cancel_process (exp_or_wfid)
wfid = extract_wfid exp_or_wfid, false
ldebug { "cancel_process() '#{wfid}'" }
root = fetch_root wfid
raise "no process to cancel '#{wfid}'" unless root
cancel root
end
alias :cancel_flow :cancel_process
#
# Forgets the given expression (make it an orphan).
#
def forget (parent_exp, exp)
exp, fei = fetch exp
#ldebug { "forget() forgetting #{fei}" }
return if not exp
onotify :forget, exp
parent_exp.children.delete(fei)
#exp.parent_id = GONE_PARENT_ID
exp.parent_id = nil
exp.dup_environment
exp.store_itself()
ldebug { "forget() forgot #{fei}" }
end
#
# Replies to the parent of the given expression.
#
def reply_to_parent (exp, workitem, remove=true)
ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" }
workitem.last_expression_id = exp.fei
onotify :reply_to_parent, exp, workitem
if remove
remove exp
#
# remove the expression itself
exp.clean_children
#
# remove all the children of the expression
end
#
# manage tag, have to remove it so it can get 'redone' or 'undone'
# (preventing abuse)
tagname = exp.attributes["tag"] if exp.attributes
exp.delete_variable(tagname) if tagname
#
# has raw_expression been updated ?
track_child_raw_representation exp
#
# flow terminated ?
#if not exp.parent_id
if (not exp.parent_id) and (exp.fei.expid == '0')
ldebug do
"reply_to_parent() process " +
"#{exp.fei.workflow_instance_id} terminated"
end
onotify :terminate, exp