Skip to content

Commit

Permalink
logical_range_filter: implement roughly based on filter and sort
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Feb 9, 2015
1 parent 664aeaf commit a0de830
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 18 deletions.
146 changes: 141 additions & 5 deletions plugins/sharding/logical_range_filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,26 @@ class LogicalRangeFilterCommand < Command
def run_body(input)
enumerator = LogicalEnumerator.new("logical_range_filter", input)
filter = input[:filter]
offset = input[:offset] || 0
limit = input[:limit] || 10
offset = (input[:offset] || 0).to_i
limit = (input[:limit] || 10).to_i
output_columns = input[:output_columns] || "_key, *"

result_sets = []
enumerator.each do |table, shard_key, shard_range, cover_type|
# TODO: result_sets << result_set
n_records = 0
enumerator.each do |table, shard_key, shard_range|
result_set = filter_shard(table, filter,
shard_key, shard_range,
enumerator.target_range)
next if result_set.nil?
if result_set.empty?
result_set.close if result_set.temporary?
next
end
result_sets << result_set
n_records += result_set.size
break if n_records >= offset + limit
end

if result_sets.empty?
n_elements = 0
else
Expand All @@ -35,14 +47,138 @@ def run_body(input)
n_elements += result_set.size
end
end

sort_keys = [
{
:key => enumerator.shard_key_name,
:order => :ascending,
},
]
current_offset = offset
current_limit = limit
writer.array("RESULTSET", n_elements) do
first_result_set = result_sets.first
if first_result_set
writer.write_table_columns(first_result_set, output_columns)
end
result_sets.each do |result_set|
writer.write_table_records(result_set, output_columns)
if result_set.size <= current_offset
current_offset -= result_set.size
next
end
sorted_result_set = result_set.sort(sort_keys,
:offset => current_offset,
:limit => current_limit)
writer.write_table_records(sorted_result_set, output_columns)
current_limit -= sorted_result_set.size
sorted_result_set.close
end
end

result_sets.each do |result_set|
result_set.close if result_set.temporary?
end
end

def filter_shard(table, filter, shard_key, shard_range, target_range)
cover_type = target_range.cover_type(shard_range)
return nil if cover_type == :none

if cover_type == :all
if filter.nil?
return table
else
return filter_table(table, filter)
end
end

use_range_index = false
range_index = nil
# TODO
# if filter.nil?
# index_info = shard_key.find_index(Operator::LESS)
# if index_info
# range_index = index_info.index
# use_range_index = true
# end
# end

case cover_type
when :partial_min
if use_range_index
# TODO
# count_n_records_in_range(range_index,
# target_range.min, target_range.min_border,
# nil, nil)
else
filter_table(table, filter) do |expression|
expression.append_object(shard_key, Operator::PUSH, 1)
expression.append_operator(Operator::GET_VALUE, 1)
expression.append_constant(target_range.min, Operator::PUSH, 1)
if target_range.min_border == :include
expression.append_operator(Operator::GREATER_EQUAL, 2)
else
expression.append_operator(Operator::GREATER, 2)
end
end
end
when :partial_max
if use_range_index
# TODO
# count_n_records_in_range(range_index,
# nil, nil,
# target_range.max, target_range.max_border)
else
filter_table(table, filter) do |expression|
expression.append_object(shard_key, Operator::PUSH, 1)
expression.append_operator(Operator::GET_VALUE, 1)
expression.append_constant(target_range.max, Operator::PUSH, 1)
if target_range.max_border == :include
expression.append_operator(Operator::LESS_EQUAL, 2)
else
expression.append_operator(Operator::LESS, 2)
end
end
end
when :partial_min_and_max
if use_range_index
# TODO
# count_n_records_in_range(range_index,
# target_range.min, target_range.min_border,
# target_range.max, target_range.max_border)
else
filter_table(table, filter) do |expression|
expression.append_object(context["between"], Operator::PUSH, 1)
expression.append_object(shard_key, Operator::PUSH, 1)
expression.append_operator(Operator::GET_VALUE, 1)
expression.append_constant(target_range.min, Operator::PUSH, 1)
expression.append_constant(target_range.min_border,
Operator::PUSH, 1)
expression.append_constant(target_range.max, Operator::PUSH, 1)
expression.append_constant(target_range.max_border,
Operator::PUSH, 1)
expression.append_operator(Operator::CALL, 5)
end
end
end
end

def filter_table(table, filter)
expression = nil
begin
expression = Expression.create(table)
if block_given?
yield(expression)
if filter
expression.parse(filter)
expression.append_operator(Operator::AND, 2)
end
else
expression.parse(filter)
end
table.select(expression)
ensure
expression.close if expression
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,79 @@ table_create Logs_20150203 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20150203 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20150203 memo COLUMN_SCALAR ShortText
[[0,0.0,0.0],true]
table_create Logs_20150204 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20150204 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20150204 memo COLUMN_SCALAR ShortText
[[0,0.0,0.0],true]
table_create Logs_20150205 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20150205 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20150205 memo COLUMN_SCALAR ShortText
[[0,0.0,0.0],true]
load --table Logs_20150203
[
{"timestamp": "2015-02-03 12:49:00"}
{"timestamp": "2015-02-03 12:49:00", "memo": "2015-02-03 12:49:00"}
]
[[0,0.0,0.0],1]
load --table Logs_20150204
[
{"timestamp": "2015-02-04 13:49:00"},
{"timestamp": "2015-02-04 13:50:00"}
{"timestamp": "2015-02-04 13:49:00", "memo": "2015-02-04 13:49:00"},
{"timestamp": "2015-02-04 13:50:00", "memo": "2015-02-04 13:50:00"}
]
[[0,0.0,0.0],2]
load --table Logs_20150205
[
{"timestamp": "2015-02-05 13:49:00"},
{"timestamp": "2015-02-05 13:50:00"},
{"timestamp": "2015-02-05 13:51:00"}
{"timestamp": "2015-02-05 13:49:00", "memo": "2015-02-05 13:49:00"},
{"timestamp": "2015-02-05 13:50:00", "memo": "2015-02-05 13:50:00"},
{"timestamp": "2015-02-05 13:51:00", "memo": "2015-02-05 13:51:00"}
]
[[0,0.0,0.0],3]
logical_range_filter Logs timestamp
[[0,0.0,0.0],[]]
[
[
0,
0.0,
0.0
],
[
[
[
"memo",
"ShortText"
],
[
"timestamp",
"Time"
]
],
[
"2015-02-03 12:49:00",
1422935340.0
],
[
"2015-02-04 13:49:00",
1423025340.0
],
[
"2015-02-04 13:50:00",
1423025400.0
],
[
"2015-02-05 13:49:00",
1423111740.0
],
[
"2015-02-05 13:50:00",
1423111800.0
],
[
"2015-02-05 13:51:00",
1423111860.0
]
]
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,32 @@ register sharding

table_create Logs_20150203 TABLE_NO_KEY
column_create Logs_20150203 timestamp COLUMN_SCALAR Time
column_create Logs_20150203 memo COLUMN_SCALAR ShortText

table_create Logs_20150204 TABLE_NO_KEY
column_create Logs_20150204 timestamp COLUMN_SCALAR Time
column_create Logs_20150204 memo COLUMN_SCALAR ShortText

table_create Logs_20150205 TABLE_NO_KEY
column_create Logs_20150205 timestamp COLUMN_SCALAR Time
column_create Logs_20150205 memo COLUMN_SCALAR ShortText

load --table Logs_20150203
[
{"timestamp": "2015-02-03 12:49:00"}
{"timestamp": "2015-02-03 12:49:00", "memo": "2015-02-03 12:49:00"}
]

load --table Logs_20150204
[
{"timestamp": "2015-02-04 13:49:00"},
{"timestamp": "2015-02-04 13:50:00"}
{"timestamp": "2015-02-04 13:49:00", "memo": "2015-02-04 13:49:00"},
{"timestamp": "2015-02-04 13:50:00", "memo": "2015-02-04 13:50:00"}
]

load --table Logs_20150205
[
{"timestamp": "2015-02-05 13:49:00"},
{"timestamp": "2015-02-05 13:50:00"},
{"timestamp": "2015-02-05 13:51:00"}
{"timestamp": "2015-02-05 13:49:00", "memo": "2015-02-05 13:49:00"},
{"timestamp": "2015-02-05 13:50:00", "memo": "2015-02-05 13:50:00"},
{"timestamp": "2015-02-05 13:51:00", "memo": "2015-02-05 13:51:00"}
]

logical_range_filter Logs timestamp

0 comments on commit a0de830

Please sign in to comment.