Navigation Menu

Skip to content

Commit

Permalink
Merge pull request #824 from komainu8/add_post_filter_in_logical_select
Browse files Browse the repository at this point in the history
Add post filter in logical select

Patch by Yasuhiro Horimoto. Thanks!!!
  • Loading branch information
kou committed Feb 19, 2018
2 parents 8b83982 + bb614c2 commit 36174b1
Show file tree
Hide file tree
Showing 7 changed files with 422 additions and 0 deletions.
18 changes: 18 additions & 0 deletions plugins/sharding/logical_select.rb
Expand Up @@ -28,6 +28,7 @@ class LogicalSelectCommand < Command
"match_columns",
"query",
"drilldown_filter",
"post_filter",
])

def run_body(input)
Expand Down Expand Up @@ -99,6 +100,7 @@ def cache_key(input)
key << "#{drilldown.filter}\0"
key << drilldown.dynamic_columns.cache_key
end
key << "#{input[:post_filter]}\0"
dynamic_columns = DynamicColumns.parse(input)
key << dynamic_columns.cache_key
key
Expand Down Expand Up @@ -278,6 +280,7 @@ class ExecuteContext
attr_reader :labeled_drilldowns
attr_reader :temporary_tables
attr_reader :expressions
attr_reader :post_filter
def initialize(input)
@input = input
@enumerator = LogicalEnumerator.new("logical_select", @input)
Expand All @@ -300,6 +303,8 @@ def initialize(input)
@temporary_tables = []

@expressions = []

@post_filter = @input[:post_filter]
end

def close
Expand Down Expand Up @@ -682,6 +687,7 @@ def initialize(context, shard, shard_range)
@match_columns = @context.match_columns
@query = @context.query
@filter = @context.filter
@post_filter = @context.post_filter
@sort_keys = @context.sort_keys
@result_sets = @context.result_sets
@unsorted_result_sets = @context.unsorted_result_sets
Expand Down Expand Up @@ -778,6 +784,12 @@ def filter_table
add_result_set(table.select(expression), expression)
end

def apply_post_filter(table)
expression = create_expression(table)
expression.parse(@post_filter)
table.select(expression)
end

def add_result_set(result_set, condition)
query_logger.log(:size, ":",
"select(#{result_set.size})[#{@shard.table_name}]")
Expand All @@ -794,6 +806,12 @@ def add_result_set(result_set, condition)
dynamic_column.apply(result_set, condition)
end

unless @post_filter.nil?
filtered_table = result_set
result_set = apply_post_filter(filtered_table)
@context.temporary_tables << filtered_table
end

if @sort_keys.empty?
@result_sets << result_set
else
Expand Down
@@ -0,0 +1,97 @@
plugin_register sharding
[[0,0.0,0.0],true]
table_create Logs_20170315 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170315 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
table_create Logs_20170316 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170316 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170316 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
table_create Logs_20170317 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170317 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170317 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]
[[0,0.0,0.0],3]
load --table Logs_20170316
[
{"timestamp": "2017/03/16 10:00:00", "price": 530},
{"timestamp": "2017/03/16 11:00:00", "price": 520},
{"timestamp": "2017/03/16 12:00:00", "price": 110}
]
[[0,0.0,0.0],3]
load --table Logs_20170317
[
{"timestamp": "2017/03/17 20:00:00", "price": 800},
{"timestamp": "2017/03/17 21:00:00", "price": 400},
{"timestamp": "2017/03/17 22:00:00", "price": 300}
]
[[0,0.0,0.0],3]
table_create Times TABLE_PAT_KEY Time
[[0,0.0,0.0],true]
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp
[[0,0.0,0.0],true]
column_create Times logs_20170316 COLUMN_INDEX Logs_20170316 timestamp
[[0,0.0,0.0],true]
column_create Times logs_20170317 COLUMN_INDEX Logs_20170317 timestamp
[[0,0.0,0.0],true]
log_level --level debug
[[0,0.0,0.0],true]
logical_select Logs --shard_key timestamp --columns[filtered_id].stage filtered --columns[filtered_id].type UInt32 --columns[filtered_id].flags COLUMN_SCALAR --columns[filtered_id].value '_id' --filter 'price <= 900' --post_filter 'filtered_id > 1' --offset 1 --limit 3 --output_columns _id,filtered_id,price
[
[
0,
0.0,
0.0
],
[
[
[
6
],
[
[
"_id",
"UInt32"
],
[
"filtered_id",
"UInt32"
],
[
"price",
"UInt32"
]
],
[
3,
3,
300
],
[
2,
2,
520
],
[
3,
3,
110
]
]
]
]
log_level --level notice
[[0,0.0,0.0],true]
@@ -0,0 +1,57 @@
#@on-error omit
plugin_register sharding
#@on-error default

table_create Logs_20170315 TABLE_NO_KEY
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
column_create Logs_20170315 price COLUMN_SCALAR UInt32

table_create Logs_20170316 TABLE_NO_KEY
column_create Logs_20170316 timestamp COLUMN_SCALAR Time
column_create Logs_20170316 price COLUMN_SCALAR UInt32

table_create Logs_20170317 TABLE_NO_KEY
column_create Logs_20170317 timestamp COLUMN_SCALAR Time
column_create Logs_20170317 price COLUMN_SCALAR UInt32

load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]

load --table Logs_20170316
[
{"timestamp": "2017/03/16 10:00:00", "price": 530},
{"timestamp": "2017/03/16 11:00:00", "price": 520},
{"timestamp": "2017/03/16 12:00:00", "price": 110}
]

load --table Logs_20170317
[
{"timestamp": "2017/03/17 20:00:00", "price": 800},
{"timestamp": "2017/03/17 21:00:00", "price": 400},
{"timestamp": "2017/03/17 22:00:00", "price": 300}
]

table_create Times TABLE_PAT_KEY Time
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp
column_create Times logs_20170316 COLUMN_INDEX Logs_20170316 timestamp
column_create Times logs_20170317 COLUMN_INDEX Logs_20170317 timestamp

#@add-important-log-levels debug
log_level --level debug
logical_select Logs \
--shard_key timestamp \
--columns[filtered_id].stage filtered \
--columns[filtered_id].type UInt32 \
--columns[filtered_id].flags COLUMN_SCALAR \
--columns[filtered_id].value '_id' \
--filter 'price <= 900' \
--post_filter 'filtered_id > 1' \
--offset 1 \
--limit 3 \
--output_columns _id,filtered_id,price
log_level --level notice
#@remove-important-log-levels debug
@@ -0,0 +1,55 @@
plugin_register sharding
[[0,0.0,0.0],true]
table_create Logs_20170315 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170315 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
table_create Logs_20170316 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170316 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170316 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
table_create Logs_20170317 TABLE_NO_KEY
[[0,0.0,0.0],true]
column_create Logs_20170317 timestamp COLUMN_SCALAR Time
[[0,0.0,0.0],true]
column_create Logs_20170317 price COLUMN_SCALAR UInt32
[[0,0.0,0.0],true]
load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]
[[0,0.0,0.0],3]
load --table Logs_20170316
[
{"timestamp": "2017/03/16 10:00:00", "price": 530},
{"timestamp": "2017/03/16 11:00:00", "price": 520},
{"timestamp": "2017/03/16 12:00:00", "price": 110}
]
[[0,0.0,0.0],3]
load --table Logs_20170317
[
{"timestamp": "2017/03/17 20:00:00", "price": 800},
{"timestamp": "2017/03/17 21:00:00", "price": 400},
{"timestamp": "2017/03/17 22:00:00", "price": 300}
]
[[0,0.0,0.0],3]
table_create Times TABLE_PAT_KEY Time
[[0,0.0,0.0],true]
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp
[[0,0.0,0.0],true]
column_create Times logs_20170316 COLUMN_INDEX Logs_20170316 timestamp
[[0,0.0,0.0],true]
column_create Times logs_20170317 COLUMN_INDEX Logs_20170317 timestamp
[[0,0.0,0.0],true]
log_level --level debug
[[0,0.0,0.0],true]
logical_select Logs --shard_key timestamp --post_filter '_id < 3' --offset 4 --limit 1 --output_columns _id,price
[[0,0.0,0.0],[[[6],[["_id","UInt32"],["price","UInt32"]],[1,800]]]]
log_level --level notice
[[0,0.0,0.0],true]
@@ -0,0 +1,52 @@
#@on-error omit
plugin_register sharding
#@on-error default

table_create Logs_20170315 TABLE_NO_KEY
column_create Logs_20170315 timestamp COLUMN_SCALAR Time
column_create Logs_20170315 price COLUMN_SCALAR UInt32

table_create Logs_20170316 TABLE_NO_KEY
column_create Logs_20170316 timestamp COLUMN_SCALAR Time
column_create Logs_20170316 price COLUMN_SCALAR UInt32

table_create Logs_20170317 TABLE_NO_KEY
column_create Logs_20170317 timestamp COLUMN_SCALAR Time
column_create Logs_20170317 price COLUMN_SCALAR UInt32

load --table Logs_20170315
[
{"timestamp": "2017/03/15 00:00:00", "price": 1000},
{"timestamp": "2017/03/15 01:00:00", "price": 900},
{"timestamp": "2017/03/15 02:00:00", "price": 300}
]

load --table Logs_20170316
[
{"timestamp": "2017/03/16 10:00:00", "price": 530},
{"timestamp": "2017/03/16 11:00:00", "price": 520},
{"timestamp": "2017/03/16 12:00:00", "price": 110}
]

load --table Logs_20170317
[
{"timestamp": "2017/03/17 20:00:00", "price": 800},
{"timestamp": "2017/03/17 21:00:00", "price": 400},
{"timestamp": "2017/03/17 22:00:00", "price": 300}
]

table_create Times TABLE_PAT_KEY Time
column_create Times logs_20170315 COLUMN_INDEX Logs_20170315 timestamp
column_create Times logs_20170316 COLUMN_INDEX Logs_20170316 timestamp
column_create Times logs_20170317 COLUMN_INDEX Logs_20170317 timestamp

#@add-important-log-levels debug
log_level --level debug
logical_select Logs \
--shard_key timestamp \
--post_filter '_id < 3' \
--offset 4 \
--limit 1 \
--output_columns _id,price
log_level --level notice
#@remove-important-log-levels debug

0 comments on commit 36174b1

Please sign in to comment.