Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into hugging-face-entity…
Browse files Browse the repository at this point in the history
…-extraction
  • Loading branch information
affan00733 committed May 30, 2023
2 parents 96d6901 + 97e9127 commit a918b1b
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 8 deletions.
33 changes: 32 additions & 1 deletion eva/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from eva.expression.tuple_value_expression import TupleValueExpression
from eva.parser.create_index_statement import CreateIndexStatement
from eva.parser.create_mat_view_statement import CreateMaterializedViewStatement
from eva.parser.create_statement import ColumnDefinition
from eva.parser.create_statement import ColumnDefinition, CreateTableStatement
from eva.parser.delete_statement import DeleteTableStatement
from eva.parser.explain_statement import ExplainStatement
from eva.parser.rename_statement import RenameTableStatement
Expand Down Expand Up @@ -149,6 +149,37 @@ def _bind_delete_statement(self, node: DeleteTableStatement):
if node.where_clause:
self.bind(node.where_clause)

@bind.register(CreateTableStatement)
def _bind_create_statement(self, node: CreateTableStatement):
if node.query is not None:
self.bind(node.query)
num_projected_columns = 0
for expr in node.query.target_list:
if expr.etype == ExpressionType.TUPLE_VALUE:
num_projected_columns += 1
elif expr.etype == ExpressionType.FUNCTION_EXPRESSION:
num_projected_columns += len(expr.output_objs)
else:
raise BinderError("Unsupported expression type {}.".format(expr.etype))

binded_col_list = []
idx = 0
for expr in node.query.target_list:
output_objs = [(expr.col_name, expr.col_object)] \
if expr.etype == ExpressionType.TUPLE_VALUE \
else zip(expr.projection_columns, expr.output_objs)
for col_name, output_obj in output_objs:
binded_col_list.append(
ColumnDefinition(
col_name,
output_obj.type,
output_obj.array_type,
output_obj.array_dimensions,
)
)
idx += 1
node.column_list = binded_col_list

@bind.register(CreateMaterializedViewStatement)
def _bind_create_mat_statement(self, node: CreateMaterializedViewStatement):
self.bind(node.query)
Expand Down
20 changes: 19 additions & 1 deletion eva/executor/create_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
# limitations under the License.
from eva.catalog.catalog_manager import CatalogManager
from eva.executor.abstract_executor import AbstractExecutor
from eva.executor.executor_utils import handle_if_not_exists
from eva.executor.executor_utils import ExecutorError, handle_if_not_exists
from eva.plan_nodes.create_plan import CreatePlan
from eva.plan_nodes.types import PlanOprType
from eva.storage.storage_engine import StorageEngine
from eva.utils.logging_manager import logger

Expand All @@ -28,8 +29,25 @@ def __init__(self, node: CreatePlan):
def exec(self, *args, **kwargs):
if not handle_if_not_exists(self.node.table_info, self.node.if_not_exists):
logger.debug(f"Creating table {self.node.table_info}")

catalog_entry = self.catalog.create_and_insert_table_catalog_entry(
self.node.table_info, self.node.column_list
)
storage_engine = StorageEngine.factory(catalog_entry)
storage_engine.create(table=catalog_entry)

if self.children != []:
child = self.children[0]
# only support seq scan based materialization
if child.node.opr_type not in {PlanOprType.SEQUENTIAL_SCAN, PlanOprType.PROJECT}:
err_msg = "Invalid query {}, expected {} or {}".format(
child.node.opr_type,
PlanOprType.SEQUENTIAL_SCAN,
PlanOprType.PROJECT,
)
raise ExecutorError(err_msg)

# Populate the table
for batch in child.exec():
batch.drop_column_alias()
storage_engine.write(catalog_entry, batch)
1 change: 1 addition & 0 deletions eva/optimizer/plan_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def build_optimal_physical_plan(
physical_plan = None
root_grp = optimizer_context.memo.groups[root_grp_id]
best_grp_expr = root_grp.get_best_expr(PropertyType.DEFAULT)

physical_plan = best_grp_expr.opr

for child_grp_id in best_grp_expr.children:
Expand Down
19 changes: 19 additions & 0 deletions eva/optimizer/rules/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,25 @@ def apply(self, before: LogicalCreate, context: OptimizerContext):
yield after


class LogicalCreateFromSelectToPhysical(Rule):
def __init__(self):
pattern = Pattern(OperatorType.LOGICALCREATE)
pattern.append_child(Pattern(OperatorType.DUMMY))
super().__init__(RuleType.LOGICAL_CREATE_FROM_SELECT_TO_PHYSICAL, pattern)

def promise(self):
return Promise.LOGICAL_CREATE_FROM_SELECT_TO_PHYSICAL

def check(self, before: Operator, context: OptimizerContext):
return True

def apply(self, before: LogicalCreate, context: OptimizerContext):
after = CreatePlan(before.video, before.column_list, before.if_not_exists)
for child in before.children:
after.append_child(child)
yield after


class LogicalRenameToPhysical(Rule):
def __init__(self):
pattern = Pattern(OperatorType.LOGICALRENAME)
Expand Down
2 changes: 2 additions & 0 deletions eva/optimizer/rules/rules_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class RuleType(Flag):
LOGICAL_DELETE_TO_PHYSICAL = auto()
LOGICAL_LOAD_TO_PHYSICAL = auto()
LOGICAL_CREATE_TO_PHYSICAL = auto()
LOGICAL_CREATE_FROM_SELECT_TO_PHYSICAL = auto()
LOGICAL_RENAME_TO_PHYSICAL = auto()
LOGICAL_DROP_TO_PHYSICAL = auto()
LOGICAL_CREATE_UDF_TO_PHYSICAL = auto()
Expand Down Expand Up @@ -108,6 +109,7 @@ class Promise(IntEnum):
LOGICAL_DROP_TO_PHYSICAL = auto()
LOGICAL_LOAD_TO_PHYSICAL = auto()
LOGICAL_CREATE_TO_PHYSICAL = auto()
LOGICAL_CREATE_FROM_SELECT_TO_PHYSICAL = auto()
LOGICAL_CREATE_UDF_TO_PHYSICAL = auto()
LOGICAL_SAMPLE_TO_UNIFORMSAMPLE = auto()
LOGICAL_GET_TO_SEQSCAN = auto()
Expand Down
2 changes: 2 additions & 0 deletions eva/optimizer/rules/rules_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
LogicalCreateIndexToVectorIndex,
LogicalCreateMaterializedViewToPhysical,
LogicalCreateToPhysical,
LogicalCreateFromSelectToPhysical,
LogicalCreateUDFToPhysical,
LogicalDeleteToPhysical,
LogicalDerivedGetToPhysical,
Expand Down Expand Up @@ -104,6 +105,7 @@ def __init__(self):

self._implementation_rules = [
LogicalCreateToPhysical(),
LogicalCreateFromSelectToPhysical(),
LogicalRenameToPhysical(),
LogicalDropToPhysical(),
LogicalCreateUDFToPhysical(),
Expand Down
4 changes: 4 additions & 0 deletions eva/optimizer/statement_to_opr_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ def visit_create(self, statement: AbstractStatement):
create_opr = LogicalCreate(
table_info, statement.column_list, statement.if_not_exists
)

if statement.query is not None:
self.visit_select(statement.query)
create_opr.append_child(self._plan)
self._plan = create_opr

def visit_rename(self, statement: RenameTableStatement):
Expand Down
18 changes: 18 additions & 0 deletions eva/parser/create_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from typing import List, Tuple

from eva.catalog.catalog_type import ColumnType, NdArrayType
from eva.parser.select_statement import SelectStatement
from eva.parser.statement import AbstractStatement
from eva.parser.table_ref import TableInfo
from eva.parser.types import StatementType
Expand Down Expand Up @@ -121,17 +122,24 @@ def __init__(
table_info: TableInfo,
if_not_exists: bool,
column_list: List[ColumnDefinition] = None,
query: SelectStatement = None,
):
super().__init__(StatementType.CREATE)
self._table_info = table_info
self._if_not_exists = if_not_exists
self._column_list = column_list
self._query = query

def __str__(self) -> str:
print_str = "CREATE TABLE {} ({}) \n".format(
self._table_info, self._if_not_exists
)

if self._query is not None:
print_str = "CREATE TABLE {} AS {}\n".format(
self._table_info, self._query
)

for column in self.column_list:
print_str += str(column) + "\n"

Expand All @@ -149,13 +157,22 @@ def if_not_exists(self):
def column_list(self):
return self._column_list

@property
def query(self):
return self._query

@column_list.setter
def column_list(self, value):
self._column_list = value

def __eq__(self, other):
if not isinstance(other, CreateTableStatement):
return False
return (
self.table_info == other.table_info
and self.if_not_exists == other.if_not_exists
and self.column_list == other.column_list
and self.query == other.query
)

def __hash__(self) -> int:
Expand All @@ -165,5 +182,6 @@ def __hash__(self) -> int:
self.table_info,
self.if_not_exists,
tuple(self.column_list or []),
self.query
)
)
4 changes: 2 additions & 2 deletions eva/parser/eva.lark
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ create_database: CREATE DATABASE if_not_exists? uid

create_index: CREATE INDEX uid ON table_name index_elem vector_store_type?

create_table: CREATE TABLE if_not_exists? table_name create_definitions
create_table: CREATE TABLE if_not_exists? table_name (create_definitions | (AS select_statement))

// Rename statements

Expand All @@ -30,7 +30,7 @@ rename_table: RENAME TABLE table_name TO table_name
create_udf: CREATE UDF if_not_exists? udf_name INPUT create_definitions OUTPUT create_definitions TYPE udf_type IMPL udf_impl udf_metadata* | CREATE UDF if_not_exists? udf_name IMPL udf_impl udf_metadata* | CREATE UDF if_not_exists? udf_name TYPE udf_type udf_metadata*

// Create Materialized View
create_materialized_view: CREATE MATERIALIZED VIEW if_not_exists? table_name ("(" uid_list ")") AS select_statement | CREATE MATERIALIZED VIEW if_not_exists? table_name AS select_statement
create_materialized_view: CREATE MATERIALIZED VIEW if_not_exists? table_name ("(" uid_list ")")? AS select_statement

// Details
udf_name: uid
Expand Down
5 changes: 4 additions & 1 deletion eva/parser/lark_visitor/_create_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def create_table(self, tree):
table_info = None
if_not_exists = False
create_definitions = []
query = None

for child in tree.children:
if isinstance(child, Tree):
Expand All @@ -45,9 +46,11 @@ def create_table(self, tree):
table_info = self.visit(child)
elif child.data == "create_definitions":
create_definitions = self.visit(child)
elif child.data == "simple_select":
query = self.visit(child)

create_stmt = CreateTableStatement(
table_info, if_not_exists, create_definitions
table_info, if_not_exists, create_definitions, query=query
)
return create_stmt

Expand Down
2 changes: 1 addition & 1 deletion eva/plan_nodes/create_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from eva.catalog.models.column_catalog import ColumnCatalogEntry

from eva.parser.table_ref import TableInfo
from eva.plan_nodes.abstract_plan import AbstractPlan
from eva.plan_nodes.types import PlanOprType
Expand Down
79 changes: 77 additions & 2 deletions test/integration_tests/test_create_table_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,96 @@
# limitations under the License.
import unittest

import pytest
from test.util import (
DummyObjectDetector,
create_sample_video,
file_remove,
load_udfs_for_testing,
shutdown_ray,
)

import pandas as pd

from eva.catalog.catalog_manager import CatalogManager
from eva.configuration.constants import EVA_ROOT_DIR
from eva.executor.executor_utils import ExecutorError
from eva.models.storage.batch import Batch
from eva.server.command_handler import execute_query_fetch_all

NUM_FRAMES = 10


class CreateTableTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
pass
# reset the catalog manager before running each test
CatalogManager().reset()
video_file_path = create_sample_video()
load_query = f"LOAD VIDEO '{video_file_path}' INTO MyVideo;"
execute_query_fetch_all(load_query)
ua_detrac = f"{EVA_ROOT_DIR}/data/ua_detrac/ua_detrac.mp4"
execute_query_fetch_all(f"LOAD VIDEO '{ua_detrac}' INTO UATRAC;")
load_udfs_for_testing()

@classmethod
def tearDownClass(cls):
pass
shutdown_ray()
file_remove("dummy.avi")
file_remove("ua_detrac.mp4")
execute_query_fetch_all("DROP TABLE IF EXISTS MyVideo;")
execute_query_fetch_all("DROP TABLE IF EXISTS UATRAC;")

def setUp(self):
execute_query_fetch_all("DROP TABLE IF EXISTS dummy_table;")
execute_query_fetch_all("DROP TABLE IF EXISTS uadtrac_fastRCNN;")

def tearDown(self):
execute_query_fetch_all("DROP TABLE IF EXISTS dummy_table;")
execute_query_fetch_all("DROP TABLE IF EXISTS uadtrac_fastRCNN;")

def test_currently_cannot_create_boolean_table(self):
query = """ CREATE TABLE BooleanTable( A BOOLEAN);"""

with self.assertRaises(ExecutorError):
execute_query_fetch_all(query)

def test_should_create_table_from_select(self):
create_query = """CREATE TABLE dummy_table
AS SELECT id, DummyObjectDetector(data).label FROM MyVideo;
"""
execute_query_fetch_all(create_query)

select_query = "SELECT id, label FROM dummy_table;"
actual_batch = execute_query_fetch_all(select_query)
actual_batch.sort()

labels = DummyObjectDetector().labels
expected = [
{"dummy_table.id": i, "dummy_table.label": [labels[1 + i % 2]]}
for i in range(NUM_FRAMES)
]
expected_batch = Batch(frames=pd.DataFrame(expected))
self.assertEqual(actual_batch, expected_batch)

@pytest.mark.torchtest
def test_should_create_table_from_select_lateral_join(self):
select_query = (
"SELECT id, label, bbox FROM UATRAC JOIN LATERAL "
"Yolo(data) AS T(label, bbox, score) WHERE id < 5;"
)
query = (
"CREATE TABLE IF NOT EXISTS "
f"uadtrac_fastRCNN AS {select_query};"
)
execute_query_fetch_all(query)

select_view_query = "SELECT id, label, bbox FROM uadtrac_fastRCNN"
actual_batch = execute_query_fetch_all(select_view_query)
actual_batch.sort()

self.assertEqual(len(actual_batch), 5)
# non-trivial test case
res = actual_batch.frames
for idx in res.index:
self.assertTrue("car" in res["uadtrac_fastrcnn.label"][idx])
3 changes: 3 additions & 0 deletions test/optimizer/rules/test_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
LogicalCreateIndexToVectorIndex,
LogicalCreateMaterializedViewToPhysical,
LogicalCreateToPhysical,
LogicalCreateFromSelectToPhysical,
LogicalCreateUDFToPhysical,
LogicalDeleteToPhysical,
LogicalDerivedGetToPhysical,
Expand Down Expand Up @@ -136,6 +137,7 @@ def test_rules_promises_order(self):
Promise.LOGICAL_DROP_TO_PHYSICAL,
Promise.LOGICAL_LOAD_TO_PHYSICAL,
Promise.LOGICAL_CREATE_TO_PHYSICAL,
Promise.LOGICAL_CREATE_FROM_SELECT_TO_PHYSICAL,
Promise.LOGICAL_CREATE_UDF_TO_PHYSICAL,
Promise.LOGICAL_SAMPLE_TO_UNIFORMSAMPLE,
Promise.LOGICAL_GET_TO_SEQSCAN,
Expand Down Expand Up @@ -213,6 +215,7 @@ def test_supported_rules(self):
# has some simple heuristics to choose one over the other.
supported_implementation_rules = [
LogicalCreateToPhysical(),
LogicalCreateFromSelectToPhysical(),
LogicalRenameToPhysical(),
LogicalDropToPhysical(),
LogicalCreateUDFToPhysical(),
Expand Down

0 comments on commit a918b1b

Please sign in to comment.