Skip to content

Commit

Permalink
Preliminary bucket implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
wesm committed May 27, 2015
1 parent 013a5b9 commit cb90310
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 7 deletions.
8 changes: 4 additions & 4 deletions ibis/expr/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ def output_type(self):
class Histogram(ir.ValueNode):

def __init__(self, arg, nbins, binwidth, base, closed='left',
close_extreme=True):
close_extreme=True, aux_hash=None):
self.arg = arg
self.nbins = nbins
self.binwidth = binwidth
self.base = base
self.closed = closed
self.close_extreme = close_extreme
self.include_over = include_over
self.aux_hash = aux_hash
ir.ValueNode.__init__(self, [self.arg, self.nbins, self.binwidth,
self.base, self.closed,
self.close_extreme])
self.close_extreme, self.aux_hash])

def output_type(self):
# always undefined cardinality (for now)
Expand Down Expand Up @@ -115,5 +115,5 @@ def histogram(arg, nbins=None, binwidth=None, base=None, closed='left',
"""
op = Histogram(arg, nbins, binwidth, base, closed=closed,
close_extreme=close_extreme,
include_over=include_over)
aux_hash=aux_hash)
return op.to_expr()
83 changes: 80 additions & 3 deletions ibis/sql/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from collections import defaultdict

import ibis.expr.analysis as L
import ibis.expr.api as api
import ibis.expr.operations as ops
import ibis.expr.types as ir

Expand Down Expand Up @@ -153,7 +154,9 @@ def _generate_teardown_queries(self):

def _build_result_query(self):
self._collect_elements()
self._analyze_filter_clauses()

self._analyze_select_exprs()
self._analyze_filter_exprs()
self._analyze_subqueries()
self._populate_context()

Expand Down Expand Up @@ -203,9 +206,83 @@ def _make_table_aliases(self, expr):
ctx.make_alias(expr)

#----------------------------------------------------------------------
# Filter analysis / rewrites
# Expr analysis / rewrites

def _analyze_select_exprs(self):
new_select_set = []

for expr in self.select_set:
new_expr = self._visit_select_expr(expr)
new_select_set.append(new_expr)

self.select_set = new_select_set

def _visit_select_expr(self, expr):
# Dumping ground for analysis of WHERE expressions
# - Subquery extraction
# - Conversion to explicit semi/anti joins
# - Rewrites to generate subqueries

op = expr.op()

method = '_visit_select_{}'.format(type(op).__name__)
if hasattr(self, method):
f = getattr(self, method)
return f(expr)

unchanged = True

if isinstance(op, ops.ValueNode):
new_args = []
for arg in op.args:
if isinstance(arg, ir.Expr):
new_arg = self._visit_select_expr(arg)
if arg is not new_arg:
unchanged = False
new_args.append(new_arg)
else:
new_args.append(arg)

if not unchanged:
return expr._factory(type(op)(*new_args))
else:
return expr
else:
return expr

def _visit_select_Bucket(self, expr):
import operator

op = expr.op()

stmt = api.case()

if op.closed == 'left':
l_cmp = operator.le
r_cmp = operator.lt
else:
l_cmp = operator.lt
r_cmp = operator.le

bucket_id = 0
if op.include_under:
stmt = stmt.when(r_cmp(op.arg, op.buckets[0]), bucket_id)
bucket_id += 1

for lower, upper in zip(op.buckets, op.buckets[1:]):
stmt = stmt.when(l_cmp(lower, op.arg) & r_cmp(op.arg, upper),
bucket_id)
bucket_id += 1

if op.include_over:
stmt = stmt.when(l_cmp(op.buckets[-1], op.arg), bucket_id)
bucket_id += 1

case_expr = stmt.end()

return case_expr.name(expr.get_name())

def _analyze_filter_clauses(self):
def _analyze_filter_exprs(self):
# What's semantically contained in the filter predicates may need to be
# rewritten. Not sure if this is the right place to do this, but a
# starting point
Expand Down
6 changes: 6 additions & 0 deletions ibis/tests/test_impala_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ def test_builtins_1(self):

api.literal(5).isin([i1, i4, d]),

# tier and histogram
d.bucket([0, 10, 25, 50, 100]),
d.bucket([0, 10, 25, 50], include_over=True),
d.bucket([0, 10, 25, 50], include_over=True, close_extreme=False),
d.bucket([10, 25, 50, 100], include_under=True),

# coalesce-like cases
api.coalesce(table.int_col,
api.null(),
Expand Down

0 comments on commit cb90310

Please sign in to comment.