Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jiashenC committed Apr 5, 2023
1 parent 46a751f commit dfe094f
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 16 deletions.
22 changes: 6 additions & 16 deletions eva/optimizer/plan_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List

from eva.configuration.configuration_manager import ConfigurationManager
from eva.experimental.ray.planner.exchange_plan import ExchangePlan
from eva.optimizer.cost_model import CostModel
Expand Down Expand Up @@ -109,28 +107,20 @@ def post_process(self, physical_plan: AbstractPlan):
def _recursive_strip_exchange(plan: AbstractPlan, is_top: bool = False):
children = []
for child_plan in plan.children:
return_child = _recursive_strip_exchange(child_plan)
if isinstance(return_child, List):
children += return_child
else:
children += [return_child]
return_child_list = _recursive_strip_exchange(child_plan)
children += return_child_list

plan.clear_children()
for child in children:
plan.append_child(child)

if isinstance(plan, ExchangePlan):
if is_top:
assert (
len(plan.children) == 1
), "Top ExchangePlan can only have 1 child."
return plan.children[0]
else:
return plan.children
assert not is_top or len(plan.children) == 1, "Top ExchangePlan can only have 1 child."
return plan.children
else:
return plan
return [plan]

return _recursive_strip_exchange(physical_plan, True)
return _recursive_strip_exchange(physical_plan, True)[0]
else:
return physical_plan

Expand Down
2 changes: 2 additions & 0 deletions test/integration_tests/test_like.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from eva.catalog.catalog_manager import CatalogManager
from eva.configuration.constants import EVA_ROOT_DIR
from eva.server.command_handler import execute_query_fetch_all
from test.util import shutdown_ray


class LikeTest(unittest.TestCase):
Expand All @@ -30,6 +31,7 @@ def setUp(self):
execute_query_fetch_all(f"LOAD IMAGE '{meme2}' INTO MemeImages;")

def tearDown(self):
shutdown_ray()
# clean up
execute_query_fetch_all("DROP TABLE IF EXISTS MemeImages;")

Expand Down
5 changes: 5 additions & 0 deletions test/optimizer/rules/test_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ def test_supported_rules(self):

ray_enabled = ConfigurationManager().get_value("experimental", "ray")

# For the current version, we choose either the distributed or the
# sequential rule, because we do not have a logic to choose one over
# the other in the current optimizer. Sequential rewrite is currently
# embedded inside distributed rule if ray is enabled. The rule itself
# has some simple heuristics to choose one over the other.
supported_implementation_rules = [
LogicalCreateToPhysical(),
LogicalRenameToPhysical(),
Expand Down
51 changes: 51 additions & 0 deletions test/optimizer/test_cascade_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
from eva.catalog.catalog_manager import CatalogManager
from eva.models.storage.batch import Batch
from eva.server.command_handler import execute_query_fetch_all
from eva.plan_nodes.project_plan import ProjectPlan
from eva.plan_nodes.nested_loop_join_plan import NestedLoopJoinPlan
from eva.plan_nodes.storage_plan import StoragePlan
from eva.experimental.ray.planner.exchange_plan import ExchangePlan
from eva.optimizer.plan_generator import PlanGenerator


@pytest.mark.notparallel
Expand Down Expand Up @@ -59,3 +64,49 @@ def test_logical_to_physical_udf(self):
self.assertEqual(actual_batch, expected_batch)

execute_query_fetch_all("DROP TABLE IF EXISTS MyVideo;")

def test_optimizer_post_process_strip_with_branch(self):
br_exch_plan_1 = ExchangePlan()
br_exch_plan_1.append_child(StoragePlan(None, None))

br_exch_plan_2 = ExchangePlan()
br_exch_plan_2.append_child(StoragePlan(None, None))

nest_plan = NestedLoopJoinPlan(None)
nest_plan.append_child(br_exch_plan_1)
nest_plan.append_child(br_exch_plan_2)

proj_plan = ProjectPlan(None)
proj_plan.append_child(nest_plan)

root_exch_plan = ExchangePlan()
root_exch_plan.append_child(proj_plan)

plan = PlanGenerator().post_process(root_exch_plan)

self.assertTrue(isinstance(plan, ProjectPlan))
self.assertEqual(len(plan.children), 1)
self.assertTrue(isinstance(plan.children[0], NestedLoopJoinPlan))
self.assertEqual(len(nest_plan.children), 2)
self.assertTrue(isinstance(nest_plan.children[0], StoragePlan))
self.assertTrue(isinstance(nest_plan.children[1], StoragePlan))

def test_optimizer_post_process_strip_without_branch(self):
child_exch_plan = ExchangePlan()
child_exch_plan.append_child(StoragePlan(None, None))

proj_plan = ProjectPlan(None)
proj_plan.append_child(child_exch_plan)

root_exch_plan = ExchangePlan()
root_exch_plan.append_child(proj_plan)

plan = PlanGenerator().post_process(root_exch_plan)

self.assertTrue(isinstance(plan, ExchangePlan))
self.assertEqual(len(plan.children), 1)
self.assertTrue(isinstance(plan.children[0], ProjectPlan))
self.assertEqual(len(proj_plan.children), 1)
self.assertTrue(isinstance(proj_plan.children[0], ExchangePlan))
self.assertEqual(len(child_exch_plan.children), 1)
self.assertTrue(isinstance(child_exch_plan.children[0], StoragePlan))

0 comments on commit dfe094f

Please sign in to comment.