diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala index 6848b0fa39c7c..040a97a14d451 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala @@ -71,6 +71,8 @@ object RebaseDateTime { -719164, -682945, -646420, -609895, -536845, -500320, -463795, -390745, -354220, -317695, -244645, -208120, -171595, -141427) + final val lastSwitchJulianDay: Int = julianGregDiffSwitchDay.last + // The first days of Common Era (CE) which is mapped to the '0001-01-01' date in Julian calendar. private final val julianCommonEraStartDay = julianGregDiffSwitchDay(0) @@ -416,6 +418,8 @@ object RebaseDateTime { // in the interval: [julianGregDiffSwitchMicros(i), julianGregDiffSwitchMicros(i+1)) private val julianGregRebaseMap = loadRebaseRecords("julian-gregorian-rebase-micros.json") + final val lastSwitchJulianTs: Long = julianGregRebaseMap.values.map(_.switches.last).max + /** * An optimized version of [[rebaseJulianToGregorianMicros(ZoneId, Long)]]. This method leverages * the pre-calculated rebasing maps to save calculation. If the rebasing map doesn't contain diff --git a/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt b/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt index 2a9322a4b462a..03e0d7b8bc575 100644 --- a/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/DateTimeRebaseBenchmark-jdk11-results.txt @@ -2,93 +2,93 @@ Rebasing dates/timestamps in Parquet datasource ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save dates to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 21171 21171 0 4.7 211.7 1.0X -before 1582, noop 11036 11036 0 9.1 110.4 1.9X -after 1582, rebase off 34321 34321 0 2.9 343.2 0.6X -after 1582, rebase on 33269 33269 0 3.0 332.7 0.6X -before 1582, rebase off 22016 22016 0 4.5 220.2 1.0X -before 1582, rebase on 23338 23338 0 4.3 233.4 0.9X +after 1582, noop 20073 20073 0 5.0 200.7 1.0X +before 1582, noop 10985 10985 0 9.1 109.9 1.8X +after 1582, rebase off 32245 32245 0 3.1 322.4 0.6X +after 1582, rebase on 31434 31434 0 3.2 314.3 0.6X +before 1582, rebase off 21590 21590 0 4.6 215.9 0.9X +before 1582, rebase on 22963 22963 0 4.4 229.6 0.9X -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load dates from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off, rebase off 12791 13089 287 7.8 127.9 1.0X -after 1582, vec off, rebase on 13203 13271 81 7.6 132.0 1.0X -after 1582, vec on, rebase off 3709 3764 49 27.0 37.1 3.4X -after 1582, vec on, rebase on 5082 5114 29 19.7 50.8 2.5X -before 1582, vec off, rebase off 13059 13153 87 7.7 130.6 1.0X -before 1582, vec off, rebase on 14211 14236 27 7.0 142.1 0.9X -before 1582, vec on, rebase off 3687 3749 72 27.1 36.9 3.5X -before 1582, vec on, rebase on 5449 5497 56 18.4 54.5 2.3X +after 1582, vec off, rebase off 12815 12858 40 7.8 128.1 1.0X +after 1582, vec off, rebase on 13030 13167 148 7.7 130.3 1.0X +after 1582, vec on, rebase off 3705 3712 6 27.0 37.1 3.5X +after 1582, vec on, rebase on 3788 3791 3 26.4 37.9 3.4X +before 1582, vec off, rebase off 12873 12943 61 7.8 128.7 1.0X +before 1582, vec off, rebase on 14072 14165 80 7.1 140.7 0.9X +before 1582, vec on, rebase off 3694 3708 15 27.1 36.9 3.5X +before 1582, vec on, rebase on 4403 4484 81 22.7 44.0 2.9X -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save timestamps to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 2831 2831 0 35.3 28.3 1.0X -before 1582, noop 2816 2816 0 35.5 28.2 1.0X -after 1582, rebase off 15543 15543 0 6.4 155.4 0.2X -after 1582, rebase on 18391 18391 0 5.4 183.9 0.2X -before 1582, rebase off 15747 15747 0 6.4 157.5 0.2X -before 1582, rebase on 18846 18846 0 5.3 188.5 0.2X +after 1900, noop 3032 3032 0 33.0 30.3 1.0X +before 1900, noop 3043 3043 0 32.9 30.4 1.0X +after 1900, rebase off 15634 15634 0 6.4 156.3 0.2X +after 1900, rebase on 18233 18233 0 5.5 182.3 0.2X +before 1900, rebase off 15820 15820 0 6.3 158.2 0.2X +before 1900, rebase on 19921 19921 0 5.0 199.2 0.2X -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load timestamps from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off, rebase off 16126 16216 78 6.2 161.3 1.0X -after 1582, vec off, rebase on 18277 18453 165 5.5 182.8 0.9X -after 1582, vec on, rebase off 5030 5067 42 19.9 50.3 3.2X -after 1582, vec on, rebase on 8553 8583 43 11.7 85.5 1.9X -before 1582, vec off, rebase off 15828 15872 39 6.3 158.3 1.0X -before 1582, vec off, rebase on 18899 18959 103 5.3 189.0 0.9X -before 1582, vec on, rebase off 4961 5009 43 20.2 49.6 3.3X -before 1582, vec on, rebase on 9099 9140 40 11.0 91.0 1.8X +after 1900, vec off, rebase off 14987 15008 18 6.7 149.9 1.0X +after 1900, vec off, rebase on 17500 17628 210 5.7 175.0 0.9X +after 1900, vec on, rebase off 5030 5036 7 19.9 50.3 3.0X +after 1900, vec on, rebase on 5066 5109 44 19.7 50.7 3.0X +before 1900, vec off, rebase off 15094 15213 121 6.6 150.9 1.0X +before 1900, vec off, rebase on 18098 18175 101 5.5 181.0 0.8X +before 1900, vec on, rebase off 5008 5012 4 20.0 50.1 3.0X +before 1900, vec on, rebase on 8803 8848 55 11.4 88.0 1.7X ================================================================================================ Rebasing dates/timestamps in ORC datasource ================================================================================================ -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save dates to ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 21026 21026 0 4.8 210.3 1.0X -before 1582, noop 11040 11040 0 9.1 110.4 1.9X -after 1582 28171 28171 0 3.5 281.7 0.7X -before 1582 18955 18955 0 5.3 189.5 1.1X +after 1582, noop 19593 19593 0 5.1 195.9 1.0X +before 1582, noop 10581 10581 0 9.5 105.8 1.9X +after 1582 27843 27843 0 3.6 278.4 0.7X +before 1582 19435 19435 0 5.1 194.4 1.0X -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load dates from ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off 10876 10931 49 9.2 108.8 1.0X -after 1582, vec on 3900 3913 20 25.6 39.0 2.8X -before 1582, vec off 11165 11174 12 9.0 111.6 1.0X -before 1582, vec on 4208 4214 7 23.8 42.1 2.6X +after 1582, vec off 10395 10507 119 9.6 103.9 1.0X +after 1582, vec on 3921 3945 22 25.5 39.2 2.7X +before 1582, vec off 10762 10860 127 9.3 107.6 1.0X +before 1582, vec on 4194 4226 41 23.8 41.9 2.5X -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save timestamps to ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 2924 2924 0 34.2 29.2 1.0X -before 1582, noop 2820 2820 0 35.5 28.2 1.0X -after 1582 22228 22228 0 4.5 222.3 0.1X -before 1582 22590 22590 0 4.4 225.9 0.1X +after 1900, noop 3003 3003 0 33.3 30.0 1.0X +before 1900, noop 3016 3016 0 33.2 30.2 1.0X +after 1900 21804 21804 0 4.6 218.0 0.1X +before 1900 23920 23920 0 4.2 239.2 0.1X -OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load timestamps from ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off 13591 13658 59 7.4 135.9 1.0X -after 1582, vec on 7399 7488 126 13.5 74.0 1.8X -before 1582, vec off 14065 14096 30 7.1 140.7 1.0X -before 1582, vec on 7950 8127 249 12.6 79.5 1.7X +after 1900, vec off 14112 14128 17 7.1 141.1 1.0X +after 1900, vec on 7347 7459 134 13.6 73.5 1.9X +before 1900, vec off 15170 15192 27 6.6 151.7 0.9X +before 1900, vec on 8280 8312 52 12.1 82.8 1.7X diff --git a/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt b/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt index 050950571511d..a32a1ad8af89e 100644 --- a/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt +++ b/sql/core/benchmarks/DateTimeRebaseBenchmark-results.txt @@ -2,93 +2,93 @@ Rebasing dates/timestamps in Parquet datasource ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save dates to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 24114 24114 0 4.1 241.1 1.0X -before 1582, noop 10250 10250 0 9.8 102.5 2.4X -after 1582, rebase off 36672 36672 0 2.7 366.7 0.7X -after 1582, rebase on 37123 37123 0 2.7 371.2 0.6X -before 1582, rebase off 21925 21925 0 4.6 219.2 1.1X -before 1582, rebase on 22341 22341 0 4.5 223.4 1.1X +after 1582, noop 23088 23088 0 4.3 230.9 1.0X +before 1582, noop 10782 10782 0 9.3 107.8 2.1X +after 1582, rebase off 34821 34821 0 2.9 348.2 0.7X +after 1582, rebase on 35040 35040 0 2.9 350.4 0.7X +before 1582, rebase off 22151 22151 0 4.5 221.5 1.0X +before 1582, rebase on 24677 24677 0 4.1 246.8 0.9X -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load dates from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off, rebase off 12456 12601 126 8.0 124.6 1.0X -after 1582, vec off, rebase on 13299 13336 32 7.5 133.0 0.9X -after 1582, vec on, rebase off 3623 3660 40 27.6 36.2 3.4X -after 1582, vec on, rebase on 5160 5177 15 19.4 51.6 2.4X -before 1582, vec off, rebase off 13177 13264 76 7.6 131.8 0.9X -before 1582, vec off, rebase on 14102 14149 46 7.1 141.0 0.9X -before 1582, vec on, rebase off 3649 3670 34 27.4 36.5 3.4X -before 1582, vec on, rebase on 5652 5667 15 17.7 56.5 2.2X +after 1582, vec off, rebase off 13559 13650 79 7.4 135.6 1.0X +after 1582, vec off, rebase on 12942 12973 28 7.7 129.4 1.0X +after 1582, vec on, rebase off 3657 3689 29 27.3 36.6 3.7X +after 1582, vec on, rebase on 3859 3902 53 25.9 38.6 3.5X +before 1582, vec off, rebase off 12588 12607 17 7.9 125.9 1.1X +before 1582, vec off, rebase on 13396 13420 25 7.5 134.0 1.0X +before 1582, vec on, rebase off 3631 3650 19 27.5 36.3 3.7X +before 1582, vec on, rebase on 4706 4755 77 21.3 47.1 2.9X -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save timestamps to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 2871 2871 0 34.8 28.7 1.0X -before 1582, noop 2753 2753 0 36.3 27.5 1.0X -after 1582, rebase off 15927 15927 0 6.3 159.3 0.2X -after 1582, rebase on 19138 19138 0 5.2 191.4 0.1X -before 1582, rebase off 16137 16137 0 6.2 161.4 0.2X -before 1582, rebase on 19584 19584 0 5.1 195.8 0.1X +after 1900, noop 2681 2681 0 37.3 26.8 1.0X +before 1900, noop 3051 3051 0 32.8 30.5 0.9X +after 1900, rebase off 16901 16901 0 5.9 169.0 0.2X +after 1900, rebase on 19725 19725 0 5.1 197.3 0.1X +before 1900, rebase off 16900 16900 0 5.9 169.0 0.2X +before 1900, rebase on 20381 20381 0 4.9 203.8 0.1X -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load timestamps from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off, rebase off 14995 15047 47 6.7 150.0 1.0X -after 1582, vec off, rebase on 18111 18146 37 5.5 181.1 0.8X -after 1582, vec on, rebase off 4837 4873 44 20.7 48.4 3.1X -after 1582, vec on, rebase on 9542 9669 111 10.5 95.4 1.6X -before 1582, vec off, rebase off 14993 15090 94 6.7 149.9 1.0X -before 1582, vec off, rebase on 18675 18712 64 5.4 186.7 0.8X -before 1582, vec on, rebase off 4908 4923 15 20.4 49.1 3.1X -before 1582, vec on, rebase on 10128 10148 19 9.9 101.3 1.5X +after 1900, vec off, rebase off 15236 15291 62 6.6 152.4 1.0X +after 1900, vec off, rebase on 17832 18047 187 5.6 178.3 0.9X +after 1900, vec on, rebase off 4875 4901 31 20.5 48.7 3.1X +after 1900, vec on, rebase on 5354 5386 37 18.7 53.5 2.8X +before 1900, vec off, rebase off 15229 15338 108 6.6 152.3 1.0X +before 1900, vec off, rebase on 18626 18668 44 5.4 186.3 0.8X +before 1900, vec on, rebase off 4968 4975 6 20.1 49.7 3.1X +before 1900, vec on, rebase on 9913 9932 16 10.1 99.1 1.5X ================================================================================================ Rebasing dates/timestamps in ORC datasource ================================================================================================ -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save dates to ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 23977 23977 0 4.2 239.8 1.0X -before 1582, noop 10094 10094 0 9.9 100.9 2.4X -after 1582 33115 33115 0 3.0 331.2 0.7X -before 1582 19430 19430 0 5.1 194.3 1.2X +after 1582, noop 22942 22942 0 4.4 229.4 1.0X +before 1582, noop 11035 11035 0 9.1 110.4 2.1X +after 1582 31341 31341 0 3.2 313.4 0.7X +before 1582 20376 20376 0 4.9 203.8 1.1X -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load dates from ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off 10217 10241 21 9.8 102.2 1.0X -after 1582, vec on 3671 3691 31 27.2 36.7 2.8X -before 1582, vec off 10800 10874 114 9.3 108.0 0.9X -before 1582, vec on 4118 4165 74 24.3 41.2 2.5X +after 1582, vec off 10361 10378 29 9.7 103.6 1.0X +after 1582, vec on 3820 3828 11 26.2 38.2 2.7X +before 1582, vec off 10709 10720 13 9.3 107.1 1.0X +before 1582, vec on 4136 4153 15 24.2 41.4 2.5X -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Save timestamps to ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, noop 2691 2691 0 37.2 26.9 1.0X -before 1582, noop 2743 2743 0 36.5 27.4 1.0X -after 1582 21409 21409 0 4.7 214.1 0.1X -before 1582 22554 22554 0 4.4 225.5 0.1X +after 1900, noop 2888 2888 0 34.6 28.9 1.0X +before 1900, noop 2823 2823 0 35.4 28.2 1.0X +after 1900 19790 19790 0 5.1 197.9 0.1X +before 1900 20774 20774 0 4.8 207.7 0.1X -OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws +OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz Load timestamps from ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -after 1582, vec off 14752 14855 103 6.8 147.5 1.0X -after 1582, vec on 8146 8185 34 12.3 81.5 1.8X -before 1582, vec off 15247 15294 46 6.6 152.5 1.0X -before 1582, vec on 8414 8466 52 11.9 84.1 1.8X +after 1900, vec off 14649 14687 38 6.8 146.5 1.0X +after 1900, vec on 7850 7937 130 12.7 78.5 1.9X +before 1900, vec off 15354 15417 108 6.5 153.5 1.0X +before 1900, vec on 8382 8408 22 11.9 83.8 1.7X diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index cfb873ff37379..7ae60f22aa790 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -423,15 +423,8 @@ private void readIntBatch(int rowId, int num, WritableColumnVector column) throw num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (column.dataType() == DataTypes.DateType ) { if (rebaseDateTime) { - for (int i = 0; i < num; i++) { - if (defColumn.readInteger() == maxDefLevel) { - column.putInt( - rowId + i, - RebaseDateTime.rebaseJulianToGregorianDays(dataColumn.readInteger())); - } else { - column.putNull(rowId + i); - } - } + defColumn.readIntegersWithRebase( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else { defColumn.readIntegers( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); @@ -449,15 +442,8 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (originalType == OriginalType.TIMESTAMP_MICROS) { if (rebaseDateTime) { - for (int i = 0; i < num; i++) { - if (defColumn.readInteger() == maxDefLevel) { - column.putLong( - rowId + i, - RebaseDateTime.rebaseJulianToGregorianMicros(dataColumn.readLong())); - } else { - column.putNull(rowId + i); - } - } + defColumn.readLongsWithRebase( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else { defColumn.readLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index c62dc3d86386e..2ed2e11b60c03 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -22,6 +22,7 @@ import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.io.ParquetDecodingException; +import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import org.apache.parquet.column.values.ValuesReader; @@ -81,6 +82,33 @@ public final void readIntegers(int total, WritableColumnVector c, int rowId) { } } + // A fork of `readIntegers` to rebase the date values. For performance reasons, this method + // iterates the values twice: check if we need to rebase first, then go to the optimized branch + // if rebase is not needed. + @Override + public final void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) { + int requiredBytes = total * 4; + ByteBuffer buffer = getBuffer(requiredBytes); + boolean rebase = false; + for (int i = 0; i < total; i += 1) { + rebase |= buffer.getInt(buffer.position() + i * 4) < RebaseDateTime.lastSwitchJulianDay(); + } + if (rebase) { + for (int i = 0; i < total; i += 1) { + c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt())); + } + } else { + if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putIntsLittleEndian(rowId, total, buffer.array(), offset); + } else { + for (int i = 0; i < total; i += 1) { + c.putInt(rowId + i, buffer.getInt()); + } + } + } + } + @Override public final void readLongs(int total, WritableColumnVector c, int rowId) { int requiredBytes = total * 8; @@ -96,6 +124,33 @@ public final void readLongs(int total, WritableColumnVector c, int rowId) { } } + // A fork of `readLongs` to rebase the timestamp values. For performance reasons, this method + // iterates the values twice: check if we need to rebase first, then go to the optimized branch + // if rebase is not needed. + @Override + public final void readLongsWithRebase(int total, WritableColumnVector c, int rowId) { + int requiredBytes = total * 8; + ByteBuffer buffer = getBuffer(requiredBytes); + boolean rebase = false; + for (int i = 0; i < total; i += 1) { + rebase |= buffer.getLong(buffer.position() + i * 8) < RebaseDateTime.lastSwitchJulianTs(); + } + if (rebase) { + for (int i = 0; i < total; i += 1) { + c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong())); + } + } else { + if (buffer.hasArray()) { + int offset = buffer.arrayOffset() + buffer.position(); + c.putLongsLittleEndian(rowId, total, buffer.array(), offset); + } else { + for (int i = 0; i < total; i += 1) { + c.putLong(rowId + i, buffer.getLong()); + } + } + } + } + @Override public final void readFloats(int total, WritableColumnVector c, int rowId) { int requiredBytes = total * 4; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index fe3d31ae8e746..4d72a33fcf774 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -26,6 +26,7 @@ import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; +import org.apache.spark.sql.catalyst.util.RebaseDateTime; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; import java.io.IOException; @@ -203,6 +204,43 @@ public void readIntegers( } } + // A fork of `readIntegers`, which rebases the date int value (days) before filling + // the Spark column vector. + public void readIntegersWithRebase( + int total, + WritableColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { + int left = total; + while (left > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + if (currentValue == level) { + data.readIntegersWithRebase(n, c, rowId); + } else { + c.putNulls(rowId, n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (currentBuffer[currentBufferIdx++] == level) { + c.putInt(rowId + i, + RebaseDateTime.rebaseJulianToGregorianDays(data.readInteger())); + } else { + c.putNull(rowId + i); + } + } + break; + } + rowId += n; + left -= n; + currentCount -= n; + } + } + // TODO: can this code duplication be removed without a perf penalty? public void readBooleans( int total, @@ -342,6 +380,43 @@ public void readLongs( } } + // A fork of `readLongs`, which rebases the timestamp long value (microseconds) before filling + // the Spark column vector. + public void readLongsWithRebase( + int total, + WritableColumnVector c, + int rowId, + int level, + VectorizedValuesReader data) throws IOException { + int left = total; + while (left > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + if (currentValue == level) { + data.readLongsWithRebase(n, c, rowId); + } else { + c.putNulls(rowId, n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (currentBuffer[currentBufferIdx++] == level) { + c.putLong(rowId + i, + RebaseDateTime.rebaseJulianToGregorianMicros(data.readLong())); + } else { + c.putNull(rowId + i); + } + } + break; + } + rowId += n; + left -= n; + currentCount -= n; + } + } + public void readFloats( int total, WritableColumnVector c, @@ -508,6 +583,11 @@ public void readIntegers(int total, WritableColumnVector c, int rowId) { } } + @Override + public void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException("only readInts is valid."); + } + @Override public byte readByte() { throw new UnsupportedOperationException("only readInts is valid."); @@ -523,6 +603,11 @@ public void readLongs(int total, WritableColumnVector c, int rowId) { throw new UnsupportedOperationException("only readInts is valid."); } + @Override + public void readLongsWithRebase(int total, WritableColumnVector c, int rowId) { + throw new UnsupportedOperationException("only readInts is valid."); + } + @Override public void readBinary(int total, WritableColumnVector c, int rowId) { throw new UnsupportedOperationException("only readInts is valid."); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java index 57d92ae27ece8..809ac44cc8272 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java @@ -40,7 +40,9 @@ public interface VectorizedValuesReader { void readBooleans(int total, WritableColumnVector c, int rowId); void readBytes(int total, WritableColumnVector c, int rowId); void readIntegers(int total, WritableColumnVector c, int rowId); + void readIntegersWithRebase(int total, WritableColumnVector c, int rowId); void readLongs(int total, WritableColumnVector c, int rowId); + void readLongsWithRebase(int total, WritableColumnVector c, int rowId); void readFloats(int total, WritableColumnVector c, int rowId); void readDoubles(int total, WritableColumnVector c, int rowId); void readBinary(int total, WritableColumnVector c, int rowId); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala index 077ac28c149ee..7968836a00d0f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DateTimeRebaseBenchmark.scala @@ -49,15 +49,15 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { .select($"seconds".cast("timestamp").as("ts")) } - private def genTsAfter1582(cardinality: Int): DataFrame = { - val start = LocalDateTime.of(1582, 10, 15, 0, 0, 0) + private def genTsAfter1900(cardinality: Int): DataFrame = { + val start = LocalDateTime.of(1900, 1, 31, 0, 0, 0) val end = LocalDateTime.of(3000, 1, 1, 0, 0, 0) genTs(cardinality, start, end) } - private def genTsBefore1582(cardinality: Int): DataFrame = { + private def genTsBefore1900(cardinality: Int): DataFrame = { val start = LocalDateTime.of(10, 1, 1, 0, 0, 0) - val end = LocalDateTime.of(1580, 1, 1, 0, 0, 0) + val end = LocalDateTime.of(1900, 1, 1, 0, 0, 0) genTs(cardinality, start, end) } @@ -71,34 +71,35 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { } private def genDateAfter1582(cardinality: Int): DataFrame = { - val start = LocalDate.of(1582, 10, 15) + val start = LocalDate.of(1582, 10, 31) val end = LocalDate.of(3000, 1, 1) genDate(cardinality, start, end) } private def genDateBefore1582(cardinality: Int): DataFrame = { val start = LocalDate.of(10, 1, 1) - val end = LocalDate.of(1580, 1, 1) + val end = LocalDate.of(1580, 10, 1) genDate(cardinality, start, end) } - private def genDF(cardinality: Int, dateTime: String, after1582: Boolean): DataFrame = { - (dateTime, after1582) match { + private def genDF(cardinality: Int, dateTime: String, modernDates: Boolean): DataFrame = { + (dateTime, modernDates) match { case ("date", true) => genDateAfter1582(cardinality) case ("date", false) => genDateBefore1582(cardinality) - case ("timestamp", true) => genTsAfter1582(cardinality) - case ("timestamp", false) => genTsBefore1582(cardinality) + case ("timestamp", true) => genTsAfter1900(cardinality) + case ("timestamp", false) => genTsBefore1900(cardinality) case _ => throw new IllegalArgumentException( - s"cardinality = $cardinality dateTime = $dateTime after1582 = $after1582") + s"cardinality = $cardinality dateTime = $dateTime modernDates = $modernDates") } } private def benchmarkInputs(benchmark: Benchmark, rowsNum: Int, dateTime: String): Unit = { - benchmark.addCase("after 1582, noop", 1) { _ => - genDF(rowsNum, dateTime, after1582 = true).noop() + val year = if (dateTime == "date") 1582 else 1900 + benchmark.addCase(s"after $year, noop", 1) { _ => + genDF(rowsNum, dateTime, modernDates = true).noop() } - benchmark.addCase("before 1582, noop", 1) { _ => - genDF(rowsNum, dateTime, after1582 = false).noop() + benchmark.addCase(s"before $year, noop", 1) { _ => + genDF(rowsNum, dateTime, modernDates = false).noop() } } @@ -107,23 +108,26 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { } private def caseName( - after1582: Boolean, + modernDates: Boolean, + dateTime: String, rebase: Option[Boolean] = None, vec: Option[Boolean] = None): String = { - val period = if (after1582) "after" else "before" + val period = if (modernDates) "after" else "before" + val year = if (dateTime == "date") 1582 else 1900 val vecFlag = vec.map(flagToStr).map(flag => s", vec $flag").getOrElse("") val rebaseFlag = rebase.map(flagToStr).map(flag => s", rebase $flag").getOrElse("") - s"$period 1582$vecFlag$rebaseFlag" + s"$period $year$vecFlag$rebaseFlag" } private def getPath( basePath: File, dateTime: String, - after1582: Boolean, + modernDates: Boolean, rebase: Option[Boolean] = None): String = { - val period = if (after1582) "after" else "before" + val period = if (modernDates) "after" else "before" + val year = if (dateTime == "date") 1582 else 1900 val rebaseFlag = rebase.map(flagToStr).map(flag => s"_$flag").getOrElse("") - basePath.getAbsolutePath + s"/${dateTime}_${period}_1582$rebaseFlag" + basePath.getAbsolutePath + s"/${dateTime}_${period}_$year$rebaseFlag" } override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { @@ -139,16 +143,16 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { rowsNum, output = output) benchmarkInputs(benchmark, rowsNum, dateTime) - Seq(true, false).foreach { after1582 => + Seq(true, false).foreach { modernDates => Seq(false, true).foreach { rebase => - benchmark.addCase(caseName(after1582, Some(rebase)), 1) { _ => + benchmark.addCase(caseName(modernDates, dateTime, Some(rebase)), 1) { _ => withSQLConf( SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> rebase.toString) { - genDF(rowsNum, dateTime, after1582) + genDF(rowsNum, dateTime, modernDates) .write .mode("overwrite") .format("parquet") - .save(getPath(path, dateTime, after1582, Some(rebase))) + .save(getPath(path, dateTime, modernDates, Some(rebase))) } } } @@ -157,16 +161,15 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { val benchmark2 = new Benchmark( s"Load ${dateTime}s from parquet", rowsNum, output = output) - Seq(true, false).foreach { after1582 => + Seq(true, false).foreach { modernDates => Seq(false, true).foreach { vec => Seq(false, true).foreach { rebase => - benchmark2.addCase(caseName(after1582, Some(rebase), Some(vec)), 3) { _ => - withSQLConf( - SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vec.toString, - SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) { + val name = caseName(modernDates, dateTime, Some(rebase), Some(vec)) + benchmark2.addCase(name, 3) { _ => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vec.toString) { spark.read .format("parquet") - .load(getPath(path, dateTime, after1582, Some(rebase))) + .load(getPath(path, dateTime, modernDates, Some(rebase))) .noop() } } @@ -183,13 +186,13 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { Seq("date", "timestamp").foreach { dateTime => val benchmark = new Benchmark(s"Save ${dateTime}s to ORC", rowsNum, output = output) benchmarkInputs(benchmark, rowsNum, dateTime) - Seq(true, false).foreach { after1582 => - benchmark.addCase(caseName(after1582), 1) { _ => - genDF(rowsNum, dateTime, after1582) + Seq(true, false).foreach { modernDates => + benchmark.addCase(caseName(modernDates, dateTime), 1) { _ => + genDF(rowsNum, dateTime, modernDates) .write .mode("overwrite") .format("orc") - .save(getPath(path, dateTime, after1582)) + .save(getPath(path, dateTime, modernDates)) } } benchmark.run() @@ -198,14 +201,14 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark { s"Load ${dateTime}s from ORC", rowsNum, output = output) - Seq(true, false).foreach { after1582 => + Seq(true, false).foreach { modernDates => Seq(false, true).foreach { vec => - benchmark2.addCase(caseName(after1582, vec = Some(vec)), 3) { _ => + benchmark2.addCase(caseName(modernDates, dateTime, vec = Some(vec)), 3) { _ => withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vec.toString) { spark .read .format("orc") - .load(getPath(path, dateTime, after1582)) + .load(getPath(path, dateTime, modernDates)) .noop() } }