Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 9 additions & 128 deletions core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.security

import java.io.{Closeable, InputStream, IOException, OutputStream}
import java.io.{InputStream, OutputStream}
import java.nio.ByteBuffer
import java.nio.channels.{ReadableByteChannel, WritableByteChannel}
import java.util.Properties
Expand Down Expand Up @@ -55,10 +55,8 @@ private[spark] object CryptoStreamUtils extends Logging {
val params = new CryptoParams(key, sparkConf)
val iv = createInitializationVector(params.conf)
os.write(iv)
new ErrorHandlingOutputStream(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I'm using these crypto streams, I looked at these codes and seems the workaround is not necessary anymore.

new CryptoOutputStream(params.transformation, params.conf, os, params.keySpec,
new IvParameterSpec(iv)),
os)
new CryptoOutputStream(params.transformation, params.conf, os, params.keySpec,
new IvParameterSpec(iv))
}

/**
Expand All @@ -73,10 +71,8 @@ private[spark] object CryptoStreamUtils extends Logging {
val helper = new CryptoHelperChannel(channel)

helper.write(ByteBuffer.wrap(iv))
new ErrorHandlingWritableChannel(
new CryptoOutputStream(params.transformation, params.conf, helper, params.keySpec,
new IvParameterSpec(iv)),
helper)
new CryptoOutputStream(params.transformation, params.conf, helper, params.keySpec,
new IvParameterSpec(iv))
}

/**
Expand All @@ -89,10 +85,8 @@ private[spark] object CryptoStreamUtils extends Logging {
val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
ByteStreams.readFully(is, iv)
val params = new CryptoParams(key, sparkConf)
new ErrorHandlingInputStream(
new CryptoInputStream(params.transformation, params.conf, is, params.keySpec,
new IvParameterSpec(iv)),
is)
new CryptoInputStream(params.transformation, params.conf, is, params.keySpec,
new IvParameterSpec(iv))
}

/**
Expand All @@ -107,10 +101,8 @@ private[spark] object CryptoStreamUtils extends Logging {
JavaUtils.readFully(channel, buf)

val params = new CryptoParams(key, sparkConf)
new ErrorHandlingReadableChannel(
new CryptoInputStream(params.transformation, params.conf, channel, params.keySpec,
new IvParameterSpec(iv)),
channel)
new CryptoInputStream(params.transformation, params.conf, channel, params.keySpec,
new IvParameterSpec(iv))
}

def toCryptoConf(conf: SparkConf): Properties = {
Expand Down Expand Up @@ -166,117 +158,6 @@ private[spark] object CryptoStreamUtils extends Logging {

}

/**
* SPARK-25535. The commons-crypto library will throw InternalError if something goes
* wrong, and leave bad state behind in the Java wrappers, so it's not safe to use them
* afterwards. This wrapper detects that situation and avoids further calls into the
* commons-crypto code, while still allowing the underlying streams to be closed.
*
* This should be removed once CRYPTO-141 is fixed (and Spark upgrades its commons-crypto
Copy link
Member Author

@viirya viirya Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CRYPTO-141 is fixed in 1.1.0 (https://commons.apache.org/proper/commons-crypto/changes-report.html). Currently used commons-crypto library in Spark is 1.1.0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @vanzin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin left some comments at https://issues.apache.org/jira/browse/CRYPTO-141
If I find some time I'll see whether the ASF has any infra for testing this on Win32 and MacOS, since I only have a Linux dev env available.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, as I said that I happened to look at this code while working on related stuff. The workaround looks out-of-dated based on the status of CRYPTO-141.

Let's wait for what @vanzin thinks about the fix of CRYPTO-141.

* dependency).
*/
trait BaseErrorHandler extends Closeable {

private var closed = false

/** The encrypted stream that may get into an unhealthy state. */
protected def cipherStream: Closeable

/**
* The underlying stream that is being wrapped by the encrypted stream, so that it can be
* closed even if there's an error in the crypto layer.
*/
protected def original: Closeable

protected def safeCall[T](fn: => T): T = {
if (closed) {
throw new IOException("Cipher stream is closed.")
}
try {
fn
} catch {
case ie: InternalError =>
closed = true
original.close()
throw ie
}
}

override def close(): Unit = {
if (!closed) {
cipherStream.close()
}
}

}

// Visible for testing.
class ErrorHandlingReadableChannel(
protected val cipherStream: ReadableByteChannel,
protected val original: ReadableByteChannel)
extends ReadableByteChannel with BaseErrorHandler {

override def read(src: ByteBuffer): Int = safeCall {
cipherStream.read(src)
}

override def isOpen(): Boolean = cipherStream.isOpen()

}

private class ErrorHandlingInputStream(
protected val cipherStream: InputStream,
protected val original: InputStream)
extends InputStream with BaseErrorHandler {

override def read(b: Array[Byte]): Int = safeCall {
cipherStream.read(b)
}

override def read(b: Array[Byte], off: Int, len: Int): Int = safeCall {
cipherStream.read(b, off, len)
}

override def read(): Int = safeCall {
cipherStream.read()
}
}

private class ErrorHandlingWritableChannel(
protected val cipherStream: WritableByteChannel,
protected val original: WritableByteChannel)
extends WritableByteChannel with BaseErrorHandler {

override def write(src: ByteBuffer): Int = safeCall {
cipherStream.write(src)
}

override def isOpen(): Boolean = cipherStream.isOpen()

}

private class ErrorHandlingOutputStream(
protected val cipherStream: OutputStream,
protected val original: OutputStream)
extends OutputStream with BaseErrorHandler {

override def flush(): Unit = safeCall {
cipherStream.flush()
}

override def write(b: Array[Byte]): Unit = safeCall {
cipherStream.write(b)
}

override def write(b: Array[Byte], off: Int, len: Int): Unit = safeCall {
cipherStream.write(b, off, len)
}

override def write(b: Int): Unit = safeCall {
cipherStream.write(b)
}
}

private class CryptoParams(key: Array[Byte], sparkConf: SparkConf) {

val keySpec = new SecretKeySpec(key, "AES")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
package org.apache.spark.security

import java.io._
import java.nio.ByteBuffer
import java.nio.channels.{Channels, ReadableByteChannel}
import java.nio.channels.Channels
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.{Arrays, Random, UUID}

import com.google.common.io.ByteStreams
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._

import org.apache.spark._
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -167,36 +164,6 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {
}
}

test("error handling wrapper") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to remove this test case inevitably? I'm wondering if we can keep some test coverage to make it sure that CRYPTO-141 is fixed.

Copy link
Member Author

@viirya viirya Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only tests the wrapper class ErrorHandlingReadableChannel. As it is removed, this test has nothing to test (it cannot be compiled).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original ticket SPARK-25535 and the PR doesn't add test for CRYPTO-141...

val wrapped = mock(classOf[ReadableByteChannel])
val decrypted = mock(classOf[ReadableByteChannel])
val errorHandler = new CryptoStreamUtils.ErrorHandlingReadableChannel(decrypted, wrapped)

when(decrypted.read(any(classOf[ByteBuffer])))
.thenThrow(new IOException())
.thenThrow(new InternalError())
.thenReturn(1)

val out = ByteBuffer.allocate(1)
intercept[IOException] {
errorHandler.read(out)
}
intercept[InternalError] {
errorHandler.read(out)
}

val e = intercept[IOException] {
errorHandler.read(out)
}
assert(e.getMessage().contains("is closed"))
errorHandler.close()

verify(decrypted, times(2)).read(any(classOf[ByteBuffer]))
verify(wrapped, never()).read(any(classOf[ByteBuffer]))
verify(decrypted, never()).close()
verify(wrapped, times(1)).close()
}

private def createConf(extra: (String, String)*): SparkConf = {
val conf = new SparkConf()
extra.foreach { case (k, v) => conf.set(k, v) }
Expand Down