Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import asyncio
from types import SimpleNamespace
from unittest.mock import patch

import pytest

from core.architecture.handlers.control.evaluate_expression_handler import (
EvaluateExpressionHandler,
)
from proto.org.apache.texera.amber.engine.architecture.rpc import (
EvaluatedValue,
EvaluatePythonExpressionRequest,
TypedValue,
)


class TestEvaluateExpressionHandler:
@pytest.fixture
def executor(self):
# A stand-in for the user's UDF instance — anything addressable as
# `self` from the evaluated expression will do.
return SimpleNamespace(state="alive")

@pytest.fixture
def handler(self, executor):
instance = EvaluateExpressionHandler.__new__(EvaluateExpressionHandler)
instance.context = SimpleNamespace(
executor_manager=SimpleNamespace(executor=executor),
tuple_processing_manager=SimpleNamespace(
current_input_tuple={"col": 42},
current_input_port_id="port-0",
),
)
return instance

def test_returns_what_the_evaluator_returns(self, handler):
sentinel = EvaluatedValue(
value=TypedValue(expression="1+1", value_ref="2", value_type="int")
)
with patch(
"core.architecture.handlers.control.evaluate_expression_handler"
".ExpressionEvaluator.evaluate",
return_value=sentinel,
) as evaluate:
result = asyncio.run(
handler.evaluate_python_expression(
EvaluatePythonExpressionRequest(expression="1+1")
)
)

assert result is sentinel
evaluate.assert_called_once()

def test_runtime_context_exposes_self_tuple_input(self, handler, executor):
with patch(
"core.architecture.handlers.control.evaluate_expression_handler"
".ExpressionEvaluator.evaluate",
return_value=EvaluatedValue(),
) as evaluate:
asyncio.run(
handler.evaluate_python_expression(
EvaluatePythonExpressionRequest(expression="self.state")
)
)

expression, runtime_context = evaluate.call_args.args
assert expression == "self.state"
assert runtime_context["self"] is executor
assert runtime_context["tuple_"] == {"col": 42}
assert runtime_context["input_"] == "port-0"

def test_runtime_context_reflects_current_tuple_at_call_time(
self, handler, executor
):
# The handler must read the *current* tuple/port out of the context on
# each call — not snapshot them at construction. Drive two calls with
# different intermediate state.
captured: list = []

def capture(_expression, runtime_context):
captured.append((runtime_context["tuple_"], runtime_context["input_"]))
return EvaluatedValue()

with patch(
"core.architecture.handlers.control.evaluate_expression_handler"
".ExpressionEvaluator.evaluate",
side_effect=capture,
):
asyncio.run(
handler.evaluate_python_expression(
EvaluatePythonExpressionRequest(expression="x")
)
)
handler.context.tuple_processing_manager.current_input_tuple = {"col": 99}
handler.context.tuple_processing_manager.current_input_port_id = "port-1"
asyncio.run(
handler.evaluate_python_expression(
EvaluatePythonExpressionRequest(expression="x")
)
)

assert captured == [({"col": 42}, "port-0"), ({"col": 99}, "port-1")]

def test_handles_none_input_tuple_and_port(self, handler):
# Before the worker has received any input, current_input_tuple and
# current_input_port_id are None. The handler must still build a
# context (the user might be evaluating `self.foo`).
handler.context.tuple_processing_manager.current_input_tuple = None
handler.context.tuple_processing_manager.current_input_port_id = None
with patch(
"core.architecture.handlers.control.evaluate_expression_handler"
".ExpressionEvaluator.evaluate",
return_value=EvaluatedValue(),
) as evaluate:
asyncio.run(
handler.evaluate_python_expression(
EvaluatePythonExpressionRequest(expression="self.state")
)
)

_expression, runtime_context = evaluate.call_args.args
assert runtime_context["tuple_"] is None
assert runtime_context["input_"] is None

def test_evaluator_exception_propagates(self, handler):
# If the evaluator raises (bad syntax, attribute error in the user's
# expression, etc.), the handler must not swallow it — the RPC layer
# is responsible for surfacing the failure to the frontend.
with patch(
"core.architecture.handlers.control.evaluate_expression_handler"
".ExpressionEvaluator.evaluate",
side_effect=AttributeError("no such attribute"),
):
with pytest.raises(AttributeError, match="no such attribute"):
asyncio.run(
handler.evaluate_python_expression(
EvaluatePythonExpressionRequest(expression="self.missing")
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import asyncio
from types import SimpleNamespace
from unittest.mock import MagicMock

import pytest

from core.architecture.handlers.control.replay_current_tuple_handler import (
RetryCurrentTupleHandler,
)
from core.architecture.managers.pause_manager import PauseType
from proto.org.apache.texera.amber.engine.architecture.rpc import (
EmptyRequest,
EmptyReturn,
)
from proto.org.apache.texera.amber.engine.architecture.worker import WorkerState


def _build_handler(state: WorkerState, current_tuple, remaining_iter):
instance = RetryCurrentTupleHandler.__new__(RetryCurrentTupleHandler)
state_manager = MagicMock()
state_manager.confirm_state.side_effect = lambda *states: state in states
instance.context = SimpleNamespace(
state_manager=state_manager,
tuple_processing_manager=SimpleNamespace(
current_input_tuple=current_tuple,
current_input_tuple_iter=iter(remaining_iter),
),
pause_manager=MagicMock(),
)
return instance


class TestRetryCurrentTupleHandler:
@pytest.fixture
def running_handler(self):
return _build_handler(
WorkerState.RUNNING,
current_tuple={"col": "current"},
remaining_iter=[{"col": "next"}],
)

def test_returns_empty_return(self, running_handler):
result = asyncio.run(running_handler.retry_current_tuple(EmptyRequest()))
assert isinstance(result, EmptyReturn)

def test_chains_current_tuple_back_onto_iterator(self, running_handler):
asyncio.run(running_handler.retry_current_tuple(EmptyRequest()))
# The iterator must now yield the current tuple first, then the
# tuples that were already queued.
chained = list(
running_handler.context.tuple_processing_manager.current_input_tuple_iter
)
assert chained == [{"col": "current"}, {"col": "next"}]

def test_resumes_user_and_exception_pause_in_order(self, running_handler):
asyncio.run(running_handler.retry_current_tuple(EmptyRequest()))
actual = [
call.args[0]
for call in running_handler.context.pause_manager.resume.call_args_list
]
assert actual == [PauseType.USER_PAUSE, PauseType.EXCEPTION_PAUSE]

def test_does_not_resume_debug_pause(self, running_handler):
# Unlike WorkerDebugCommandHandler, retry only releases USER and
# EXCEPTION pauses — DEBUG_PAUSE must remain in effect so an active
# debugging session is not silently dropped.
asyncio.run(running_handler.retry_current_tuple(EmptyRequest()))
resumed = {
call.args[0]
for call in running_handler.context.pause_manager.resume.call_args_list
}
assert PauseType.DEBUG_PAUSE not in resumed

def test_no_op_when_state_is_completed(self):
completed_handler = _build_handler(
WorkerState.COMPLETED,
current_tuple={"col": "current"},
remaining_iter=[{"col": "next"}],
)
result = asyncio.run(completed_handler.retry_current_tuple(EmptyRequest()))

# Iterator must be untouched (no chaining), and no pause type is
# resumed — replaying a tuple after completion is meaningless.
remaining = list(
completed_handler.context.tuple_processing_manager.current_input_tuple_iter
)
assert remaining == [{"col": "next"}]
completed_handler.context.pause_manager.resume.assert_not_called()
assert isinstance(result, EmptyReturn)

def test_chains_even_when_remaining_iter_is_exhausted(self):
handler = _build_handler(
WorkerState.RUNNING,
current_tuple={"col": "lone"},
remaining_iter=[],
)
asyncio.run(handler.retry_current_tuple(EmptyRequest()))
chained = list(
handler.context.tuple_processing_manager.current_input_tuple_iter
)
assert chained == [{"col": "lone"}]

def test_paused_state_still_chains_and_resumes(self):
# The completion guard is `if not confirm_state(COMPLETED)`, so every
# other state — RUNNING, READY, PAUSED, UNINITIALIZED — must take the
# chain+resume path. PAUSED is the most likely real-world entry point
# (the user hits "retry" while the worker is paused on an exception).
handler = _build_handler(
WorkerState.PAUSED,
current_tuple={"col": "current"},
remaining_iter=[{"col": "next"}],
)
asyncio.run(handler.retry_current_tuple(EmptyRequest()))

chained = list(
handler.context.tuple_processing_manager.current_input_tuple_iter
)
assert chained == [{"col": "current"}, {"col": "next"}]
resumed = [
call.args[0] for call in handler.context.pause_manager.resume.call_args_list
]
assert resumed == [PauseType.USER_PAUSE, PauseType.EXCEPTION_PAUSE]
Loading