Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][SPARK-39202][SQL] Introduce a putByteArrays method for WritableColumnVector to support setting multiple duplicate byte[] #36571

Closed
wants to merge 10 commits into from

Conversation

LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented May 17, 2022

What changes were proposed in this pull request?

This pr add a putByteArrays method for WritableColumnVector as follows:

public int putByteArrays(int rowId, int total, byte[] value) 

This method used to support setting multiple duplicate byte[] to WritableColumnVector. Since byte[] value is fixed length, memory can allocated at one time without calling reserve(int requiredCapacity) method many times.

The new method is applicable to ColumnVectorUtils.populate method with StringType and partial DecimalType scenario, this corresponds to the Vectorized Partition Column filling of Parquet and Orc.

Why are the changes needed?

Reduce reserve(int requiredCapacity) call times to avoid memory allocation times in setting multiple duplicate fixed length byte[] to WritableColumnVector scene.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • Pass GA
  • Add a StringType partition column test scenario in DataSourceReadBenchmark.

Before

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1022-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Partitioned Table(Partition Column Type = StringType):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------------
Data column - CSV                                              19487          19517          42          0.8        1239.0       1.0X
Data column - Json                                             12943          12948           7          1.2         822.9       1.5X
Data column - Parquet Vectorized: DataPageV1                     219            224           6         72.0          13.9      89.2X
Data column - Parquet Vectorized: DataPageV2                     494            501           7         31.9          31.4      39.5X
Data column - Parquet MR: DataPageV1                            2515           2521           9          6.3         159.9       7.7X
Data column - Parquet MR: DataPageV2                            2327           2337          14          6.8         148.0       8.4X
Data column - ORC Vectorized                                     303            306           2         51.9          19.3      64.3X
Data column - ORC MR                                            2126           2130           6          7.4         135.2       9.2X
Partition column - CSV                                          6480           6482           2          2.4         412.0       3.0X
Partition column - Json                                        10564          10572          11          1.5         671.6       1.8X
Partition column - Parquet Vectorized: DataPageV1                 53             58          16        296.6           3.4     367.5X
Partition column - Parquet Vectorized: DataPageV2                 52             57          10        303.6           3.3     376.1X
Partition column - Parquet MR: DataPageV1                       1231           1232           2         12.8          78.3      15.8X
Partition column - Parquet MR: DataPageV2                       1227           1229           3         12.8          78.0      15.9X
Partition column - ORC Vectorized                                 52             57           8        300.3           3.3     372.0X
Partition column - ORC MR                                       1334           1343          11         11.8          84.8      14.6X
Both columns - CSV                                             19608          19626          25          0.8        1246.6       1.0X
Both columns - Json                                            13003          13018          22          1.2         826.7       1.5X
Both columns - Parquet Vectorized: DataPageV1                    262            269           7         60.1          16.6      74.4X
Both columns - Parquet Vectorized: DataPageV2                    538            541           6         29.3          34.2      36.3X
Both columns - Parquet MR: DataPageV1                           2569           2570           2          6.1         163.3       7.6X
Both columns - Parquet MR: DataPageV2                           2343           2361          26          6.7         148.9       8.3X
Both columns - ORC Vectorized                                    344            345           1         45.7          21.9      56.6X
Both columns - ORC MR                                           2173           2178           8          7.2         138.1       9.0X

After

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1022-azure
Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
Partitioned Table(Partition Column Type = StringType):  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------------
Data column - CSV                                              22546          22554          11          0.7        1433.4       1.0X
Data column - Json                                             13638          13638           0          1.2         867.1       1.7X
Data column - Parquet Vectorized: DataPageV1                     208            214           9         75.7          13.2     108.5X
Data column - Parquet Vectorized: DataPageV2                     488            492           5         32.2          31.0      46.2X
Data column - Parquet MR: DataPageV1                            2625           2631           9          6.0         166.9       8.6X
Data column - Parquet MR: DataPageV2                            2323           2328           8          6.8         147.7       9.7X
Data column - ORC Vectorized                                     296            300           6         53.1          18.8      76.1X
Data column - ORC MR                                            2154           2156           2          7.3         136.9      10.5X
Partition column - CSV                                          6410           6434          34          2.5         407.6       3.5X
Partition column - Json                                        10021          10028          10          1.6         637.1       2.2X
Partition column - Parquet Vectorized: DataPageV1                 51             55          10        306.9           3.3     439.9X
Partition column - Parquet Vectorized: DataPageV2                 51             55           9        308.1           3.2     441.6X
Partition column - Parquet MR: DataPageV1                       1207           1209           2         13.0          76.7      18.7X
Partition column - Parquet MR: DataPageV2                       1222           1237          22         12.9          77.7      18.5X
Partition column - ORC Vectorized                                 52             55           8        304.2           3.3     436.1X
Partition column - ORC MR                                       1310           1310           0         12.0          83.3      17.2X
Both columns - CSV                                             22310          22318          11          0.7        1418.4       1.0X
Both columns - Json                                            13625          13629           5          1.2         866.3       1.7X
Both columns - Parquet Vectorized: DataPageV1                    248            256          13         63.4          15.8      90.9X
Both columns - Parquet Vectorized: DataPageV2                    529            555          50         29.7          33.7      42.6X
Both columns - Parquet MR: DataPageV1                           2634           2641          10          6.0         167.5       8.6X
Both columns - Parquet MR: DataPageV2                           2375           2377           3          6.6         151.0       9.5X
Both columns - ORC Vectorized                                    338            339           1         46.5          21.5      66.6X
Both columns - ORC MR                                           2189           2193           5          7.2         139.2      10.3X

@github-actions github-actions bot added the SQL label May 17, 2022
@LuciferYang LuciferYang marked this pull request as draft May 17, 2022 03:18
* Results will be written to "benchmarks/ColumnVectorUtilsBenchmark-results.txt".
* }}}
*/
object ColumnVectorUtilsBenchmark extends BenchmarkBase {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will delete after test

@LuciferYang
Copy link
Contributor Author

For ColumnVectorUtils.populate method:

def testPopulate(valuesPerIteration: Int, length: Int): Unit = {

    val batchSize = 4096
    val onHeapColumnVector = new OnHeapColumnVector(batchSize, StringType)
    val offHeapColumnVector = new OffHeapColumnVector(batchSize, StringType)

    val benchmark = new Benchmark(
      s"Test ColumnVectorUtils.populate, row length = $length",
      valuesPerIteration * batchSize,
      output = output)

    val builder = new UTF8StringBuilder()
    builder.append(RandomStringUtils.random(length))
    val row = InternalRow(builder.build())

    benchmark.addCase("OnHeapColumnVector") { _: Int =>
      for (_ <- 0L until valuesPerIteration) {
        onHeapColumnVector.reset()
        ColumnVectorUtils.populate(onHeapColumnVector, row, 0)
      }
    }

    benchmark.addCase("OffHeapColumnVector") { _: Int =>
      for (_ <- 0L until valuesPerIteration) {
        offHeapColumnVector.reset()
        ColumnVectorUtils.populate(offHeapColumnVector, row, 0)
      }
    }
    benchmark.run()
  }

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    val valuesPerIteration = 100000
    Seq(1, 5, 10, 15, 20).foreach { length =>
      testPopulate(valuesPerIteration, length)
    }
  }

Before

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1022-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Test ColumnVectorUtils.populate, row length = 1:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------
OnHeapColumnVector                                        3381           3404          32        121.2           8.3       1.0X
OffHeapColumnVector                                       3931           3968          53        104.2           9.6       0.9X

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1022-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Test ColumnVectorUtils.populate, row length = 5:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------
OnHeapColumnVector                                        4700           4767          96         87.2          11.5       1.0X
OffHeapColumnVector                                       5258           5356         139         77.9          12.8       0.9X

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1022-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Test ColumnVectorUtils.populate, row length = 10:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------
OnHeapColumnVector                                         4920           4934          19         83.2          12.0       1.0X
OffHeapColumnVector                                        5007           5017          14         81.8          12.2       1.0X

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1022-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Test ColumnVectorUtils.populate, row length = 15:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------
OnHeapColumnVector                                         5227           5255          40         78.4          12.8       1.0X
OffHeapColumnVector                                        5626           5731         148         72.8          13.7       0.9X

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1022-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Test ColumnVectorUtils.populate, row length = 20:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------
OnHeapColumnVector                                         5226           5263          53         78.4          12.8       1.0X
OffHeapColumnVector                                        5526           5699         244         74.1          13.5       0.9X

After

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1022-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Test ColumnVectorUtils.populate, row length = 1:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------
OnHeapColumnVector                                        3734           3742          11        109.7           9.1       1.0X
OffHeapColumnVector                                       3683           3683           0        111.2           9.0       1.0X

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1022-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Test ColumnVectorUtils.populate, row length = 5:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------
OnHeapColumnVector                                        4085           4088           4        100.3          10.0       1.0X
OffHeapColumnVector                                       4770           4771           2         85.9          11.6       0.9X

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1022-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Test ColumnVectorUtils.populate, row length = 10:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------
OnHeapColumnVector                                         4788           4789           1         85.5          11.7       1.0X
OffHeapColumnVector                                        4387           4387           0         93.4          10.7       1.1X

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1022-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Test ColumnVectorUtils.populate, row length = 15:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------
OnHeapColumnVector                                         4669           4669           0         87.7          11.4       1.0X
OffHeapColumnVector                                        5197           5198           1         78.8          12.7       0.9X

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1022-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
Test ColumnVectorUtils.populate, row length = 20:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------
OnHeapColumnVector                                         4769           4769           0         85.9          11.6       1.0X
OffHeapColumnVector                                        5441           5441           1         75.3          13.3       0.9X

@LuciferYang LuciferYang marked this pull request as ready for review May 17, 2022 10:00
@LuciferYang LuciferYang changed the title [SPARK-39202][SQL] Introduce a putByteArrays method to WritableColumnVector to support setting multiple duplicate byte[] [SPARK-39202][SQL] Introduce a putByteArrays method for WritableColumnVector to support setting multiple duplicate byte[] May 17, 2022
@LuciferYang LuciferYang marked this pull request as draft May 18, 2022 02:44
@LuciferYang LuciferYang changed the title [SPARK-39202][SQL] Introduce a putByteArrays method for WritableColumnVector to support setting multiple duplicate byte[] [WIP][SPARK-39202][SQL] Introduce a putByteArrays method for WritableColumnVector to support setting multiple duplicate byte[] May 18, 2022
@LuciferYang
Copy link
Contributor Author

Maybe it's better to use a dictionary to store StringType partition column. I'm testing it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
1 participant