Navigation Menu

Skip to content

Commit

Permalink
Count total number of source records asynchronously.
Browse files Browse the repository at this point in the history
Because Cool.io's loop is already running and synchornous API tries to run Cool.io's loop again.
  • Loading branch information
piroor committed Apr 14, 2015
1 parent f1cd9ff commit 52f9bbe
Showing 1 changed file with 22 additions and 14 deletions.
36 changes: 22 additions & 14 deletions lib/droonga/plugins/system/absorb_data.rb
Expand Up @@ -61,7 +61,10 @@ def handle
@start_time = Time.now

begin
@total_n_source_records = count_total_n_source_records
@total_n_source_records = nil
get_total_n_source_records do |count|
@total_n_source_records = count
end
dumper_error_message = dumper.run do |message|
message["dataset"] = current_dataset
@messenger.forward(message,
Expand Down Expand Up @@ -176,9 +179,9 @@ def source_dataset
Catalog::Dataset::DEFAULT_NAME
end

def source_tables
response = source_client.request("dataset" => source_dataset,
"type" => "table_list")
def get_source_tables(&block)
source_client.request("dataset" => source_dataset,
"type" => "table_list") do |response|

unless response
raise EmptyResponse.new("table_list returns nil response")
Expand All @@ -190,9 +193,12 @@ def source_tables
message_body = response["body"]
body = message_body[1]
tables = body[1..-1]
tables.collect do |table|
table_names = tables.collect do |table|
table[1]
end
yield(table_names)

end
end

def source_client_options
Expand All @@ -216,7 +222,8 @@ def source_client
@source_client ||= Droonga::Client.new(source_client_options)
end

def count_total_n_source_records
def get_total_n_source_records(&block)
get_source_tables do |source_tables|
queries = {}
source_tables.each do |table|
queries["n_records_of_#{table}"] = {
Expand All @@ -226,13 +233,12 @@ def count_total_n_source_records
},
}
end
response = source_client.request("dataset" => source_dataset,
"type" => "search",
"body" => {
"timeout" => 10,
"queries" => queries,
})

source_client.request("dataset" => source_dataset,
"type" => "search",
"body" => {
"timeout" => 10,
"queries" => queries,
}) do |response|
unless response
raise EmptyResponse.new("search returns nil response")
end
Expand All @@ -244,7 +250,9 @@ def count_total_n_source_records
response["body"].each do |query_name, result|
n_records += result["count"]
end
n_records
yield(n_records)
end
end
end

def log_tag
Expand Down

0 comments on commit 52f9bbe

Please sign in to comment.