Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions datafusion/src/datasource/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ impl CsvFile {
let schema = Arc::new(match options.schema {
Some(s) => s.clone(),
None => {
let mut filenames: Vec<String> = vec![];
common::build_file_list(path, &mut filenames, options.file_extension)?;
let filenames = common::build_file_list(path, options.file_extension)?;
if filenames.is_empty() {
return Err(DataFusionError::Plan(format!(
"No files found at {path} with file extension {file_extension}",
Expand Down
17 changes: 14 additions & 3 deletions datafusion/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,19 @@ pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatc
.map_err(DataFusionError::from)
}

/// Recursively build a list of files in a directory with a given extension
pub fn build_file_list(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Result<()> {
/// Recursively builds a list of files in a directory with a given extension
pub fn build_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
let mut filenames: Vec<String> = Vec::new();
build_file_list_recurse(dir, &mut filenames, ext)?;
Ok(filenames)
}

/// Recursively build a list of files in a directory with a given extension with an accumulator list
fn build_file_list_recurse(
dir: &str,
filenames: &mut Vec<String>,
ext: &str,
) -> Result<()> {
let metadata = metadata(dir)?;
if metadata.is_file() {
if dir.ends_with(ext) {
Expand All @@ -91,7 +102,7 @@ pub fn build_file_list(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Res
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
build_file_list(path_name, filenames, ext)?;
build_file_list_recurse(path_name, filenames, ext)?;
} else if path_name.ends_with(ext) {
filenames.push(path_name.to_string());
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ impl CsvExec {
) -> Result<Self> {
let file_extension = String::from(options.file_extension);

let mut filenames: Vec<String> = vec![];
common::build_file_list(path, &mut filenames, file_extension.as_str())?;
let filenames = common::build_file_list(path, file_extension.as_str())?;
if filenames.is_empty() {
return Err(DataFusionError::Execution(format!(
"No files found at {path} with file extension {file_extension}",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl ExecutionPlan for HashJoinExec {
num_output_rows: 0,
join_time: 0,
random_state: self.random_state.clone(),
visited_left_side: visited_left_side,
visited_left_side,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this fixes a clippy warning

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed the same thing #259

is_exhausted: false,
}))
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ impl ParquetExec {
) -> Result<Self> {
// build a list of filenames from the specified path, which could be a single file or
// a directory containing one or more parquet files
let mut filenames: Vec<String> = vec![];
common::build_file_list(path, &mut filenames, ".parquet")?;
let filenames = common::build_file_list(path, ".parquet")?;
if filenames.is_empty() {
Err(DataFusionError::Plan(format!(
"No Parquet files found at path {}",
Expand Down