Skip to content

Commit

Permalink
Merge pull request #124 from georgia-tech-db/limit
Browse files Browse the repository at this point in the history
Limit Implementation
  • Loading branch information
gaurav274 committed Jan 7, 2021
2 parents 3c6f6d8 + 4129fdb commit 09e8a98
Show file tree
Hide file tree
Showing 16 changed files with 417 additions and 19 deletions.
51 changes: 51 additions & 0 deletions src/executor/limit_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# coding=utf-8
# Copyright 2018-2020 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator

from src.models.storage.batch import Batch
from src.executor.abstract_executor import AbstractExecutor
from src.planner.limit_plan import LimitPlan
from src.configuration.configuration_manager import ConfigurationManager


class LimitExecutor(AbstractExecutor):
"""
Limits the number of rows returned
Arguments:
node (AbstractPlan): The Limit Plan
"""

def __init__(self, node: LimitPlan):
super().__init__(node)
self._limit_count = node.limit_value
self.BATCH_MAX_SIZE = ConfigurationManager().get_value(
"executor", "batch_size")

def validate(self):
pass

def exec(self) -> Iterator[Batch]:
child_executor = self.children[0]
remaining_tuples = self._limit_count
# aggregates the batches into one large batch
for batch in child_executor.exec():
if len(batch) > remaining_tuples:
yield batch[:remaining_tuples]
return

remaining_tuples -= len(batch)
yield batch
3 changes: 3 additions & 0 deletions src/executor/plan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pandas as pd

from src.executor.abstract_executor import AbstractExecutor
from src.executor.limit_executor import LimitExecutor
from src.executor.seq_scan_executor import SequentialScanExecutor
from src.models.storage.batch import Batch
from src.planner.abstract_plan import AbstractPlan
Expand Down Expand Up @@ -76,6 +77,8 @@ def _build_execution_tree(self, plan: AbstractPlan) -> AbstractExecutor:
executor_node = LoadDataExecutor(node=plan)
elif plan_node_type == PlanNodeType.ORDER_BY:
executor_node = OrderByExecutor(node=plan)
elif plan_node_type == PlanNodeType.LIMIT:
executor_node = LimitExecutor(node=plan)

# Build Executor Tree for children
for children in plan.children:
Expand Down
11 changes: 10 additions & 1 deletion src/optimizer/generators/seq_scan_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
# limitations under the License.
from src.optimizer.generators.base import Generator
from src.optimizer.operators import LogicalGet, Operator, LogicalFilter, \
LogicalProject, LogicalUnion, LogicalOrderBy
LogicalProject, LogicalUnion, LogicalOrderBy, LogicalLimit
from src.planner.seq_scan_plan import SeqScanPlan
from src.planner.union_plan import UnionPlan
from src.planner.storage_plan import StoragePlan
from src.planner.orderby_plan import OrderByPlan
from src.planner.limit_plan import LimitPlan


class ScanGenerator(Generator):
Expand Down Expand Up @@ -53,6 +54,11 @@ def _visit_logical_orderby(self, operator: LogicalOrderBy):
orderbyplan.append_child(self._plan)
self._plan = orderbyplan

def _visit_logical_limit(self, operator: LogicalLimit):
limitplan = LimitPlan(operator.limit_count)
limitplan.append_child(self._plan)
self._plan = limitplan

def _visit(self, operator: Operator):
if isinstance(operator, LogicalUnion):
self._visit_logical_union(operator)
Expand All @@ -64,6 +70,9 @@ def _visit(self, operator: Operator):
if isinstance(operator, LogicalOrderBy):
self._visit_logical_orderby(operator)

if isinstance(operator, LogicalLimit):
self._visit_logical_limit(operator)

if isinstance(operator, LogicalGet):
self._visit_logical_get(operator)

Expand Down
42 changes: 31 additions & 11 deletions src/optimizer/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import IntEnum, unique
from enum import IntEnum, unique, auto
from typing import List

from src.catalog.models.df_metadata import DataFrameMetadata
from src.expression.constant_value_expression import ConstantValueExpression
from src.parser.table_ref import TableRef
from src.expression.abstract_expression import AbstractExpression
from src.catalog.models.df_column import DataFrameColumn
Expand All @@ -28,16 +29,17 @@ class OperatorType(IntEnum):
"""
Manages enums for all the operators supported
"""
LOGICALGET = 1,
LOGICALFILTER = 2,
LOGICALPROJECT = 3,
LOGICALINSERT = 4,
LOGICALCREATE = 5,
LOGICALCREATEUDF = 6,
LOGICALLOADDATA = 7,
LOGICALQUERYDERIVEDGET = 8,
LOGICALUNION = 9,
LOGICALORDERBY = 10
LOGICALGET = auto()
LOGICALFILTER = auto()
LOGICALPROJECT = auto()
LOGICALINSERT = auto()
LOGICALCREATE = auto()
LOGICALCREATEUDF = auto()
LOGICALLOADDATA = auto()
LOGICALQUERYDERIVEDGET = auto()
LOGICALUNION = auto()
LOGICALORDERBY = auto()
LOGICALLIMIT = auto()


class Operator:
Expand Down Expand Up @@ -167,6 +169,24 @@ def __eq__(self, other):
and self.orderby_list == other.orderby_list)


class LogicalLimit(Operator):
def __init__(self, limit_count: ConstantValueExpression,
children: List = None):
super().__init__(OperatorType.LOGICALLIMIT, children)
self._limit_count = limit_count

@property
def limit_count(self):
return self._limit_count

def __eq__(self, other):
is_subtree_equal = super().__eq__(other)
if not isinstance(other, LogicalLimit):
return False
return (is_subtree_equal
and self.limit_count == other.limit_count)


class LogicalUnion(Operator):
def __init__(self, all: bool, children: List = None):
super().__init__(OperatorType.LOGICALUNION, children)
Expand Down
2 changes: 1 addition & 1 deletion src/optimizer/plan_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class PlanGenerator:
"""
_SCAN_NODE_TYPES = (OperatorType.LOGICALFILTER, OperatorType.LOGICALGET,
OperatorType.LOGICALPROJECT, OperatorType.LOGICALUNION,
OperatorType.LOGICALORDERBY)
OperatorType.LOGICALORDERBY, OperatorType.LOGICALLIMIT)
_INSERT_NODE_TYPE = OperatorType.LOGICALINSERT
_CREATE_NODE_TYPE = OperatorType.LOGICALCREATE
_CREATE_UDF_NODE_TYPE = OperatorType.LOGICALCREATEUDF
Expand Down
10 changes: 9 additions & 1 deletion src/optimizer/statement_to_opr_convertor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
LogicalInsert, LogicalCreate,
LogicalCreateUDF, LogicalLoadData,
LogicalQueryDerivedGet, LogicalUnion,
LogicalOrderBy)
LogicalOrderBy, LogicalLimit)
from src.parser.statement import AbstractStatement
from src.parser.select_statement import SelectStatement
from src.parser.insert_statement import InsertTableStatement
Expand Down Expand Up @@ -100,6 +100,9 @@ def visit_select(self, statement: SelectStatement):
if statement.orderby_list is not None:
self._visit_orderby(statement.orderby_list)

if statement.limit_count is not None:
self._visit_limit(statement.limit_count)

def _visit_orderby(self, orderby_list):
# orderby_list structure: List[(TupleValueExpression, EnumInt), ...]
orderby_columns = [orderbyexpr[0] for orderbyexpr in orderby_list]
Expand All @@ -108,6 +111,11 @@ def _visit_orderby(self, orderby_list):
orderby_opr.append_child(self._plan)
self._plan = orderby_opr

def _visit_limit(self, limit_count):
limit_opr = LogicalLimit(limit_count)
limit_opr.append_child(self._plan)
self._plan = limit_opr

def _visit_union(self, target, all):
left_child_plan = self._plan
self.visit_select(target)
Expand Down
10 changes: 9 additions & 1 deletion src/parser/parser_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ def visitQuerySpecification(
from_clause = None
where_clause = None
orderby_clause = None
limit_count = None

# first child will be a SELECT terminal token

Expand All @@ -355,6 +356,9 @@ def visitQuerySpecification(
elif rule_idx == evaql_parser.RULE_orderByClause:
orderby_clause = self.visit(ctx.orderByClause())

elif rule_idx == evaql_parser.RULE_limitClause:
limit_count = self.visit(ctx.limitClause())

except BaseException:
# stop parsing something bad happened
return None
Expand All @@ -365,7 +369,8 @@ def visitQuerySpecification(

select_stmt = SelectStatement(
target_list, from_clause, where_clause,
orderby_clause_list=orderby_clause)
orderby_clause_list=orderby_clause,
limit_count=limit_count)

return select_stmt

Expand Down Expand Up @@ -407,6 +412,9 @@ def visitOrderByExpression(

return self.visitChildren(ctx.expression()), sort_token

def visitLimitClause(self, ctx: evaql_parser.LimitClauseContext):
return ConstantValueExpression(self.visitChildren(ctx))

##################################################################
# LOAD STATEMENT
##################################################################
Expand Down
16 changes: 15 additions & 1 deletion src/parser/select_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from src.parser.types import StatementType
from src.expression.abstract_expression import AbstractExpression
from src.expression.constant_value_expression import ConstantValueExpression
from src.parser.table_ref import TableRef
from typing import List

Expand Down Expand Up @@ -49,6 +50,7 @@ def __init__(self, target_list: List[AbstractExpression] = None,
self._union_link = None
self._union_all = False
self._orderby_list = kwargs.get("orderby_clause_list", None)
self._limit_count = kwargs.get("limit_count", None)

@property
def union_link(self):
Expand Down Expand Up @@ -99,6 +101,14 @@ def orderby_list(self, orderby_list_new):
# orderby_list_new: List[(TupleValueExpression, int)]
self._orderby_list = orderby_list_new

@property
def limit_count(self):
return self._limit_count

@limit_count.setter
def limit_count(self, limit_count_new: ConstantValueExpression):
self._limit_count = limit_count_new

def __str__(self) -> str:
print_str = "SELECT {} FROM {} WHERE {}".format(self._target_list,
self._from_table,
Expand All @@ -112,6 +122,9 @@ def __str__(self) -> str:
if self._orderby_list is not None:
print_str += " ORDER BY " + str(self._orderby_list)

if self._limit_count is not None:
print_str += " LIMIT " + str(self._limit_count)

return print_str

def __eq__(self, other):
Expand All @@ -122,4 +135,5 @@ def __eq__(self, other):
and self.where_clause == other.where_clause
and self.union_link == other.union_link
and self.union_all == other.union_all
and self.orderby_list == other.orderby_list)
and self.orderby_list == other.orderby_list
and self.limit_count == other.limit_count)
42 changes: 42 additions & 0 deletions src/planner/limit_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# coding=utf-8
# Copyright 2018-2020 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from src.planner.abstract_plan import AbstractPlan
from src.planner.types import PlanNodeType
from src.expression.constant_value_expression import ConstantValueExpression


class LimitPlan(AbstractPlan):
"""
This plan is used for storing information required for limit
operations.
Arguments:
limit_count: ConstantValueExpression
A ConstantValueExpression which is the count of the
number of rows returned
"""

def __init__(self, limit_count: ConstantValueExpression):
self._limit_count = limit_count
super().__init__(PlanNodeType.LIMIT)

@property
def limit_expression(self):
return self._limit_count

@property
def limit_value(self):
return self._limit_count.value
1 change: 1 addition & 0 deletions src/planner/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ class PlanNodeType(IntEnum):
LOAD_DATA = 7
UNION = 8
ORDER_BY = 9
LIMIT = 10
# add other types
Loading

0 comments on commit 09e8a98

Please sign in to comment.