Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions datafusion/sqllogictest/bin/sqllogictests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use datafusion::common::utils::get_available_parallelism;
use datafusion::common::{exec_datafusion_err, exec_err, DataFusionError, Result};
use datafusion_sqllogictest::{
df_value_validator, read_dir_recursive, setup_scratch_dir, should_skip_file,
should_skip_record, value_normalizer, CurrentlyExecutingSqlTracker, DataFusion,
DataFusionSubstraitRoundTrip, Filter, TestContext,
should_skip_record, take_last_validation_failure, value_normalizer,
CurrentlyExecutingSqlTracker, DataFusion, DataFusionSubstraitRoundTrip, Filter,
TestContext, ValidationFailure,
};
use futures::stream::StreamExt;
use indicatif::{
Expand All @@ -33,7 +34,7 @@ use log::Level::Info;
use log::{info, log_enabled};
use sqllogictest::{
parse_file, strict_column_validator, AsyncDB, Condition, MakeConnection, Normalizer,
Record, Validator,
Record, TestErrorKind, Validator,
};

#[cfg(feature = "postgres")]
Expand All @@ -43,6 +44,7 @@ use crate::postgres_container::{
use datafusion::common::runtime::SpawnedTask;
use futures::FutureExt;
use std::ffi::OsStr;
use std::fmt::Write;
use std::fs;
use std::path::{Path, PathBuf};

Expand Down Expand Up @@ -384,6 +386,31 @@ async fn run_file_in_runner<D: AsyncDB, M: MakeConnection<Conn = D>>(
continue;
}
if let Err(err) = runner.run_async(record).await {
if let TestErrorKind::QueryResultMismatch { sql, .. } = err.kind() {
if let Some(failure) = take_last_validation_failure() {
let ValidationFailure {
expected_snapshot,
actual_snapshot,
note,
} = failure;
let mut rendered = String::new();
let _ = writeln!(
&mut rendered,
"query result mismatch (DataFusion validator):"
);
let _ = writeln!(&mut rendered, "[SQL] {sql}");
if let Some(note) = note {
let _ = writeln!(&mut rendered, "[Reason] {note}");
}
let _ = writeln!(&mut rendered, "[Expected]");
let _ = writeln!(&mut rendered, "{expected_snapshot}");
let _ = writeln!(&mut rendered, "[Actual]");
let _ = writeln!(&mut rendered, "{actual_snapshot}");
let _ = writeln!(&mut rendered, "at {}", err.location());
errs.push(rendered);
continue;
}
}
errs.push(format!("{err}"));
}
}
Expand Down
185 changes: 147 additions & 38 deletions datafusion/sqllogictest/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use itertools::Itertools;
use log::Level::Warn;
use log::{info, log_enabled, warn};
use sqllogictest::Normalizer;
use std::cell::RefCell;
use std::fs;
use std::path::{Path, PathBuf};

Expand Down Expand Up @@ -82,63 +83,135 @@ pub fn df_value_validator(
actual: &[Vec<String>],
expected: &[String],
) -> bool {
// Support ignore marker <slt:ignore> to skip volatile parts of output.
const IGNORE_MARKER: &str = "<slt:ignore>";
reset_validation_failure();

let contains_ignore_marker = expected.iter().any(|line| line.contains(IGNORE_MARKER));

let normalized_expected = expected.iter().map(normalizer).collect::<Vec<_>>();
let normalized_actual = actual
let normalized_expected: Vec<String> =
expected.iter().map(normalizer).collect::<Vec<_>>();
let normalized_actual: Vec<String> = actual
.iter()
.map(|strs| strs.iter().join(" "))
.map(|str| str.trim_end().to_string())
.collect_vec();

// If ignore marker present, perform fragment-based matching on the full snapshot.
if contains_ignore_marker {
let expected_snapshot = normalized_expected.join("\n");
let actual_snapshot = normalized_actual.join("\n");
let fragments: Vec<&str> = expected_snapshot.split(IGNORE_MARKER).collect();
let mut pos = 0;
for (i, frag) in fragments.iter().enumerate() {
if frag.is_empty() {
continue;
match compare_with_ignore_markers(&expected_snapshot, &actual_snapshot) {
Ok(()) => true,
Err(note) => {
record_validation_failure(ValidationFailure {
expected_snapshot,
actual_snapshot,
note: Some(note),
});
log_value_mismatch(&normalized_actual, &normalized_expected);
false
}
if let Some(idx) = actual_snapshot[pos..].find(frag) {
// Edge case: The following example is expected to fail
// Actual - 'foo bar baz'
// Expected - 'bar <slt:ignore>'
if (i == 0) && (idx != 0) {
return false;
}

pos += idx + frag.len();
}
} else if normalized_actual == normalized_expected {
true
} else {
log_value_mismatch(&normalized_actual, &normalized_expected);
false
}
}

pub fn is_spark_path(relative_path: &Path) -> bool {
relative_path.starts_with("spark/")
}

/// Details captured when validation fails so that the error reporter can render a consistent
/// message (e.g. suppressing `<slt:ignore>` false positives).
#[derive(Debug, Clone)]
pub struct ValidationFailure {
pub expected_snapshot: String,
pub actual_snapshot: String,
pub note: Option<String>,
}

thread_local! {
static LAST_VALIDATION_FAILURE: RefCell<Option<ValidationFailure>> = const { RefCell::new(None) };
}

pub fn take_last_validation_failure() -> Option<ValidationFailure> {
LAST_VALIDATION_FAILURE.with(|cell| cell.borrow_mut().take())
}

fn record_validation_failure(failure: ValidationFailure) {
LAST_VALIDATION_FAILURE.with(|cell| {
*cell.borrow_mut() = Some(failure);
});
}

fn reset_validation_failure() {
LAST_VALIDATION_FAILURE.with(|cell| {
cell.borrow_mut().take();
});
}

fn log_value_mismatch(actual: &[String], expected: &[String]) {
if !log_enabled!(Warn) {
return;
}

warn!("df validation failed. actual vs expected:");
for i in 0..actual.len() {
warn!("[{i}] {}<eol>", actual[i]);
warn!(
"[{i}] {}<eol>",
if expected.len() > i {
&expected[i]
} else {
return false;
"No more results"
}
}
return true;
);
}
}

const IGNORE_MARKER: &str = "<slt:ignore>";

fn compare_with_ignore_markers(
expected_snapshot: &str,
actual_snapshot: &str,
) -> Result<(), String> {
let fragments: Vec<&str> = expected_snapshot.split(IGNORE_MARKER).collect();
let mut pos = 0usize;
let trailing_wildcard = expected_snapshot.ends_with(IGNORE_MARKER);
for (i, frag) in fragments.iter().enumerate() {
if frag.is_empty() {
continue;
}

if log_enabled!(Warn) && normalized_actual != normalized_expected {
warn!("df validation failed. actual vs expected:");
for i in 0..normalized_actual.len() {
warn!("[{i}] {}<eol>", normalized_actual[i]);
warn!(
"[{i}] {}<eol>",
if normalized_expected.len() > i {
&normalized_expected[i]
} else {
"No more results"
}
);
if let Some(idx) = actual_snapshot[pos..].find(frag) {
if i == 0 && idx != 0 {
let unexpected = actual_snapshot[pos..pos + idx]
.chars()
.take(32)
.collect::<String>();
return Err(format!(
"expected output should start with fragment \"{frag}\" but actual output begins with \"{unexpected}\""
));
}
pos += idx + frag.len();
} else {
let tail = &actual_snapshot[pos..];
let preview = tail.chars().take(32).collect::<String>();
return Err(format!(
"fragment \"{frag}\" not found in actual output after byte offset {pos} (remaining output starts with \"{preview}\")"
));
}
}

normalized_actual == normalized_expected
}
if !trailing_wildcard && pos != actual_snapshot.len() {
let preview = actual_snapshot[pos..].chars().take(32).collect::<String>();
return Err(format!(
"actual output contains additional trailing data starting with \"{preview}\""
));
}

pub fn is_spark_path(relative_path: &Path) -> bool {
relative_path.starts_with("spark/")
Ok(())
}

#[cfg(test)]
Expand All @@ -156,4 +229,40 @@ mod tests {

assert!(!df_value_validator(value_normalizer, &actual, &expected));
}

#[test]
fn ignore_marker_failure_records_context() {
let actual = vec![
vec!["0".to_string()],
vec!["1".to_string()],
vec!["2".to_string()],
vec!["3".to_string()],
];
let expected = vec![
"<slt:ignore>".to_string(),
"1".to_string(),
"3<slt:ignore>".to_string(),
];

assert!(!df_value_validator(value_normalizer, &actual, &expected));
let failure = take_last_validation_failure().expect("failure recorded");
assert!(failure.expected_snapshot.contains("<slt:ignore>"));
assert!(failure.actual_snapshot.contains("2"));
assert!(failure.note.expect("note").contains("fragment"));
}

#[test]
fn ignore_marker_requires_exact_end_without_trailing_wildcard() {
let actual = vec![
vec!["0".to_string()],
vec!["1".to_string()],
vec!["2".to_string()],
vec!["3".to_string()],
];
let expected = vec!["<slt:ignore>".to_string(), "2".to_string()];

assert!(!df_value_validator(value_normalizer, &actual, &expected));
let failure = take_last_validation_failure().expect("failure recorded");
assert!(failure.note.expect("note").contains("trailing data"));
}
}