Navigation Menu

Skip to content

Commit

Permalink
logical_count: add post_filter and filtered stage dynamic columns
Browse files Browse the repository at this point in the history
  • Loading branch information
kou committed Feb 19, 2018
1 parent 363414c commit 010f24b
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 5 deletions.
41 changes: 36 additions & 5 deletions plugins/sharding/logical_count.rb
Expand Up @@ -10,6 +10,7 @@ class LogicalCountCommand < Command
"max",
"max_border",
"filter",
"post_filter",
])

def run_body(input)
Expand Down Expand Up @@ -43,6 +44,7 @@ def cache_key(input)
key << "#{input[:max]}\0"
key << "#{input[:max_border]}\0"
key << "#{input[:filter]}\0"
key << "#{input[:post_filter]}\0"
dynamic_columns = DynamicColumns.parse(input)
key << dynamic_columns.cache_key
key
Expand All @@ -52,6 +54,7 @@ class Counter
def initialize(input, target_range)
@logger = Context.instance.logger
@filter = input[:filter]
@post_filter = input[:post_filter]
@dynamic_columns = DynamicColumns.parse(input)
@target_range = target_range
end
Expand All @@ -72,15 +75,15 @@ def count(shard, shard_range)
if cover_type == :all
log_use_range_index(false, table_name, "covered",
__LINE__, __method__)
if @filter
if @filter or @post_filter
return filtered_count_n_records(table, shard_key, cover_type)
else
return table.size
end
end

range_index = nil
if @filter
if @filter or @post_filter
log_use_range_index(false, table_name, "need filter",
__LINE__, __method__)
else
Expand Down Expand Up @@ -121,7 +124,7 @@ def log_use_range_index(use, table_name, reason, line, method)

def prepare_table(shard)
table = shard.table
return yield(table) if @filter.nil?
return yield(table) if @filter.nil? and @post_filter.nil?

@dynamic_columns.each_initial do |dynamic_column|
if table == shard.table
Expand Down Expand Up @@ -156,14 +159,42 @@ def filtered_count_n_records(table, shard_key, cover_type)
when :partial_min_and_max
expression_builder.build_partial_min_and_max(expression)
end
filtered_table = table.select(expression)
filtered_table.size
if cover_type == :all and @filter.nil?
# TODO: We can drop needless select when filtered stage dynamic
# doesn't exist.
filtered_table = table.select_all
else
filtered_table = table.select(expression)
end
if @post_filter
post_filtered_count_n_records(filtered_table)
else
filtered_table.size
end
ensure
filtered_table.close if filtered_table
expression.close if expression
end
end

def post_filtered_count_n_records(filtered_table)
@dynamic_columns.each_filtered do |dynamic_column|
dynamic_column.apply(filtered_table)
end

expression = nil
post_filtered_table = nil
begin
expression = Expression.create(filtered_table)
expression.parse(@post_filter)
post_filtered_table = filtered_table.select(expression)
post_filtered_table.size
ensure
post_filtered_table.close if post_filtered_table
expression.close if expression
end
end

def count_n_records_in_range(range_index, cover_type)
case cover_type
when :partial_min
Expand Down
@@ -0,0 +1,58 @@
plugin_register sharding
[[0,0.0,0.0],true]
table_create Logs_20170315 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170315 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
table_create Logs_20170316 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170316 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170316 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
table_create Logs_20170317 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170317 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170317 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]
[[0,0.0,0.0],3]
load --table Logs_20170316
[
{"timestamp": "2017/03/16 10:00:00", "price": 530},
{"timestamp": "2017/03/16 11:00:00", "price": 520},
{"timestamp": "2017/03/16 12:00:00", "price": 110}
]
[[0,0.0,0.0],3]
load --table Logs_20170317
[
{"timestamp": "2017/03/17 20:00:00", "price": 800},
{"timestamp": "2017/03/17 21:00:00", "price": 400},
{"timestamp": "2017/03/17 22:00:00", "price": 300}
]
[[0,0.0,0.0],3]
table_create Times TABLE_PAT_KEY Time
[[0,0.0,0.0],true]
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp
[[0,0.0,0.0],true]
column_create Times logs_20170316 COLUMN_INDEX Logs_20170316 timestamp
[[0,0.0,0.0],true]
column_create Times logs_20170317 COLUMN_INDEX Logs_20170317 timestamp
[[0,0.0,0.0],true]
log_level --level debug
[[0,0.0,0.0],true]
logical_count Logs --shard_key timestamp --columns[filtered_id].stage filtered --columns[filtered_id].type UInt32 --columns[filtered_id].flags COLUMN_SCALAR --columns[filtered_id].value '_id' --filter 'price >= 200' --post_filter 'filtered_id > 1'
[[0,0.0,0.0],5]
#|d| [logical_count][select] <Logs_20170315>: covered
#|d| [logical_count][select] <Logs_20170316>: covered
#|d| [logical_count][select] <Logs_20170317>: covered
log_level --level notice
[[0,0.0,0.0],true]
@@ -0,0 +1,54 @@
#@on-error omit
plugin_register sharding
#@on-error default

table_create Logs_20170315 TABLE_NO_KEY
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
column_create Logs_20170315 price COLUMN_SCALAR UInt32

table_create Logs_20170316 TABLE_NO_KEY
column_create Logs_20170316 timestamp COLUMN_SCALAR Time
column_create Logs_20170316 price COLUMN_SCALAR UInt32

table_create Logs_20170317 TABLE_NO_KEY
column_create Logs_20170317 timestamp COLUMN_SCALAR Time
column_create Logs_20170317 price COLUMN_SCALAR UInt32

load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]

load --table Logs_20170316
[
{"timestamp": "2017/03/16 10:00:00", "price": 530},
{"timestamp": "2017/03/16 11:00:00", "price": 520},
{"timestamp": "2017/03/16 12:00:00", "price": 110}
]

load --table Logs_20170317
[
{"timestamp": "2017/03/17 20:00:00", "price": 800},
{"timestamp": "2017/03/17 21:00:00", "price": 400},
{"timestamp": "2017/03/17 22:00:00", "price": 300}
]

table_create Times TABLE_PAT_KEY Time
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp
column_create Times logs_20170316 COLUMN_INDEX Logs_20170316 timestamp
column_create Times logs_20170317 COLUMN_INDEX Logs_20170317 timestamp

#@add-important-log-levels debug
log_level --level debug
logical_count Logs \
--shard_key timestamp \
--columns[filtered_id].stage filtered \
--columns[filtered_id].type UInt32 \
--columns[filtered_id].flags COLUMN_SCALAR \
--columns[filtered_id].value '_id' \
--filter 'price >= 200' \
--post_filter 'filtered_id > 1'
log_level --level notice
#@remove-important-log-levels debug
58 changes: 58 additions & 0 deletions test/command/suite/sharding/logical_count/post_filter/min.expected
@@ -0,0 +1,58 @@
plugin_register sharding
[[0,0.0,0.0],true]
table_create Logs_20170315 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170315 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
table_create Logs_20170316 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170316 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170316 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
table_create Logs_20170317 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170317 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170317 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]
[[0,0.0,0.0],3]
load --table Logs_20170316
[
{"timestamp": "2017/03/16 10:00:00", "price": 530},
{"timestamp": "2017/03/16 11:00:00", "price": 520},
{"timestamp": "2017/03/16 12:00:00", "price": 110}
]
[[0,0.0,0.0],3]
load --table Logs_20170317
[
{"timestamp": "2017/03/17 20:00:00", "price": 800},
{"timestamp": "2017/03/17 21:00:00", "price": 400},
{"timestamp": "2017/03/17 22:00:00", "price": 300}
]
[[0,0.0,0.0],3]
table_create Times TABLE_PAT_KEY Time
[[0,0.0,0.0],true]
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp
[[0,0.0,0.0],true]
column_create Times logs_20170316 COLUMN_INDEX Logs_20170316 timestamp
[[0,0.0,0.0],true]
column_create Times logs_20170317 COLUMN_INDEX Logs_20170317 timestamp
[[0,0.0,0.0],true]
log_level --level debug
[[0,0.0,0.0],true]
logical_count Logs --shard_key timestamp --min "2017/03/15 01:00:00" --min_border "include" --columns[filtered_id].stage filtered --columns[filtered_id].type UInt32 --columns[filtered_id].flags COLUMN_SCALAR --columns[filtered_id].value '_id' --filter 'price >= 200' --post_filter 'filtered_id < 3'
[[0,0.0,0.0],5]
#|d| [logical_count][select] <Logs_20170315>: need filter
#|d| [logical_count][select] <Logs_20170316>: covered
#|d| [logical_count][select] <Logs_20170317>: covered
log_level --level notice
[[0,0.0,0.0],true]
56 changes: 56 additions & 0 deletions test/command/suite/sharding/logical_count/post_filter/min.test
@@ -0,0 +1,56 @@
#@on-error omit
plugin_register sharding
#@on-error default

table_create Logs_20170315 TABLE_NO_KEY
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
column_create Logs_20170315 price COLUMN_SCALAR UInt32

table_create Logs_20170316 TABLE_NO_KEY
column_create Logs_20170316 timestamp COLUMN_SCALAR Time
column_create Logs_20170316 price COLUMN_SCALAR UInt32

table_create Logs_20170317 TABLE_NO_KEY
column_create Logs_20170317 timestamp COLUMN_SCALAR Time
column_create Logs_20170317 price COLUMN_SCALAR UInt32

load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]

load --table Logs_20170316
[
{"timestamp": "2017/03/16 10:00:00", "price": 530},
{"timestamp": "2017/03/16 11:00:00", "price": 520},
{"timestamp": "2017/03/16 12:00:00", "price": 110}
]

load --table Logs_20170317
[
{"timestamp": "2017/03/17 20:00:00", "price": 800},
{"timestamp": "2017/03/17 21:00:00", "price": 400},
{"timestamp": "2017/03/17 22:00:00", "price": 300}
]

table_create Times TABLE_PAT_KEY Time
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp
column_create Times logs_20170316 COLUMN_INDEX Logs_20170316 timestamp
column_create Times logs_20170317 COLUMN_INDEX Logs_20170317 timestamp

#@add-important-log-levels debug
log_level --level debug
logical_count Logs \
--shard_key timestamp \
--min "2017/03/15 01:00:00" \
--min_border "include" \
--columns[filtered_id].stage filtered \
--columns[filtered_id].type UInt32 \
--columns[filtered_id].flags COLUMN_SCALAR \
--columns[filtered_id].value '_id' \
--filter 'price >= 200' \
--post_filter 'filtered_id < 3'
log_level --level notice
#@remove-important-log-levels debug
@@ -0,0 +1,58 @@
plugin_register sharding
[[0,0.0,0.0],true]
table_create Logs_20170315 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170315 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
table_create Logs_20170316 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170316 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170316 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
table_create Logs_20170317 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170317 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170317 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]
[[0,0.0,0.0],3]
load --table Logs_20170316
[
{"timestamp": "2017/03/16 10:00:00", "price": 530},
{"timestamp": "2017/03/16 11:00:00", "price": 520},
{"timestamp": "2017/03/16 12:00:00", "price": 110}
]
[[0,0.0,0.0],3]
load --table Logs_20170317
[
{"timestamp": "2017/03/17 20:00:00", "price": 800},
{"timestamp": "2017/03/17 21:00:00", "price": 400},
{"timestamp": "2017/03/17 22:00:00", "price": 300}
]
[[0,0.0,0.0],3]
table_create Times TABLE_PAT_KEY Time
[[0,0.0,0.0],true]
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp
[[0,0.0,0.0],true]
column_create Times logs_20170316 COLUMN_INDEX Logs_20170316 timestamp
[[0,0.0,0.0],true]
column_create Times logs_20170317 COLUMN_INDEX Logs_20170317 timestamp
[[0,0.0,0.0],true]
log_level --level debug
[[0,0.0,0.0],true]
logical_count Logs --shard_key timestamp --post_filter '_id < 3'
[[0,0.0,0.0],6]
#|d| [logical_count][select] <Logs_20170315>: covered
#|d| [logical_count][select] <Logs_20170316>: covered
#|d| [logical_count][select] <Logs_20170317>: covered
log_level --level notice
[[0,0.0,0.0],true]

0 comments on commit 010f24b

Please sign in to comment.