Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-9754: [Rust] [DataFusion] Implement async in ExecutionPlan trait #8285

Closed
wants to merge 9 commits into from

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Sep 26, 2020

This PR implements async in the ExecutionPlan trait. I ran the TPC-H benchmark and the performance is about the same.

Master branch:

Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/parquet/100-240", file_format: "parquet" }
Query 1 iteration 0 took 14365 ms
Query 1 iteration 1 took 14284 ms
Query 1 iteration 2 took 14269 ms

This PR:

Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/parquet/100-240", file_format: "parquet" }
Query 1 iteration 0 took 14305 ms
Query 1 iteration 1 took 14372 ms
Query 1 iteration 2 took 14323 ms

EDIT: I previously posted perf numbers that were marginally better but that was due to using a more recent Rust nightly during the development of this PR.

@andygrove
Copy link
Member Author

Currently fails to compile due to

error[E0700]: hidden type for `impl Trait` captures lifetime that does not appear in bounds
   --> datafusion/src/execution/context.rs:352:10
    |
352 |     ) -> Result<()> {
    |          ^^^^^^^^^^
    |
note: hidden type `impl std::future::Future` captures the scope of call-site for function at 352:21
   --> datafusion/src/execution/context.rs:352:21
    |
352 |       ) -> Result<()> {
    |  _____________________^
353 | |         // create directory to contain the CSV files (one per partition)
354 | |         let path = path.to_string();
355 | |         fs::create_dir(&path)?;
...   |
377 | |         Ok(())
378 | |     }
    | |_____^

@BatmanAoD I was wondering if you might know what is causing this?

@andygrove
Copy link
Member Author

fyi @jorgecarleitao @alamb This is some groundwork for async that the scheduler will need

@github-actions
Copy link

@BatmanAoD
Copy link
Contributor

Hm, I'd guess it's probably because of the &str in the input args. Let me take a quick look...

@BatmanAoD
Copy link
Contributor

Yes, I think it's because the &str is actually alive in between calls to the future. I'm not sure why that is, though, since you reassign it to an owned type in the first line.

@jorgecarleitao
Copy link
Member

I agree with this: what we want from ExecutionPlan's execute() is not just to run something, but to present to a scheduler code that:

  1. can execute
  2. threads can switch in an out of that execution

which is exactly what async offers. 👍

@andygrove andygrove changed the title ARROW-9754: [Rust] [DataFusion] Implement async in ExecutionPlan trait [WIP] ARROW-9754: [Rust] [DataFusion] Implement async in ExecutionPlan trait Sep 27, 2020
@andygrove
Copy link
Member Author

Thanks @BatmanAoD that was it.

@andygrove andygrove marked this pull request as ready for review September 27, 2020 14:55
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this change @andygrove -- I think it is a great step forward and is a great start towards running DataFusion more efficiently. I would be happy if it were merged!

ctx.state.config.concurrency = 1;
ctx.register_table("aggregate_test_100", Box::new(mem_table));
ctx
unimplemented!()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

working on this now ... about to push the fix

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue was that I couldn't call async code from criterion so I had to create a separate tokio runtime and block on some async code. I expect that there may be a cleaner way to do this.

for chunk in chunks {
let chunk = chunk.to_vec();
let input = self.input.clone();
let task: JoinHandle<Result<Vec<Arc<RecordBatch>>>> =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is really much nicer in my opinion -- we spawn tasks (not threads) -- and thus we won't create more threads than cpus and give users better control over how the tasks are run. 👍

@@ -1 +1 @@
nightly-2020-04-22
nightly-2020-08-24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this upgrade was helpful during development, but I suggest we don't upgrade upon final merge.

@andygrove
Copy link
Member Author

@alamb @jorgecarleitao I just realized that once this PR is merged, I could go ahead and implement join support because it should be relatively efficient now that MergeExec is executing tasks not threads. I'm not sure which is the higher priority for 2.0.0 out of implementing joins (just inner equijoins for now) or implementing the scheduler. What are your thoughts?

@andygrove
Copy link
Member Author

@alippai @vertexclique @svenwb fyi

Copy link
Member

@jorgecarleitao jorgecarleitao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Really great, @andygrove .

@jorgecarleitao
Copy link
Member

I +1 the one that you think you will have the most fun working on :-)

If both are equally fun, I would go for the joins, just because feature-wise IMO it is one of the two major features that we are missing (together with windowing). This is because the type of queries I am used to use a lot of joins - it may not be so relevant for other folks.

@andygrove
Copy link
Member Author

@jorgecarleitao I know how to implement joins, but I am still learning on the scheduler front, so I think it would make more sense to ship join support in 2.0.0 and this may make DataFusion more compelling for a larger audience. I could then focus on scheduling for the next release.

@andygrove andygrove closed this in 75cdad4 Sep 27, 2020
@alamb
Copy link
Contributor

alamb commented Sep 27, 2020

Sounds like a good plan @andygrove -- regarding the scheduler I may have time to help out in a few weeks as well as it is directly applicable to what I am working on at work

I actually think the move to async will help a lot (by partly constraining the implementation). I am also very happy to review / help out with Joins (but I will have limited time to do so as they are not directly relevant to what I am doing for work)

arw2019 pushed a commit to arw2019/arrow that referenced this pull request Sep 28, 2020
This PR implements async in the ExecutionPlan trait. I ran the TPC-H benchmark and the performance is about the same.

Master branch:

```
Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/parquet/100-240", file_format: "parquet" }
Query 1 iteration 0 took 14365 ms
Query 1 iteration 1 took 14284 ms
Query 1 iteration 2 took 14269 ms
```

This PR:

```
Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/parquet/100-240", file_format: "parquet" }
Query 1 iteration 0 took 14305 ms
Query 1 iteration 1 took 14372 ms
Query 1 iteration 2 took 14323 ms
```

_EDIT: I previously posted perf numbers that were marginally better but that was due to using a more recent Rust nightly during the development of this PR._

Closes apache#8285 from andygrove/async-execution

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
arw2019 pushed a commit to arw2019/arrow that referenced this pull request Sep 29, 2020
This PR implements async in the ExecutionPlan trait. I ran the TPC-H benchmark and the performance is about the same.

Master branch:

```
Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/parquet/100-240", file_format: "parquet" }
Query 1 iteration 0 took 14365 ms
Query 1 iteration 1 took 14284 ms
Query 1 iteration 2 took 14269 ms
```

This PR:

```
Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/parquet/100-240", file_format: "parquet" }
Query 1 iteration 0 took 14305 ms
Query 1 iteration 1 took 14372 ms
Query 1 iteration 2 took 14323 ms
```

_EDIT: I previously posted perf numbers that were marginally better but that was due to using a more recent Rust nightly during the development of this PR._

Closes apache#8285 from andygrove/async-execution

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
arw2019 pushed a commit to arw2019/arrow that referenced this pull request Sep 29, 2020
This PR implements async in the ExecutionPlan trait. I ran the TPC-H benchmark and the performance is about the same.

Master branch:

```
Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/parquet/100-240", file_format: "parquet" }
Query 1 iteration 0 took 14365 ms
Query 1 iteration 1 took 14284 ms
Query 1 iteration 2 took 14269 ms
```

This PR:

```
Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/parquet/100-240", file_format: "parquet" }
Query 1 iteration 0 took 14305 ms
Query 1 iteration 1 took 14372 ms
Query 1 iteration 2 took 14323 ms
```

_EDIT: I previously posted perf numbers that were marginally better but that was due to using a more recent Rust nightly during the development of this PR._

Closes apache#8285 from andygrove/async-execution

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
emkornfield pushed a commit to emkornfield/arrow that referenced this pull request Oct 16, 2020
This PR implements async in the ExecutionPlan trait. I ran the TPC-H benchmark and the performance is about the same.

Master branch:

```
Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/parquet/100-240", file_format: "parquet" }
Query 1 iteration 0 took 14365 ms
Query 1 iteration 1 took 14284 ms
Query 1 iteration 2 took 14269 ms
```

This PR:

```
Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/parquet/100-240", file_format: "parquet" }
Query 1 iteration 0 took 14305 ms
Query 1 iteration 1 took 14372 ms
Query 1 iteration 2 took 14323 ms
```

_EDIT: I previously posted perf numbers that were marginally better but that was due to using a more recent Rust nightly during the development of this PR._

Closes apache#8285 from andygrove/async-execution

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
GeorgeAp pushed a commit to sirensolutions/arrow that referenced this pull request Jun 7, 2021
This PR implements async in the ExecutionPlan trait. I ran the TPC-H benchmark and the performance is about the same.

Master branch:

```
Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/parquet/100-240", file_format: "parquet" }
Query 1 iteration 0 took 14365 ms
Query 1 iteration 1 took 14284 ms
Query 1 iteration 2 took 14269 ms
```

This PR:

```
Running benchmarks with the following options: TpchOpt { query: 1, debug: false, iterations: 3, concurrency: 24, batch_size: 4096, path: "/mnt/tpch/parquet/100-240", file_format: "parquet" }
Query 1 iteration 0 took 14305 ms
Query 1 iteration 1 took 14372 ms
Query 1 iteration 2 took 14323 ms
```

_EDIT: I previously posted perf numbers that were marginally better but that was due to using a more recent Rust nightly during the development of this PR._

Closes apache#8285 from andygrove/async-execution

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants