Skip to content

Commit

Permalink
chore(execution-engine): Some stream-related LambdaError are unjoinab…
Browse files Browse the repository at this point in the history
…le b/c: canon stream replaces normal stream, when canon stream is used, it is materialized and its size is known
  • Loading branch information
raftedproc committed Mar 9, 2023
1 parent 47a7a87 commit 233e1d3
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 17 deletions.
9 changes: 0 additions & 9 deletions air/src/execution_step/errors/catchable_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,6 @@ impl Joinable for CatchableError {
log_join!(" waiting for an argument with name '{}'", var_name);
true
}
LambdaApplierError(LambdaError::StreamNotHaveEnoughValues { stream_size, idx }) => {
log_join!(" waiting for an argument with idx '{}' on stream with size '{}'", idx, stream_size);
true
}
LambdaApplierError(LambdaError::EmptyStream) => {
log_join!(" waiting on empty stream for path ");
true
}

_ => false,
}
}
Expand Down
2 changes: 1 addition & 1 deletion air/src/execution_step/lambda_applier/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fn select_by_path_from_stream<'value>(
let value = lambda_to_execution_error!(stream
.peekable()
.nth(idx as usize)
.ok_or(LambdaError::StreamNotHaveEnoughValues { stream_size, idx }))?;
.ok_or(LambdaError::CanonStreamNotHaveEnoughValues { stream_size, idx }))?;

let result = select_by_path_from_scalar(value, body.iter(), exec_ctx)?;
let select_result = StreamSelectResult::from_cow(result, idx);
Expand Down
2 changes: 1 addition & 1 deletion air/src/execution_step/lambda_applier/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use thiserror::Error as ThisError;
#[derive(Debug, Clone, ThisError)]
pub enum LambdaError {
#[error("lambda is applied to a stream that have only '{stream_size}' elements, but '{idx}' requested")]
StreamNotHaveEnoughValues { stream_size: usize, idx: u32 },
CanonStreamNotHaveEnoughValues { stream_size: usize, idx: u32 },

/// An error occurred while trying to apply lambda to an empty stream.
#[error("lambda is applied to an empty stream")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@ fn wait_on_empty_stream_json_path() {
)
)"#);

let result = checked_call_vm!(local_vm, <_>::default(), join_stream_script, "", "");
print_trace(&result, "");
let result = local_vm.call(&join_stream_script, "", "", <_>::default()).unwrap();
let actual_trace = trace_from_result(&result);

assert_eq!(actual_trace.len(), 2); // only the first call and canon should produce a trace
let expected_error =
CatchableError::LambdaApplierError(LambdaError::CanonStreamNotHaveEnoughValues { stream_size: 0, idx: 0 });
assert!(check_error(&result, expected_error));
}

#[test]
Expand Down
10 changes: 7 additions & 3 deletions air/tests/test_module/features/lambda/flattening.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

use air::CatchableError;
use air_test_utils::prelude::*;

use std::cell::RefCell;
Expand Down Expand Up @@ -155,11 +156,14 @@ fn flattening_empty_values() {
)
"#);

let result = checked_call_vm!(set_variable_vm, <_>::default(), script.clone(), "", "");
let result = checked_call_vm!(local_vm, <_>::default(), script, "", result.data);
let result = set_variable_vm.call(script.clone(), "", "", <_>::default()).unwrap();
let result = local_vm.call(script.clone(), "", result.data, <_>::default()).unwrap();

assert!(is_interpreter_succeded(&result));
assert!(!is_interpreter_succeded(&result));
assert_eq!(closure_call_args.args_var, Rc::new(RefCell::new(vec![])));
let expected_error =
CatchableError::LambdaApplierError(air::LambdaError::CanonStreamNotHaveEnoughValues { stream_size: 1, idx: 1 });
assert!(check_error(&result, expected_error));
}

#[test]
Expand Down

0 comments on commit 233e1d3

Please sign in to comment.