Navigation Menu

Skip to content

Commit

Permalink
Use "timeout" parameter given as a part of request messages
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 6, 2015
1 parent 18c0e16 commit 43bf6f2
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
24 changes: 23 additions & 1 deletion lib/droonga/dispatcher.rb
Expand Up @@ -172,7 +172,9 @@ def process_internal_message(message)
dataset = message["dataset"] || @message["dataset"]
collector_runner = @collector_runners[dataset]
session = session_planner.create_session(id, self, collector_runner)
@engine_state.register_session(id, session)
timeout_seconds = message["timeout_seconds"] || nil
@engine_state.register_session(id, session,
:timeout_seconds => timeout_seconds)
else
logger.error("no steps error", :id => id, :message => message)
return
Expand Down Expand Up @@ -208,7 +210,13 @@ def dispatch_steps(steps)
id = @engine_state.generate_id

destinations = []
timeout_seconds = nil
steps.each do |step|
calculated_timeout_seconds = timeout_seconds_from_step(step)
if calculated_timeout_seconds
timeout_seconds = calculated_timeout_seconds
end

dataset = @catalog.dataset(step["dataset"])
if dataset
if write_step?(step)
Expand Down Expand Up @@ -236,6 +244,7 @@ def dispatch_steps(steps)
else
step["routes"] ||= [id]
end

destinations += step["routes"].collect do |route|
internal_farm_path(route)
end
Expand All @@ -244,6 +253,7 @@ def dispatch_steps(steps)
dispatch_message = {
"id" => id,
"steps" => steps,
"timeout_seconds" => timeout_seconds,
}
destinations.uniq.each do |destination|
dispatch(dispatch_message, destination)
Expand Down Expand Up @@ -293,6 +303,18 @@ def write_step?(step)
step_definition.write?
end

def timeout_seconds_from_step(step)
return nil unless step["dataset"]

step_runner = @step_runners[step["dataset"]]
return nil unless step_runner

step_definition = step_runner.find(step["command"])
return nil unless step_definition

step_definition.timeout_seconds_for_step(step)
end

private
def internal_route(route)
@engine_state.internal_route(route)
Expand Down
6 changes: 4 additions & 2 deletions lib/droonga/engine_state.rb
Expand Up @@ -124,10 +124,12 @@ def find_session(id)
@sessions[id]
end

def register_session(id, session)
def register_session(id, session, options={})
@sessions[id] = session
logger.trace("new session #{id} is registered. rest sessions=#{@sessions.size}")
session.set_timeout(@loop, DEFAULT_SESSION_TIMEOUT_SECONDS) do

timeout_seconds = options[:timeout_seconds] || DEFAULT_SESSION_TIMEOUT_SECONDS
session.set_timeout(@loop, timeout_seconds) do
logger.trace("session #{id} is timed out!")
unregister_session(id)
end
Expand Down
18 changes: 18 additions & 0 deletions lib/droonga/single_step_definition.rb
Expand Up @@ -19,6 +19,7 @@ class SingleStepDefinition
attr_accessor :handler
attr_accessor :collector
attr_writer :write
attr_writer :timeout_seconds_calculator
attr_accessor :inputs
attr_accessor :output
def initialize(plugin_module)
Expand All @@ -27,6 +28,15 @@ def initialize(plugin_module)
@handler = nil
@collector = nil
@write = false
@timeout_seconds_calculator = lambda do |step|
if step["timeout"]
return step["timeout"]
elsif step["body"]
return step["body"]["timeout"] if step["body"]["timeout"]
end
nil
end

@inputs = []
@output = {}
yield(self)
Expand All @@ -36,6 +46,14 @@ def write?
@write
end

def timeout_seconds_for_step(step)
if @timeout_seconds_calculator
@timeout_seconds_calculator.call(step)
else
nil
end
end

def handler_class
resolve_class(@handler)
end
Expand Down

0 comments on commit 43bf6f2

Please sign in to comment.