Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import javax.security.sasl.SaslException;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -282,7 +281,7 @@ public void testFileRegionEncryption() throws Exception {
verify(callback, times(1)).onSuccess(anyInt(), any(ManagedBuffer.class));
verify(callback, never()).onFailure(anyInt(), any(Throwable.class));

byte[] received = ByteStreams.toByteArray(response.get().createInputStream());
byte[] received = response.get().createInputStream().readAllBytes();
assertArrayEquals(data, received);
} finally {
file.delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da

import scala.jdk.CollectionConverters._

import com.google.common.io.{ByteStreams, Closeables}
import com.google.common.io.Closeables
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
Expand Down Expand Up @@ -202,7 +202,7 @@ class PortableDataStream(
def toArray(): Array[Byte] = {
val stream = open()
try {
ByteStreams.toByteArray(stream)
stream.readAllBytes()
} finally {
Closeables.close(stream, true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.input

import com.google.common.io.{ByteStreams, Closeables}
import com.google.common.io.Closeables
import org.apache.hadoop.conf.{Configurable => HConfigurable, Configuration}
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.InputSplit
Expand Down Expand Up @@ -71,7 +71,7 @@ private[spark] class WholeTextFileRecordReader(
override def nextKeyValue(): Boolean = {
if (!processed) {
val fileIn = HadoopCodecStreams.createInputStream(getConf, path)
val innerBuffer = ByteStreams.toByteArray(fileIn)
val innerBuffer = fileIn.readAllBytes()

value = new Text(innerBuffer)
Closeables.close(fileIn, false)
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.File

import scala.reflect.ClassTag

import com.google.common.io.ByteStreams
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME
Expand Down Expand Up @@ -612,7 +611,7 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext {
val compressedInputStream = CompressionCodec.createCodec(conf)
.compressedInputStream(fs.open(checkpointFile))
try {
ByteStreams.toByteArray(compressedInputStream)
compressedInputStream.readAllBytes()
} finally {
compressedInputStream.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.nio.file.{Files, Paths}
import scala.collection.mutable.ArrayBuffer
import scala.io.{Codec, Source}

import com.google.common.io.ByteStreams
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path}
import org.scalatest.BeforeAndAfterEach
Expand Down Expand Up @@ -1880,8 +1879,7 @@ object SimpleApplicationTest {
object UserClasspathFirstTest {
def main(args: Array[String]): Unit = {
val ccl = Thread.currentThread().getContextClassLoader()
val resource = ccl.getResourceAsStream("test.resource")
val bytes = ByteStreams.toByteArray(resource)
val bytes = ccl.getResourceAsStream("test.resource").readAllBytes()
val contents = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8)
if (contents != "USER") {
throw new SparkException("Should have read user resource, but instead read: " + contents)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.net.URI
import java.nio.file.Files
import java.util.zip.{ZipInputStream, ZipOutputStream}

import com.google.common.io.ByteStreams
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfter
Expand Down Expand Up @@ -220,7 +219,7 @@ class SingleFileEventLogFileReaderSuite extends EventLogFileReadersSuite {

val entry = is.getNextEntry
assert(entry != null)
val actual = ByteStreams.toByteArray(is)
val actual = is.readAllBytes()
val expected = Files.readAllBytes(new File(logPath.toString).toPath)
assert(actual === expected)
assert(is.getNextEntry === null)
Expand Down Expand Up @@ -367,7 +366,7 @@ class RollingEventLogFilesReaderSuite extends EventLogFileReadersSuite {
val fileName = entry.getName.stripPrefix(logPath.getName + "/")
assert(allFileNames.contains(fileName))

val actual = ByteStreams.toByteArray(is)
val actual = is.readAllBytes()
val expected = Files.readAllBytes(new File(logPath.toString, fileName).toPath)
assert(actual === expected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {

val inputStream = new ByteArrayInputStream(encryptedBytes)
val wrappedInputStream = serializerManager.wrapStream(blockId, inputStream)
val decryptedBytes = ByteStreams.toByteArray(wrappedInputStream)
val decryptedBytes = wrappedInputStream.readAllBytes()
val decryptedStr = new String(decryptedBytes, UTF_8)
assert(decryptedStr === plainStr)
}
Expand Down Expand Up @@ -141,7 +141,7 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {

val inStream = createCryptoInputStream(new FileInputStream(file), conf, key)
try {
val inStreamData = ByteStreams.toByteArray(inStream)
val inStreamData = inStream.readAllBytes()
assert(Arrays.equals(inStreamData, testData))
} finally {
inStream.close()
Expand All @@ -157,7 +157,7 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {

val inChannel = createReadableChannel(new FileInputStream(file).getChannel(), conf, key)
try {
val inChannelData = ByteStreams.toByteArray(Channels.newInputStream(inChannel))
val inChannelData = Channels.newInputStream(inChannel).readAllBytes()
assert(Arrays.equals(inChannelData, testData))
} finally {
inChannel.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
import java.nio.file.Files
import java.util.{Arrays, Random}

import com.google.common.io.ByteStreams
import io.netty.channel.FileRegion

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
Expand Down Expand Up @@ -172,7 +171,7 @@ class DiskStoreSuite extends SparkFunSuite {
private def readViaInputStream(data: BlockData): Array[Byte] = {
val is = data.toInputStream()
try {
ByteStreams.toByteArray(is)
is.readAllBytes()
} finally {
is.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ml.source.image

import com.google.common.io.{ByteStreams, Closeables}
import com.google.common.io.Closeables
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.Job
Expand Down Expand Up @@ -76,7 +76,7 @@ private[image] case class ImageFileFormat() extends FileFormat with DataSourceRe
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val stream = fs.open(path)
val bytes = try {
ByteStreams.toByteArray(stream)
stream.readAllBytes()
} finally {
Closeables.close(stream, true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import scala.collection.mutable
import scala.concurrent.duration._
import scala.io.Source

import com.google.common.io.ByteStreams
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.concurrent.Eventually._
import org.scalatest.exceptions.TestFailedException
Expand Down Expand Up @@ -763,8 +762,7 @@ private object YarnClasspathTest extends Logging {
var result = "failure"
try {
val ccl = Thread.currentThread().getContextClassLoader()
val resource = ccl.getResourceAsStream("test.resource")
val bytes = ByteStreams.toByteArray(resource)
val bytes = ccl.getResourceAsStream("test.resource").readAllBytes()
result = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8)
} catch {
case t: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.sql.catalyst
import java.io._
import java.nio.charset.StandardCharsets.UTF_8

import com.google.common.io.ByteStreams

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.TempResolvedColumn
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -52,7 +50,7 @@ package object util extends Logging {
classLoader: ClassLoader = Utils.getSparkClassLoader): Array[Byte] = {
val inStream = classLoader.getResourceAsStream(resource)
try {
ByteStreams.toByteArray(inStream)
inStream.readAllBytes()
} finally {
inStream.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.binaryfile

import java.sql.Timestamp

import com.google.common.io.{ByteStreams, Closeables}
import com.google.common.io.Closeables
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job
Expand Down Expand Up @@ -118,7 +118,7 @@ case class BinaryFileFormat() extends FileFormat with DataSourceRegister {
}
val stream = fs.open(status.getPath)
try {
writer.write(i, ByteStreams.toByteArray(stream))
writer.write(i, stream.readAllBytes())
} finally {
Closeables.close(stream, true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.json
import java.io.InputStream

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import com.google.common.io.ByteStreams
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Text
Expand Down Expand Up @@ -222,7 +221,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
CodecStreams.createInputStreamWithCloseResource(conf, file.toPath)
}
} { inputStream =>
UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
UTF8String.fromBytes(inputStream.readAllBytes())
}
}
val streamParser = parser.options.encoding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.sql.Timestamp

import scala.jdk.CollectionConverters._

import com.google.common.io.{ByteStreams, Closeables}
import com.google.common.io.Closeables
import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
import org.mockito.Mockito.{mock, when}

Expand Down Expand Up @@ -134,7 +134,7 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession {
val fcontent = {
val stream = fs.open(fileStatus.getPath)
val content = try {
ByteStreams.toByteArray(stream)
stream.readAllBytes()
} finally {
Closeables.close(stream, true)
}
Expand Down