Skip to content

Commit

Permalink
update PR
Browse files Browse the repository at this point in the history
  • Loading branch information
shahidki31 committed Dec 6, 2018
1 parent 6dfa27a commit 7d6ad51
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 11 deletions.
12 changes: 2 additions & 10 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Expand Up @@ -43,10 +43,6 @@ trait CompressionCodec {
def compressedOutputStream(s: OutputStream): OutputStream

def compressedInputStream(s: InputStream): InputStream

private[spark] def compressedInputStreamForPartialFrame(s: InputStream): InputStream = {
compressedInputStream(s)
}
}

private[spark] object CompressionCodec {
Expand Down Expand Up @@ -199,14 +195,10 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedInputStream(s: InputStream): InputStream = {
// Wrap the zstd input stream in a buffered input stream so that we can
// avoid overhead excessive of JNI call while trying to uncompress small amount of data.
new BufferedInputStream(new ZstdInputStream(s), bufferSize)
}

override def compressedInputStreamForPartialFrame(s: InputStream): InputStream = {
// SPARK-26283: Enable reading from open frames of zstd (for eg: zstd compressed eventLog
// Reading). By default `isContinuous` is false, and when we try to read from open frames,
// `compressedInputStream` method above throws truncated error exception. This method set
// `isContinuous` true to allow reading from open frames.
// throws truncated error exception. This method set `isContinuous` true to allow reading
// from open frames.
new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), bufferSize)
}
}
Expand Up @@ -402,7 +402,7 @@ private[spark] object EventLoggingListener extends Logging {
val codec = codecName(log).map { c =>
codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
}
codec.map(_.compressedInputStreamForPartialFrame(in)).getOrElse(in)
codec.map(_.compressedInputStream(in)).getOrElse(in)
} catch {
case e: Throwable =>
in.close()
Expand Down

0 comments on commit 7d6ad51

Please sign in to comment.