Skip to content

Commit

Permalink
Merge branch 'master' into clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav274 committed Jun 5, 2022
2 parents 63a6dff + 7aee654 commit 42a0eb4
Show file tree
Hide file tree
Showing 56 changed files with 1,894 additions and 141 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ wheels/
.installed.cfg
*.egg
MANIFEST
pip-wheel-metadata/

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,18 @@ SELECT id, data FROM MyVideo WHERE id < 5;
## Example Queries
1. Search frames with a car
```mysql
SELECT id, frame FROM MyVideo WHERE ['car'] <@ FastRCNNObjectDetector(frame).labels;
SELECT id, data FROM MyVideo WHERE ['car'] <@ FastRCNNObjectDetector(data).labels;
```
![QueryResult](https://georgia-tech-db.github.io/eva/Img/car.gif)

2. Search frames with a pedestrian and a car
```mysql
SELECT id, frame FROM MyVideo WHERE ['pedestrian', 'car'] <@ FastRCNNObjectDetector(frame).labels;
SELECT id, data FROM MyVideo WHERE ['pedestrian', 'car'] <@ FastRCNNObjectDetector(data).labels;
```

2. Search frames containing greater than 3 cars
```mysql
SELECT id, frame FROM DETRAC WHERE array_count(FastRCNNObjectDetector(frame).labels, 'car') > 3;
SELECT id, data FROM DETRAC WHERE array_count(FastRCNNObjectDetector(data).labels, 'car') > 3;
```

## Documentation
Expand Down
19 changes: 14 additions & 5 deletions eva/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,27 @@ def _bind_load_data_statement(self, node: LoadDataStatement):

@bind.register(TableRef)
def _bind_tableref(self, node: TableRef):
if node.is_select():
if node.is_table_atom():
# Table
self._binder_context.add_table_alias(
node.alias, node.table.table_name)
bind_table_info(node.table)
elif node.is_select():
current_context = self._binder_context
self._binder_context = StatementBinderContext()
self.bind(node.select_statement)
self._binder_context = current_context
self._binder_context.add_derived_table_alias(
node.alias, node.select_statement.target_list)
elif node.is_join():
self.bind(node.join_node.left)
self.bind(node.join_node.right)
if node.join_node.predicate:
self.bind(node.join_node.predicate)
elif node.is_func_expr():
self.bind(node.func_expr)
else:
# Table
self._binder_context.add_table_alias(
node.alias, node.table.table_name)
bind_table_info(node.table)
raise ValueError(f'Unsupported node {type(node)}')

@bind.register(TupleValueExpression)
def _bind_tuple_expr(self, node: TupleValueExpression):
Expand Down
18 changes: 18 additions & 0 deletions eva/executor/executor_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import List
from eva.expression.abstract_expression import AbstractExpression
from eva.models.storage.batch import Batch


def apply_project(batch: Batch, project_list: List[AbstractExpression]):
if not batch.empty() and project_list:
batches = [expr.evaluate(batch) for expr in project_list]
batch = Batch.merge_column_wise(batches)
return batch


def apply_predicate(batch: Batch, predicate: AbstractExpression):
if not batch.empty() and predicate is not None:
outcomes = predicate.evaluate(batch).frames
batch = Batch(
batch.frames[(outcomes > 0).to_numpy()].reset_index(drop=True))
return batch
46 changes: 46 additions & 0 deletions eva/executor/function_scan_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# coding=utf-8
# Copyright 2018-2020 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator

from eva.planner.function_scan_plan import FunctionScanPlan
from eva.executor.abstract_executor import AbstractExecutor
from eva.models.storage.batch import Batch


class FunctionScanExecutor(AbstractExecutor):
"""
Executes functional expression which yields a table of rows
Arguments:
node (AbstractPlan): FunctionScanPlan
"""

def __init__(self, node: FunctionScanPlan):
super().__init__(node)
self.func_expr = node.func_expr

def validate(self):
pass

def exec(self, *args, **kwargs) -> Iterator[Batch]:
print(kwargs)
assert 'lateral_input' in kwargs, (
'Key lateral_input not passed to the FunctionScan')
lateral_input = kwargs.get('lateral_input')
if not lateral_input.empty():
res = self.func_expr.evaluate(lateral_input)

if not res.empty():
yield res
53 changes: 53 additions & 0 deletions eva/executor/hash_join_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# coding=utf-8
# Copyright 2018-2020 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
from eva.executor.executor_utils import apply_predicate, apply_project

from eva.models.storage.batch import Batch
from eva.executor.abstract_executor import AbstractExecutor
from eva.planner.hash_join_probe_plan import HashJoinProbePlan


class HashJoinExecutor(AbstractExecutor):

def __init__(self, node: HashJoinProbePlan):
super().__init__(node)
self.predicate = node.join_predicate
self.join_type = node.join_type
self.probe_keys = node.probe_keys
self.join_project = node.join_project

def validate(self):
pass

def exec(self, *args, **kwargs) -> Iterator[Batch]:

build_table = self.children[0]
probe_table = self.children[1]
hash_keys = [key.col_alias for key in self.probe_keys]
for build_batch in build_table.exec():
for probe_batch in probe_table.exec():
probe_batch.frames.index = probe_batch.frames[
hash_keys].apply(
lambda x: hash(tuple(x)), axis=1)
join_batch = probe_batch.frames.merge(build_batch.frames,
left_index=True,
right_index=True,
how='inner')
join_batch.reset_index(drop=True, inplace=True)
join_batch = Batch(join_batch)
join_batch = apply_predicate(join_batch, self.predicate)
join_batch = apply_project(join_batch, self.join_project)
yield join_batch
30 changes: 30 additions & 0 deletions eva/executor/join_build_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Iterator

from eva.models.storage.batch import Batch

from eva.executor.abstract_executor import AbstractExecutor
from eva.planner.hash_join_build_plan import HashJoinBuildPlan


class BuildJoinExecutor(AbstractExecutor):
def __init__(self, node: HashJoinBuildPlan):
super().__init__(node)
self.predicate = None # node.join_predicate
self.join_type = node.join_type
self.build_keys = node.build_keys

def validate(self):
pass

def exec(self, *args, **kwargs) -> Iterator[Batch]:
child_executor = self.children[0]
# build in memory hash table and pass to the probe phase
# Assumption the hash table fits in memory
# Todo: Implement a partition based hash join (grace hash join)
cumm_batches = [batch for batch in child_executor.exec()
if not batch.empty()]
cumm_batches = Batch.concat(cumm_batches)
hash_keys = [key.col_alias for key in self.build_keys]
cumm_batches.frames.index = cumm_batches.frames[hash_keys].apply(
lambda x: hash(tuple(x)), axis=1)
yield cumm_batches
46 changes: 46 additions & 0 deletions eva/executor/lateral_join_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# coding=utf-8
# Copyright 2018-2020 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
from eva.executor.executor_utils import apply_predicate, apply_project

from eva.models.storage.batch import Batch
from eva.executor.abstract_executor import AbstractExecutor
from eva.planner.lateral_join_plan import LateralJoinPlan


class LateralJoinExecutor(AbstractExecutor):

def __init__(self, node: LateralJoinPlan):
super().__init__(node)
self.predicate = node.join_predicate
self.join_project = node.join_project

def validate(self):
pass

def exec(self, *args, **kwargs) -> Iterator[Batch]:

outer = self.children[0]
inner = self.children[1]

for outer_batch in outer.exec():
for result_batch in inner.exec(lateral_input=outer_batch):
# merge
result_batch = Batch.merge_column_wise(
[outer_batch, result_batch])
result_batch = apply_predicate(result_batch, self.predicate)
result_batch = apply_project(result_batch, self.join_project)
if not result_batch.empty():
return result_batch
18 changes: 18 additions & 0 deletions eva/executor/plan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.limit_executor import LimitExecutor
from eva.executor.predicate_executor import PredicateExecutor
from eva.executor.project_executor import ProjectExecutor
from eva.executor.sample_executor import SampleExecutor
from eva.executor.seq_scan_executor import SequentialScanExecutor
from eva.models.storage.batch import Batch
Expand All @@ -32,6 +34,10 @@
from eva.executor.storage_executor import StorageExecutor
from eva.executor.union_executor import UnionExecutor
from eva.executor.orderby_executor import OrderByExecutor
from eva.executor.hash_join_executor import HashJoinExecutor
from eva.executor.lateral_join_executor import LateralJoinExecutor
from eva.executor.join_build_executor import BuildJoinExecutor
from eva.executor.function_scan_executor import FunctionScanExecutor


class PlanExecutor:
Expand Down Expand Up @@ -87,8 +93,20 @@ def _build_execution_tree(self, plan: AbstractPlan) -> AbstractExecutor:
executor_node = LimitExecutor(node=plan)
elif plan_opr_type == PlanOprType.SAMPLE:
executor_node = SampleExecutor(node=plan)
elif plan_opr_type == PlanOprType.LATERAL_JOIN:
executor_node = LateralJoinExecutor(node=plan)
elif plan_opr_type == PlanOprType.HASH_JOIN:
executor_node = HashJoinExecutor(node=plan)
elif plan_opr_type == PlanOprType.HASH_BUILD:
executor_node = BuildJoinExecutor(node=plan)
elif plan_opr_type == PlanOprType.FUNCTION_SCAN:
executor_node = FunctionScanExecutor(node=plan)
elif plan_opr_type == PlanOprType.CREATE_MATERIALIZED_VIEW:
executor_node = CreateMaterializedViewExecutor(node=plan)
elif plan_opr_type == PlanOprType.PROJECT:
executor_node = ProjectExecutor(node=plan)
elif plan_opr_type == PlanOprType.PREDICATE_FILTER:
executor_node = PredicateExecutor(node=plan)
# Build Executor Tree for children
for children in plan.children:
executor_node.append_child(self._build_execution_tree(children))
Expand Down
39 changes: 39 additions & 0 deletions eva/executor/predicate_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# coding=utf-8
# Copyright 2018-2020 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
from eva.executor.executor_utils import apply_predicate

from eva.models.storage.batch import Batch
from eva.executor.abstract_executor import AbstractExecutor
from eva.planner.predicate_plan import PredicatePlan


class PredicateExecutor(AbstractExecutor):
"""
"""

def __init__(self, node: PredicatePlan):
super().__init__(node)
self.predicate = node.predicate

def validate(self):
pass

def exec(self) -> Iterator[Batch]:
child_executor = self.children[0]
for batch in child_executor.exec():
batch = apply_predicate(batch, self.predicate)
if not batch.empty():
yield batch
40 changes: 40 additions & 0 deletions eva/executor/project_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# coding=utf-8
# Copyright 2018-2020 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
from eva.executor.executor_utils import apply_project

from eva.models.storage.batch import Batch
from eva.executor.abstract_executor import AbstractExecutor
from eva.planner.project_plan import ProjectPlan


class ProjectExecutor(AbstractExecutor):
"""
"""

def __init__(self, node: ProjectPlan):
super().__init__(node)
self.target_list = node.target_list

def validate(self):
pass

def exec(self) -> Iterator[Batch]:
child_executor = self.children[0]
for batch in child_executor.exec():
batch = apply_project(batch, self.target_list)

if not batch.empty():
yield batch
Loading

0 comments on commit 42a0eb4

Please sign in to comment.