Skip to content

Commit

Permalink
logical_select: log load status as query log
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Mar 18, 2019
1 parent e089ffd commit 54f43f9
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 31 deletions.
83 changes: 53 additions & 30 deletions plugins/sharding/logical_select.rb
Expand Up @@ -39,6 +39,8 @@ def run_body(input)
executor = Executor.new(context)
executor.execute

load_records(context)

n_results = 1
n_plain_drilldowns = context.plain_drilldown.n_result_sets
n_labeled_drilldowns = context.labeled_drilldowns.n_result_sets
Expand Down Expand Up @@ -112,6 +114,57 @@ def cache_key(input)
key
end

def resolve_target_table_name(result_set)
target_table = result_set
while target_table.domain_id
domain = Context.instance[target_table.domain_id]
break unless domain.is_a?(Table)
target_table = domain
end
target_table.name
end

def load_records(context)
load_table = context.load_table
load_columns = context.load_columns
load_values = context.load_values
return if load_table.nil?
return if load_columns.nil?
return if load_values.nil?

n_loaded_records = 0
to = Context.instance[load_table]
to_columns = []
begin
load_columns.split(/\s*,\s*/).each do |name|
to_columns << to.find_column(name)
end
context.result_sets.each do |result_set|
output_columns = result_set.parse_output_columns(load_values)
begin
output_columns.apply(to_columns)
ensure
output_columns.close
end
n_sub_loaded_records = result_set.size
query_logger.log(:size,
":",
"load(#{n_sub_loaded_records})" +
"[#{resolve_target_table_name(result_set)}]: " +
"[#{load_table}][#{to.size}]")
n_loaded_records += n_sub_loaded_records
end
ensure
to_columns.each do |column|
column.close if column.is_a?(Accessor)
end
end
query_logger.log(:size,
":",
"load(#{n_loaded_records}): " +
"[#{load_table}][#{to.size}]")
end

def write_records(writer, context)
result_sets = context.result_sets

Expand Down Expand Up @@ -542,7 +595,6 @@ def initialize(context)

def execute
execute_search
execute_load_table
if @context.plain_drilldown.have_keys?
execute_plain_drilldown
elsif @context.labeled_drilldowns.have_keys?
Expand Down Expand Up @@ -580,35 +632,6 @@ def execute_search
end
end

def execute_load_table
load_table = @context.load_table
load_columns = @context.load_columns
load_values = @context.load_values
return if load_table.nil?
return if load_columns.nil?
return if load_values.nil?

to = Context.instance[load_table]
to_columns = []
begin
load_columns.split(/\s*,\s*/).each do |name|
to_columns << to.find_column(name)
end
@context.result_sets.each do |result_set|
output_columns = result_set.parse_output_columns(load_values)
begin
output_columns.apply(to_columns)
ensure
output_columns.close
end
end
ensure
to_columns.each do |column|
column.close if column.is_a?(Accessor)
end
end
end

def execute_plain_drilldown
drilldown = @context.plain_drilldown
group_result = TableGroupResult.new
Expand Down
Expand Up @@ -34,7 +34,7 @@ load --table Logs_20150204
}
]
[[0,0.0,0.0],1]
logical_select --logical_table Logs --shard_key timestamp --load_table Logs --load_columns "_key, original_id, timestamp_text" --load_values "cast_loose(ShortText, timestamp, '') + ':' + _id, _id, timestamp" --limit 0
logical_select --logical_table Logs --shard_key timestamp --filter true --load_table Logs --load_columns "_key, original_id, timestamp_text" --load_values "cast_loose(ShortText, timestamp, '') + ':' + _id, _id, timestamp" --limit 0
[
[
0,
Expand Down Expand Up @@ -63,6 +63,16 @@ logical_select --logical_table Logs --shard_key timestamp --load_table Log
]
]
]
#>logical_select --filter "true" --limit "0" --load_columns "_key, original_id, timestamp_text" --load_table "Logs" --load_values "cast_loose(ShortText, timestamp, '') + ':' + _id, _id, timestamp" --logical_table "Logs" --shard_key "timestamp"
#:000000000000000 filter(2): true
#:000000000000000 select(2)[Logs_20150203]
#:000000000000000 filter(1): true
#:000000000000000 select(1)[Logs_20150204]
#:000000000000000 load(2)[Logs_20150203]: [Logs][2]
#:000000000000000 load(1)[Logs_20150204]: [Logs][3]
#:000000000000000 load(3): [Logs][3]
#:000000000000000 output(0)
#<000000000000000 rc=0
select --table Logs
[
[
Expand Down
Expand Up @@ -32,12 +32,15 @@ load --table Logs_20150204
}
]

#@collect-query-log true
logical_select \
--logical_table Logs \
--shard_key timestamp \
--filter true \
--load_table Logs \
--load_columns "_key, original_id, timestamp_text" \
--load_values "cast_loose(ShortText, timestamp, '') + ':' + _id, _id, timestamp" \
--limit 0
#@collect-query-log false

select --table Logs

0 comments on commit 54f43f9

Please sign in to comment.