-
Notifications
You must be signed in to change notification settings - Fork 234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support serializing packed tables directly for the normal shuffle path #10818
Conversation
--------- Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Make it draft because there are still 5 unit tests failing. |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
WAR the failing tests by disabling the GPU serde, and filed an issue (#10823) to track the follow-up |
build |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a quick first pass
@@ -1788,6 +1788,15 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression. | |||
.integerConf | |||
.createWithDefault(20) | |||
|
|||
val SHUFFLE_GPU_SERDE_ENABLED = | |||
conf("spark.rapids.shuffle.serde.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets change this to:
spark.rapids.shuffle.serde.type
And there are two types so far: "CPU", "GPU". The way the flag is used is fine we would still convert it to a boolean isGpuSerdeEnabled
, but we would test whether spark.rapids.shuffle.serde.type=="GPU"
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's internal right now, but as we learn more about this method we need to add documentation that says when to use which. Or come up with smart heuristics that will pick CPU/GPU automatically (so we can add another type.. AUTO)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done for the config name part
@@ -504,6 +504,7 @@ class AdaptiveQueryExecSuite | |||
// disable DemoteBroadcastHashJoin rule from removing BHJ due to empty partitions | |||
.set(SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key, "0") | |||
.set(RapidsConf.TEST_ALLOWED_NONGPU.key, "ShuffleExchangeExec,HashPartitioning") | |||
.set(RapidsConf.SHUFFLE_GPU_SERDE_ENABLED.key, "false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should remove these overrides setting since disabled is the default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
import org.apache.spark.sql.vectorized.ColumnarBatch | ||
|
||
private sealed trait TableSerde { | ||
protected val P_MAGIC_NUM: Int = 0x43554447 // "CUDF".asInt + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we have our own P_MAGIC_NUM
could we not use this to detect whether the data is GpuTableSerde
or JCudfSerialized
? This should be follow on work but I imagine a case where we might want to use a specific serialization format for columns of a certain type, size, complexity vs the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoops, I didn't mean to approve before.
Which queries were slower? It would be great to get some feedback from you on what is different between the customer query and the NDS queries. Also which queries got faster from NDS? That would be interesting. I did also write internally as I'd like to see more standard configurations used for this benchmark on the next run, so we can compare apples-to-apples with our baseline. |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
…uffle-gpu-serde Signed-off-by: Firestarman <firestarmanllc@gmail.com>
build |
Please add more context about why the test cases in #10823 are failing before merging this PR. We'd like to understand if that issue needs to be addressed as part of this PR. |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Done |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Move to draft since the perf is not as good as our expectation. The previous 2x speedup was got only when setting the executor cores to 2, but it supposed to be 16. |
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@winningsix I am going to close this since we fail to get a case that can benefit from it. And we can reopen it if we find one in the future. |
Contribute to #10790
Fix #10841
This PR is trying to accelerate the normal shuffle path by partitioning and slicing tables on GPU.
The sliced table is already serializable so can be written to the Shuffle output stream directly, along with a lightweight metadata (a TableMeta) to rebuild the table on the Shuffle read side.
On the Shuffle read side, the new introduced
PackedTableIterator
will read the tables from the Shuffle input stream and rebuild them on GPU by leveraging the existing utils (MetaUtils, GpuCompressedColumnVector
). Next, the existingGpuCoalesceBatches
node is used to do the batch concatenation for the downstream operators, similar as what Rapids Shuffle does.It led to some perf degression in NDS runs, so disable this feature by default. But we got about 2x speedup for a customer query (We got this only when setting the executor cores to 2, but it supposed to be 16).
Waiting for more tests ...
Numbers of 3k parquest data on our cluster.