Navigation Menu

Skip to content

Commit

Permalink
logical_range_filter: support dynamic columns
Browse files Browse the repository at this point in the history
"initial" and "filtered" stages are only supported.

TODO:

  * Add tests for cache
  * Document
  • Loading branch information
kou committed Oct 31, 2017
1 parent 72ad95d commit 225ee45
Show file tree
Hide file tree
Showing 20 changed files with 954 additions and 5 deletions.
6 changes: 6 additions & 0 deletions plugins/sharding/dynamic_columns.rb
Expand Up @@ -54,6 +54,12 @@ def each(&block)
each_output(&block)
end

def empty?
@initial_contexts.empty? and
@filtered_contexts.empty? and
@output_contexts.empty?
end

def close
each do |context|
context.close
Expand Down
52 changes: 47 additions & 5 deletions plugins/sharding/logical_range_filter.rb
Expand Up @@ -71,6 +71,8 @@ def cache_key(input)
key << "#{input[:limit]}\0"
key << "#{input[:output_columns]}\0"
key << "#{input[:use_range_index]}\0"
dynamic_columns = DynamicColumns.parse(input)
key << dynamic_columns.cache_key
key
end

Expand All @@ -81,10 +83,12 @@ class ExecuteContext
attr_reader :filter
attr_reader :offset
attr_reader :limit
attr_reader :dynamic_columns
attr_accessor :current_offset
attr_accessor :current_limit
attr_reader :result_sets
attr_reader :unsorted_result_sets
attr_reader :temporary_tables
attr_reader :threshold
def initialize(input)
@input = input
Expand All @@ -94,13 +98,16 @@ def initialize(input)
@filter = @input[:filter]
@offset = (@input[:offset] || 0).to_i
@limit = (@input[:limit] || 10).to_i
@dynamic_columns = DynamicColumns.parse(@input)

@current_offset = @offset
@current_limit = @limit

@result_sets = []
@unsorted_result_sets = []

@temporary_tables = []

@threshold = compute_threshold
end

Expand All @@ -111,6 +118,12 @@ def close
@result_sets.each do |result_set|
result_set.close if result_set.temporary?
end

@dynamic_columns.close

@temporary_tables.each do |table|
table.close
end
end

private
Expand Down Expand Up @@ -179,6 +192,12 @@ def execute
if @context.result_sets.empty?
result_set = HashTable.create(:flags => ObjectFlags::WITH_SUBREC,
:key_type => first_shard.table)
@context.dynamic_columns.each_initial do |dynamic_column|
dynamic_column.apply(result_set)
end
@context.dynamic_columns.each_filtered do |dynamic_column|
dynamic_column.apply(result_set)
end
@context.result_sets << result_set
end
end
Expand All @@ -190,6 +209,8 @@ def initialize(context, shard, shard_range)
@shard = shard
@shard_range = shard_range

@target_table = @shard.table

@filter = @context.filter
@result_sets = @context.result_sets
@unsorted_result_sets = @context.unsorted_result_sets
Expand All @@ -201,7 +222,7 @@ def initialize(context, shard, shard_range)

def execute
return if @cover_type == :none
return if @shard.table.empty?
return if @target_table.empty?

shard_key = @shard.key
if shard_key.nil?
Expand All @@ -224,6 +245,14 @@ def execute
range_index = nil
end

@context.dynamic_columns.each_initial do |dynamic_column|
if @target_table == @shard.table
@target_table = @target_table.select_all
@context.temporary_tables << @target_table
end
dynamic_column.apply(@target_table)
end

execute_filter(range_index, expression_builder)
end

Expand Down Expand Up @@ -273,11 +302,15 @@ def use_range_index?(range_index, expression_builder)
__LINE__, __method__)
end

unless @context.dynamic_columns.empty?
reason = "dynamic columns are used"
return decide_use_range_index(false, reason, __LINE__, __method__)
end

current_limit = @context.current_limit
if current_limit < 0
reason = "limit is negative: <#{current_limit}>"
return decide_use_range_index(false, reason,
__LINE__, __method__)
return decide_use_range_index(false, reason, __LINE__, __method__)
end

required_n_records = @context.current_offset + current_limit
Expand Down Expand Up @@ -449,7 +482,7 @@ def execute_filter(range_index, expression_builder)
end

def filter_shard_all(range_index, expression_builder)
table = @shard.table
table = @target_table
if @filter.nil?
if table.size <= @context.current_offset
@context.current_offset -= table.size
Expand Down Expand Up @@ -593,7 +626,7 @@ def compute_max_n_unmatched_records(data_table_size, limit)
end

def filter_table
table = @shard.table
table = @target_table
create_expression(table) do |expression|
yield(expression)
result_set = table.select(expression)
Expand All @@ -602,11 +635,20 @@ def filter_table
end

def sort_result_set(result_set)
@context.temporary_tables.delete(result_set)

if result_set.empty?
result_set.close if result_set.temporary?
return
end

@context.dynamic_columns.each_filtered do |dynamic_column|
if result_set == @shard.table
result_set = result_set.select_all
end
dynamic_column.apply(result_set)
end

if result_set.size <= @context.current_offset
@context.current_offset -= result_set.size
result_set.close if result_set.temporary?
Expand Down
@@ -0,0 +1,94 @@
plugin_register sharding
[[0,0.0,0.0],true]
table_create Logs_20170315 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170315 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
table_create Logs_20170316 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170316 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170316 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
table_create Logs_20170317 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170317 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170317 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]
[[0,0.0,0.0],3]
load --table Logs_20170316
[
{"timestamp": "2017/03/16 10:00:00", "price": 530},
{"timestamp": "2017/03/16 11:00:00", "price": 520},
{"timestamp": "2017/03/16 12:00:00", "price": 110}
]
[[0,0.0,0.0],3]
load --table Logs_20170317
[
{"timestamp": "2017/03/17 20:00:00", "price": 800},
{"timestamp": "2017/03/17 21:00:00", "price": 400},
{"timestamp": "2017/03/17 22:00:00", "price": 300}
]
[[0,0.0,0.0],3]
table_create Times TABLE_PAT_KEY Time
[[0,0.0,0.0],true]
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp
[[0,0.0,0.0],true]
column_create Times logs_20170316 COLUMN_INDEX Logs_20170316 timestamp
[[0,0.0,0.0],true]
column_create Times logs_20170317 COLUMN_INDEX Logs_20170317 timestamp
[[0,0.0,0.0],true]
log_level --level debug
[[0,0.0,0.0],true]
logical_range_filter Logs --shard_key timestamp --columns[filtered_id].stage filtered --columns[filtered_id].type UInt32 --columns[filtered_id].flags COLUMN_SCALAR --columns[filtered_id].value '_id' --filter 'price <= 900' --offset 1 --limit 3 --output_columns _id,filtered_id,price
[
[
0,
0.0,
0.0
],
[
[
[
"_id",
"UInt32"
],
[
"filtered_id",
"UInt32"
],
[
"price",
"UInt32"
]
],
[
3,
3,
300
],
[
1,
1,
530
],
[
2,
2,
520
]
]
]
#|d| [logical_range_filter][select] <Logs_20170315>: dynamic columns are used
#|d| [logical_range_filter][select] <Logs_20170316>: dynamic columns are used
log_level --level notice
[[0,0.0,0.0],true]
@@ -0,0 +1,56 @@
#@on-error omit
plugin_register sharding
#@on-error default

table_create Logs_20170315 TABLE_NO_KEY
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
column_create Logs_20170315 price COLUMN_SCALAR UInt32

table_create Logs_20170316 TABLE_NO_KEY
column_create Logs_20170316 timestamp COLUMN_SCALAR Time
column_create Logs_20170316 price COLUMN_SCALAR UInt32

table_create Logs_20170317 TABLE_NO_KEY
column_create Logs_20170317 timestamp COLUMN_SCALAR Time
column_create Logs_20170317 price COLUMN_SCALAR UInt32

load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]

load --table Logs_20170316
[
{"timestamp": "2017/03/16 10:00:00", "price": 530},
{"timestamp": "2017/03/16 11:00:00", "price": 520},
{"timestamp": "2017/03/16 12:00:00", "price": 110}
]

load --table Logs_20170317
[
{"timestamp": "2017/03/17 20:00:00", "price": 800},
{"timestamp": "2017/03/17 21:00:00", "price": 400},
{"timestamp": "2017/03/17 22:00:00", "price": 300}
]

table_create Times TABLE_PAT_KEY Time
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp
column_create Times logs_20170316 COLUMN_INDEX Logs_20170316 timestamp
column_create Times logs_20170317 COLUMN_INDEX Logs_20170317 timestamp

#@add-important-log-levels debug
log_level --level debug
logical_range_filter Logs \
--shard_key timestamp \
--columns[filtered_id].stage filtered \
--columns[filtered_id].type UInt32 \
--columns[filtered_id].flags COLUMN_SCALAR \
--columns[filtered_id].value '_id' \
--filter 'price <= 900' \
--offset 1 \
--limit 3 \
--output_columns _id,filtered_id,price
log_level --level notice
#@remove-important-log-levels debug
@@ -0,0 +1,25 @@
plugin_register sharding
[[0,0.0,0.0],true]
table_create Logs_20170315 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170315 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]
[[0,0.0,0.0],3]
table_create Times TABLE_PAT_KEY Time
[[0,0.0,0.0],true]
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp
[[0,0.0,0.0],true]
log_level --level debug
[[0,0.0,0.0],true]
logical_range_filter Logs --shard_key timestamp --columns[price_with_tax].stage filtered --columns[price_with_tax].type UInt32 --columns[price_with_tax].flags COLUMN_SCALAR --columns[price_with_tax].value 'price * 1.08' --min "2017/04/01 00:00:00" --offset 1 --limit 3 --output_columns price,price_with_tax
[[0,0.0,0.0],[[["price","UInt32"],["price_with_tax","UInt32"]]]]
log_level --level notice
[[0,0.0,0.0],true]
@@ -0,0 +1,32 @@
#@on-error omit
plugin_register sharding
#@on-error default

table_create Logs_20170315 TABLE_NO_KEY
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
column_create Logs_20170315 price COLUMN_SCALAR UInt32

load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]

table_create Times TABLE_PAT_KEY Time
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp

#@add-important-log-levels debug
log_level --level debug
logical_range_filter Logs \
--shard_key timestamp \
--columns[price_with_tax].stage filtered \
--columns[price_with_tax].type UInt32 \
--columns[price_with_tax].flags COLUMN_SCALAR \
--columns[price_with_tax].value 'price * 1.08' \
--min "2017/04/01 00:00:00" \
--offset 1 \
--limit 3 \
--output_columns price,price_with_tax
log_level --level notice
#@remove-important-log-levels debug

0 comments on commit 225ee45

Please sign in to comment.