Skip to content
Please note that GitHub no longer supports Internet Explorer.

We recommend upgrading to the latest Microsoft Edge, Google Chrome, or Firefox.

Learn more
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-6086: [Rust] [DataFusion] Add support for partitioned Parquet data sources #5494

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -37,23 +37,27 @@ use parquet::file::reader::*;

use crate::datasource::{ScanResult, TableProvider};
use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::common;
use crate::execution::physical_plan::BatchIterator;

/// Table-based representation of a `ParquetFile`
pub struct ParquetTable {
filename: String,
filenames: Vec<String>,
schema: Arc<Schema>,
}

impl ParquetTable {
/// Attempt to initialize a new `ParquetTable` from a file path
pub fn try_new(filename: &str) -> Result<Self> {
let parquet_file = ParquetFile::open(filename, None, 0)?;
let schema = parquet_file.projection_schema.clone();
Ok(Self {
filename: filename.to_string(),
schema,
})
pub fn try_new(path: &str) -> Result<Self> {
let mut filenames: Vec<String> = vec![];
common::build_file_list(path, &mut filenames, ".parquet")?;
if filenames.is_empty() {
Err(ExecutionError::General("No files found".to_string()))
} else {
let parquet_file = ParquetFile::open(&filenames[0], None, 0)?;
let schema = parquet_file.projection_schema.clone();

This comment has been minimized.

Copy link
@paddyhoran

paddyhoran Sep 25, 2019

Contributor

What happens if the schema of the files differ? I guess it just fails are execution time when a different schema is encountered?

This comment has been minimized.

Copy link
@andygrove

andygrove Sep 25, 2019

Author Member

Yes, this code assumes that all of the partitions have the same schema currently. It's pretty basic. I imagine we could eventually have schema merging.

This comment has been minimized.

Copy link
@andygrove

andygrove Sep 25, 2019

Author Member

I will write up a JIRA to add validation that all the partitions have the same schema. That would be a nice improvement.

This comment has been minimized.

Copy link
@paddyhoran

paddyhoran Sep 25, 2019

Contributor

Makes sense. Feel free to merge.

Ok(Self { filenames, schema })
}
}
}

@@ -70,17 +74,16 @@ impl TableProvider for ParquetTable {
projection: &Option<Vec<usize>>,
batch_size: usize,
) -> Result<Vec<ScanResult>> {
// note that this code currently assumes the filename is a file rather than a directory
// and therefore only returns a single partition
let parquet_file = match projection {
Some(p) => ParquetScanPartition::try_new(
&self.filename,
Some(p.clone()),
batch_size,
)?,
None => ParquetScanPartition::try_new(&self.filename, None, batch_size)?,
};
Ok(vec![Arc::new(Mutex::new(parquet_file))])
Ok(self
.filenames
.iter()
.map(|filename| {
ParquetScanPartition::try_new(filename, projection.clone(), batch_size)
.and_then(|part| {
Ok(Arc::new(Mutex::new(part)) as Arc<Mutex<dyn BatchIterator>>)
})
})
.collect::<Result<Vec<_>>>()?)
}
}

@@ -17,9 +17,11 @@

//! Defines common code used in execution plans

use std::fs;
use std::fs::metadata;
use std::sync::{Arc, Mutex};

use crate::error::Result;
use crate::error::{ExecutionError, Result};
use crate::execution::physical_plan::BatchIterator;

use arrow::datatypes::Schema;
@@ -75,3 +77,30 @@ pub fn collect(it: Arc<Mutex<dyn BatchIterator>>) -> Result<Vec<RecordBatch>> {
}
}
}

/// Recursively build a list of files in a directory with a given extension
pub fn build_file_list(dir: &str, filenames: &mut Vec<String>, ext: &str) -> Result<()> {
let metadata = metadata(dir)?;
if metadata.is_file() {
if dir.ends_with(ext) {
filenames.push(dir.to_string());
}
} else {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
build_file_list(path_name, filenames, ext)?;
} else {
if path_name.ends_with(ext) {
filenames.push(path_name.to_string());
}
}
} else {
return Err(ExecutionError::General("Invalid path".to_string()));
}
}
}
Ok(())
}
@@ -17,12 +17,11 @@

//! Execution plan for reading CSV files

use std::fs;
use std::fs::metadata;
use std::fs::File;
use std::sync::{Arc, Mutex};

use crate::error::{ExecutionError, Result};
use crate::error::Result;
use crate::execution::physical_plan::common;
use crate::execution::physical_plan::{BatchIterator, ExecutionPlan, Partition};
use arrow::csv;
use arrow::datatypes::Schema;
@@ -51,7 +50,7 @@ impl ExecutionPlan for CsvExec {
/// Get the partitions for this execution plan. Each partition can be executed in parallel.
fn partitions(&self) -> Result<Vec<Arc<dyn Partition>>> {
let mut filenames: Vec<String> = vec![];
self.build_file_list(&self.path, &mut filenames)?;
common::build_file_list(&self.path, &mut filenames, ".csv")?;
let partitions = filenames
.iter()
.map(|filename| {
@@ -85,33 +84,6 @@ impl CsvExec {
batch_size,
})
}

/// Recursively build a list of csv files in a directory
fn build_file_list(&self, dir: &str, filenames: &mut Vec<String>) -> Result<()> {
let metadata = metadata(dir)?;
if metadata.is_file() {
if dir.ends_with(".csv") {
filenames.push(dir.to_string());
}
} else {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if let Some(path_name) = path.to_str() {
if path.is_dir() {
self.build_file_list(path_name, filenames)?;
} else {
if path_name.ends_with(".csv") {
filenames.push(path_name.to_string());
}
}
} else {
return Err(ExecutionError::General("Invalid path".to_string()));
}
}
}
Ok(())
}
}

/// CSV Partition
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.