Skip to content

Commit

Permalink
Merge branch 'pr148' into contain
Browse files Browse the repository at this point in the history
  • Loading branch information
gaurav274 committed Mar 26, 2021
2 parents 50812db + 10adee7 commit d62e7f8
Show file tree
Hide file tree
Showing 27 changed files with 433 additions and 115 deletions.
51 changes: 25 additions & 26 deletions script/install/conda_eva_environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,34 @@ channels:
- pytorch
dependencies:
- python=3.7.6
- pip
- numpy
- opencv
- scikit-learn
- pandas
- coveralls
- coverage
- pytest-cov
- autopep8
- autoflake
- pytorch::torchvision
- pytorch::pytorch
- tensorflow
- tensorboard
- pip=21.0.1
- numpy=1.20.1
- opencv=4.5.1
- scikit-learn=0.23.2
- pandas=1.2.3
- coveralls=3.0.1
- coverage=5.3
- pytest-cov=2.11.1
- autopep8=1.5.6
- autoflake=1.3
- pytorch::pytorch=1.6.0
- pytorch::torchvision=0.7.0
- cudatoolkit=10.2
- tensorflow=1.14.0
- tensorboard=1.14.0
- pillow=6.1
- sqlalchemy
- pymysql
- sqlalchemy-utils
- mock
- pytest-asyncio
- flake8
- sphinx
- sphinx_rtd_theme
- sqlalchemy=1.3.20
- pymysql=0.10.1
- sqlalchemy-utils=0.36.6
- mock=4.0.3
- pytest-asyncio=0.14.0
- flake8=3.9.0
- sphinx=3.3.0
- sphinx_rtd_theme=0.5.0
- pip:
- antlr4-python3-runtime==4.8
- pyspark==3.0.2
- petastorm==0.9.8
- pre-commit
- flake8
- pytest
- coveralls
- pre-commit==2.8.2
- pytest==6.1.2
prefix: eva
3 changes: 3 additions & 0 deletions src/executor/plan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from src.executor.abstract_executor import AbstractExecutor
from src.executor.limit_executor import LimitExecutor
from src.executor.sample_executor import SampleExecutor
from src.executor.seq_scan_executor import SequentialScanExecutor
from src.models.storage.batch import Batch
from src.planner.abstract_plan import AbstractPlan
Expand Down Expand Up @@ -79,6 +80,8 @@ def _build_execution_tree(self, plan: AbstractPlan) -> AbstractExecutor:
executor_node = OrderByExecutor(node=plan)
elif plan_node_type == PlanNodeType.LIMIT:
executor_node = LimitExecutor(node=plan)
elif plan_node_type == PlanNodeType.SAMPLE:
executor_node = SampleExecutor(node=plan)

# Build Executor Tree for children
for children in plan.children:
Expand Down
44 changes: 44 additions & 0 deletions src/executor/sample_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# coding=utf-8
# Copyright 2018-2020 EVA
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator

from src.models.storage.batch import Batch
from src.executor.abstract_executor import AbstractExecutor
from src.planner.sample_plan import SamplePlan


class SampleExecutor(AbstractExecutor):
"""
Samples uniformly from the rows.
Arguments:
node (AbstractPlan): The Sample Plan
"""

def __init__(self, node: SamplePlan):
super().__init__(node)
self._sample_freq = node.sample_freq.value

def validate(self):
pass

def exec(self) -> Iterator[Batch]:
child_executor = self.children[0]

current = 0
for batch in child_executor.exec():
yield batch[current::self._sample_freq]
current = (current - len(batch)) % self._sample_freq
11 changes: 10 additions & 1 deletion src/optimizer/generators/seq_scan_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
# limitations under the License.
from src.optimizer.generators.base import Generator
from src.optimizer.operators import LogicalGet, Operator, LogicalFilter, \
LogicalProject, LogicalUnion, LogicalOrderBy, LogicalLimit
LogicalProject, LogicalUnion, LogicalOrderBy, LogicalLimit, LogicalSample
from src.planner.seq_scan_plan import SeqScanPlan
from src.planner.union_plan import UnionPlan
from src.planner.storage_plan import StoragePlan
from src.planner.orderby_plan import OrderByPlan
from src.planner.limit_plan import LimitPlan
from src.planner.sample_plan import SamplePlan


class ScanGenerator(Generator):
Expand Down Expand Up @@ -59,6 +60,11 @@ def _visit_logical_limit(self, operator: LogicalLimit):
limitplan.append_child(self._plan)
self._plan = limitplan

def _visit_logical_sample(self, operator: LogicalSample):
sampleplan = SamplePlan(operator.sample_freq)
sampleplan.append_child(self._plan)
self._plan = sampleplan

def _visit(self, operator: Operator):
if isinstance(operator, LogicalUnion):
self._visit_logical_union(operator)
Expand All @@ -73,6 +79,9 @@ def _visit(self, operator: Operator):
if isinstance(operator, LogicalLimit):
self._visit_logical_limit(operator)

if isinstance(operator, LogicalSample):
self._visit_logical_sample(operator)

if isinstance(operator, LogicalGet):
self._visit_logical_get(operator)

Expand Down
19 changes: 19 additions & 0 deletions src/optimizer/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class OperatorType(IntEnum):
LOGICALUNION = auto()
LOGICALORDERBY = auto()
LOGICALLIMIT = auto()
LOGICALSAMPLE = auto()


class Operator:
Expand Down Expand Up @@ -187,6 +188,24 @@ def __eq__(self, other):
and self.limit_count == other.limit_count)


class LogicalSample(Operator):
def __init__(self, sample_freq: ConstantValueExpression,
children: List = None):
super().__init__(OperatorType.LOGICALSAMPLE, children)
self._sample_freq = sample_freq

@property
def sample_freq(self):
return self._sample_freq

def __eq__(self, other):
is_subtree_equal = super().__eq__(other)
if not isinstance(other, LogicalSample):
return False
return (is_subtree_equal
and self.sample_freq == other.sample_freq)


class LogicalUnion(Operator):
def __init__(self, all: bool, children: List = None):
super().__init__(OperatorType.LOGICALUNION, children)
Expand Down
3 changes: 2 additions & 1 deletion src/optimizer/plan_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class PlanGenerator:
"""
_SCAN_NODE_TYPES = (OperatorType.LOGICALFILTER, OperatorType.LOGICALGET,
OperatorType.LOGICALPROJECT, OperatorType.LOGICALUNION,
OperatorType.LOGICALORDERBY, OperatorType.LOGICALLIMIT)
OperatorType.LOGICALORDERBY, OperatorType.LOGICALLIMIT,
OperatorType.LOGICALSAMPLE)
_INSERT_NODE_TYPE = OperatorType.LOGICALINSERT
_CREATE_NODE_TYPE = OperatorType.LOGICALCREATE
_CREATE_UDF_NODE_TYPE = OperatorType.LOGICALCREATEUDF
Expand Down
57 changes: 32 additions & 25 deletions src/optimizer/statement_to_opr_convertor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
LogicalInsert, LogicalCreate,
LogicalCreateUDF, LogicalLoadData,
LogicalQueryDerivedGet, LogicalUnion,
LogicalOrderBy, LogicalLimit)
LogicalOrderBy, LogicalLimit,
LogicalSample)
from src.parser.statement import AbstractStatement
from src.parser.select_statement import SelectStatement
from src.parser.insert_statement import InsertTableStatement
Expand All @@ -45,17 +46,26 @@ def _populate_column_map(self, dataset: DataFrameMetadata):
for column in dataset.columns:
self._column_map[column.name.lower()] = column

def visit_table_ref(self, video: TableRef):
def visit_table_ref(self, table_ref: TableRef):
"""Bind table ref object and convert to Logical get operator
Arguments:
video {TableRef} -- [Input table ref object created by the parser]
table {TableRef} -- [Input table ref object created by the parser]
"""
catalog_vid_metadata = bind_dataset(video.table_info)

self._populate_column_map(catalog_vid_metadata)
if table_ref.is_select():
# NestedQuery
self.visit_select(table_ref.table)
child_plan = self._plan
self._plan = LogicalQueryDerivedGet()
self._plan.append_child(child_plan)
else:
# Table
catalog_vid_metadata = bind_dataset(table_ref.table)
self._populate_column_map(catalog_vid_metadata)
self._plan = LogicalGet(table_ref, catalog_vid_metadata)

self._plan = LogicalGet(video, catalog_vid_metadata)
if table_ref.sample_freq:
self._visit_sample(table_ref.sample_freq)

def visit_select(self, statement: SelectStatement):
"""converter for select statement
Expand All @@ -64,21 +74,13 @@ def visit_select(self, statement: SelectStatement):
statement {SelectStatement} -- [input select statement]
"""

video = statement.from_table
if video is None:
table_ref = statement.from_table
if table_ref is None:
LoggingManager().log('From entry missing in select statement',
LoggingLevel.ERROR)
return None

if isinstance(video, SelectStatement):
# NestedQuery
self.visit_select(video)
child_plan = self._plan
self._plan = LogicalQueryDerivedGet()
self._plan.append_child(child_plan)
elif isinstance(video, TableRef):
# Table
self.visit_table_ref(video)
self.visit_table_ref(table_ref)

# Filter Operator
predicate = statement.where_clause
Expand All @@ -103,6 +105,11 @@ def visit_select(self, statement: SelectStatement):
if statement.limit_count is not None:
self._visit_limit(statement.limit_count)

def _visit_sample(self, sample_freq):
sample_opr = LogicalSample(sample_freq)
sample_opr.append_child(self._plan)
self._plan = sample_opr

def _visit_orderby(self, orderby_list):
# orderby_list structure: List[(TupleValueExpression, EnumInt), ...]
orderby_columns = [orderbyexpr[0] for orderbyexpr in orderby_list]
Expand Down Expand Up @@ -145,14 +152,14 @@ def visit_insert(self, statement: AbstractStatement):
statement {AbstractStatement} -- [input insert statement]
"""
# Bind the table reference
video = statement.table
catalog_table_id = bind_table_ref(video.table_info)
table_ref = statement.table
catalog_table_id = bind_table_ref(table_ref.table)

# Bind column_list
col_list = statement.column_list
for col in col_list:
if col.table_name is None:
col.table_name = video.table_info.table_name
col.table_name = table_ref.table.table_name
if col.table_metadata_id is None:
col.table_metadata_id = catalog_table_id
bind_columns_expr(col_list, {})
Expand All @@ -163,7 +170,7 @@ def visit_insert(self, statement: AbstractStatement):

# Ready to create Logical node
insert_opr = LogicalInsert(
video, catalog_table_id, col_list, value_list)
table_ref, catalog_table_id, col_list, value_list)
self._plan = insert_opr

def visit_create(self, statement: AbstractStatement):
Expand Down Expand Up @@ -209,11 +216,11 @@ def visit_load_data(self, statement: LoadDataStatement):
Arguments:
statement(LoadDataStatement): [Load data statement]
"""
table = statement.table
table_metainfo = bind_dataset(table.table_info)
table_ref = statement.table
table_metainfo = bind_dataset(table_ref.table)
if table_metainfo is None:
# Create a new metadata object
table_metainfo = create_video_metadata(table.table_info.table_name)
table_metainfo = create_video_metadata(table_ref.table.table_name)

load_data_opr = LogicalLoadData(table_metainfo, statement.path)
self._plan = load_data_opr
Expand Down
1 change: 1 addition & 0 deletions src/parser/evaql/evaql_lexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ OR: 'OR';
ORDER: 'ORDER';
PRIMARY: 'PRIMARY';
REFERENCES: 'REFERENCES';
SAMPLE: 'SAMPLE';
SELECT: 'SELECT';
SET: 'SET';
SHUTDOWN: 'SHUTDOWN';
Expand Down
15 changes: 12 additions & 3 deletions src/parser/evaql/evaql_parser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -206,19 +206,28 @@ tableSources
;

tableSource
: tableSourceItem joinPart* #tableSourceBase
: tableSourceItemWithSample joinPart* #tableSourceBase
;

tableSourceItemWithSample
: tableSourceItem sampleClause?
;

tableSourceItem
: tableName #atomTableItem
: tableName #atomTableItem
| (
selectStatement |
LR_BRACKET selectStatement RR_BRACKET
) #subqueryTableItem
;

sampleClause
: SAMPLE decimalLiteral
;


joinPart
: JOIN tableSourceItem
: JOIN tableSourceItemWithSample
(
ON expression
| USING '(' uidList ')'
Expand Down
4 changes: 2 additions & 2 deletions src/parser/parser_visitor/_common_clauses_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from src.expression.tuple_value_expression import TupleValueExpression

from src.parser.table_ref import TableRef, TableInfo
from src.parser.table_ref import TableInfo

from src.parser.evaql.evaql_parser import evaql_parser

Expand All @@ -34,7 +34,7 @@ def visitTableName(self, ctx: evaql_parser.TableNameContext):
table_name = self.visit(ctx.fullId())
if table_name is not None:
table_info = TableInfo(table_name=table_name)
return TableRef(table_info)
return table_info
else:
warnings.warn("Invalid from table", SyntaxWarning)

Expand Down
3 changes: 3 additions & 0 deletions src/parser/parser_visitor/_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,6 @@ def visitExpressionsWithDefaults(
expr_list.append(expression)

return expr_list

def visitSampleClause(self, ctx: evaql_parser.SampleClauseContext):
return ConstantValueExpression(self.visitChildren(ctx))
Loading

0 comments on commit d62e7f8

Please sign in to comment.