-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-57268][SQL] Add Apache Arrow as a native cache format for in-memory Dataset caching #56334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
8606c70
6826e7f
97de2e0
03459e4
7800501
8e1d1bd
4504494
7a98664
d4046e0
3498767
da11431
11b1444
107d66f
ab59047
54a8ca8
7748eba
fc17e99
d16320c
0a23d0c
7b20580
22779f1
8cf5b7b
2ee60d7
99f7e08
417ff6c
7e5d12f
4407eb1
0a6f911
f730d08
645d0c6
e32290e
a3c4688
e93a179
2d66fef
b657624
fa28424
23f88e4
4b4700d
ded601e
3960b27
432fa9d
ae35da8
6d333de
298e09a
4f3c3a9
5390882
5838182
ece94cd
19323dd
8f75393
9ae0b7e
b632172
7717b05
f4db462
7c63918
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,50 @@ private[sql] object ArrowUtils { | |
|
|
||
| // todo: support more types. | ||
|
|
||
| /** | ||
| * Check if a Spark DataType is supported by Arrow. This recursively checks complex types | ||
| * (Array, Struct, Map). | ||
| * | ||
| * Note: This checks compatibility with toArrowField(), not toArrowType(). Types like | ||
| * GeometryType, GeographyType, and VariantType are not supported by toArrowType() (which only | ||
| * handles primitive Arrow types), but ARE supported by toArrowField() which converts them to | ||
| * Arrow Struct representations with metadata. Since Arrow cache uses toArrowField() via | ||
| * toArrowSchema() to create the schema, these types are supported. | ||
| */ | ||
| def isSupportedByArrow(dt: DataType): Boolean = { | ||
| dt match { | ||
| // Primitive types | ||
| case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | | ||
| _: StringType | BinaryType | NullType => | ||
| true | ||
|
|
||
| // Decimal | ||
| case _: DecimalType => true | ||
|
|
||
| // Temporal types | ||
| case DateType | TimestampType | TimestampNTZType | _: TimeType => true | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @MaxGekk to take a look ^^
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [P2] Please reconcile this capability whitelist with current
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch that master now maps these through Arrow. I looked at the cache paths, though, and the physical value for
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Follow-up filed: SPARK-57735 / #56842 adds nanosecond-timestamp support to the default in-memory cache (
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [P2] The prerequisite cited here has now landed in this exact tree: [ 🤖 posted by Codex on behalf of sunchao using the code-review-for-me skill 🤖 ] |
||
|
|
||
| // Interval types | ||
| case _: YearMonthIntervalType | _: DayTimeIntervalType | CalendarIntervalType => true | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [P2] Please do not advertise
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed with a clear diagnostic. Caching a CalendarInterval whose microseconds exceed +/-(Long.MaxValue / 1000) now throws an explanatory error (naming the type and the nanosecond-conversion limit) instead of an opaque
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [P2] The diagnostic fix still covers only top-level intervals. [ 🤖 posted by Codex on behalf of sunchao using the code-review-for-me skill 🤖 ] |
||
|
|
||
| // Complex types - recursively check element types | ||
| case ArrayType(elementType, _) => isSupportedByArrow(elementType) | ||
| case StructType(fields) => fields.forall(f => isSupportedByArrow(f.dataType)) | ||
| case MapType(keyType, valueType, _) => | ||
| isSupportedByArrow(keyType) && isSupportedByArrow(valueType) | ||
|
|
||
| // Special types | ||
| // Note: These are not in toArrowType(), but are handled by toArrowField() | ||
| case udt: UserDefinedType[_] => isSupportedByArrow(udt.sqlType) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [P2] This capability check accepts any UDT whose
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
| case _: GeometryType => true // Converted to Struct with srid + wkb fields | ||
| case _: GeographyType => true // Converted to Struct with srid + wkb fields | ||
| case _: VariantType => true // Converted to Struct with value + metadata fields | ||
|
|
||
| // Unsupported types | ||
| case _ => false | ||
| } | ||
| } | ||
|
|
||
| /** Maps data type from Spark to Arrow. NOTE: timeZoneId required for TimestampTypes */ | ||
| def toArrowType(dt: DataType, timeZoneId: String, largeVarTypes: Boolean = false): ArrowType = | ||
| TypeApiOps(dt) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| ================================================================================================ | ||
| Arrow Cache vs Default Cache | ||
| ================================================================================================ | ||
|
|
||
| ================================================================================================ | ||
| Cache primitive types | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Cache 5M rows with primitives: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| --------------------------------------------------------------------------------------------------------------------------- | ||
| Default cache - write + read 1854 1922 97 2.7 370.8 1.0X | ||
| Default cache - write + read (uncompressed) 1159 1165 8 4.3 231.8 1.6X | ||
| Arrow cache - write + read 1300 1315 21 3.8 260.0 1.4X | ||
| Arrow cache - write + read (zstd level -1) 1808 1811 4 2.8 361.6 1.0X | ||
| Arrow cache - write + read (zstd level 1) 1814 1830 23 2.8 362.8 1.0X | ||
| Arrow cache - write + read (zstd level 3) 1902 1929 39 2.6 380.4 1.0X | ||
|
|
||
|
|
||
| ================================================================================================ | ||
| Cache then filter | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Cache 5M rows, then filter: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| Default cache - filter 1662 1683 29 3.0 332.5 1.0X | ||
| Default cache - filter (uncompressed) 1312 1312 0 3.8 262.4 1.3X | ||
| Arrow cache - filter 1447 1462 21 3.5 289.4 1.1X | ||
| Arrow cache - filter (zstd level -1) 1729 1757 40 2.9 345.8 1.0X | ||
| Arrow cache - filter (zstd level 1) 1787 1799 17 2.8 357.3 0.9X | ||
| Arrow cache - filter (zstd level 3) 1951 1955 5 2.6 390.3 0.9X | ||
|
|
||
|
|
||
| ================================================================================================ | ||
| Cache columnar input (Parquet) | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Cache 2M rows from Parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ----------------------------------------------------------------------------------------------------------------------------- | ||
| Default cache - columnar input 1545 1619 104 1.3 772.7 1.0X | ||
| Default cache - columnar input (uncompressed) 1313 1336 33 1.5 656.4 1.2X | ||
| Arrow cache - columnar input 1353 1378 35 1.5 676.7 1.1X | ||
| Arrow cache - columnar input (zstd level -1) 1535 1573 54 1.3 767.6 1.0X | ||
| Arrow cache - columnar input (zstd level 1) 1619 1622 5 1.2 809.6 1.0X | ||
| Arrow cache - columnar input (zstd level 3) 1708 1709 2 1.2 853.8 0.9X | ||
|
|
||
|
|
||
| ================================================================================================ | ||
| Re-cache Arrow cached data (zero-copy test) | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Re-cache 2M rows (zero-copy): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| -------------------------------------------------------------------------------------------------------------------------------- | ||
| Default cache - cache a cached DF 411 428 20 4.9 205.7 1.0X | ||
| Default cache - cache a cached DF (uncompressed) 191 210 26 10.5 95.7 2.2X | ||
| Arrow cache - cache a cached DF (zero-copy) 137 156 24 14.6 68.4 3.0X | ||
| Arrow cache - cache a cached DF (zstd level -1) 327 343 18 6.1 163.3 1.3X | ||
| Arrow cache - cache a cached DF (zstd level 1) 338 341 3 5.9 168.8 1.2X | ||
| Arrow cache - cache a cached DF (zstd level 3) 352 357 3 5.7 176.2 1.2X | ||
|
|
||
|
|
||
| ================================================================================================ | ||
| Cache with column pruning (select 1 of 20 columns) | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 6.17.0-1018-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Cache 5M rows, select 1 column: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ----------------------------------------------------------------------------------------------------------------------------- | ||
| Default cache - select 1 of 20 columns 10855 11142 406 0.5 2171.0 1.0X | ||
| Default cache - select 1 of 20 (uncompressed) 4135 4149 20 1.2 827.0 2.6X | ||
| Arrow cache - select 1 of 20 5179 5280 144 1.0 1035.8 2.1X | ||
| Arrow cache - select 1 of 20 (zstd level -1) 9258 9283 35 0.5 1851.7 1.2X | ||
| Arrow cache - select 1 of 20 (zstd level 1) 9437 9603 234 0.5 1887.4 1.2X | ||
| Arrow cache - select 1 of 20 (zstd level 3) 9778 9794 23 0.5 1955.5 1.1X | ||
|
|
||
|
|
||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| ================================================================================================ | ||
| Arrow Cache vs Default Cache | ||
| ================================================================================================ | ||
|
|
||
| ================================================================================================ | ||
| Cache primitive types | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Cache 5M rows with primitives: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| --------------------------------------------------------------------------------------------------------------------------- | ||
| Default cache - write + read 1686 1723 53 3.0 337.2 1.0X | ||
| Default cache - write + read (uncompressed) 1045 1065 27 4.8 209.1 1.6X | ||
| Arrow cache - write + read 1268 1305 53 3.9 253.6 1.3X | ||
| Arrow cache - write + read (zstd level -1) 1724 1725 1 2.9 344.8 1.0X | ||
| Arrow cache - write + read (zstd level 1) 1770 1794 34 2.8 354.0 1.0X | ||
| Arrow cache - write + read (zstd level 3) 1857 1893 50 2.7 371.4 0.9X | ||
|
|
||
|
|
||
| ================================================================================================ | ||
| Cache then filter | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Cache 5M rows, then filter: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| Default cache - filter 1426 1432 8 3.5 285.3 1.0X | ||
| Default cache - filter (uncompressed) 1252 1274 31 4.0 250.4 1.1X | ||
| Arrow cache - filter 1289 1295 8 3.9 257.8 1.1X | ||
| Arrow cache - filter (zstd level -1) 1712 1716 7 2.9 342.4 0.8X | ||
| Arrow cache - filter (zstd level 1) 1747 1759 16 2.9 349.5 0.8X | ||
| Arrow cache - filter (zstd level 3) 1812 1848 50 2.8 362.4 0.8X | ||
|
|
||
|
|
||
| ================================================================================================ | ||
| Cache columnar input (Parquet) | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Cache 2M rows from Parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ----------------------------------------------------------------------------------------------------------------------------- | ||
| Default cache - columnar input 1461 1486 35 1.4 730.6 1.0X | ||
| Default cache - columnar input (uncompressed) 1219 1227 12 1.6 609.3 1.2X | ||
| Arrow cache - columnar input 1253 1273 27 1.6 626.7 1.2X | ||
| Arrow cache - columnar input (zstd level -1) 1448 1460 17 1.4 723.8 1.0X | ||
| Arrow cache - columnar input (zstd level 1) 1504 1504 0 1.3 752.0 1.0X | ||
| Arrow cache - columnar input (zstd level 3) 1578 1587 13 1.3 788.9 0.9X | ||
|
|
||
|
|
||
| ================================================================================================ | ||
| Re-cache Arrow cached data (zero-copy test) | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Re-cache 2M rows (zero-copy): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| -------------------------------------------------------------------------------------------------------------------------------- | ||
| Default cache - cache a cached DF 386 409 28 5.2 193.1 1.0X | ||
| Default cache - cache a cached DF (uncompressed) 194 217 26 10.3 96.8 2.0X | ||
| Arrow cache - cache a cached DF (zero-copy) 132 144 10 15.2 65.9 2.9X | ||
| Arrow cache - cache a cached DF (zstd level -1) 321 324 7 6.2 160.3 1.2X | ||
| Arrow cache - cache a cached DF (zstd level 1) 333 341 7 6.0 166.7 1.2X | ||
| Arrow cache - cache a cached DF (zstd level 3) 350 356 12 5.7 174.8 1.1X | ||
|
|
||
|
|
||
| ================================================================================================ | ||
| Cache with column pruning (select 1 of 20 columns) | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 6.17.0-1018-azure | ||
| AMD EPYC 7763 64-Core Processor | ||
| Cache 5M rows, select 1 column: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ----------------------------------------------------------------------------------------------------------------------------- | ||
| Default cache - select 1 of 20 columns 9310 9426 164 0.5 1862.0 1.0X | ||
| Default cache - select 1 of 20 (uncompressed) 3929 3994 92 1.3 785.7 2.4X | ||
| Arrow cache - select 1 of 20 5150 5225 106 1.0 1030.0 1.8X | ||
| Arrow cache - select 1 of 20 (zstd level -1) 9265 9376 156 0.5 1853.1 1.0X | ||
| Arrow cache - select 1 of 20 (zstd level 1) 9296 9351 78 0.5 1859.3 1.0X | ||
| Arrow cache - select 1 of 20 (zstd level 3) 9970 9982 18 0.5 1994.0 0.9X | ||
|
|
||
|
|
||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Presumably when this returns
falsefor any reason, we fallback to the default cache driver, that should be made clear in the docs if it isn't alreadyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g. the doc says
supports all Spark SQL data typesbut this implementation would seem to falsify that claim ;)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I guess the claim may be true, and the fallthrough at the end might be defensive... In which case maybe we'd want to log a surprisingly unsupported type :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch -- "supports all Spark SQL data types" did overstate it, and the type list was also incomplete (it omitted Time, intervals, Geometry/Geography, Variant, Null, and UDTs, which are all supported). Fixed the doc to list the actually-supported set and to describe the unsupported-type behavior.
One clarification on the mechanism, re: your fallback question:
isSupportedByArrowhere only gatessupportsColumnarInput, which the cache framework uses to choose the columnar-vs-row input path into this same serializer -- it isn't a fallback to the default cache driver (there's no such per-type fallback). A truly unsupported type isn't silently dropped either:toArrowSchemathrowsUNSUPPORTED_DATATYPEwhen the cache is materialized. The docs now state this explicitly.