Navigation Menu

Skip to content

Commit

Permalink
Report progress based on interval seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
piroor committed Apr 10, 2015
1 parent 3842ee8 commit c64a1ae
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
3 changes: 3 additions & 0 deletions lib/droonga/data_absorber.rb
Expand Up @@ -23,6 +23,7 @@ class DataAbsorber
include Loggable

DEFAULT_MESSAGES_PER_SECOND = 100
DEFAULT_PROGRESS_INTERVAL_SECONDS = 3

TIME_UNKNOWN = -1
PROGRESS_UNKNOWN = -1
Expand All @@ -42,6 +43,7 @@ def initialize(params)
@params = params

@messages_per_second = @params[:messages_per_second] || DEFAULT_MESSAGES_PER_SECOND
@progress_interval_seconds = @params[:progress_interval_seconds] || DEFAULT_PROGRESS_INTERVAL_SECONDS

@dataset = @params[:dataset] || CatalogGenerator::DEFAULT_DATASET
@port = @params[:port] || CatalogGenerator::DEFAULT_PORT
Expand Down Expand Up @@ -69,6 +71,7 @@ def run
"tag" => @tag,
"dataset" => @dataset,
"messagesPerSecond" => @messages_per_second,
"progressIntervalSeconds" => @progress_interval_seconds,
},
}
destination_client.subscribe(absorb_message) do |message|
Expand Down
23 changes: 19 additions & 4 deletions lib/droonga/plugins/system/absorb_data.rb
Expand Up @@ -28,6 +28,8 @@ class AbsorbDataHandler < AsyncCommand::Handler
action.synchronous = true

DEFAULT_MESSAGES_PER_SECOND = 100
DEFAULT_PROGRESS_INTERVAL_SECONDS = 3
MIN_PROGRESS_INTERVAL_SECONDS = 1

class MissingHostParameter < BadRequest
def initialize
Expand Down Expand Up @@ -61,15 +63,18 @@ def handle
serf = Serf.new(my_node_name)
serf.set_tag("absorbing", true)

@start_time = Time.now

begin
@total_n_source_records = count_total_n_source_records
@n_processed_messages = 0
dumper_error_message = dumper.run do |message|
@messenger.forward(message,
"to" => my_node_name,
"type" => message["type"])
@n_processed_messages += 1
report_progress
elapsed_time = (Time.now - @start_time).to_i
if (elapsed_time % progress_interval_seconds).zero?
report_progress
end
end
report_progress
rescue Exception => exception
Expand Down Expand Up @@ -99,7 +104,6 @@ def dumper_params
end

def report_progress
return unless (@n_processed_messages % 100).zero?
forward("#{prefix}.progress",
"nProcessedMessages" => @n_processed_messages,
"percentage" => progress_percentage,
Expand Down Expand Up @@ -135,6 +139,17 @@ def my_node_name
@messenger.engine_state.name
end

def prepare_progress_interval_seconds
interval_seconds = @request.request["progressIntervalSeconds"]
|| DEFAULT_PROGRESS_INTERVAL_SECONDS
interval_seconds = interval_seconds.to_i
[interval_seconds, MIN_PROGRESS_INTERVAL_SECONDS].max
end

def progress_interval_seconds
@progress_interval_seconds ||= prepare_progress_interval_seconds
end

def source_host
@source_host ||= @request.request["host"]
end
Expand Down

0 comments on commit c64a1ae

Please sign in to comment.