Branch: master
Find file History
andygrove and kszucs ARROW-4602: [Rust] [DataFusion] Integrate query optimizer with Execut…
…ionContext

Instead of registering `DataSource` with the context, we now register `DataSourceProvider`. This trait has a `scan()` method where we can pass the projection.

`ExecutionContext` calls the optimizer rule for all SQL queries now, so that only the necessary columns are loaded from disk.

There is also a simpler API for registering CSV files with the context, with a `register_csv` method.

I added some criterion benchmarks too but they are not ideal since they load from disk each time. I am working on another PR to add support for running queries against RecordBatches already loaded into memory and will update the benchmarks to use these as part of that PR.

Author: Andy Grove <andygrove73@gmail.com>

Closes #3678 from andygrove/ARROW-4602 and squashes the following commits:

80ea3c9 <Andy Grove> Integrate projection push down rule with ExecutionContext
Latest commit 54fcb06 Feb 19, 2019

README.md

DataFusion

DataFusion is an in-memory query engine that uses Apache Arrow as the memory model

Status

The current code supports single-threaded execution of limited SQL queries (projection, selection, and aggregates) against CSV files. Parquet files will be supported shortly.

Here is a brief example for running a SQL query against a CSV file. See the examples directory for full examples.

fn main() {
    // create local execution context
    let mut ctx = ExecutionContext::new();

    // define schema for data source (csv file)
    let schema = Arc::new(Schema::new(vec![
        Field::new("c1", DataType::Utf8, false),
        Field::new("c2", DataType::UInt32, false),
        Field::new("c3", DataType::Int8, false),
        Field::new("c4", DataType::Int16, false),
        Field::new("c5", DataType::Int32, false),
        Field::new("c6", DataType::Int64, false),
        Field::new("c7", DataType::UInt8, false),
        Field::new("c8", DataType::UInt16, false),
        Field::new("c9", DataType::UInt32, false),
        Field::new("c10", DataType::UInt64, false),
        Field::new("c11", DataType::Float32, false),
        Field::new("c12", DataType::Float64, false),
        Field::new("c13", DataType::Utf8, false),
    ]));

    // register csv file with the execution context
    let csv_datasource = CsvDataSource::new(
        "../../testing/data/csv/aggregate_test_100.csv",
        schema.clone(),
        1024,
    );
    ctx.register_datasource("aggregate_test_100", Rc::new(RefCell::new(csv_datasource)));

    // execute the query
    let sql = "SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 WHERE c11 > 0.1 AND c11 < 0.9 GROUP BY c1";
    let relation = ctx.sql(&sql).unwrap();
    let mut results = relation.borrow_mut();

    // iterate over result batches
    while let Some(batch) = results.next().unwrap() {
        println!(
            "RecordBatch has {} rows and {} columns",
            batch.num_rows(),
            batch.num_columns()
        );

        let c1 = batch
            .column(0)
            .as_any()
            .downcast_ref::<BinaryArray>()
            .unwrap();

        let min = batch
            .column(1)
            .as_any()
            .downcast_ref::<Float64Array>()
            .unwrap();

        let max = batch
            .column(2)
            .as_any()
            .downcast_ref::<Float64Array>()
            .unwrap();

        for i in 0..batch.num_rows() {
            let c1_value: String = String::from_utf8(c1.value(i).to_vec()).unwrap();

            println!("{}, Min: {}, Max: {}", c1_value, min.value(i), max.value(i),);
        }
    }
}