From 931c0bcd95087716c633eb9fd065d580a1d47353 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 4 May 2020 15:30:10 +0900 Subject: [PATCH] [SPARK-31606][SQL] Reduce the perf regression of vectorized parquet reader caused by datetime rebase ### What changes were proposed in this pull request? Push the rebase logic to the lower level of the parquet vectorized reader, to make the final code more vectorization-friendly. ### Why are the changes needed? Parquet vectorized reader is carefully implemented, to make it more likely to be vectorized by the JVM. However, the newly added datetime rebase degrade the performance a lot, as it breaks vectorization, even if the datetime values don't need to rebase (this is very likely as dates before 1582 is rare). ### Does this PR introduce any user-facing change? no ### How was this patch tested? Run part of the `DateTimeRebaseBenchmark` locally. The results: before this patch ``` [info] Load dates from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] after 1582, vec on, rebase off 2677 2838 142 37.4 26.8 1.0X [info] after 1582, vec on, rebase on 3828 4331 805 26.1 38.3 0.7X [info] before 1582, vec on, rebase off 2903 2926 34 34.4 29.0 0.9X [info] before 1582, vec on, rebase on 4163 4197 38 24.0 41.6 0.6X [info] Load timestamps from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] after 1900, vec on, rebase off 3537 3627 104 28.3 35.4 1.0X [info] after 1900, vec on, rebase on 6891 7010 105 14.5 68.9 0.5X [info] before 1900, vec on, rebase off 3692 3770 72 27.1 36.9 1.0X [info] before 1900, vec on, rebase on 7588 7610 30 13.2 75.9 0.5X ``` After this patch ``` [info] Load dates from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] after 1582, vec on, rebase off 2758 2944 197 36.3 27.6 1.0X [info] after 1582, vec on, rebase on 2908 2966 51 34.4 29.1 0.9X [info] before 1582, vec on, rebase off 2840 2878 37 35.2 28.4 1.0X [info] before 1582, vec on, rebase on 3407 3433 24 29.4 34.1 0.8X [info] Load timestamps from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] after 1900, vec on, rebase off 3861 4003 139 25.9 38.6 1.0X [info] after 1900, vec on, rebase on 4194 4283 77 23.8 41.9 0.9X [info] before 1900, vec on, rebase off 3849 3937 79 26.0 38.5 1.0X [info] before 1900, vec on, rebase on 7512 7546 55 13.3 75.1 0.5X ``` Date type is 30% faster if the values don't need to rebase, 20% faster if need to rebase. Timestamp type is 60% faster if the values don't need to rebase, no difference if need to rebase. Closes #28406 from cloud-fan/perf. Lead-authored-by: Wenchen Fan Co-authored-by: Maxim Gekk Signed-off-by: HyukjinKwon --- .../sql/catalyst/util/RebaseDateTime.scala | 4 + .../DateTimeRebaseBenchmark-jdk11-results.txt | 104 +++++++++--------- .../DateTimeRebaseBenchmark-results.txt | 104 +++++++++--------- .../parquet/VectorizedColumnReader.java | 22 +--- .../parquet/VectorizedPlainValuesReader.java | 55 +++++++++ .../parquet/VectorizedRleValuesReader.java | 85 ++++++++++++++ .../parquet/VectorizedValuesReader.java | 2 + .../benchmark/DateTimeRebaseBenchmark.scala | 79 ++++++------- 8 files changed, 295 insertions(+), 160 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala index 6848b0fa39c7c..040a97a14d451 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala @@ -71,6 +71,8 @@ object RebaseDateTime { -719164, -682945, -646420, -609895, -536845, -500320, -463795, -390745, -354220, -317695, -244645, -208120, -171595, -141427) + final val lastSwitchJulianDay: Int = julianGregDiffSwitchDay.last + // The first days of Common Era (CE) which is mapped to the '0001-01-01' date in Julian calendar. private final val julianCommonEraStartDay = julianGregDiffSwitchDay(0) @@ -416,6 +418,8 @@ object RebaseDateTime { // in the interval: [julianGregDiffSwitchMicros(i), julianGregDiffSwitchMicros(i+1)) private val julianGregRebaseMap = loadRebaseRecords("julian-gregorian-rebase-micros.json") + final val lastSwitchJulianTs: Long = julianGregRebaseMap.values.map(_.switches.last).max + /** * An optimized version of [[rebaseJulianToGregorianMicros(ZoneId, Long)]]. This method leverages * the pre-calculated rebasing maps to save calculation. If the rebasing map doesn't contain diff --git a/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt b/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt index 2a9322a4b462a..03e0d7b8bc575 100644 --- a/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt @@ -2,93 +2,93 @@ Rebasing dates/timestamps in Parquet datasource ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save dates to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 21171 21171 0 4.7 211.7 1.0X -before 1582, noop 11036 11036 0 9.1 110.4 1.9X -after 1582, rebase off 34321 34321 0 2.9 343.2 0.6X -after 1582, rebase on 33269 33269 0 3.0 332.7 0.6X -before 1582, rebase off 22016 22016 0 4.5 220.2 1.0X -before 1582, rebase on 23338 23338 0 4.3 233.4 0.9X +after 1582, noop 20073 20073 0 5.0 200.7 1.0X +before 1582, noop 10985 10985 0 9.1 109.9 1.8X +after 1582, rebase off 32245 32245 0 3.1 322.4 0.6X +after 1582, rebase on 31434 31434 0 3.2 314.3 0.6X +before 1582, rebase off 21590 21590 0 4.6 215.9 0.9X +before 1582, rebase on 22963 22963 0 4.4 229.6 0.9X -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load dates from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off, rebase off 12791 13089 287 7.8 127.9 1.0X -after 1582, vec off, rebase on 13203 13271 81 7.6 132.0 1.0X -after 1582, vec on, rebase off 3709 3764 49 27.0 37.1 3.4X -after 1582, vec on, rebase on 5082 5114 29 19.7 50.8 2.5X -before 1582, vec off, rebase off 13059 13153 87 7.7 130.6 1.0X -before 1582, vec off, rebase on 14211 14236 27 7.0 142.1 0.9X -before 1582, vec on, rebase off 3687 3749 72 27.1 36.9 3.5X -before 1582, vec on, rebase on 5449 5497 56 18.4 54.5 2.3X +after 1582, vec off, rebase off 12815 12858 40 7.8 128.1 1.0X +after 1582, vec off, rebase on 13030 13167 148 7.7 130.3 1.0X +after 1582, vec on, rebase off 3705 3712 6 27.0 37.1 3.5X +after 1582, vec on, rebase on 3788 3791 3 26.4 37.9 3.4X +before 1582, vec off, rebase off 12873 12943 61 7.8 128.7 1.0X +before 1582, vec off, rebase on 14072 14165 80 7.1 140.7 0.9X +before 1582, vec on, rebase off 3694 3708 15 27.1 36.9 3.5X +before 1582, vec on, rebase on 4403 4484 81 22.7 44.0 2.9X -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save timestamps to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 2831 2831 0 35.3 28.3 1.0X -before 1582, noop 2816 2816 0 35.5 28.2 1.0X -after 1582, rebase off 15543 15543 0 6.4 155.4 0.2X -after 1582, rebase on 18391 18391 0 5.4 183.9 0.2X -before 1582, rebase off 15747 15747 0 6.4 157.5 0.2X -before 1582, rebase on 18846 18846 0 5.3 188.5 0.2X +after 1900, noop 3032 3032 0 33.0 30.3 1.0X +before 1900, noop 3043 3043 0 32.9 30.4 1.0X +after 1900, rebase off 15634 15634 0 6.4 156.3 0.2X +after 1900, rebase on 18233 18233 0 5.5 182.3 0.2X +before 1900, rebase off 15820 15820 0 6.3 158.2 0.2X +before 1900, rebase on 19921 19921 0 5.0 199.2 0.2X -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load timestamps from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off, rebase off 16126 16216 78 6.2 161.3 1.0X -after 1582, vec off, rebase on 18277 18453 165 5.5 182.8 0.9X -after 1582, vec on, rebase off 5030 5067 42 19.9 50.3 3.2X -after 1582, vec on, rebase on 8553 8583 43 11.7 85.5 1.9X -before 1582, vec off, rebase off 15828 15872 39 6.3 158.3 1.0X -before 1582, vec off, rebase on 18899 18959 103 5.3 189.0 0.9X -before 1582, vec on, rebase off 4961 5009 43 20.2 49.6 3.3X -before 1582, vec on, rebase on 9099 9140 40 11.0 91.0 1.8X +after 1900, vec off, rebase off 14987 15008 18 6.7 149.9 1.0X +after 1900, vec off, rebase on 17500 17628 210 5.7 175.0 0.9X +after 1900, vec on, rebase off 5030 5036 7 19.9 50.3 3.0X +after 1900, vec on, rebase on 5066 5109 44 19.7 50.7 3.0X +before 1900, vec off, rebase off 15094 15213 121 6.6 150.9 1.0X +before 1900, vec off, rebase on 18098 18175 101 5.5 181.0 0.8X +before 1900, vec on, rebase off 5008 5012 4 20.0 50.1 3.0X +before 1900, vec on, rebase on 8803 8848 55 11.4 88.0 1.7X ================================================================================================ Rebasing dates/timestamps in ORC datasource ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save dates to ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 21026 21026 0 4.8 210.3 1.0X -before 1582, noop 11040 11040 0 9.1 110.4 1.9X -after 1582 28171 28171 0 3.5 281.7 0.7X -before 1582 18955 18955 0 5.3 189.5 1.1X +after 1582, noop 19593 19593 0 5.1 195.9 1.0X +before 1582, noop 10581 10581 0 9.5 105.8 1.9X +after 1582 27843 27843 0 3.6 278.4 0.7X +before 1582 19435 19435 0 5.1 194.4 1.0X -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load dates from ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off 10876 10931 49 9.2 108.8 1.0X -after 1582, vec on 3900 3913 20 25.6 39.0 2.8X -before 1582, vec off 11165 11174 12 9.0 111.6 1.0X -before 1582, vec on 4208 4214 7 23.8 42.1 2.6X +after 1582, vec off 10395 10507 119 9.6 103.9 1.0X +after 1582, vec on 3921 3945 22 25.5 39.2 2.7X +before 1582, vec off 10762 10860 127 9.3 107.6 1.0X +before 1582, vec on 4194 4226 41 23.8 41.9 2.5X -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save timestamps to ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 2924 2924 0 34.2 29.2 1.0X -before 1582, noop 2820 2820 0 35.5 28.2 1.0X -after 1582 22228 22228 0 4.5 222.3 0.1X -before 1582 22590 22590 0 4.4 225.9 0.1X +after 1900, noop 3003 3003 0 33.3 30.0 1.0X +before 1900, noop 3016 3016 0 33.2 30.2 1.0X +after 1900 21804 21804 0 4.6 218.0 0.1X +before 1900 23920 23920 0 4.2 239.2 0.1X -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load timestamps from ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off 13591 13658 59 7.4 135.9 1.0X -after 1582, vec on 7399 7488 126 13.5 74.0 1.8X -before 1582, vec off 14065 14096 30 7.1 140.7 1.0X -before 1582, vec on 7950 8127 249 12.6 79.5 1.7X +after 1900, vec off 14112 14128 17 7.1 141.1 1.0X +after 1900, vec on 7347 7459 134 13.6 73.5 1.9X +before 1900, vec off 15170 15192 27 6.6 151.7 0.9X +before 1900, vec on 8280 8312 52 12.1 82.8 1.7X diff --git a/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt b/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt index 050950571511d..a32a1ad8af89e 100644 --- a/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt +++ b/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt @@ -2,93 +2,93 @@ Rebasing dates/timestamps in Parquet datasource ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save dates to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 24114 24114 0 4.1 241.1 1.0X -before 1582, noop 10250 10250 0 9.8 102.5 2.4X -after 1582, rebase off 36672 36672 0 2.7 366.7 0.7X -after 1582, rebase on 37123 37123 0 2.7 371.2 0.6X -before 1582, rebase off 21925 21925 0 4.6 219.2 1.1X -before 1582, rebase on 22341 22341 0 4.5 223.4 1.1X +after 1582, noop 23088 23088 0 4.3 230.9 1.0X +before 1582, noop 10782 10782 0 9.3 107.8 2.1X +after 1582, rebase off 34821 34821 0 2.9 348.2 0.7X +after 1582, rebase on 35040 35040 0 2.9 350.4 0.7X +before 1582, rebase off 22151 22151 0 4.5 221.5 1.0X +before 1582, rebase on 24677 24677 0 4.1 246.8 0.9X -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load dates from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off, rebase off 12456 12601 126 8.0 124.6 1.0X -after 1582, vec off, rebase on 13299 13336 32 7.5 133.0 0.9X -after 1582, vec on, rebase off 3623 3660 40 27.6 36.2 3.4X -after 1582, vec on, rebase on 5160 5177 15 19.4 51.6 2.4X -before 1582, vec off, rebase off 13177 13264 76 7.6 131.8 0.9X -before 1582, vec off, rebase on 14102 14149 46 7.1 141.0 0.9X -before 1582, vec on, rebase off 3649 3670 34 27.4 36.5 3.4X -before 1582, vec on, rebase on 5652 5667 15 17.7 56.5 2.2X +after 1582, vec off, rebase off 13559 13650 79 7.4 135.6 1.0X +after 1582, vec off, rebase on 12942 12973 28 7.7 129.4 1.0X +after 1582, vec on, rebase off 3657 3689 29 27.3 36.6 3.7X +after 1582, vec on, rebase on 3859 3902 53 25.9 38.6 3.5X +before 1582, vec off, rebase off 12588 12607 17 7.9 125.9 1.1X +before 1582, vec off, rebase on 13396 13420 25 7.5 134.0 1.0X +before 1582, vec on, rebase off 3631 3650 19 27.5 36.3 3.7X +before 1582, vec on, rebase on 4706 4755 77 21.3 47.1 2.9X -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save timestamps to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 2871 2871 0 34.8 28.7 1.0X -before 1582, noop 2753 2753 0 36.3 27.5 1.0X -after 1582, rebase off 15927 15927 0 6.3 159.3 0.2X -after 1582, rebase on 19138 19138 0 5.2 191.4 0.1X -before 1582, rebase off 16137 16137 0 6.2 161.4 0.2X -before 1582, rebase on 19584 19584 0 5.1 195.8 0.1X +after 1900, noop 2681 2681 0 37.3 26.8 1.0X +before 1900, noop 3051 3051 0 32.8 30.5 0.9X +after 1900, rebase off 16901 16901 0 5.9 169.0 0.2X +after 1900, rebase on 19725 19725 0 5.1 197.3 0.1X +before 1900, rebase off 16900 16900 0 5.9 169.0 0.2X +before 1900, rebase on 20381 20381 0 4.9 203.8 0.1X -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load timestamps from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off, rebase off 14995 15047 47 6.7 150.0 1.0X -after 1582, vec off, rebase on 18111 18146 37 5.5 181.1 0.8X -after 1582, vec on, rebase off 4837 4873 44 20.7 48.4 3.1X -after 1582, vec on, rebase on 9542 9669 111 10.5 95.4 1.6X -before 1582, vec off, rebase off 14993 15090 94 6.7 149.9 1.0X -before 1582, vec off, rebase on 18675 18712 64 5.4 186.7 0.8X -before 1582, vec on, rebase off 4908 4923 15 20.4 49.1 3.1X -before 1582, vec on, rebase on 10128 10148 19 9.9 101.3 1.5X +after 1900, vec off, rebase off 15236 15291 62 6.6 152.4 1.0X +after 1900, vec off, rebase on 17832 18047 187 5.6 178.3 0.9X +after 1900, vec on, rebase off 4875 4901 31 20.5 48.7 3.1X +after 1900, vec on, rebase on 5354 5386 37 18.7 53.5 2.8X +before 1900, vec off, rebase off 15229 15338 108 6.6 152.3 1.0X +before 1900, vec off, rebase on 18626 18668 44 5.4 186.3 0.8X +before 1900, vec on, rebase off 4968 4975 6 20.1 49.7 3.1X +before 1900, vec on, rebase on 9913 9932 16 10.1 99.1 1.5X ================================================================================================ Rebasing dates/timestamps in ORC datasource ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save dates to ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 23977 23977 0 4.2 239.8 1.0X -before 1582, noop 10094 10094 0 9.9 100.9 2.4X -after 1582 33115 33115 0 3.0 331.2 0.7X -before 1582 19430 19430 0 5.1 194.3 1.2X +after 1582, noop 22942 22942 0 4.4 229.4 1.0X +before 1582, noop 11035 11035 0 9.1 110.4 2.1X +after 1582 31341 31341 0 3.2 313.4 0.7X +before 1582 20376 20376 0 4.9 203.8 1.1X -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load dates from ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off 10217 10241 21 9.8 102.2 1.0X -after 1582, vec on 3671 3691 31 27.2 36.7 2.8X -before 1582, vec off 10800 10874 114 9.3 108.0 0.9X -before 1582, vec on 4118 4165 74 24.3 41.2 2.5X +after 1582, vec off 10361 10378 29 9.7 103.6 1.0X +after 1582, vec on 3820 3828 11 26.2 38.2 2.7X +before 1582, vec off 10709 10720 13 9.3 107.1 1.0X +before 1582, vec on 4136 4153 15 24.2 41.4 2.5X -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save timestamps to ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 2691 2691 0 37.2 26.9 1.0X -before 1582, noop 2743 2743 0 36.5 27.4 1.0X -after 1582 21409 21409 0 4.7 214.1 0.1X -before 1582 22554 22554 0 4.4 225.5 0.1X +after 1900, noop 2888 2888 0 34.6 28.9 1.0X +before 1900, noop 2823 2823 0 35.4 28.2 1.0X +after 1900 19790 19790 0 5.1 197.9 0.1X +before 1900 20774 20774 0 4.8 207.7 0.1X -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load timestamps from ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off 14752 14855 103 6.8 147.5 1.0X -after 1582, vec on 8146 8185 34 12.3 81.5 1.8X -before 1582, vec off 15247 15294 46 6.6 152.5 1.0X -before 1582, vec on 8414 8466 52 11.9 84.1 1.8X +after 1900, vec off 14649 14687 38 6.8 146.5 1.0X +after 1900, vec on 7850 7937 130 12.7 78.5 1.9X +before 1900, vec off 15354 15417 108 6.5 153.5 1.0X +before 1900, vec on 8382 8408 22 11.9 83.8 1.7X diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index cfb873ff37379..7ae60f22aa790 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -423,15 +423,8 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (column.dataType() == DataTypes.DateType ) { if (rebaseDateTime) { - for (int i = 0; i < num; i++) { - if (defColumn.readInteger() == maxDefLevel) { - column.putInt( - rowId + i, - RebaseDateTime.rebaseJulianToGregorianDays(dataColumn.readInteger())); - } else { - column.putNull(rowId + i); - } - } + defColumn.readIntegersWithRebase( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else { defColumn.readIntegers( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); @@ -449,15 +442,8 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (originalType == OriginalType.TIMESTAMP_MICROS) { if (rebaseDateTime) { - for (int i = 0; i < num; i++) { - if (defColumn.readInteger() == maxDefLevel) { - column.putLong( - rowId + i, - RebaseDateTime.rebaseJulianToGregorianMicros(dataColumn.readLong())); - } else { - column.putNull(rowId + i); - } - } + defColumn.readLongsWithRebase( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else { defColumn.readLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index c62dc3d86386e..2ed2e11b60c03 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -22,6 +22,7 @@ import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.io.ParquetDecodingException; +import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.parquet.column.values.ValuesReader; @@ -81,6 +82,33 @@ public final void readIntegers(int total, WritableColumnVector c, int rowId) { } } + // A fork of `readIntegers` to rebase the date values. For performance reasons, this method + // iterates the values twice: check if we need to rebase first, then go to the optimized branch + // if rebase is not needed. + @Override + public final void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) { + int requiredBytes = total * 4; + ByteBuffer buffer = getBuffer(requiredBytes); + boolean rebase = false; + for (int i = 0; i < total; i += 1) { + rebase |= buffer.getInt(buffer.position() + i * 4) < RebaseDateTime.lastSwitchJulianDay(); + } + if (rebase) { + for (int i = 0; i < total; i += 1) { + c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt())); + } + } else { + if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset); + } else { + for (int i = 0; i < total; i += 1) { + c.putInt(rowId + i, buffer.getInt()); + } + } + } + } + @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { int requiredBytes = total * 8; @@ -96,6 +124,33 @@ public final void readLongs(int total, WritableColumnVector c, int rowId) { } } + // A fork of `readLongs` to rebase the timestamp values. For performance reasons, this method + // iterates the values twice: check if we need to rebase first, then go to the optimized branch + // if rebase is not needed. + @Override + public final void readLongsWithRebase(int total, WritableColumnVector c, int rowId) { + int requiredBytes = total * 8; + ByteBuffer buffer = getBuffer(requiredBytes); + boolean rebase = false; + for (int i = 0; i < total; i += 1) { + rebase |= buffer.getLong(buffer.position() + i * 8) < RebaseDateTime.lastSwitchJulianTs(); + } + if (rebase) { + for (int i = 0; i < total; i += 1) { + c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong())); + } + } else { + if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset); + } else { + for (int i = 0; i < total; i += 1) { + c.putLong(rowId + i, buffer.getLong()); + } + } + } + } + @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { int requiredBytes = total * 4; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index fe3d31ae8e746..4d72a33fcf774 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -26,6 +26,7 @@ import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; +import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import java.io.IOException; @@ -203,6 +204,43 @@ public void readIntegers( } } + // A fork of `readIntegers`, which rebases the date int value (days) before filling + // the Spark column vector. + public void readIntegersWithRebase( + int total, + WritableColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { + int left = total; + while (left > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + if (currentValue == level) { + data.readIntegersWithRebase(n, c, rowId); + } else { + c.putNulls(rowId, n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (currentBuffer[currentBufferIdx++] == level) { + c.putInt(rowId + i, + RebaseDateTime.rebaseJulianToGregorianDays(data.readInteger())); + } else { + c.putNull(rowId + i); + } + } + break; + } + rowId += n; + left -= n; + currentCount -= n; + } + } + // TODO: can this code duplication be removed without a perf penalty? public void readBooleans( int total, @@ -342,6 +380,43 @@ public void readLongs( } } + // A fork of `readLongs`, which rebases the timestamp long value (microseconds) before filling + // the Spark column vector. + public void readLongsWithRebase( + int total, + WritableColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { + int left = total; + while (left > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + if (currentValue == level) { + data.readLongsWithRebase(n, c, rowId); + } else { + c.putNulls(rowId, n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (currentBuffer[currentBufferIdx++] == level) { + c.putLong(rowId + i, + RebaseDateTime.rebaseJulianToGregorianMicros(data.readLong())); + } else { + c.putNull(rowId + i); + } + } + break; + } + rowId += n; + left -= n; + currentCount -= n; + } + } + public void readFloats( int total, WritableColumnVector c, @@ -508,6 +583,11 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) { } } + @Override + public void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException("only readInts is valid."); + } + @Override public byte readByte() { throw new UnsupportedOperationException("only readInts is valid."); @@ -523,6 +603,11 @@ public void readLongs(int total, WritableColumnVector c, int rowId) { throw new UnsupportedOperationException("only readInts is valid."); } + @Override + public void readLongsWithRebase(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException("only readInts is valid."); + } + @Override public void readBinary(int total, WritableColumnVector c, int rowId) { throw new UnsupportedOperationException("only readInts is valid."); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java index 57d92ae27ece8..809ac44cc8272 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java @@ -40,7 +40,9 @@ public interface VectorizedValuesReader { void readBooleans(int total, WritableColumnVector c, int rowId); void readBytes(int total, WritableColumnVector c, int rowId); void readIntegers(int total, WritableColumnVector c, int rowId); + void readIntegersWithRebase(int total, WritableColumnVector c, int rowId); void readLongs(int total, WritableColumnVector c, int rowId); + void readLongsWithRebase(int total, WritableColumnVector c, int rowId); void readFloats(int total, WritableColumnVector c, int rowId); void readDoubles(int total, WritableColumnVector c, int rowId); void readBinary(int total, WritableColumnVector c, int rowId); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala index 077ac28c149ee..7968836a00d0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala @@ -49,15 +49,15 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { .select($"seconds".cast("timestamp").as("ts")) } - private def genTsAfter1582(cardinality: Int): DataFrame = { - val start = LocalDateTime.of(1582, 10, 15, 0, 0, 0) + private def genTsAfter1900(cardinality: Int): DataFrame = { + val start = LocalDateTime.of(1900, 1, 31, 0, 0, 0) val end = LocalDateTime.of(3000, 1, 1, 0, 0, 0) genTs(cardinality, start, end) } - private def genTsBefore1582(cardinality: Int): DataFrame = { + private def genTsBefore1900(cardinality: Int): DataFrame = { val start = LocalDateTime.of(10, 1, 1, 0, 0, 0) - val end = LocalDateTime.of(1580, 1, 1, 0, 0, 0) + val end = LocalDateTime.of(1900, 1, 1, 0, 0, 0) genTs(cardinality, start, end) } @@ -71,34 +71,35 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { } private def genDateAfter1582(cardinality: Int): DataFrame = { - val start = LocalDate.of(1582, 10, 15) + val start = LocalDate.of(1582, 10, 31) val end = LocalDate.of(3000, 1, 1) genDate(cardinality, start, end) } private def genDateBefore1582(cardinality: Int): DataFrame = { val start = LocalDate.of(10, 1, 1) - val end = LocalDate.of(1580, 1, 1) + val end = LocalDate.of(1580, 10, 1) genDate(cardinality, start, end) } - private def genDF(cardinality: Int, dateTime: String, after1582: Boolean): DataFrame = { - (dateTime, after1582) match { + private def genDF(cardinality: Int, dateTime: String, modernDates: Boolean): DataFrame = { + (dateTime, modernDates) match { case ("date", true) => genDateAfter1582(cardinality) case ("date", false) => genDateBefore1582(cardinality) - case ("timestamp", true) => genTsAfter1582(cardinality) - case ("timestamp", false) => genTsBefore1582(cardinality) + case ("timestamp", true) => genTsAfter1900(cardinality) + case ("timestamp", false) => genTsBefore1900(cardinality) case _ => throw new IllegalArgumentException( - s"cardinality = $cardinality dateTime = $dateTime after1582 = $after1582") + s"cardinality = $cardinality dateTime = $dateTime modernDates = $modernDates") } } private def benchmarkInputs(benchmark: Benchmark, rowsNum: Int, dateTime: String): Unit = { - benchmark.addCase("after 1582, noop", 1) { _ => - genDF(rowsNum, dateTime, after1582 = true).noop() + val year = if (dateTime == "date") 1582 else 1900 + benchmark.addCase(s"after $year, noop", 1) { _ => + genDF(rowsNum, dateTime, modernDates = true).noop() } - benchmark.addCase("before 1582, noop", 1) { _ => - genDF(rowsNum, dateTime, after1582 = false).noop() + benchmark.addCase(s"before $year, noop", 1) { _ => + genDF(rowsNum, dateTime, modernDates = false).noop() } } @@ -107,23 +108,26 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { } private def caseName( - after1582: Boolean, + modernDates: Boolean, + dateTime: String, rebase: Option[Boolean] = None, vec: Option[Boolean] = None): String = { - val period = if (after1582) "after" else "before" + val period = if (modernDates) "after" else "before" + val year = if (dateTime == "date") 1582 else 1900 val vecFlag = vec.map(flagToStr).map(flag => s", vec $flag").getOrElse("") val rebaseFlag = rebase.map(flagToStr).map(flag => s", rebase $flag").getOrElse("") - s"$period 1582$vecFlag$rebaseFlag" + s"$period $year$vecFlag$rebaseFlag" } private def getPath( basePath: File, dateTime: String, - after1582: Boolean, + modernDates: Boolean, rebase: Option[Boolean] = None): String = { - val period = if (after1582) "after" else "before" + val period = if (modernDates) "after" else "before" + val year = if (dateTime == "date") 1582 else 1900 val rebaseFlag = rebase.map(flagToStr).map(flag => s"_$flag").getOrElse("") - basePath.getAbsolutePath + s"/${dateTime}_${period}_1582$rebaseFlag" + basePath.getAbsolutePath + s"/${dateTime}_${period}_$year$rebaseFlag" } override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { @@ -139,16 +143,16 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { rowsNum, output = output) benchmarkInputs(benchmark, rowsNum, dateTime) - Seq(true, false).foreach { after1582 => + Seq(true, false).foreach { modernDates => Seq(false, true).foreach { rebase => - benchmark.addCase(caseName(after1582, Some(rebase)), 1) { _ => + benchmark.addCase(caseName(modernDates, dateTime, Some(rebase)), 1) { _ => withSQLConf( SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> rebase.toString) { - genDF(rowsNum, dateTime, after1582) + genDF(rowsNum, dateTime, modernDates) .write .mode("overwrite") .format("parquet") - .save(getPath(path, dateTime, after1582, Some(rebase))) + .save(getPath(path, dateTime, modernDates, Some(rebase))) } } } @@ -157,16 +161,15 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { val benchmark2 = new Benchmark( s"Load ${dateTime}s from parquet", rowsNum, output = output) - Seq(true, false).foreach { after1582 => + Seq(true, false).foreach { modernDates => Seq(false, true).foreach { vec => Seq(false, true).foreach { rebase => - benchmark2.addCase(caseName(after1582, Some(rebase), Some(vec)), 3) { _ => - withSQLConf( - SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vec.toString, - SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + val name = caseName(modernDates, dateTime, Some(rebase), Some(vec)) + benchmark2.addCase(name, 3) { _ => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vec.toString) { spark.read .format("parquet") - .load(getPath(path, dateTime, after1582, Some(rebase))) + .load(getPath(path, dateTime, modernDates, Some(rebase))) .noop() } } @@ -183,13 +186,13 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { Seq("date", "timestamp").foreach { dateTime => val benchmark = new Benchmark(s"Save ${dateTime}s to ORC", rowsNum, output = output) benchmarkInputs(benchmark, rowsNum, dateTime) - Seq(true, false).foreach { after1582 => - benchmark.addCase(caseName(after1582), 1) { _ => - genDF(rowsNum, dateTime, after1582) + Seq(true, false).foreach { modernDates => + benchmark.addCase(caseName(modernDates, dateTime), 1) { _ => + genDF(rowsNum, dateTime, modernDates) .write .mode("overwrite") .format("orc") - .save(getPath(path, dateTime, after1582)) + .save(getPath(path, dateTime, modernDates)) } } benchmark.run() @@ -198,14 +201,14 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { s"Load ${dateTime}s from ORC", rowsNum, output = output) - Seq(true, false).foreach { after1582 => + Seq(true, false).foreach { modernDates => Seq(false, true).foreach { vec => - benchmark2.addCase(caseName(after1582, vec = Some(vec)), 3) { _ => + benchmark2.addCase(caseName(modernDates, dateTime, vec = Some(vec)), 3) { _ => withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vec.toString) { spark .read .format("orc") - .load(getPath(path, dateTime, after1582)) + .load(getPath(path, dateTime, modernDates)) .noop() } }