diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java index f9c0c60c2f2c6..62fcda701d948 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java @@ -19,10 +19,7 @@ import java.io.*; import java.util.concurrent.TimeUnit; -import java.util.zip.Adler32; -import java.util.zip.CRC32; -import java.util.zip.CheckedInputStream; -import java.util.zip.Checksum; +import java.util.zip.*; import com.google.common.io.ByteStreams; @@ -66,6 +63,13 @@ private static Checksum[] getChecksumsByAlgorithm(int num, String algorithm) { } } + case "CRC32C" -> { + checksums = new CRC32C[num]; + for (int i = 0; i < num; i++) { + checksums[i] = new CRC32C(); + } + } + default -> throw new UnsupportedOperationException( "Unsupported shuffle checksum algorithm: " + algorithm); } diff --git a/core/benchmarks/ChecksumBenchmark-jdk21-results.txt b/core/benchmarks/ChecksumBenchmark-jdk21-results.txt new file mode 100644 index 0000000000000..85370450f355c --- /dev/null +++ b/core/benchmarks/ChecksumBenchmark-jdk21-results.txt @@ -0,0 +1,14 @@ +================================================================================================ +Benchmark Checksum Algorithms +================================================================================================ + +OpenJDK 64-Bit Server VM 21.0.4+7-LTS on Linux 6.5.0-1025-azure +AMD EPYC 7763 64-Core Processor +Checksum Algorithms: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +CRC32 2743 2746 3 0.0 2678409.9 1.0X +CRC32C 1974 2055 70 0.0 1928129.2 1.4X +Adler32 12689 12709 17 0.0 12391425.9 0.2X +hadoop PureJavaCrc32C 23027 23041 13 0.0 22487098.9 0.1X + + diff --git a/core/benchmarks/ChecksumBenchmark-results.txt b/core/benchmarks/ChecksumBenchmark-results.txt new file mode 100644 index 0000000000000..cce5a61abf637 --- /dev/null +++ b/core/benchmarks/ChecksumBenchmark-results.txt @@ -0,0 +1,14 @@ +================================================================================================ +Benchmark Checksum Algorithms +================================================================================================ + +OpenJDK 64-Bit Server VM 17.0.12+7-LTS on Linux 6.5.0-1025-azure +AMD EPYC 7763 64-Core Processor +Checksum Algorithms: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +CRC32 2757 2758 1 0.0 2692250.2 1.0X +CRC32C 2142 2244 116 0.0 2091901.8 1.3X +Adler32 12699 12712 15 0.0 12401205.6 0.2X +hadoop PureJavaCrc32C 23049 23066 15 0.0 22508320.3 0.1X + + diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e9e411cc56b51..1cf6746e9f8b4 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1618,8 +1618,7 @@ package object config { .version("3.2.0") .stringConf .transform(_.toUpperCase(Locale.ROOT)) - .checkValue(Set("ADLER32", "CRC32").contains, "Shuffle checksum algorithm " + - "should be either ADLER32 or CRC32.") + .checkValues(Set("ADLER32", "CRC32", "CRC32C")) .createWithDefault("ADLER32") private[spark] val SHUFFLE_COMPRESS = diff --git a/core/src/test/scala/org/apache/spark/shuffle/ChecksumBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/ChecksumBenchmark.scala new file mode 100644 index 0000000000000..16a50fabb7ffd --- /dev/null +++ b/core/src/test/scala/org/apache/spark/shuffle/ChecksumBenchmark.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.util.zip.{Adler32, CRC32, CRC32C} + +import org.apache.hadoop.util.PureJavaCrc32C + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} + +/** + * Benchmark for Checksum Algorithms used by shuffle. + * {{{ + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "core/Test/runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/Test/runMain " + * Results will be written to "benchmarks/ChecksumBenchmark-results.txt". + * }}} + */ +object ChecksumBenchmark extends BenchmarkBase { + + val N = 1024 + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + runBenchmark("Benchmark Checksum Algorithms") { + val data: Array[Byte] = (1 until 32 * 1024 * 1024).map(_.toByte).toArray + val benchmark = new Benchmark("Checksum Algorithms", N, 3, output = output) + benchmark.addCase("CRC32") { _ => + (1 to N).foreach(_ => new CRC32().update(data)) + } + benchmark.addCase(s"CRC32C") { _ => + (1 to N).foreach(_ => new CRC32C().update(data)) + } + benchmark.addCase(s"Adler32") { _ => + (1 to N).foreach(_ => new Adler32().update(data)) + } + benchmark.addCase(s"hadoop PureJavaCrc32C") { _ => + (1 to N).foreach(_ => new PureJavaCrc32C().update(data)) + } + benchmark.run() + } + } +} diff --git a/docs/configuration.md b/docs/configuration.md index ff2f21d282a5e..81fb43b5f9ab0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1332,7 +1332,7 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.checksum.algorithm ADLER32 - The algorithm is used to calculate the shuffle checksum. Currently, it only supports built-in algorithms of JDK, e.g., ADLER32, CRC32. + The algorithm is used to calculate the shuffle checksum. Currently, it only supports built-in algorithms of JDK, e.g., ADLER32, CRC32 and CRC32C. 3.2.0