Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle

import java.io.{DataInputStream, File, FileInputStream}
import java.util.zip.CheckedInputStream

import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
import org.apache.spark.network.util.LimitedInputStream

object ShuffleChecksumUtils {

/**
* Ensure that the checksum values are consistent with index file and data file.
*/
def compareChecksums(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put this into ShuffleChecksumHelper?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ShuffleChecksumHelper is test code helper under test/scala/.... So we can not reuse it in the main code.

core/src/test/scala/org/apache/spark/shuffle/ShuffleChecksumTestHelper.scala

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I meant ShuffleChecksumHelper not ShuffleChecksumTestHelper.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I misread that.

BTW, I also considered that but I decided not to do because it's Java class which means we need to re-implement the whole logic. I guess you also suggested a moving from Scala to Scala code, not Scala to Java. Or, did you suggest to reimplement with Java?

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. No, I didn't realized that ShuffleChecksumHelper is Java code. Thanks for reply.

numPartition: Int,
algorithm: String,
checksum: File,
data: File,
index: File): Boolean = {
var checksumIn: DataInputStream = null
val expectChecksums = Array.ofDim[Long](numPartition)
try {
checksumIn = new DataInputStream(new FileInputStream(checksum))
(0 until numPartition).foreach(i => expectChecksums(i) = checksumIn.readLong())
} finally {
if (checksumIn != null) {
checksumIn.close()
}
}

var dataIn: FileInputStream = null
var indexIn: DataInputStream = null
var checkedIn: CheckedInputStream = null
try {
dataIn = new FileInputStream(data)
indexIn = new DataInputStream(new FileInputStream(index))
var prevOffset = indexIn.readLong
(0 until numPartition).foreach { i =>
val curOffset = indexIn.readLong
val limit = (curOffset - prevOffset).toInt
val bytes = new Array[Byte](limit)
val checksumCal = ShuffleChecksumHelper.getChecksumByAlgorithm(algorithm)
checkedIn = new CheckedInputStream(
new LimitedInputStream(dataIn, curOffset - prevOffset), checksumCal)
checkedIn.read(bytes, 0, limit)
prevOffset = curOffset
// checksum must be consistent at both write and read sides
if (checkedIn.getChecksum.getValue != expectChecksums(i)) return false
}
} finally {
if (dataIn != null) {
dataIn.close()
}
if (indexIn != null) {
indexIn.close()
}
if (checkedIn != null) {
checkedIn.close()
}
}
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

package org.apache.spark.shuffle

import java.io.{DataInputStream, File, FileInputStream}
import java.util.zip.CheckedInputStream

import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
import org.apache.spark.network.util.LimitedInputStream
import java.io.File

trait ShuffleChecksumTestHelper {

Expand All @@ -38,46 +34,7 @@ trait ShuffleChecksumTestHelper {
assert(data.exists(), "Data file doesn't exist")
assert(index.exists(), "Index file doesn't exist")

var checksumIn: DataInputStream = null
val expectChecksums = Array.ofDim[Long](numPartition)
try {
checksumIn = new DataInputStream(new FileInputStream(checksum))
(0 until numPartition).foreach(i => expectChecksums(i) = checksumIn.readLong())
} finally {
if (checksumIn != null) {
checksumIn.close()
}
}

var dataIn: FileInputStream = null
var indexIn: DataInputStream = null
var checkedIn: CheckedInputStream = null
try {
dataIn = new FileInputStream(data)
indexIn = new DataInputStream(new FileInputStream(index))
var prevOffset = indexIn.readLong
(0 until numPartition).foreach { i =>
val curOffset = indexIn.readLong
val limit = (curOffset - prevOffset).toInt
val bytes = new Array[Byte](limit)
val checksumCal = ShuffleChecksumHelper.getChecksumByAlgorithm(algorithm)
checkedIn = new CheckedInputStream(
new LimitedInputStream(dataIn, curOffset - prevOffset), checksumCal)
checkedIn.read(bytes, 0, limit)
prevOffset = curOffset
// checksum must be consistent at both write and read sides
assert(checkedIn.getChecksum.getValue == expectChecksums(i))
}
} finally {
if (dataIn != null) {
dataIn.close()
}
if (indexIn != null) {
indexIn.close()
}
if (checkedIn != null) {
checkedIn.close()
}
}
assert(ShuffleChecksumUtils.compareChecksums(numPartition, algorithm, checksum, data, index),
"checksum must be consistent at both write and read sides")
}
}