diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala index 8e9adce111d55..61f7a656809de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala @@ -20,10 +20,9 @@ package org.apache.spark.sql.execution.streaming import java.io.{InputStream, IOException, OutputStream} import java.nio.charset.StandardCharsets.UTF_8 -import scala.collection.JavaConverters._ +import scala.io.Source import scala.reflect.ClassTag -import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.spark.sql.SparkSession @@ -102,11 +101,10 @@ abstract class CompactibleFileStreamLog[T: ClassTag]( out.write('\n') out.write(serializeData(data).getBytes(UTF_8)) } - out.flush() } override def deserialize(in: InputStream): Array[T] = { - val lines = IOUtils.lineIterator(in, UTF_8).asScala + val lines = Source.fromInputStream(in, UTF_8.name()).getLines() if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index 38aee031821a1..f9e24167a17ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.streaming -import java.io.OutputStream - import org.apache.hadoop.fs.{FileStatus, Path} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala index d94c415ab2bf8..4681f2ba08c84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.streaming -import java.io.OutputStream import java.util.{LinkedHashMap => JLinkedHashMap} import java.util.Map.Entry diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 1eb5bb57d6d02..c7235320fd6bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -22,10 +22,10 @@ import java.util.{ConcurrentModificationException, EnumSet, UUID} import scala.reflect.ClassTag +import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.io.IOUtils import org.apache.spark.internal.Logging import org.apache.spark.serializer.JavaSerializer @@ -141,7 +141,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) try { writer(metadata, output) } finally { - IOUtils.closeStream(output) + IOUtils.closeQuietly(output) } try { // Try to commit the batch @@ -197,7 +197,7 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) try { Some(deserialize(input)) } finally { - IOUtils.closeStream(input) + IOUtils.closeQuietly(input) } } else { logDebug(s"Unable to find batch $batchMetadataFile")