Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++][Flight] See if reading/writing to gRPC get/put streams asynchronously helps performance #26337

Open
asfimport opened this issue Oct 20, 2020 · 20 comments

Comments

@asfimport
Copy link

We don't use any asynchronous concepts in the way that Flight is implemented now, i.e. IPC deconstruction/reconstruction (which may include compression!) is not performed concurrent with moving FlightData objects through the gRPC machinery, which may yield suboptimal performance.

It might be better to apply an actor-type approach where a dedicated thread retrieves and prepares the next raw IPC message (within a Future) while the current IPC message is being processed – that way reading/writing to/from the gRPC stream is not blocked on the IPC code doing its thing.

Reporter: Wes McKinney / @wesm

Note: This issue was originally created as ARROW-10351. Please see the migration documentation for further details.

@asfimport
Copy link
Author

Yibo Cai / @cyb70289:
Hi @wesm, @lidavidm,
I'm planning to investigate if gRPC async server may improve Flight performance. Would like to know if community is doing similar things to avoid duplicated effort. Thanks.
I see another Jira looks related https://issues.apache.org/jira/browse/ARROW-1009.

@asfimport
Copy link
Author

David Li / @lidavidm:
We are thinking about pipelining compression and RPC streaming but have not started this yet. That also doesn't necessarily need to use the gRPC async APIs. We are overall improving support for concurrency in Arrow, e.g. ARROW-10183, ARROW-10117, ARROW-8626 (and its related issues).

@asfimport
Copy link
Author

David Li / @lidavidm:
FWIW @cyb70289, I actually tried pipelining the Flight client and server: https://github.com/lidavidm/arrow/tree/arrow-10351-async-compression

This way we would do I/O-bound (gRPC read/write) and CPU-bound (Arrow record batch encoding/decoding) work on separate threads with readahead on the I/O side.

In our tests it did not have any benefit. I didn't test the actual async gRPC APIs, however, unless there is a major difference between how those are implemented on the gRPC side, I'd be doubtful that they'd help by themselves unless they unlock some opportunity to parallelize/pipeline work. But if you are investigating we'd be curious to see the results! It could definitely improve how ergonomic the APIs are and/or open a path to asyncio support in the Python bindings. It might also improve latency instead of throughput (our tests have focused on throughput).

@asfimport
Copy link
Author

Yibo Cai / @cyb70289:
Thanks @lidavidm, your information is very helpful.
I haven't started actual work about async gRPC evaluation so it may take some time. Will update when I have some solid findings.

@asfimport
Copy link
Author

yibocai#1:
Did a very rudiment test to interleave data compressing with data sending. Looks it can improve performance.
A thread is spawned to prepare next record batch when main thread is sending current record batch.
Basically, I separated these two lines into two different functions running in their own threads.

For streams = 1, I see stable improvement of speed (1200 -> 1700) and latency (90 ->70).
No improvement when streams >= 4. I'm testing on one skylake server with 32 cores and 2 numa nodes. See POC code for details.
POC code at cyb70289@24a55f7

@asfimport
Copy link
Author

David Li / @lidavidm:
@cyb70289  interesting, thanks for the results! It seems we should be able to build this in to Flight directly so that the user doesn't have to worry about this (also, we could potentially make things like the ring buffer size configurable).

What is the effect when no compression is used?

@asfimport
Copy link
Author

yibocai#1:
>> What is the effect when no compression is used?
No effect if compression is not used. The data preparation time is trivial compared with packet delivering.

Per my profiling, 70% cpu time is used in compression, 30% is packet sending. So an improvement of ~30% looks reasonable when interleaving them.

@asfimport
Copy link
Author

David Li / @lidavidm:
Hmm, I was unable to replicate the results here. I checked out your branch and current master branch. I'm running on an Intel Comet Lake laptop with 8 physical cores.

Current master:


> env OMP_NUM_THREADS=4 ./release/arrow-flight-benchmark -test_put -num_perf_runs=4 -num_streams=4 -num_threads=1 
Using spawned TCP server
Server running with pid 5988
Server host: localhost
Server port: 31337
Testing method: DoPut
Server host: localhost
Server port: 31337
Number of perf runs: 4
Number of concurrent gets/puts: 1
Batch size: 131040
Batches written: 39072
Bytes written: 5120000000
Nanos: 2655271083
Speed: 1838.91 MB/s
Throughput: 14714.9 batches/s
Latency mean: 65 us
Latency quantile=0.5: 65 us
Latency quantile=0.95: 75 us
Latency quantile=0.99: 82 us
Latency max: 941 us
 

This branch:


> env OMP_NUM_THREADS=4 ./release/arrow-flight-benchmark -test_put -num_perf_runs=4 -num_streams=1 -num_threads=1
Using spawned TCP server
Server running with pid 5921
Server host: localhost
Server port: 31337
Testing method: DoPut
Server host: localhost
Server port: 31337
Number of perf runs: 4
Number of concurrent gets/puts: 1
Batch size: 131040
Batches written: 9768
Bytes written: 1280000000
Nanos: 686687591
Speed: 1777.67 MB/s
Throughput: 14224.8 batches/s
Latency mean: 67 us
Latency quantile=0.5: 67 us
Latency quantile=0.95: 76 us
Latency quantile=0.99: 92 us
Latency max: 958 us

@asfimport
Copy link
Author

yibocai#1:
Will redo the test on an 8 core desktop.
Maybe too many threads (grpc client, server, compression) compete limited cores.

@asfimport
Copy link
Author

yibocai#1:
I retested on an old i7 machine with 8 cores. Running the same commands as yours, I can see stable improvements. Speed: 800 -> 1000, Latency: 170 -> 120.

My POC test code is too bad to use correctly. I hardcoded to use compression in client.cc. Master branch won't use compression by default. I meant to comment out INTERLEAVE_PREPARE_AND_SEND macro to benchmark again master branch.

Will provide a patch to add command line options to exercise easily both code paths.

@asfimport
Copy link
Author

David Li / @lidavidm:
I do see an improvement when both comparisons are on this branch, but with the #define toggled. That is, using a background thread helps performance when compression is enabled. However, I don't see a benefit compared to not using compression, which is what I was interested in.

I'll try to test the 3 cases here (no compression, compression, compression with background thread) on a pair of EC2 instances when I get a chance - testing over localhost probably isn't the fairest comparison.

@asfimport
Copy link
Author

yibocai#1:
Per my test, the data preparation time (without compression) seems trivial compared with data sending time. So it may be hard to see the benefits if compression is not enabled.
I will also do some tests across hosts over network.

@asfimport
Copy link
Author

David Li / @lidavidm:
Sure, serializing the batches is otherwise too cheap to justify the thread. However, if compression + background thread can't outperform no compression at all, then there's little point to compression in the first place.

I tested two EC2 t3.xlarge instances (4 cores, 16 GB RAM). They should have ~5 Gbps of bandwidth between them. All benchmarks were run as ./release/arrow-flight-benchmark -test_put -num_perf_runs=4 -num_streams=4 -num_threads=1 -server_host=(host).

With compression, with background thread:


Using standalone TCP server
Server host: ip-172-31-68-128.ec2.internal
Server port: 31337
Testing method: DoPut
Number of perf runs: 4
Number of concurrent gets/puts: 1
Batch size: 131040
Batches written: 39072
Bytes written: 5120000000
Nanos: 9203507933
Speed: 530.538 MB/s
Throughput: 4245.34 batches/s
Latency mean: 230 us
Latency quantile=0.5: 182 us
Latency quantile=0.95: 392 us
Latency quantile=0.99: 1411 us
Latency max: 11809 us

With compression, without background thread:


Using standalone TCP server
Server host: ip-172-31-68-128.ec2.internal
Server port: 31337
Testing method: DoPut
Number of perf runs: 4
Number of concurrent gets/puts: 1
Batch size: 131040
Batches written: 39072
Bytes written: 5120000000
Nanos: 9256189526
Speed: 527.519 MB/s
Throughput: 4221.18 batches/s
Latency mean: 232 us
Latency quantile=0.5: 195 us
Latency quantile=0.95: 328 us
Latency quantile=0.99: 874 us
Latency max: 20200 us

Without compression, without background thread:


Using standalone TCP server
Server host: ip-172-31-68-128.ec2.internal
Server port: 31337
Testing method: DoPut
Number of perf runs: 4
Number of concurrent gets/puts: 1
Batch size: 131040
Batches written: 39072
Bytes written: 5120000000
Nanos: 8678223134
Speed: 562.651 MB/s
Throughput: 4502.3 batches/s
Latency mean: 216 us
Latency quantile=0.5: 55 us
Latency quantile=0.95: 1556 us
Latency quantile=0.99: 2806 us
Latency max: 21395 us

In short, for Flight, it seems compression is simply not worth it, regardless of whether there's a background thread or not. This tradeoff may change when there's less bandwidth available. It does seem p99 latency is better.

And there are other factors. For instance, benchmark uses random data which may not compress well; a different dataset may perform better. ZSTD is relatively fast, but here we aren't tuning it for compression/decompression speed.

@asfimport
Copy link
Author

yibocai#1:
Regarding random test data, I do see the compressed data is actually larger than original uncompressed data.

Shall we replace random test data with some real world data? Otherwise, the compression test in current benchmark will always be a pure loss.

@asfimport
Copy link
Author

David Li / @lidavidm:
@cyb70289 it would be nice to have the option to use real test data, yes. Perhaps the benchmark (both client/server) could accept a path to an IPC file as an option.

@asfimport
Copy link
Author

David Li / @lidavidm:
@cyb70289 I rebased the benchmark (https://github.com/lidavidm/arrow/tree/flight-poc) and ran with real data (the NYC taxi dataset, for the month of 2009/01: https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2009-01.csv).

The command in all cases was env OMP_NUM_THREADS=4 ./release/arrow-flight-benchmark -test_put -num_perf_runs=4 -num_streams=4 -num_threads=1 -data_file data.feather.

Master (no compression):


Testing method: DoPut
Using spawned TCP server
Server running with pid 20909
Server host: localhost
Server port: 31337
Server host: localhost
Server port: 31337
Number of perf runs: 4
Number of concurrent gets/puts: 1
Batch size: 5265782
Batches written: 3456
Bytes written: 18198543232
Nanos: 8007363952
Speed: 2167.44 MB/s
Throughput: 431.603 batches/s
Latency mean: 1309 us
Latency quantile=0.5: 1156 us
Latency quantile=0.95: 2135 us
Latency quantile=0.99: 2783 us
Latency max: 3876 us

flight-poc, with compression, without asynchronous compression:


Testing method: DoPut
Using spawned TCP server
Server running with pid 13773
Server host: localhost
Server port: 31337
Server host: localhost
Server port: 31337
Number of perf runs: 4
Number of concurrent gets/puts: 1
Batch size: 5265782
Batches written: 3456
Bytes written: 18198543232
Nanos: 23333072829
Speed: 743.815 MB/s
Throughput: 148.116 batches/s
Latency mean: 5666 us
Latency quantile=0.5: 5544 us
Latency quantile=0.95: 6460 us
Latency quantile=0.99: 6831 us
Latency max: 8569 us

flight-poc, with compression, with async compression:


Testing method: DoPut
Using spawned TCP server
Server running with pid 13689
Server host: localhost
Server port: 31337
Server host: localhost
Server port: 31337
Number of perf runs: 4
Number of concurrent gets/puts: 1
Batch size: 5265782
Batches written: 3456
Bytes written: 18198543232
Nanos: 22178585229
Speed: 782.533 MB/s
Throughput: 155.826 batches/s
Latency mean: 5320 us
Latency quantile=0.5: 5183 us
Latency quantile=0.95: 6227 us
Latency quantile=0.99: 6840 us
Latency max: 9255 us

So it seems with real data, things get even worse. Async compression is better than sync compression, but neither is in the ballpark of simply not compressing. Of course this is all over localhost which is likely not fair to compression so maybe I should try over EC2 next (~600MiB/s max bandwidth).

@asfimport
Copy link
Author

David Li / @lidavidm:
And between two EC2 t3.xlarge instances:

Without compression:


Testing method: DoPut
Using standalone TCP server
Server host: ip-172-31-73-63.ec2.internal
Server port: 31337
Number of perf runs: 4
Number of concurrent gets/puts: 1
Batch size: 5265782
Batches written: 3456
Bytes written: 18198543232
Nanos: 36150890880
Speed: 480.085 MB/s
Throughput: 95.5993 batches/s
Latency mean: 8485 us
Latency quantile=0.5: 8692 us
Latency quantile=0.95: 9233 us
Latency quantile=0.99: 10627 us
Latency max: 13944 us 

flight-poc, with sync compression:


Testing method: DoPut
Using standalone TCP server
Server host: ip-172-31-73-63.ec2.internal
Server port: 31337
Number of perf runs: 4
Number of concurrent gets/puts: 1
Batch size: 5265782
Batches written: 3456
Bytes written: 18198543232
Nanos: 38743831916
Speed: 447.955 MB/s
Throughput: 89.2013 batches/s
Latency mean: 9305 us
Latency quantile=0.5: 9312 us
Latency quantile=0.95: 9736 us
Latency quantile=0.99: 10097 us
Latency max: 11723 us 

flight-poc, with async compression:


Testing method: DoPut
Using standalone TCP server
Server host: ip-172-31-73-63.ec2.internal
Server port: 31337
Number of perf runs: 4
Number of concurrent gets/puts: 1
Batch size: 5265782
Batches written: 3456
Bytes written: 18198543232
Nanos: 36706487822
Speed: 472.818 MB/s
Throughput: 94.1523 batches/s
Latency mean: 8739 us
Latency quantile=0.5: 8726 us
Latency quantile=0.95: 9258 us
Latency quantile=0.99: 9793 us
Latency max: 12832 us 

It still doesn't seem very beneficial. Maybe if we have a very compressible dataset, and/or tune the compressor used?

@asfimport
Copy link
Author

yibocai#1:
Looks compression is not very useful for normal cases. Things may be worse if the receiver has to decompress the payload.
I guess compression is only helpful if network bandwidth is limited.

What about encryption? Is it handled transparently by gRPC?

@asfimport
Copy link
Author

David Li / @lidavidm:
Encryption is supported in the form of TLS. In the past, benchmarks showed it had quite a big hit, but I haven't tested recently. That would also be a useful thing to check.

@amoeba
Copy link
Member

amoeba commented Dec 12, 2023

Just to add to the immediately above, I've been doing some benchmarking to see how much of an impact TLS has and it can 1/2 or even 1/3 throughput, at least for large (~GiB) streaming RPCs and this pattern holds for at least C++ and Golang. See https://github.com/amoeba/grpc_bench?tab=readme-ov-file#results.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants