diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 7aca0fdd6e8d..ec705fc9ba06 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -21,8 +21,8 @@ use datafusion::common::utils::get_available_parallelism; use datafusion::common::{exec_datafusion_err, exec_err, DataFusionError, Result}; use datafusion_sqllogictest::{ df_value_validator, read_dir_recursive, setup_scratch_dir, should_skip_file, - should_skip_record, value_normalizer, DataFusion, DataFusionSubstraitRoundTrip, - Filter, TestContext, + should_skip_record, value_normalizer, CurrentlyExecutingSqlTracker, DataFusion, + DataFusionSubstraitRoundTrip, Filter, TestContext, }; use futures::stream::StreamExt; use indicatif::{ @@ -41,6 +41,7 @@ use crate::postgres_container::{ initialize_postgres_container, terminate_postgres_container, }; use datafusion::common::runtime::SpawnedTask; +use futures::FutureExt; use std::ffi::OsStr; use std::fs; use std::path::{Path, PathBuf}; @@ -154,6 +155,11 @@ async fn run_tests() -> Result<()> { let m_style_clone = m_style.clone(); let filters = options.filters.clone(); + let relative_path = test_file.relative_path.clone(); + + let currently_running_sql_tracker = CurrentlyExecutingSqlTracker::new(); + let currently_running_sql_tracker_clone = + currently_running_sql_tracker.clone(); SpawnedTask::spawn(async move { match ( options.postgres_runner, @@ -167,6 +173,7 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, filters.as_ref(), + currently_running_sql_tracker_clone, ) .await? } @@ -177,12 +184,19 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, filters.as_ref(), + currently_running_sql_tracker_clone, ) .await? } (false, true, _) => { - run_complete_file(test_file, validator, m_clone, m_style_clone) - .await? + run_complete_file( + test_file, + validator, + m_clone, + m_style_clone, + currently_running_sql_tracker_clone, + ) + .await? } (true, false, _) => { run_test_file_with_postgres( @@ -191,6 +205,7 @@ async fn run_tests() -> Result<()> { m_clone, m_style_clone, filters.as_ref(), + currently_running_sql_tracker_clone, ) .await? } @@ -200,6 +215,7 @@ async fn run_tests() -> Result<()> { validator, m_clone, m_style_clone, + currently_running_sql_tracker_clone, ) .await? } @@ -207,14 +223,42 @@ async fn run_tests() -> Result<()> { Ok(()) as Result<()> }) .join() + .map(move |result| (result, relative_path, currently_running_sql_tracker)) }) // run up to num_cpus streams in parallel .buffer_unordered(options.test_threads) - .flat_map(|result| { + .flat_map(|(result, test_file_path, current_sql)| { // Filter out any Ok() leaving only the DataFusionErrors futures::stream::iter(match result { // Tokio panic error - Err(e) => Some(DataFusionError::External(Box::new(e))), + Err(e) => { + let error = DataFusionError::External(Box::new(e)); + let current_sql = current_sql.get_currently_running_sqls(); + + if current_sql.is_empty() { + Some(error.context(format!( + "failure in {} with no currently running sql tracked", + test_file_path.display() + ))) + } else if current_sql.len() == 1 { + let sql = ¤t_sql[0]; + Some(error.context(format!( + "failure in {} for sql {sql}", + test_file_path.display() + ))) + } else { + let sqls = current_sql + .iter() + .enumerate() + .map(|(i, sql)| format!("\n[{}]: {}", i + 1, sql)) + .collect::(); + Some(error.context(format!( + "failure in {} for multiple currently running sqls: {}", + test_file_path.display(), + sqls + ))) + } + } Ok(thread_result) => thread_result.err(), }) }) @@ -247,6 +291,7 @@ async fn run_test_file_substrait_round_trip( mp: MultiProgress, mp_style: ProgressStyle, filters: &[Filter], + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { let TestFile { path, @@ -269,7 +314,8 @@ async fn run_test_file_substrait_round_trip( test_ctx.session_ctx().clone(), relative_path.clone(), pb.clone(), - )) + ) + .with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone())) }); runner.add_label("DatafusionSubstraitRoundTrip"); runner.with_column_validator(strict_column_validator); @@ -286,6 +332,7 @@ async fn run_test_file( mp: MultiProgress, mp_style: ProgressStyle, filters: &[Filter], + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { let TestFile { path, @@ -308,7 +355,8 @@ async fn run_test_file( test_ctx.session_ctx().clone(), relative_path.clone(), pb.clone(), - )) + ) + .with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone())) }); runner.add_label("Datafusion"); runner.with_column_validator(strict_column_validator); @@ -402,6 +450,7 @@ async fn run_test_file_with_postgres( mp: MultiProgress, mp_style: ProgressStyle, filters: &[Filter], + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { use datafusion_sqllogictest::Postgres; let TestFile { @@ -417,7 +466,11 @@ async fn run_test_file_with_postgres( pb.set_message(format!("{:?}", &relative_path)); let mut runner = sqllogictest::Runner::new(|| { - Postgres::connect(relative_path.clone(), pb.clone()) + Postgres::connect_with_tracked_sql( + relative_path.clone(), + pb.clone(), + currently_executing_sql_tracker.clone(), + ) }); runner.add_label("postgres"); runner.with_column_validator(strict_column_validator); @@ -435,6 +488,7 @@ async fn run_test_file_with_postgres( _mp: MultiProgress, _mp_style: ProgressStyle, _filters: &[Filter], + _currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { use datafusion::common::plan_err; plan_err!("Can not run with postgres as postgres feature is not enabled") @@ -445,6 +499,7 @@ async fn run_complete_file( validator: Validator, mp: MultiProgress, mp_style: ProgressStyle, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { let TestFile { path, @@ -470,7 +525,8 @@ async fn run_complete_file( test_ctx.session_ctx().clone(), relative_path.clone(), pb.clone(), - )) + ) + .with_currently_executing_sql_tracker(currently_executing_sql_tracker.clone())) }); let col_separator = " "; @@ -497,6 +553,7 @@ async fn run_complete_file_with_postgres( validator: Validator, mp: MultiProgress, mp_style: ProgressStyle, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { use datafusion_sqllogictest::Postgres; let TestFile { @@ -516,7 +573,11 @@ async fn run_complete_file_with_postgres( pb.set_message(format!("{:?}", &relative_path)); let mut runner = sqllogictest::Runner::new(|| { - Postgres::connect(relative_path.clone(), pb.clone()) + Postgres::connect_with_tracked_sql( + relative_path.clone(), + pb.clone(), + currently_executing_sql_tracker.clone(), + ) }); runner.add_label("postgres"); runner.with_column_validator(strict_column_validator); @@ -547,6 +608,7 @@ async fn run_complete_file_with_postgres( _validator: Validator, _mp: MultiProgress, _mp_style: ProgressStyle, + _currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, ) -> Result<()> { use datafusion::common::plan_err; plan_err!("Can not run with postgres as postgres feature is not enabled") diff --git a/datafusion/sqllogictest/src/engines/currently_executed_sql.rs b/datafusion/sqllogictest/src/engines/currently_executed_sql.rs new file mode 100644 index 000000000000..5b1979b4ee9a --- /dev/null +++ b/datafusion/sqllogictest/src/engines/currently_executed_sql.rs @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::atomic::AtomicUsize; +use std::sync::{Arc, Mutex}; + +/// Hold the currently executed SQL statements. +/// This is used to save the currently running SQLs in case of a crash. +#[derive(Clone)] +pub struct CurrentlyExecutingSqlTracker { + /// The index of the SQL statement. + /// Used to uniquely identify each SQL statement even if they are the same. + sql_index: Arc, + /// Lock to store the currently executed SQL statement. + /// It DOES NOT hold the lock for the duration of query execution and only execute the lock + /// when updating the currently executed SQL statement to allow for saving the last executed SQL + /// in case of a crash. + currently_executed_sqls: Arc>>, +} + +impl Default for CurrentlyExecutingSqlTracker { + fn default() -> Self { + Self::new() + } +} + +impl CurrentlyExecutingSqlTracker { + pub fn new() -> Self { + Self { + sql_index: Arc::new(AtomicUsize::new(0)), + currently_executed_sqls: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Set the currently executed SQL statement. + /// + /// Returns a key to use to remove the SQL statement when done. + /// + /// We are not returning a guard that will automatically remove the SQL statement when dropped. + /// as on panic the drop can be called, and it will remove the SQL statement before we can log it. + #[must_use = "The returned index must be used to remove the SQL statement when done."] + pub fn set_sql(&self, sql: impl Into) -> usize { + let index = self + .sql_index + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.currently_executed_sqls + .lock() + .unwrap_or_else(|e| e.into_inner()) + .insert(index, sql.into()); + index + } + + /// Remove the currently executed SQL statement by the provided key that was returned by [`Self::set_sql`]. + pub fn remove_sql(&self, index: usize) { + self.currently_executed_sqls + .lock() + .unwrap_or_else(|e| e.into_inner()) + .remove(&index); + } + + /// Get the currently executed SQL statements. + pub fn get_currently_running_sqls(&self) -> Vec { + self.currently_executed_sqls + .lock() + .unwrap_or_else(|e| e.into_inner()) + .values() + .cloned() + .collect() + } +} diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs index 45deefdc9bbd..13c107408cf5 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs @@ -19,6 +19,9 @@ use std::sync::Arc; use std::{path::PathBuf, time::Duration}; use super::{error::Result, normalize, DFSqlLogicTestError}; +use crate::engines::currently_executed_sql::CurrentlyExecutingSqlTracker; +use crate::engines::output::{DFColumnType, DFOutput}; +use crate::is_spark_path; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::physical_plan::common::collect; @@ -30,13 +33,11 @@ use log::{debug, log_enabled, warn}; use sqllogictest::DBOutput; use tokio::time::Instant; -use crate::engines::output::{DFColumnType, DFOutput}; -use crate::is_spark_path; - pub struct DataFusion { ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, } impl DataFusion { @@ -45,6 +46,20 @@ impl DataFusion { ctx, relative_path, pb, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker::default(), + } + } + + /// Add a tracker that will track the currently executed SQL statement. + /// + /// This is useful for logging and debugging purposes. + pub fn with_currently_executing_sql_tracker( + self, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, + ) -> Self { + Self { + currently_executing_sql_tracker, + ..self } } @@ -79,10 +94,14 @@ impl sqllogictest::AsyncDB for DataFusion { ); } + let tracked_sql = self.currently_executing_sql_tracker.set_sql(sql); + let start = Instant::now(); let result = run_query(&self.ctx, is_spark_path(&self.relative_path), sql).await; let duration = start.elapsed(); + self.currently_executing_sql_tracker.remove_sql(tracked_sql); + if duration.gt(&Duration::from_millis(500)) { self.update_slow_count(); } diff --git a/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs index 2df93f0dede3..8ba182763f5e 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_substrait_roundtrip_engine/runner.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::{path::PathBuf, time::Duration}; +use crate::engines::currently_executed_sql::CurrentlyExecutingSqlTracker; use crate::engines::datafusion_engine::Result; use crate::engines::output::{DFColumnType, DFOutput}; use crate::{convert_batches, convert_schema_to_types, DFSqlLogicTestError}; @@ -39,6 +40,7 @@ pub struct DataFusionSubstraitRoundTrip { ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, } impl DataFusionSubstraitRoundTrip { @@ -47,6 +49,20 @@ impl DataFusionSubstraitRoundTrip { ctx, relative_path, pb, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker::default(), + } + } + + /// Add a tracker that will track the currently executed SQL statement. + /// + /// This is useful for logging and debugging purposes. + pub fn with_currently_executing_sql_tracker( + self, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, + ) -> Self { + Self { + currently_executing_sql_tracker, + ..self } } @@ -81,10 +97,14 @@ impl sqllogictest::AsyncDB for DataFusionSubstraitRoundTrip { ); } + let tracked_sql = self.currently_executing_sql_tracker.set_sql(sql); + let start = Instant::now(); let result = run_query_substrait_round_trip(&self.ctx, sql).await; let duration = start.elapsed(); + self.currently_executing_sql_tracker.remove_sql(tracked_sql); + if duration.gt(&Duration::from_millis(500)) { self.update_slow_count(); } diff --git a/datafusion/sqllogictest/src/engines/mod.rs b/datafusion/sqllogictest/src/engines/mod.rs index ef6335ddbed6..8a6f44de4391 100644 --- a/datafusion/sqllogictest/src/engines/mod.rs +++ b/datafusion/sqllogictest/src/engines/mod.rs @@ -17,6 +17,7 @@ /// Implementation of sqllogictest for datafusion. mod conversion; +mod currently_executed_sql; mod datafusion_engine; mod datafusion_substrait_roundtrip_engine; mod output; @@ -29,6 +30,8 @@ pub use datafusion_substrait_roundtrip_engine::DataFusionSubstraitRoundTrip; pub use output::DFColumnType; pub use output::DFOutput; +pub use currently_executed_sql::CurrentlyExecutingSqlTracker; + #[cfg(feature = "postgres")] mod postgres_engine; diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs index 4d310711687f..1fafc9626a3f 100644 --- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs @@ -27,6 +27,7 @@ use std::str::FromStr; use std::time::Duration; use super::conversion::*; +use crate::engines::currently_executed_sql::CurrentlyExecutingSqlTracker; use crate::engines::output::{DFColumnType, DFOutput}; use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; use indicatif::ProgressBar; @@ -59,6 +60,7 @@ pub struct Postgres { /// Relative test file path relative_path: PathBuf, pb: ProgressBar, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, } impl Postgres { @@ -118,9 +120,34 @@ impl Postgres { spawned_task: Some(spawned_task), relative_path, pb, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker::default(), }) } + /// Creates a runner for executing queries against an existing postgres connection + /// with a tracker for currently executing SQL statements. + pub async fn connect_with_tracked_sql( + relative_path: PathBuf, + pb: ProgressBar, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, + ) -> Result { + let conn = Self::connect(relative_path, pb).await?; + Ok(conn.with_currently_executing_sql_tracker(currently_executing_sql_tracker)) + } + + /// Add a tracker that will track the currently executed SQL statement. + /// + /// This is useful for logging and debugging purposes. + pub fn with_currently_executing_sql_tracker( + self, + currently_executing_sql_tracker: CurrentlyExecutingSqlTracker, + ) -> Self { + Self { + currently_executing_sql_tracker, + ..self + } + } + fn get_client(&mut self) -> &mut tokio_postgres::Client { self.client.as_mut().expect("client is shutdown") } @@ -242,6 +269,8 @@ impl sqllogictest::AsyncDB for Postgres { sql ); + let tracked_sql = self.currently_executing_sql_tracker.set_sql(sql); + let lower_sql = sql.trim_start().to_ascii_lowercase(); let is_query_sql = { @@ -258,11 +287,15 @@ impl sqllogictest::AsyncDB for Postgres { if lower_sql.starts_with("copy") { self.pb.inc(1); - return self.run_copy_command(sql).await; + let result = self.run_copy_command(sql).await; + self.currently_executing_sql_tracker.remove_sql(tracked_sql); + + return result; } if !is_query_sql { self.get_client().execute(sql, &[]).await?; + self.currently_executing_sql_tracker.remove_sql(tracked_sql); self.pb.inc(1); return Ok(DBOutput::StatementComplete(0)); } @@ -292,6 +325,8 @@ impl sqllogictest::AsyncDB for Postgres { .collect() }; + self.currently_executing_sql_tracker.remove_sql(tracked_sql); + if rows.is_empty() && types.is_empty() { Ok(DBOutput::StatementComplete(0)) } else { diff --git a/datafusion/sqllogictest/src/lib.rs b/datafusion/sqllogictest/src/lib.rs index f3a78607242c..d9319dbbbdac 100644 --- a/datafusion/sqllogictest/src/lib.rs +++ b/datafusion/sqllogictest/src/lib.rs @@ -30,6 +30,7 @@ mod engines; pub use engines::convert_batches; pub use engines::convert_schema_to_types; +pub use engines::CurrentlyExecutingSqlTracker; pub use engines::DFColumnType; pub use engines::DFOutput; pub use engines::DFSqlLogicTestError;