-
Notifications
You must be signed in to change notification settings - Fork 117
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
logical_range_filter: Add support for immediate un-reference of shards #1330
Conversation
0422e99
to
a3bce9a
Compare
@HashidaTKS |
Maybe they are due to my modification. I'll fix them. |
09e9ca6
to
c5b7540
Compare
They have been fixed by rebasing to |
|
||
def reverse_each_with_index(&block) | ||
each_with_index_internal(:descending, &block) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need them with the change:
diff --git a/plugins/sharding/logical_enumerator.rb b/plugins/sharding/logical_enumerator.rb
index 79e5f7003..5dde0fb8f 100644
--- a/plugins/sharding/logical_enumerator.rb
+++ b/plugins/sharding/logical_enumerator.rb
@@ -28,6 +28,7 @@ module Groonga
private
def each_internal(order)
+ return enum_for(__method__, order) unless block_given?
context = Context.instance
each_shard_with_around(order) do |prev_shard, current_shard, next_shard|
shard_range_data = current_shard.range_data
Then you can use like enumerator.each.with_index
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, I didn't know that!
@@ -63,9 +72,35 @@ def each_internal(order) | |||
end | |||
end | |||
|
|||
private |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is removed.
end | ||
end | ||
|
||
private |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is removed.
Can we test this change by adding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding Shard#first?
and Shard#last?
instead of checking index
?
diff --git a/plugins/sharding/logical_enumerator.rb b/plugins/sharding/logical_enumerator.rb
index 94e8c3437..0c2d334ee 100644
--- a/plugins/sharding/logical_enumerator.rb
+++ b/plugins/sharding/logical_enumerator.rb
@@ -6,7 +6,6 @@ module Groonga
attr_reader :target_range
attr_reader :logical_table
attr_reader :shard_key_name
- attr_reader :size
def initialize(command_name, input, options={})
@command_name = command_name
@input = input
@@ -23,20 +22,13 @@ module Groonga
each_internal(:descending, &block)
end
- def each_with_index(&block)
- each_with_index_internal(:ascending, &block)
- end
-
- def reverse_each_with_index(&block)
- each_with_index_internal(:descending, &block)
- end
-
def unref
@shards.each(&:unref)
end
private
def each_internal(order)
+ return enum_for(__method__, order) unless block_given?
context = Context.instance
each_shard_with_around(order) do |prev_shard, current_shard, next_shard|
shard_range_data = current_shard.range_data
@@ -72,35 +64,10 @@ module Groonga
end
end
- private
- def each_with_index_internal(order)
- index = 0
- each_internal(order) do |shard, shard_range|
- yield(shard, shard_range, index)
- index += 1
- end
- end
-
- private
- def compute_size
- context = Context.instance
- prefix = "#{@logical_table}_"
- size = 0
- context.database.each_name(:prefix => prefix) do |name|
- shard_range_raw = name[prefix.size..-1]
-
- if shard_range_raw =~ /\A(\d{4})(\d{2})\z/ ||
- shard_range_raw =~ /\A(\d{4})(\d{2})(\d{2})\z/
- size += 1
- end
- end
- size
- end
-
def each_shard_with_around(order)
context = Context.instance
prefix = "#{@logical_table}_"
- unref_immediately = @options[:unref_immediately] || false
+ unref_immediately = @options.fetch(:unref_immediately, false)
shards = [nil]
context.database.each_name(:prefix => prefix,
@@ -118,6 +85,11 @@ module Groonga
end
shard = Shard.new(name, @shard_key_name, shard_range_data)
+ previous_shard = shards.last
+ if previous_shard
+ shard.previous_shard = previous_shard
+ previous_shard.next_shard = shard
+ end
shards << shard
@shards << shard
next if shards.size < 3
@@ -154,7 +126,6 @@ module Groonga
end
@target_range = TargetRange.new(@command_name, @input)
- @size = compute_size
end
def compute_month_shard_max_day(year, month, next_shard_range)
@@ -167,12 +138,16 @@ module Groonga
class Shard
attr_reader :table_name, :key_name, :range_data
+ attr_accessor :previous_shard
+ attr_accessor :next_shard
def initialize(table_name, key_name, range_data)
@table_name = table_name
@key_name = key_name
@range_data = range_data
@table = nil
@key = nil
+ @previous_shard = nil
+ @next_shard = nil
end
def table
@@ -187,8 +162,12 @@ module Groonga
@key ||= Context.instance[full_key_name]
end
- def copy
- Shard.new(@table_name, @key_name, @range_data)
+ def first?
+ @previous_shard.nil?
+ end
+
+ def last?
+ @next_shard.nil?
end
def unref
diff --git a/plugins/sharding/logical_range_filter.rb b/plugins/sharding/logical_range_filter.rb
index add7355fa..811000903 100644
--- a/plugins/sharding/logical_range_filter.rb
+++ b/plugins/sharding/logical_range_filter.rb
@@ -118,7 +118,7 @@ module Groonga
def initialize(input)
@input = input
@use_range_index = parse_use_range_index(@input[:use_range_index])
- @enumerator = LogicalEnumerator.new("logical_range_filter",
+ @enumerator = LogicalEnumerator.new("logical_range_filter",
@input,
{:unref_immediately => true})
@order = parse_order(@input, :order)
@@ -262,13 +262,10 @@ module Groonga
end
def execute
- first_shard = nil
+ have_shard = false
have_result_set = false
each_shard_executor do |shard_executor|
- if first_shard.nil?
- first_shard = shard_executor.shard.copy
- @context.referred_objects << first_shard
- end
+ have_shard = true
shard_executor.execute
@context.consume_result_sets do |result_set|
have_result_set = true
@@ -276,7 +273,7 @@ module Groonga
end
break if @context.current_limit.zero?
end
- if first_shard.nil?
+ unless have_shard
enumerator = @context.enumerator
message =
"[logical_range_filter] no shard exists: " +
@@ -284,9 +281,12 @@ module Groonga
"shard_key: <#{enumerator.shard_key_name}>"
raise InvalidArgument, message
end
- unless have_result_set
+ return if have_result_set
+
+ each_shard_executor do |shard_executor|
+ shard = shard_executor.shard
result_set = HashTable.create(:flags => ObjectFlags::WITH_SUBREC,
- :key_type => first_shard.table)
+ :key_type => shard.table)
@context.push
@context.temporary_tables << result_set
targets = [[result_set]]
@@ -294,6 +294,7 @@ module Groonga
@context.dynamic_columns.apply_filtered(targets)
yield(result_set)
@context.shift
+ break
end
end
@@ -301,40 +302,29 @@ module Groonga
def each_shard_executor(&block)
enumerator = @context.enumerator
target_range = enumerator.target_range
+ if @context.order == :descending
+ each_method = :reverse_each
+ else
+ each_method = :each
+ end
if @context.need_look_ahead?
- if @context.order == :descending
- each_method = :reverse_each_with_index
- else
- each_method = :each_with_index
- end
- last_index = enumerator.size - 1
- executors = []
previous_executor = nil
- # Keep the previous data for window function
- is_first_executor = true
- enumerator.send(each_method) do |shard, shard_range, i|
+ enumerator.send(each_method) do |shard, shard_range|
@context.push
current_executor = ShardExecutor.new(@context, shard, shard_range)
if previous_executor
previous_executor.next_executor = current_executor
current_executor.previous_executor = previous_executor
yield(previous_executor)
- @context.shift unless is_first_executor
- is_first_executor &&= false
+ @context.shift unless previous_executor.shard.first?
+ end
+ if shard.last?
+ yield(current_executor)
+ @context.shift
end
previous_executor = current_executor
- if i >= last_index
- yield(previous_executor)
- @context.shift
- @context.shift unless is_first_executor
- end
- end
- else
- if @context.order == :descending
- each_method = :reverse_each
- else
- each_method = :each
end
+ else
enumerator.send(each_method) do |shard, shard_range|
@context.push
yield(ShardExecutor.new(@context, shard, shard_range))
def each_shard_with_around(order) | ||
context = Context.instance | ||
prefix = "#{@logical_table}_" | ||
unref_immediately = @options[:unref_immediately] || false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|| false
is redundant.
If :unref_immediately
isn't specified, @options[:unref_immediately]
returns nil
. nil
is one of false values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
Looks good! |
@@ -70,9 +70,24 @@ logical_range_filter Logs --shard_key timestamp --columns[day].stage filtere | |||
] | |||
] | |||
] | |||
#|-| [obj][open] <273>(<Logs_201703.timestamp>):<64>(<column:fix_size>) | |||
#|-| [obj][open] <272>(<Logs_201703>):<51>(<table:no_key>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that Logs_201703
is not closed (other tests seems to have the same issue), I am checking this.
I have added |
We don't need to add |
I intended to verify open/close at all tests under a plan to change current tests. |
@@ -70,9 +70,22 @@ logical_range_filter Logs --shard_key timestamp --min '2017/03/15 01:00:00' | |||
] | |||
] | |||
] | |||
#|-| [obj][open] <273>(<Logs_201703.timestamp>):<64>(<column:fix_size>) | |||
#|-| [obj][open] <272>(<Logs_201703>):<51>(<table:no_key>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An opened table, Logs_201703
in this case, is not closed in cases below.
- using
time_classify_xxx
with dynamic column - using
window
function with dynamic column - specifying a non existent column on
output_column
with specifyingfilter
select
command also has these issues so I guess this is not the matter of logical_range_filter
.
The commands to reproduce these are below.
common
plugin_register functions/time
table_create Logs_20170315 TABLE_NO_KEY
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
column_create Logs_20170315 price COLUMN_SCALAR UInt32
load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 11:00:00", "price": 900},
{"timestamp": "2017/03/15 12:00:00", "price": 300},
{"timestamp": "2017/03/15 13:00:00", "price": 200}
]
log_level --level dump
Case 1: using time_classify_xxx
with dynamic column
select Logs_20170315 --columns[day].stage filtered --columns[day].type Time --columns[day].flags COLUMN_SCALAR --columns[day].value 'time_classify_day(timestamp-36000000000)' --output_columns _id
[[0,0.0,0.0],[[[4],[["_id","UInt32"]],[1],[2],[3],[4]]]]
#|-| [obj][open] <272>(<Logs_20170315>):<51>(<table:no_key>)
#|-| [obj][open] <273>(<Logs_20170315.timestamp>):<64>(<column:fix_size>)
#|-| [obj][close] <273>(<Logs_20170315.timestamp>):<64>(<column:fix_size>)
Case 2: using window
function with dynamic column
select Logs_20170315 --columns[price_per_day].stage filtered --columns[price_per_day].type UInt32 --columns[price_per_day].flags COLUMN_SCALAR --columns[price_per_day].value 'window_sum(price)' --columns[price_per_day].window.group_keys 'timestamp' --output_columns _id
[[0,0.0,0.0],[[[4],[["_id","UInt32"]],[1],[2],[3],[4]]]]
#|-| [obj][open] <272>(<Logs_20170315>):<51>(<table:no_key>)
#|-| [obj][open] <274>(<Logs_20170315.price>):<64>(<column:fix_size>)
#|-| [obj][open] <273>(<Logs_20170315.timestamp>):<64>(<column:fix_size>)
#|-| [obj][close] <274>(<Logs_20170315.price>):<64>(<column:fix_size>)
#|-| [obj][close] <273>(<Logs_20170315.timestamp>):<64>(<column:fix_size>)
Case 3: specifying a non existent column with output_column
select Logs_20170315 --filter 'price >= 300' --output_columns _id,non_existent
[[0,0.0,0.0],[[[3],[["_id","UInt32"]],[1],[2],[3]]]]
#|-| [obj][open] <272>(<Logs_20170315>):<51>(<table:no_key>)
#|-| [obj][open] <274>(<Logs_20170315.price>):<64>(<column:fix_size>)
#|-| [obj][close] <274>(<Logs_20170315.price>):<64>(<column:fix_size>)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I guess this is not the matter of
logical_range_filter
.
Because of this, I will create a new PR to fix this.
I think this PR should focus on logical_range_filter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using
time_classify_xxx
with dynamic columnusing
window
function with dynamic columnspecifying a non existent column on
output_column
with specifyingfilter
These cases are also fixed by the change of db.c
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you split it to another pull request as you said?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, Logs_201703
is still not be closed at this test case. I am investigating it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you split it to another pull request as you said?
I see. I will create a new PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have rebased onto master
and updated tests and confirmed it is closed.
I have picked up tests for verifying open/close objects in the point of view below.
shards are not closed in some cases, but it seems that it is not the matter of |
8ae21bd
to
07f3a44
Compare
@enumerator = LogicalEnumerator.new("logical_range_filter", @input) | ||
@enumerator = LogicalEnumerator.new("logical_range_filter", | ||
@input, | ||
{ :unref_immediately => true }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{ :unref_immediately => true }) | |
:unref_immediately => true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have fixed it.
if shard.last? | ||
yield(current_executor) | ||
@context.shift | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this break
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have removed.
end | ||
|
||
if shards.size == 2 | ||
yield(shards[0], shards[1], nil) | ||
end | ||
if unref_immediately | ||
@shards.each(&:unref) | ||
@shards.clear |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using #unref
?
diff --git a/plugins/sharding/logical_enumerator.rb b/plugins/sharding/logical_enumerator.rb
index 0c2d334ee..86f87837e 100644
--- a/plugins/sharding/logical_enumerator.rb
+++ b/plugins/sharding/logical_enumerator.rb
@@ -24,6 +24,7 @@ module Groonga
def unref
@shards.each(&:unref)
+ @shards.clear
end
private
@@ -104,10 +105,7 @@ module Groonga
if shards.size == 2
yield(shards[0], shards[1], nil)
end
- if unref_immediately
- @shards.each(&:unref)
- @shards.clear
- end
+ unref if unref_immediately
end
def initialize_parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed to use #unref
.
Additionaly, I have added @shard.clear
to LogicalEnumerator.unref
in order to ensure executing Shard.unref
only once even if LogicalEnumerator.unref
is called multiple times.
Also, I have changed to ensure executing this unref if unref_immediately
because a caller of this function sometimes break a loop before reaching the end of this function.
@@ -88,8 +88,32 @@ logical_range_filter Logs --shard_key timestamp --columns[filtered_id].stage | |||
] | |||
] | |||
] | |||
#|-| [obj][open] <263>(<Logs_20170315.timestamp>):<64>(<column:fix_size>) | |||
#|-| [obj][open] <262>(<Logs_20170315>):<51>(<table:no_key>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And modify expected results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have rebased onto master
and updated tests.
#|-| [obj][close] <272>(<Times.logs_20170315>):<72>(<column:index>) | ||
#|-| [obj][close] <271>(<Times>):<49>(<table:pat_key>) | ||
#|-| [obj][open] <266>(<Logs_20170316.timestamp>):<64>(<column:fix_size>) | ||
#|-| [obj][open] <265>(<Logs_20170316>):<51>(<table:no_key>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have rebased onto master
and updated tests.
#|-| [obj][open] <273>(<Logs_20170315.timestamp>):<64>(<column:fix_size>) | ||
#|-| [obj][open] <272>(<Logs_20170315>):<51>(<table:no_key>) | ||
#|-| [obj][open] <276>(<Logs_20170316.timestamp>):<64>(<column:fix_size>) | ||
#|-| [obj][open] <275>(<Logs_20170316>):<51>(<table:no_key>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have rebased onto master
and updated tests and confirmed it is closed.
#|-| [obj][close] <277>(<Logs_20170316.price>):<64>(<column:fix_size>) | ||
#|-| [obj][close] <273>(<Logs_20170315.timestamp>):<64>(<column:fix_size>) | ||
#|-| [obj][open] <279>(<Logs_20170317.timestamp>):<64>(<column:fix_size>) | ||
#|-| [obj][open] <278>(<Logs_20170317>):<51>(<table:no_key>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have rebased onto master
and updated tests and confirmed it is closed.
@@ -57,3 +61,5 @@ logical_range_filter Logs \ | |||
--columns[price_per_day].window.group_keys 'day' \ | |||
--output_columns _id,day,price_per_day | |||
#@collect-query-log false | |||
#@remove-ignore-log-pattern /\A\[io\]/ | |||
#@remove-important-log-levels dump |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a new line here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have fixed it.
#|-| [obj][close] <262>(<Terms>):<49>(<table:pat_key>) | ||
#|-| [obj][open] <262>(<Terms>):<49>(<table:pat_key>) | ||
#|-| [obj][open] <272>(<Terms.users_name>):<72>(<column:index>) | ||
#|-| [obj][open] <263>(<Users>):<49>(<table:pat_key>) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have rebased onto master
and updated tests and confirmed it is closed.
However, the Users
table repeat open
and close
.
I concern about it in term of performance...
@@ -35,3 +39,6 @@ logical_range_filter Logs timestamp \ | |||
--max_border "include" \ | |||
--limit 4 | |||
#@collect-query-log false | |||
log_level --level notice | |||
#@remove-ignore-log-pattern /\A\[io\]/ | |||
#@remove-important-log-levels dump |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a new line here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have fixed it.
- Abolish size and compute_size
Change to check not only debug log but also dump log
…tests" This reverts commit 8ec6682.
Fit to ensure executing unref when unref_immediately is true Fix format
ba717ae
to
09a05ea
Compare
Thanks. |
Add support for immediate un-reference of shards.
Behavior before patched
All shards are un-referred at the end of the command.
Behavior after patched
Shards that are no longer referred are un-referred immediately.