From 184085284185011d7cc6d054b54d2d38eaf1dd77 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sun, 13 Mar 2016 21:03:49 -0700 Subject: [PATCH] [SPARK-13823][CORE][STREAMING][SQL] Always specify Charset in String <-> byte[] conversions (and remaining Coverity items) ## What changes were proposed in this pull request? - Fixes calls to `new String(byte[])` or `String.getBytes()` that rely on platform default encoding, to use UTF-8 - Same for `InputStreamReader` and `OutputStreamWriter` constructors - Standardizes on UTF-8 everywhere - Standardizes specifying the encoding with `StandardCharsets.UTF-8`, not the Guava constant or "UTF-8" (which means handling `UnuspportedEncodingException`) - (also addresses the other remaining Coverity scan issues, which are pretty trivial; these are separated into commit https://github.com/srowen/spark/commit/1deecd8d9ca986d8adb1a42d315890ce5349d29c ) ## How was this patch tested? Jenkins tests Author: Sean Owen Closes #11657 from srowen/SPARK-13823. --- .../network/client/StreamInterceptor.java | 3 +- .../spark/network/protocol/Encoders.java | 8 ++--- .../spark/network/sasl/SparkSaslServer.java | 10 +++--- .../apache/spark/network/util/JavaUtils.java | 6 ++-- .../shuffle/ExternalShuffleBlockResolver.java | 12 +++---- .../ExternalShuffleBlockResolverSuite.java | 36 +++++++++++-------- .../shuffle/ExternalShuffleCleanupSuite.java | 21 ++++++----- .../apache/spark/unsafe/types/UTF8String.java | 10 ++---- .../apache/spark/api/python/PythonRDD.scala | 8 ++--- .../api/python/PythonWorkerFactory.scala | 3 +- .../WriteInputFormatTestDataGenerator.scala | 4 +-- .../scala/org/apache/spark/api/r/SerDe.scala | 3 +- .../spark/deploy/FaultToleranceTest.scala | 4 ++- .../spark/deploy/SparkSubmitArguments.scala | 3 +- .../deploy/rest/RestSubmissionClient.scala | 4 +-- .../spark/deploy/worker/DriverRunner.scala | 4 +-- .../spark/deploy/worker/ExecutorRunner.scala | 4 +-- .../scheduler/EventLoggingListener.scala | 4 +-- .../serializer/GenericAvroSerializer.scala | 3 +- .../scala/org/apache/spark/util/Utils.scala | 5 +-- .../java/org/apache/spark/JavaAPISuite.java | 4 +-- .../sort/ShuffleInMemorySorterSuite.java | 3 +- .../sort/UnsafeExternalSorterSuite.java | 2 -- .../sort/UnsafeInMemorySorterSuite.java | 3 +- .../org/apache/spark/SparkContextSuite.scala | 17 ++++----- .../spark/api/python/PythonRDDSuite.scala | 11 +++--- .../spark/deploy/SparkSubmitSuite.scala | 6 ++-- .../history/FsHistoryProviderSuite.scala | 9 ++--- .../deploy/history/HistoryServerSuite.scala | 6 ++-- .../rest/StandaloneRestSubmitSuite.scala | 4 +-- .../NettyBlockTransferSecuritySuite.scala | 7 ++-- .../apache/spark/util/FileAppenderSuite.scala | 12 +++---- .../org/apache/spark/util/UtilsSuite.scala | 16 ++++----- docs/streaming-custom-receivers.md | 6 ++-- .../streaming/JavaCustomReceiver.java | 4 ++- .../examples/streaming/CustomReceiver.scala | 4 ++- .../streaming/flume/sink/SparkSinkSuite.scala | 4 ++- .../streaming/flume/FlumeTestUtils.scala | 4 +-- .../flume/PollingFlumeTestUtils.scala | 5 +-- .../spark/streaming/kafka/KafkaUtils.scala | 4 +-- .../streaming/kinesis/KinesisTestUtils.scala | 3 +- .../kinesis/KPLBasedKinesisTestUtils.scala | 3 +- .../streaming/mqtt/MQTTInputDStream.scala | 4 ++- .../spark/streaming/mqtt/MQTTTestUtils.scala | 4 +-- .../spark/graphx/GraphLoaderSuite.scala | 3 +- .../launcher/AbstractCommandBuilder.java | 5 +-- .../spark/launcher/OutputRedirector.java | 3 +- .../SparkSubmitCommandBuilderSuite.java | 6 +--- .../mllib/api/python/PythonMLLibAPI.scala | 3 +- .../libsvm/JavaLibSVMRelationSuite.java | 4 +-- .../source/libsvm/LibSVMRelationSuite.scala | 6 ++-- .../spark/mllib/util/MLUtilsSuite.scala | 8 ++--- .../spark/sql/catalyst/parser/ParseUtils.java | 4 ++- .../sql/catalyst/expressions/literals.scala | 3 +- .../spark/sql/catalyst/util/package.scala | 3 +- .../expressions/LiteralExpressionSuite.scala | 4 ++- .../expressions/MathFunctionsSuite.scala | 6 ++-- .../expressions/MiscFunctionsSuite.scala | 19 ++++++---- .../expressions/UnsafeRowConverterSuite.scala | 14 ++++---- .../codegen/GeneratedProjectionSuite.scala | 5 ++- .../vectorized/ColumnVectorUtils.java | 3 +- .../execution/vectorized/ColumnarBatch.java | 4 +++ .../datasources/csv/CSVOptions.scala | 4 +-- .../execution/datasources/csv/CSVParser.scala | 3 +- .../datasources/csv/DefaultSource.scala | 4 +-- .../streaming/FileStreamSource.scala | 4 +-- .../spark/sql/sources/JavaSaveLoadSuite.java | 2 -- .../spark/sql/DataFrameFunctionsSuite.scala | 6 ++-- .../org/apache/spark/sql/DataFrameSuite.scala | 5 +-- .../spark/sql/MathExpressionsSuite.scala | 6 ++-- .../CompressionSchemeBenchmark.scala | 3 +- .../vectorized/ColumnarBatchBenchmark.scala | 4 ++- .../vectorized/ColumnarBatchSuite.scala | 21 ++++++----- .../sql/streaming/FileStreamSourceSuite.scala | 6 ++-- .../apache/spark/sql/test/SQLTestData.scala | 12 ++++--- .../sql/hive/thriftserver/CliSuite.scala | 3 +- .../HiveThriftServer2Suites.scala | 4 +-- .../apache/spark/sql/hive/HiveContext.scala | 3 +- .../hive/execution/ScriptTransformation.scala | 3 +- .../hive/JavaMetastoreDataSourcesSuite.java | 2 -- .../spark/sql/hive/orc/OrcQuerySuite.scala | 3 +- .../dstream/SocketInputDStream.scala | 4 ++- .../apache/spark/streaming/JavaAPISuite.java | 7 ++-- .../spark/streaming/JavaReceiverAPISuite.java | 4 ++- .../spark/streaming/CheckpointSuite.scala | 4 +-- .../spark/streaming/InputStreamsSuite.scala | 10 +++--- .../spark/streaming/MasterFailureTest.scala | 4 +-- .../org/apache/spark/deploy/yarn/Client.scala | 4 +-- .../deploy/yarn/BaseYarnClusterSuite.scala | 8 ++--- .../spark/deploy/yarn/YarnClusterSuite.scala | 12 +++---- .../yarn/YarnShuffleIntegrationSuite.scala | 4 +-- .../yarn/YarnSparkHadoopUtilSuite.scala | 3 +- 92 files changed, 321 insertions(+), 244 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java b/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java index 88ba3ccebdf20..b0e85bae7c309 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java @@ -34,8 +34,7 @@ class StreamInterceptor implements TransportFrameDecoder.Interceptor { private final String streamId; private final long byteCount; private final StreamCallback callback; - - private volatile long bytesRead; + private long bytesRead; StreamInterceptor( TransportResponseHandler handler, diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java index 9162d0b977f83..be217522367c5 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java @@ -17,8 +17,8 @@ package org.apache.spark.network.protocol; +import java.nio.charset.StandardCharsets; -import com.google.common.base.Charsets; import io.netty.buffer.ByteBuf; /** Provides a canonical set of Encoders for simple types. */ @@ -27,11 +27,11 @@ public class Encoders { /** Strings are encoded with their length followed by UTF-8 bytes. */ public static class Strings { public static int encodedLength(String s) { - return 4 + s.getBytes(Charsets.UTF_8).length; + return 4 + s.getBytes(StandardCharsets.UTF_8).length; } public static void encode(ByteBuf buf, String s) { - byte[] bytes = s.getBytes(Charsets.UTF_8); + byte[] bytes = s.getBytes(StandardCharsets.UTF_8); buf.writeInt(bytes.length); buf.writeBytes(bytes); } @@ -40,7 +40,7 @@ public static String decode(ByteBuf buf) { int length = buf.readInt(); byte[] bytes = new byte[length]; buf.readBytes(bytes); - return new String(bytes, Charsets.UTF_8); + return new String(bytes, StandardCharsets.UTF_8); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java index 431cb67a2ae0b..b802a5af63c94 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java @@ -28,9 +28,9 @@ import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Map; -import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; @@ -187,14 +187,14 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback /* Encode a byte[] identifier as a Base64-encoded string. */ public static String encodeIdentifier(String identifier) { Preconditions.checkNotNull(identifier, "User cannot be null if SASL is enabled"); - return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(Charsets.UTF_8))) - .toString(Charsets.UTF_8); + return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(StandardCharsets.UTF_8))) + .toString(StandardCharsets.UTF_8); } /** Encode a password as a base64-encoded char[] array. */ public static char[] encodePassword(String password) { Preconditions.checkNotNull(password, "Password cannot be null if SASL is enabled"); - return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(Charsets.UTF_8))) - .toString(Charsets.UTF_8).toCharArray(); + return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(StandardCharsets.UTF_8))) + .toString(StandardCharsets.UTF_8).toCharArray(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java index ccc527306d920..8d83ae0712f0e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -21,11 +21,11 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import io.netty.buffer.Unpooled; @@ -68,7 +68,7 @@ public static int nonNegativeHash(Object obj) { * converted back to the same string through {@link #bytesToString(ByteBuffer)}. */ public static ByteBuffer stringToBytes(String s) { - return Unpooled.wrappedBuffer(s.getBytes(Charsets.UTF_8)).nioBuffer(); + return Unpooled.wrappedBuffer(s.getBytes(StandardCharsets.UTF_8)).nioBuffer(); } /** @@ -76,7 +76,7 @@ public static ByteBuffer stringToBytes(String s) { * converted back to the same byte buffer through {@link #stringToBytes(String)}. */ public static String bytesToString(ByteBuffer b) { - return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8); + return Unpooled.wrappedBuffer(b).toString(StandardCharsets.UTF_8); } /* diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index fe933ed650caf..460110d78f15b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -18,6 +18,7 @@ package org.apache.spark.network.shuffle; import java.io.*; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -27,7 +28,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; import com.google.common.base.Objects; import com.google.common.collect.Maps; import org.fusesource.leveldbjni.JniDBFactory; @@ -152,7 +152,7 @@ public void registerExecutor( try { if (db != null) { byte[] key = dbAppExecKey(fullId); - byte[] value = mapper.writeValueAsString(executorInfo).getBytes(Charsets.UTF_8); + byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8); db.put(key, value); } } catch (Exception e) { @@ -350,7 +350,7 @@ private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException { // we stick a common prefix on all the keys so we can find them in the DB String appExecJson = mapper.writeValueAsString(appExecId); String key = (APP_KEY_PREFIX + ";" + appExecJson); - return key.getBytes(Charsets.UTF_8); + return key.getBytes(StandardCharsets.UTF_8); } private static AppExecId parseDbAppExecKey(String s) throws IOException { @@ -368,10 +368,10 @@ static ConcurrentMap reloadRegisteredExecutors(D ConcurrentMap registeredExecutors = Maps.newConcurrentMap(); if (db != null) { DBIterator itr = db.iterator(); - itr.seek(APP_KEY_PREFIX.getBytes(Charsets.UTF_8)); + itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); while (itr.hasNext()) { Map.Entry e = itr.next(); - String key = new String(e.getKey(), Charsets.UTF_8); + String key = new String(e.getKey(), StandardCharsets.UTF_8); if (!key.startsWith(APP_KEY_PREFIX)) { break; } @@ -418,7 +418,7 @@ private static void storeVersion(DB db) throws IOException { public static class StoreVersion { - static final byte[] KEY = "StoreVersion".getBytes(Charsets.UTF_8); + static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8); public final int major; public final int minor; diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index 60a1b8b0451fe..d9b5f0261aaba 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.CharStreams; @@ -34,15 +35,16 @@ import static org.junit.Assert.*; public class ExternalShuffleBlockResolverSuite { - static String sortBlock0 = "Hello!"; - static String sortBlock1 = "World!"; + private static final String sortBlock0 = "Hello!"; + private static final String sortBlock1 = "World!"; - static String hashBlock0 = "Elementary"; - static String hashBlock1 = "Tabular"; + private static final String hashBlock0 = "Elementary"; + private static final String hashBlock1 = "Tabular"; - static TestShuffleDataContext dataContext; + private static TestShuffleDataContext dataContext; - static TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + private static final TransportConf conf = + new TransportConf("shuffle", new SystemPropertyConfigProvider()); @BeforeClass public static void beforeAll() throws IOException { @@ -50,10 +52,12 @@ public static void beforeAll() throws IOException { dataContext.create(); // Write some sort and hash data. - dataContext.insertSortShuffleData(0, 0, - new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } ); - dataContext.insertHashShuffleData(1, 0, - new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } ); + dataContext.insertSortShuffleData(0, 0, new byte[][] { + sortBlock0.getBytes(StandardCharsets.UTF_8), + sortBlock1.getBytes(StandardCharsets.UTF_8)}); + dataContext.insertHashShuffleData(1, 0, new byte[][] { + hashBlock0.getBytes(StandardCharsets.UTF_8), + hashBlock1.getBytes(StandardCharsets.UTF_8)}); } @AfterClass @@ -100,13 +104,15 @@ public void testSortShuffleBlocks() throws IOException { InputStream block0Stream = resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream(); - String block0 = CharStreams.toString(new InputStreamReader(block0Stream)); + String block0 = CharStreams.toString( + new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); block0Stream.close(); assertEquals(sortBlock0, block0); InputStream block1Stream = resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream(); - String block1 = CharStreams.toString(new InputStreamReader(block1Stream)); + String block1 = CharStreams.toString( + new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); block1Stream.close(); assertEquals(sortBlock1, block1); } @@ -119,13 +125,15 @@ public void testHashShuffleBlocks() throws IOException { InputStream block0Stream = resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream(); - String block0 = CharStreams.toString(new InputStreamReader(block0Stream)); + String block0 = CharStreams.toString( + new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); block0Stream.close(); assertEquals(hashBlock0, block0); InputStream block1Stream = resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream(); - String block1 = CharStreams.toString(new InputStreamReader(block1Stream)); + String block1 = CharStreams.toString( + new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); block1Stream.close(); assertEquals(hashBlock1, block1); } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index 532d7ab8d01bd..43d0201405872 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Random; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,8 +35,8 @@ public class ExternalShuffleCleanupSuite { // Same-thread Executor used to ensure cleanup happens synchronously in test thread. - Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor(); - TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor(); + private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); @Test public void noCleanupAndCleanup() throws IOException { @@ -123,27 +124,29 @@ public void cleanupOnlyRemovedApp() throws IOException { assertCleanedUp(dataContext1); } - private void assertStillThere(TestShuffleDataContext dataContext) { + private static void assertStillThere(TestShuffleDataContext dataContext) { for (String localDir : dataContext.localDirs) { assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists()); } } - private void assertCleanedUp(TestShuffleDataContext dataContext) { + private static void assertCleanedUp(TestShuffleDataContext dataContext) { for (String localDir : dataContext.localDirs) { assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists()); } } - private TestShuffleDataContext createSomeData() throws IOException { + private static TestShuffleDataContext createSomeData() throws IOException { Random rand = new Random(123); TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5); dataContext.create(); - dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), - new byte[][] { "ABC".getBytes(), "DEF".getBytes() } ); - dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000, - new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } ); + dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] { + "ABC".getBytes(StandardCharsets.UTF_8), + "DEF".getBytes(StandardCharsets.UTF_8)}); + dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000, new byte[][] { + "GHI".getBytes(StandardCharsets.UTF_8), + "JKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8)}); return dataContext; } } diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 427a8315e02b7..e16166ade4e5d 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -21,6 +21,7 @@ import java.io.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Map; @@ -825,14 +826,7 @@ public UTF8String translate(Map dict) { @Override public String toString() { - try { - return new String(getBytes(), "utf-8"); - } catch (UnsupportedEncodingException e) { - // Turn the exception into unchecked so we can find out about it at runtime, but - // don't need to add lots of boilerplate code everywhere. - throwException(e); - return "unknown"; // we will never reach here. - } + return new String(getBytes(), StandardCharsets.UTF_8); } @Override diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 05d1c31a08f22..8f306770a184f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -19,6 +19,7 @@ package org.apache.spark.api.python import java.io._ import java.net._ +import java.nio.charset.StandardCharsets import java.util.{ArrayList => JArrayList, Collections, List => JList, Map => JMap} import scala.collection.JavaConverters._ @@ -26,7 +27,6 @@ import scala.collection.mutable import scala.language.existentials import scala.util.control.NonFatal -import com.google.common.base.Charsets.UTF_8 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat} @@ -165,7 +165,7 @@ private[spark] class PythonRunner( val exLength = stream.readInt() val obj = new Array[Byte](exLength) stream.readFully(obj) - throw new PythonException(new String(obj, UTF_8), + throw new PythonException(new String(obj, StandardCharsets.UTF_8), writerThread.exception.getOrElse(null)) case SpecialLengths.END_OF_DATA_SECTION => // We've finished the data section of the output, but we can still @@ -624,7 +624,7 @@ private[spark] object PythonRDD extends Logging { } def writeUTF(str: String, dataOut: DataOutputStream) { - val bytes = str.getBytes(UTF_8) + val bytes = str.getBytes(StandardCharsets.UTF_8) dataOut.writeInt(bytes.length) dataOut.write(bytes) } @@ -817,7 +817,7 @@ private[spark] object PythonRDD extends Logging { private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] { - override def call(arr: Array[Byte]) : String = new String(arr, UTF_8) + override def call(arr: Array[Byte]) : String = new String(arr, StandardCharsets.UTF_8) } /** diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index a2a2f89f1e875..433764be89fb7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -19,6 +19,7 @@ package org.apache.spark.api.python import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter} import java.net.{InetAddress, ServerSocket, Socket, SocketException} +import java.nio.charset.StandardCharsets import java.util.Arrays import scala.collection.mutable @@ -121,7 +122,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream) // Tell the worker our port - val out = new OutputStreamWriter(worker.getOutputStream) + val out = new OutputStreamWriter(worker.getOutputStream, StandardCharsets.UTF_8) out.write(serverSocket.getLocalPort + "\n") out.flush() diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala index 9549784aeabf5..34cb7c61d7034 100644 --- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala +++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala @@ -19,10 +19,10 @@ package org.apache.spark.api.python import java.{util => ju} import java.io.{DataInput, DataOutput} +import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ -import com.google.common.base.Charsets.UTF_8 import org.apache.hadoop.io._ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat @@ -134,7 +134,7 @@ object WriteInputFormatTestDataGenerator { sc.parallelize(intKeys).saveAsSequenceFile(intPath) sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath) sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath) - sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(UTF_8)) } + sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(StandardCharsets.UTF_8)) } ).saveAsSequenceFile(bytesPath) val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false)) sc.parallelize(bools).saveAsSequenceFile(boolPath) diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index af815f885e8ae..c7fb192f26bd0 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -18,6 +18,7 @@ package org.apache.spark.api.r import java.io.{DataInputStream, DataOutputStream} +import java.nio.charset.StandardCharsets import java.sql.{Date, Time, Timestamp} import scala.collection.JavaConverters._ @@ -109,7 +110,7 @@ private[spark] object SerDe { val bytes = new Array[Byte](len) in.readFully(bytes) assert(bytes(len - 1) == 0) - val str = new String(bytes.dropRight(1), "UTF-8") + val str = new String(bytes.dropRight(1), StandardCharsets.UTF_8) str } diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 434aadd2c61ac..305994a3f3543 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy import java.io._ import java.net.URL +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeoutException import scala.collection.mutable.ListBuffer @@ -348,7 +349,8 @@ private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile def readState() { try { - val masterStream = new InputStreamReader(new URL("http://%s:8080/json".format(ip)).openStream) + val masterStream = new InputStreamReader( + new URL("http://%s:8080/json".format(ip)).openStream, StandardCharsets.UTF_8) val json = JsonMethods.parse(masterStream) val workers = json \ "workers" diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 175756b80b6bb..a62096d771724 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy import java.io.{ByteArrayOutputStream, PrintStream} import java.lang.reflect.InvocationTargetException import java.net.URI +import java.nio.charset.StandardCharsets import java.util.{List => JList} import java.util.jar.JarFile @@ -608,7 +609,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S stream.flush() // Get the output and discard any unnecessary lines from it. - Source.fromString(new String(out.toByteArray())).getLines + Source.fromString(new String(out.toByteArray(), StandardCharsets.UTF_8)).getLines .filter { line => !line.startsWith("log4j") && !line.startsWith("usage") } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 006e2e1472580..d3e092a34c172 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.rest import java.io.{DataOutputStream, FileNotFoundException} import java.net.{ConnectException, HttpURLConnection, SocketException, URL} +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeoutException import javax.servlet.http.HttpServletResponse @@ -28,7 +29,6 @@ import scala.concurrent.duration._ import scala.io.Source import com.fasterxml.jackson.core.JsonProcessingException -import com.google.common.base.Charsets import org.apache.spark.{Logging, SPARK_VERSION => sparkVersion, SparkConf} import org.apache.spark.util.Utils @@ -211,7 +211,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { try { val out = new DataOutputStream(conn.getOutputStream) Utils.tryWithSafeFinally { - out.write(json.getBytes(Charsets.UTF_8)) + out.write(json.getBytes(StandardCharsets.UTF_8)) } { out.close() } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 6049db6d989ae..7f4fe26c0d15e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -18,10 +18,10 @@ package org.apache.spark.deploy.worker import java.io._ +import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.hadoop.fs.Path @@ -174,7 +174,7 @@ private[deploy] class DriverRunner( val stderr = new File(baseDir, "stderr") val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"") val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40) - Files.append(header, stderr, UTF_8) + Files.append(header, stderr, StandardCharsets.UTF_8) CommandUtils.redirectStream(process.getErrorStream, stderr) } runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index c6687a4c63a6a..208a1bb68edb9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -18,10 +18,10 @@ package org.apache.spark.deploy.worker import java.io._ +import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.spark.{Logging, SecurityManager, SparkConf} @@ -168,7 +168,7 @@ private[deploy] class ExecutorRunner( stdoutAppender = FileAppender(process.getInputStream, stdout, conf) val stderr = new File(executorDir, "stderr") - Files.write(header, stderr, UTF_8) + Files.write(header, stderr, StandardCharsets.UTF_8) stderrAppender = FileAppender(process.getErrorStream, stderr, conf) // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 8354e2a6112a2..2d76d08af6cdd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -19,11 +19,11 @@ package org.apache.spark.scheduler import java.io._ import java.net.URI +import java.nio.charset.StandardCharsets import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import com.google.common.base.Charsets import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.hadoop.fs.permission.FsPermission @@ -254,7 +254,7 @@ private[spark] object EventLoggingListener extends Logging { def initEventLog(logStream: OutputStream): Unit = { val metadata = SparkListenerLogStart(SPARK_VERSION) val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n" - logStream.write(metadataJson.getBytes(Charsets.UTF_8)) + logStream.write(metadataJson.getBytes(StandardCharsets.UTF_8)) } /** diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 3d5b7105f0ca8..1a8e545b4f59e 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -19,6 +19,7 @@ package org.apache.spark.serializer import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import scala.collection.mutable @@ -86,7 +87,7 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) schemaBytes.arrayOffset() + schemaBytes.position(), schemaBytes.remaining()) val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis)) - new Schema.Parser().parse(new String(bytes, "UTF-8")) + new Schema.Parser().parse(new String(bytes, StandardCharsets.UTF_8)) }) /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b4c49513711c2..b5a98ce569a8a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer import java.nio.channels.Channels +import java.nio.charset.StandardCharsets import java.nio.file.Files import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ @@ -1904,7 +1905,7 @@ private[spark] object Utils extends Logging { require(file.exists(), s"Properties file $file does not exist") require(file.isFile(), s"Properties file $file is not a normal file") - val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8") + val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8) try { val properties = new Properties() properties.load(inReader) @@ -2344,7 +2345,7 @@ private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.Ou def read(): Int = if (iterator.hasNext) iterator.next() else -1 } - val reader = new BufferedReader(new InputStreamReader(input)) + val reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)) val stringBuilder = new StringBuilder var line = reader.readLine() while (line != null) { diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index e6a4ab7550c2a..a7e74c00793c3 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -21,6 +21,7 @@ import java.nio.channels.FileChannel; import java.nio.ByteBuffer; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -45,7 +46,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.base.Throwables; -import com.google.common.base.Charsets; import com.google.common.io.Files; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -1058,7 +1058,7 @@ public void textFiles() throws IOException { rdd.saveAsTextFile(outputDir); // Read the plain text file and check it's OK File outputFile = new File(outputDir, "part-00000"); - String content = Files.toString(outputFile, Charsets.UTF_8); + String content = Files.toString(outputFile, StandardCharsets.UTF_8); Assert.assertEquals("1\n2\n3\n4\n", content); // Also try reading it in as a text file RDD List expected = Arrays.asList("1", "2", "3", "4"); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java index b4fa33f32a3fd..a3502708aadec 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.shuffle.sort; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Random; @@ -41,7 +42,7 @@ public class ShuffleInMemorySorterSuite { private static String getStringFromDataPage(Object baseObject, long baseOffset, int strLength) { final byte[] strBytes = new byte[strLength]; Platform.copyMemory(baseObject, baseOffset, strBytes, Platform.BYTE_ARRAY_OFFSET, strLength); - return new String(strBytes); + return new String(strBytes, StandardCharsets.UTF_8); } @Test diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index b757ddc3b37f9..a79ed58133f1b 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -80,7 +80,6 @@ public int compare( } }; - SparkConf sparkConf; File tempDir; @Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager; @Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager; @@ -99,7 +98,6 @@ public OutputStream apply(OutputStream stream) { @Before public void setUp() { MockitoAnnotations.initMocks(this); - sparkConf = new SparkConf(); tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test"); spillFilesCreated.clear(); taskContext = mock(TaskContext.class); diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index ff41768df1d8f..90849ab0bd8f3 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.util.collection.unsafe.sort; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import org.junit.Assert; @@ -41,7 +42,7 @@ public class UnsafeInMemorySorterSuite { private static String getStringFromDataPage(Object baseObject, long baseOffset, int length) { final byte[] strBytes = new byte[length]; Platform.copyMemory(baseObject, baseOffset, strBytes, Platform.BYTE_ARRAY_OFFSET, length); - return new String(strBytes); + return new String(strBytes, StandardCharsets.UTF_8); } @Test diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 556afd08bbfe5..841fd02ae8bb6 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -18,12 +18,12 @@ package org.apache.spark import java.io.File +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import scala.concurrent.Await import scala.concurrent.duration.Duration -import com.google.common.base.Charsets._ import com.google.common.io.Files import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat @@ -115,8 +115,8 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { val absolutePath2 = file2.getAbsolutePath try { - Files.write("somewords1", file1, UTF_8) - Files.write("somewords2", file2, UTF_8) + Files.write("somewords1", file1, StandardCharsets.UTF_8) + Files.write("somewords2", file2, StandardCharsets.UTF_8) val length1 = file1.length() val length2 = file2.length() @@ -243,11 +243,12 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { try { // Create 5 text files. - Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, UTF_8) - Files.write("someline1 in file2\nsomeline2 in file2", file2, UTF_8) - Files.write("someline1 in file3", file3, UTF_8) - Files.write("someline1 in file4\nsomeline2 in file4", file4, UTF_8) - Files.write("someline1 in file2\nsomeline2 in file5", file5, UTF_8) + Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, + StandardCharsets.UTF_8) + Files.write("someline1 in file2\nsomeline2 in file2", file2, StandardCharsets.UTF_8) + Files.write("someline1 in file3", file3, StandardCharsets.UTF_8) + Files.write("someline1 in file4\nsomeline2 in file4", file4, StandardCharsets.UTF_8) + Files.write("someline1 in file2\nsomeline2 in file5", file5, StandardCharsets.UTF_8) sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala index 41f2a5c972b6b..05b4e67412f2e 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.api.python import java.io.{ByteArrayOutputStream, DataOutputStream} +import java.nio.charset.StandardCharsets import org.apache.spark.SparkFunSuite @@ -35,10 +36,12 @@ class PythonRDDSuite extends SparkFunSuite { // The correctness will be tested in Python PythonRDD.writeIteratorToStream(Iterator("a", null), buffer) PythonRDD.writeIteratorToStream(Iterator(null, "a"), buffer) - PythonRDD.writeIteratorToStream(Iterator("a".getBytes, null), buffer) - PythonRDD.writeIteratorToStream(Iterator(null, "a".getBytes), buffer) + PythonRDD.writeIteratorToStream(Iterator("a".getBytes(StandardCharsets.UTF_8), null), buffer) + PythonRDD.writeIteratorToStream(Iterator(null, "a".getBytes(StandardCharsets.UTF_8)), buffer) PythonRDD.writeIteratorToStream(Iterator((null, null), ("a", null), (null, "b")), buffer) - PythonRDD.writeIteratorToStream( - Iterator((null, null), ("a".getBytes, null), (null, "b".getBytes)), buffer) + PythonRDD.writeIteratorToStream(Iterator( + (null, null), + ("a".getBytes(StandardCharsets.UTF_8), null), + (null, "b".getBytes(StandardCharsets.UTF_8))), buffer) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 41ac60ece0eda..91fef772d13c3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.deploy import java.io._ +import java.nio.charset.StandardCharsets import scala.collection.mutable.ArrayBuffer -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.ByteStreams import org.scalatest.{BeforeAndAfterEach, Matchers} import org.scalatest.concurrent.Timeouts @@ -593,7 +593,7 @@ class SparkSubmitSuite val tmpDir = Utils.createTempDir() val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf") - val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf)) + val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf), StandardCharsets.UTF_8) for ((key, value) <- defaults) writer.write(s"$key $value\n") writer.close() @@ -661,7 +661,7 @@ object UserClasspathFirstTest { val ccl = Thread.currentThread().getContextClassLoader() val resource = ccl.getResourceAsStream("test.resource") val bytes = ByteStreams.toByteArray(resource) - val contents = new String(bytes, 0, bytes.length, UTF_8) + val contents = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8) if (contents != "USER") { throw new SparkException("Should have read user resource, but instead read: " + contents) } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 8e8007f4ebf4b..5fd599e190c7c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -20,13 +20,13 @@ package org.apache.spark.deploy.history import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream, OutputStreamWriter} import java.net.URI +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.concurrent.duration._ import scala.language.postfixOps -import com.google.common.base.Charsets import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.hdfs.DistributedFileSystem import org.json4s.jackson.JsonMethods._ @@ -320,8 +320,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc var entry = inputStream.getNextEntry entry should not be null while (entry != null) { - val actual = new String(ByteStreams.toByteArray(inputStream), Charsets.UTF_8) - val expected = Files.toString(logs.find(_.getName == entry.getName).get, Charsets.UTF_8) + val actual = new String(ByteStreams.toByteArray(inputStream), StandardCharsets.UTF_8) + val expected = + Files.toString(logs.find(_.getName == entry.getName).get, StandardCharsets.UTF_8) actual should be (expected) totalEntries += 1 entry = inputStream.getNextEntry @@ -415,7 +416,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc if (isNewFormat) { EventLoggingListener.initEventLog(new FileOutputStream(file)) } - val writer = new OutputStreamWriter(bstream, "UTF-8") + val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8) Utils.tryWithSafeFinally { events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n")) } { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index e5cd2eddba1e8..5822261d8da75 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.history import java.io.{File, FileInputStream, FileWriter, InputStream, IOException} import java.net.{HttpURLConnection, URL} +import java.nio.charset.StandardCharsets import java.util.zip.ZipInputStream import javax.servlet.http.{HttpServletRequest, HttpServletResponse} @@ -25,7 +26,6 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.codahale.metrics.Counter -import com.google.common.base.Charsets import com.google.common.io.{ByteStreams, Files} import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -216,8 +216,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers val expectedFile = { new File(logDir, entry.getName) } - val expected = Files.toString(expectedFile, Charsets.UTF_8) - val actual = new String(ByteStreams.toByteArray(zipStream), Charsets.UTF_8) + val expected = Files.toString(expectedFile, StandardCharsets.UTF_8) + val actual = new String(ByteStreams.toByteArray(zipStream), StandardCharsets.UTF_8) actual should be (expected) filesCompared += 1 } diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index ee889bf144546..a7bb9aa4686eb 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -19,11 +19,11 @@ package org.apache.spark.deploy.rest import java.io.DataOutputStream import java.net.{HttpURLConnection, URL} +import java.nio.charset.StandardCharsets import javax.servlet.http.HttpServletResponse import scala.collection.mutable -import com.google.common.base.Charsets import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ import org.scalatest.BeforeAndAfterEach @@ -498,7 +498,7 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { if (body.nonEmpty) { conn.setDoOutput(true) val out = new DataOutputStream(conn.getOutputStream) - out.write(body.getBytes(Charsets.UTF_8)) + out.write(body.getBytes(StandardCharsets.UTF_8)) out.close() } conn diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index 47dbcb8fc0eaa..02806a16b9467 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.network.netty import java.io.InputStreamReader import java.nio._ -import java.nio.charset.Charset +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import scala.concurrent.{Await, Promise} @@ -103,7 +103,8 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi val blockManager = mock[BlockDataManager] val blockId = ShuffleBlockId(0, 1, 2) val blockString = "Hello, world!" - val blockBuffer = new NioManagedBuffer(ByteBuffer.wrap(blockString.getBytes)) + val blockBuffer = new NioManagedBuffer(ByteBuffer.wrap( + blockString.getBytes(StandardCharsets.UTF_8))) when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer) val securityManager0 = new SecurityManager(conf0) @@ -117,7 +118,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi val result = fetchBlock(exec0, exec1, "1", blockId) match { case Success(buf) => val actualString = CharStreams.toString( - new InputStreamReader(buf.createInputStream(), Charset.forName("UTF-8"))) + new InputStreamReader(buf.createInputStream(), StandardCharsets.UTF_8)) actualString should equal(blockString) buf.release() Success() diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index b367cc8358342..d30eafd2d4218 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -18,12 +18,12 @@ package org.apache.spark.util import java.io._ +import java.nio.charset.StandardCharsets import java.util.concurrent.CountDownLatch import scala.collection.mutable.HashSet import scala.reflect._ -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.log4j.{Appender, Level, Logger} import org.apache.log4j.spi.LoggingEvent @@ -48,11 +48,11 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { test("basic file appender") { val testString = (1 to 1000).mkString(", ") - val inputStream = new ByteArrayInputStream(testString.getBytes(UTF_8)) + val inputStream = new ByteArrayInputStream(testString.getBytes(StandardCharsets.UTF_8)) val appender = new FileAppender(inputStream, testFile) inputStream.close() appender.awaitTermination() - assert(Files.toString(testFile, UTF_8) === testString) + assert(Files.toString(testFile, StandardCharsets.UTF_8) === testString) } test("rolling file appender - time-based rolling") { @@ -100,7 +100,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { val allGeneratedFiles = new HashSet[String]() val items = (1 to 10).map { _.toString * 10000 } for (i <- 0 until items.size) { - testOutputStream.write(items(i).getBytes(UTF_8)) + testOutputStream.write(items(i).getBytes(StandardCharsets.UTF_8)) testOutputStream.flush() allGeneratedFiles ++= RollingFileAppender.getSortedRolledOverFiles( testFile.getParentFile.toString, testFile.getName).map(_.toString) @@ -267,7 +267,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { // send data to appender through the input stream, and wait for the data to be written val expectedText = textToAppend.mkString("") for (i <- 0 until textToAppend.size) { - outputStream.write(textToAppend(i).getBytes(UTF_8)) + outputStream.write(textToAppend(i).getBytes(StandardCharsets.UTF_8)) outputStream.flush() Thread.sleep(sleepTimeBetweenTexts) } @@ -282,7 +282,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { logInfo("Filtered files: \n" + generatedFiles.mkString("\n")) assert(generatedFiles.size > 1) val allText = generatedFiles.map { file => - Files.toString(file, UTF_8) + Files.toString(file, StandardCharsets.UTF_8) }.mkString("") assert(allText === expectedText) generatedFiles diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 412c0ac9d9be3..093d1bd6e5948 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStr import java.lang.{Double => JDouble, Float => JFloat} import java.net.{BindException, ServerSocket, URI} import java.nio.{ByteBuffer, ByteOrder} +import java.nio.charset.StandardCharsets import java.text.DecimalFormatSymbols import java.util.Locale import java.util.concurrent.TimeUnit @@ -28,7 +29,6 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable.ListBuffer import scala.util.Random -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration @@ -268,7 +268,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { val tmpDir2 = Utils.createTempDir() val f1Path = tmpDir2 + "/f1" val f1 = new FileOutputStream(f1Path) - f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(UTF_8)) + f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8)) f1.close() // Read first few bytes @@ -295,9 +295,9 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { test("reading offset bytes across multiple files") { val tmpDir = Utils.createTempDir() val files = (1 to 3).map(i => new File(tmpDir, i.toString)) - Files.write("0123456789", files(0), UTF_8) - Files.write("abcdefghij", files(1), UTF_8) - Files.write("ABCDEFGHIJ", files(2), UTF_8) + Files.write("0123456789", files(0), StandardCharsets.UTF_8) + Files.write("abcdefghij", files(1), StandardCharsets.UTF_8) + Files.write("ABCDEFGHIJ", files(2), StandardCharsets.UTF_8) // Read first few bytes in the 1st file assert(Utils.offsetBytes(files, 0, 5) === "01234") @@ -529,7 +529,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { try { System.setProperty("spark.test.fileNameLoadB", "2") Files.write("spark.test.fileNameLoadA true\n" + - "spark.test.fileNameLoadB 1\n", outFile, UTF_8) + "spark.test.fileNameLoadB 1\n", outFile, StandardCharsets.UTF_8) val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath) properties .filter { case (k, v) => k.startsWith("spark.")} @@ -559,7 +559,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath) val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir) val targetDir = new File(tempDir, "target-dir") - Files.write("some text", sourceFile, UTF_8) + Files.write("some text", sourceFile, StandardCharsets.UTF_8) val path = if (Utils.isWindows) { @@ -801,7 +801,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { |trap "" SIGTERM |sleep 10 """.stripMargin - Files.write(cmd.getBytes(), file) + Files.write(cmd.getBytes(StandardCharsets.UTF_8), file) file.getAbsoluteFile.setExecutable(true) val process = new ProcessBuilder(file.getAbsolutePath).start() diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 84547748618d1..732c83dc841d9 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -72,7 +72,8 @@ class CustomReceiver(host: String, port: Int) socket = new Socket(host, port) // Until stopped or connection broken continue reading - val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) + val reader = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) @@ -135,7 +136,8 @@ public class JavaCustomReceiver extends Receiver { // connect to the server socket = new Socket(host, port); - BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + BufferedReader reader = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); // Until stopped or connection broken continue reading while (!isStopped() && (userInput = reader.readLine()) != null) { diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java index 5de56340c6d22..4544ad2b42ca7 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java @@ -36,6 +36,7 @@ import java.io.InputStreamReader; import java.net.ConnectException; import java.net.Socket; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Iterator; import java.util.regex.Pattern; @@ -130,7 +131,8 @@ private void receive() { try { // connect to the server socket = new Socket(host, port); - reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + reader = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); // Until stopped or connection broken continue reading while (!isStopped() && (userInput = reader.readLine()) != null) { System.out.println("Received data '" + userInput + "'"); diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala index 5ce5778e42a3a..d67da270a8178 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala @@ -20,6 +20,7 @@ package org.apache.spark.examples.streaming import java.io.{BufferedReader, InputStreamReader} import java.net.Socket +import java.nio.charset.StandardCharsets import org.apache.spark.{Logging, SparkConf} import org.apache.spark.storage.StorageLevel @@ -83,7 +84,8 @@ class CustomReceiver(host: String, port: Int) logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) - val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) + val reader = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala index 7f6cecf9cd18d..e8ca1e716394d 100644 --- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala +++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.flume.sink import java.net.InetSocketAddress +import java.nio.charset.StandardCharsets import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicInteger @@ -184,7 +185,8 @@ class SparkSinkSuite extends FunSuite { private def putEvents(ch: MemoryChannel, count: Int): Unit = { val tx = ch.getTransaction tx.begin() - (1 to count).foreach(x => ch.put(EventBuilder.withBody(x.toString.getBytes))) + (1 to count).foreach(x => + ch.put(EventBuilder.withBody(x.toString.getBytes(StandardCharsets.UTF_8)))) tx.commit() tx.close() } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala index 3f87ce46e5952..945cfa7295d1d 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala @@ -19,12 +19,12 @@ package org.apache.spark.streaming.flume import java.net.{InetSocketAddress, ServerSocket} import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.util.{List => JList} import java.util.Collections import scala.collection.JavaConverters._ -import com.google.common.base.Charsets.UTF_8 import org.apache.avro.ipc.NettyTransceiver import org.apache.avro.ipc.specific.SpecificRequestor import org.apache.commons.lang3.RandomUtils @@ -65,7 +65,7 @@ private[flume] class FlumeTestUtils { val inputEvents = input.asScala.map { item => val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8))) + event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8))) event.setHeaders(Collections.singletonMap("test", "header")) event } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala index 9515d07c5ee5b..1a96df6e94b95 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala @@ -17,12 +17,12 @@ package org.apache.spark.streaming.flume +import java.nio.charset.StandardCharsets import java.util.{Collections, List => JList, Map => JMap} import java.util.concurrent._ import scala.collection.mutable.ArrayBuffer -import com.google.common.base.Charsets.UTF_8 import org.apache.flume.event.EventBuilder import org.apache.flume.Context import org.apache.flume.channel.MemoryChannel @@ -193,7 +193,8 @@ private[flume] class PollingFlumeTestUtils { val tx = channel.getTransaction tx.begin() for (j <- 0 until eventsPerBatch) { - channel.put(EventBuilder.withBody(s"${channel.getName}-$t".getBytes(UTF_8), + channel.put(EventBuilder.withBody( + s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8), Collections.singletonMap(s"test-$t", "header"))) t += 1 } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 0cb875c9758f9..72d9053355706 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -19,12 +19,12 @@ package org.apache.spark.streaming.kafka import java.io.OutputStream import java.lang.{Integer => JInt, Long => JLong} +import java.nio.charset.StandardCharsets import java.util.{List => JList, Map => JMap, Set => JSet} import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import com.google.common.base.Charsets.UTF_8 import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder} @@ -787,7 +787,7 @@ private object KafkaUtilsPythonHelper { def pickle(obj: Object, out: OutputStream, pickler: Pickler) { if (obj == this) { out.write(Opcodes.GLOBAL) - out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(UTF_8)) + out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(StandardCharsets.UTF_8)) } else { pickler.save(this) val msgAndMetaData = obj.asInstanceOf[PythonMessageAndMetadata] diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 0ace453ee9280..026387ed65d50 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.kinesis import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -242,7 +243,7 @@ private[kinesis] class SimpleDataGenerator( val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]() data.foreach { num => val str = num.toString - val data = ByteBuffer.wrap(str.getBytes()) + val data = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8)) val putRecordRequest = new PutRecordRequest().withStreamName(streamName) .withData(data) .withPartitionKey(str) diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala index fdb270eaad8c9..0b455e574e6fa 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.kinesis import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -51,7 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]() data.foreach { num => val str = num.toString - val data = ByteBuffer.wrap(str.getBytes()) + val data = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8)) val future = producer.addUserRecord(streamName, str, data) val kinesisCallBack = new FutureCallback[UserRecordResult]() { override def onFailure(t: Throwable): Unit = {} // do nothing diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 079bd8a9a87ea..cbad6f7fe44dc 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.mqtt +import java.nio.charset.StandardCharsets + import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken import org.eclipse.paho.client.mqttv3.MqttCallback import org.eclipse.paho.client.mqttv3.MqttClient @@ -75,7 +77,7 @@ class MQTTReceiver( // Handles Mqtt message override def messageArrived(topic: String, message: MqttMessage) { - store(new String(message.getPayload(), "utf-8")) + store(new String(message.getPayload(), StandardCharsets.UTF_8)) } override def deliveryComplete(token: IMqttDeliveryToken) { diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index 26c6dc45d5115..3680c136059a5 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -18,10 +18,10 @@ package org.apache.spark.streaming.mqtt import java.net.{ServerSocket, URI} +import java.nio.charset.StandardCharsets import scala.language.postfixOps -import com.google.common.base.Charsets.UTF_8 import org.apache.activemq.broker.{BrokerService, TransportConnector} import org.apache.commons.lang3.RandomUtils import org.eclipse.paho.client.mqttv3._ @@ -85,7 +85,7 @@ private[mqtt] class MQTTTestUtils extends Logging { client.connect() if (client.isConnected) { val msgTopic = client.getTopic(topic) - val message = new MqttMessage(data.getBytes(UTF_8)) + val message = new MqttMessage(data.getBytes(StandardCharsets.UTF_8)) message.setQos(1) message.setRetained(true) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala index bff9f328d4907..e55b05fa996ad 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.graphx import java.io.File import java.io.FileOutputStream import java.io.OutputStreamWriter +import java.nio.charset.StandardCharsets import org.apache.spark.SparkFunSuite import org.apache.spark.util.Utils @@ -30,7 +31,7 @@ class GraphLoaderSuite extends SparkFunSuite with LocalSparkContext { withSpark { sc => val tmpDir = Utils.createTempDir() val graphFile = new File(tmpDir.getAbsolutePath, "graph.txt") - val writer = new OutputStreamWriter(new FileOutputStream(graphFile)) + val writer = new OutputStreamWriter(new FileOutputStream(graphFile), StandardCharsets.UTF_8) for (i <- (1 until 101)) writer.write(s"$i 0\n") writer.close() try { diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 46410327a5d72..20387e0f1068d 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -23,6 +23,7 @@ import java.io.FileInputStream; import java.io.InputStreamReader; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -102,7 +103,7 @@ List buildJavaCommand(String extraClassPath) throws IOException { File javaOpts = new File(join(File.separator, getConfDir(), "java-opts")); if (javaOpts.isFile()) { BufferedReader br = new BufferedReader(new InputStreamReader( - new FileInputStream(javaOpts), "UTF-8")); + new FileInputStream(javaOpts), StandardCharsets.UTF_8)); try { String line; while ((line = br.readLine()) != null) { @@ -301,7 +302,7 @@ private Properties loadPropertiesFile() throws IOException { FileInputStream fd = null; try { fd = new FileInputStream(propsFile); - props.load(new InputStreamReader(fd, "UTF-8")); + props.load(new InputStreamReader(fd, StandardCharsets.UTF_8)); for (Map.Entry e : props.entrySet()) { e.setValue(e.getValue().toString().trim()); } diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java index 6e7120167d605..c7959aee9f888 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java +++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java @@ -21,6 +21,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.concurrent.ThreadFactory; import java.util.logging.Level; import java.util.logging.Logger; @@ -42,7 +43,7 @@ class OutputRedirector { OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) { this.active = true; - this.reader = new BufferedReader(new InputStreamReader(in)); + this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); this.thread = tf.newThread(new Runnable() { @Override public void run() { diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java index d36731840b1a1..a85afb58b9c98 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java @@ -199,11 +199,7 @@ private void testCmdBuilder(boolean isDriver, boolean useDefaultPropertyFile) th for (String arg : cmd) { if (arg.startsWith("-XX:MaxPermSize=")) { - if (isDriver) { - assertEquals("-XX:MaxPermSize=256m", arg); - } else { - assertEquals("-XX:MaxPermSize=256m", arg); - } + assertEquals("-XX:MaxPermSize=256m", arg); } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 132dc174a894e..53935f328ab8a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -19,6 +19,7 @@ package org.apache.spark.mllib.api.python import java.io.OutputStream import java.nio.{ByteBuffer, ByteOrder} +import java.nio.charset.StandardCharsets import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} import scala.collection.JavaConverters._ @@ -1226,7 +1227,7 @@ private[spark] object SerDe extends Serializable { def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { if (obj == this) { out.write(Opcodes.GLOBAL) - out.write((module + "\n" + name + "\n").getBytes) + out.write((module + "\n" + name + "\n").getBytes(StandardCharsets.UTF_8)) } else { pickler.save(this) // it will be memorized by Pickler saveState(obj, out, pickler) diff --git a/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java b/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java index b8ddf907d05ad..1c18b2b266fef 100644 --- a/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java @@ -19,8 +19,8 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; -import com.google.common.base.Charsets; import com.google.common.io.Files; import org.junit.After; @@ -55,7 +55,7 @@ public void setUp() throws IOException { tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource"); File file = new File(tempDir, "part-00000"); String s = "1 1:1.0 3:2.0 5:3.0\n0\n0 2:4.0 4:5.0 6:6.0"; - Files.write(s, file, Charsets.US_ASCII); + Files.write(s, file, StandardCharsets.UTF_8); path = tempDir.toURI().toString(); } diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala index 84fc08be09ee7..71f4926290b20 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala @@ -17,9 +17,9 @@ package org.apache.spark.ml.source.libsvm -import java.io.{File, IOException} +import java.io.File +import java.nio.charset.StandardCharsets -import com.google.common.base.Charsets import com.google.common.io.Files import org.apache.spark.{SparkException, SparkFunSuite} @@ -42,7 +42,7 @@ class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext { """.stripMargin tempDir = Utils.createTempDir() val file = new File(tempDir, "part-00000") - Files.write(lines, file, Charsets.US_ASCII) + Files.write(lines, file, StandardCharsets.UTF_8) path = tempDir.toURI.toString } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 70219e9ad9d3e..e542f21a1802c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -18,11 +18,11 @@ package org.apache.spark.mllib.util import java.io.File +import java.nio.charset.StandardCharsets import scala.io.Source import breeze.linalg.{squaredDistance => breezeSquaredDistance} -import com.google.common.base.Charsets import com.google.common.io.Files import org.apache.spark.SparkException @@ -84,7 +84,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { """.stripMargin val tempDir = Utils.createTempDir() val file = new File(tempDir.getPath, "part-00000") - Files.write(lines, file, Charsets.US_ASCII) + Files.write(lines, file, StandardCharsets.UTF_8) val path = tempDir.toURI.toString val pointsWithNumFeatures = loadLibSVMFile(sc, path, 6).collect() @@ -117,7 +117,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { """.stripMargin val tempDir = Utils.createTempDir() val file = new File(tempDir.getPath, "part-00000") - Files.write(lines, file, Charsets.US_ASCII) + Files.write(lines, file, StandardCharsets.UTF_8) val path = tempDir.toURI.toString intercept[SparkException] { @@ -134,7 +134,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { """.stripMargin val tempDir = Utils.createTempDir() val file = new File(tempDir.getPath, "part-00000") - Files.write(lines, file, Charsets.US_ASCII) + Files.write(lines, file, StandardCharsets.UTF_8) val path = tempDir.toURI.toString intercept[SparkException] { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/parser/ParseUtils.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/parser/ParseUtils.java index 2520c7bb8dae4..01f89112a759b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/parser/ParseUtils.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/parser/ParseUtils.java @@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.parser; +import java.nio.charset.StandardCharsets; + /** * A couple of utility methods that help with parsing ASTs. * @@ -76,7 +78,7 @@ public static String unescapeSQLString(String b) { byte bVal = (byte) ((i3 - '0') + ((i2 - '0') * 8) + ((i1 - '0') * 8 * 8)); byte[] bValArr = new byte[1]; bValArr[0] = bVal; - String tmp = new String(bValArr); + String tmp = new String(bValArr, StandardCharsets.UTF_8); sb.append(tmp); i += 3; continue; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index a76517a89cc4a..e6804d096cd96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.json4s.JsonAST._ @@ -109,7 +110,7 @@ object Literal { case DateType => create(0, DateType) case TimestampType => create(0L, TimestampType) case StringType => Literal("") - case BinaryType => Literal("".getBytes) + case BinaryType => Literal("".getBytes(StandardCharsets.UTF_8)) case CalendarIntervalType => Literal(new CalendarInterval(0, 0)) case arr: ArrayType => create(Array(), arr) case map: MapType => create(Map(), map) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index d9a9b6151a0be..b11365b297184 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst import java.io._ +import java.nio.charset.StandardCharsets import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{NumericType, StringType} @@ -118,7 +119,7 @@ package object util { val writer = new PrintWriter(out) t.printStackTrace(writer) writer.flush() - new String(out.toByteArray) + new String(out.toByteArray, StandardCharsets.UTF_8) } def stringOrNull(a: AnyRef): String = if (a == null) null else a.toString diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala index 124172bd66f19..450222d8cbba3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import java.nio.charset.StandardCharsets + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -54,7 +56,7 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Literal.default(FloatType), 0.0f) checkEvaluation(Literal.default(DoubleType), 0.0) checkEvaluation(Literal.default(StringType), "") - checkEvaluation(Literal.default(BinaryType), "".getBytes) + checkEvaluation(Literal.default(BinaryType), "".getBytes(StandardCharsets.UTF_8)) checkEvaluation(Literal.default(DecimalType.USER_DEFAULT), Decimal(0)) checkEvaluation(Literal.default(DecimalType.SYSTEM_DEFAULT), Decimal(0)) checkEvaluation(Literal.default(DateType), DateTimeUtils.toJavaDate(0)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala index 4ad65db0977c7..fba5f53715039 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import java.nio.charset.StandardCharsets + import com.google.common.math.LongMath import org.apache.spark.SparkFunSuite @@ -440,7 +442,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Hex(Literal(100800200404L)), "177828FED4") checkEvaluation(Hex(Literal(-100800200404L)), "FFFFFFE887D7012C") checkEvaluation(Hex(Literal.create(null, BinaryType)), null) - checkEvaluation(Hex(Literal("helloHex".getBytes())), "68656C6C6F486578") + checkEvaluation(Hex(Literal("helloHex".getBytes(StandardCharsets.UTF_8))), "68656C6C6F486578") // scalastyle:off // Turn off scala style for non-ascii chars checkEvaluation(Hex(Literal("δΈ‰ι‡ηš„".getBytes("UTF8"))), "E4B889E9878DE79A84") @@ -452,7 +454,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("unhex") { checkEvaluation(Unhex(Literal.create(null, StringType)), null) - checkEvaluation(Unhex(Literal("737472696E67")), "string".getBytes) + checkEvaluation(Unhex(Literal("737472696E67")), "string".getBytes(StandardCharsets.UTF_8)) checkEvaluation(Unhex(Literal("")), new Array[Byte](0)) checkEvaluation(Unhex(Literal("F")), Array[Byte](15)) checkEvaluation(Unhex(Literal("ff")), Array[Byte](-1)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala index 75131a6170222..60d50baf511d9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import java.nio.charset.StandardCharsets + import org.apache.commons.codec.digest.DigestUtils import org.apache.spark.SparkFunSuite @@ -27,7 +29,8 @@ import org.apache.spark.sql.types._ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("md5") { - checkEvaluation(Md5(Literal("ABC".getBytes)), "902fbdd2b1df0c4f70b4a5d23525e932") + checkEvaluation(Md5(Literal("ABC".getBytes(StandardCharsets.UTF_8))), + "902fbdd2b1df0c4f70b4a5d23525e932") checkEvaluation(Md5(Literal.create(Array[Byte](1, 2, 3, 4, 5, 6), BinaryType)), "6ac1e56bc78f031059be7be854522c4c") checkEvaluation(Md5(Literal.create(null, BinaryType)), null) @@ -35,27 +38,31 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("sha1") { - checkEvaluation(Sha1(Literal("ABC".getBytes)), "3c01bdbb26f358bab27f267924aa2c9a03fcfdb8") + checkEvaluation(Sha1(Literal("ABC".getBytes(StandardCharsets.UTF_8))), + "3c01bdbb26f358bab27f267924aa2c9a03fcfdb8") checkEvaluation(Sha1(Literal.create(Array[Byte](1, 2, 3, 4, 5, 6), BinaryType)), "5d211bad8f4ee70e16c7d343a838fc344a1ed961") checkEvaluation(Sha1(Literal.create(null, BinaryType)), null) - checkEvaluation(Sha1(Literal("".getBytes)), "da39a3ee5e6b4b0d3255bfef95601890afd80709") + checkEvaluation(Sha1(Literal("".getBytes(StandardCharsets.UTF_8))), + "da39a3ee5e6b4b0d3255bfef95601890afd80709") checkConsistencyBetweenInterpretedAndCodegen(Sha1, BinaryType) } test("sha2") { - checkEvaluation(Sha2(Literal("ABC".getBytes), Literal(256)), DigestUtils.sha256Hex("ABC")) + checkEvaluation(Sha2(Literal("ABC".getBytes(StandardCharsets.UTF_8)), Literal(256)), + DigestUtils.sha256Hex("ABC")) checkEvaluation(Sha2(Literal.create(Array[Byte](1, 2, 3, 4, 5, 6), BinaryType), Literal(384)), DigestUtils.sha384Hex(Array[Byte](1, 2, 3, 4, 5, 6))) // unsupported bit length checkEvaluation(Sha2(Literal.create(null, BinaryType), Literal(1024)), null) checkEvaluation(Sha2(Literal.create(null, BinaryType), Literal(512)), null) - checkEvaluation(Sha2(Literal("ABC".getBytes), Literal.create(null, IntegerType)), null) + checkEvaluation(Sha2(Literal("ABC".getBytes(StandardCharsets.UTF_8)), + Literal.create(null, IntegerType)), null) checkEvaluation(Sha2(Literal.create(null, BinaryType), Literal.create(null, IntegerType)), null) } test("crc32") { - checkEvaluation(Crc32(Literal("ABC".getBytes)), 2743272264L) + checkEvaluation(Crc32(Literal("ABC".getBytes(StandardCharsets.UTF_8))), 2743272264L) checkEvaluation(Crc32(Literal.create(Array[Byte](1, 2, 3, 4, 5, 6), BinaryType)), 2180413220L) checkEvaluation(Crc32(Literal.create(null, BinaryType)), null) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala index 68545f33e5465..1265908182b3a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.scalatest.Matchers @@ -77,16 +78,16 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers { val row = new SpecificMutableRow(fieldTypes) row.setLong(0, 0) row.update(1, UTF8String.fromString("Hello")) - row.update(2, "World".getBytes) + row.update(2, "World".getBytes(StandardCharsets.UTF_8)) val unsafeRow: UnsafeRow = converter.apply(row) assert(unsafeRow.getSizeInBytes === 8 + (8 * 3) + - roundedSize("Hello".getBytes.length) + - roundedSize("World".getBytes.length)) + roundedSize("Hello".getBytes(StandardCharsets.UTF_8).length) + + roundedSize("World".getBytes(StandardCharsets.UTF_8).length)) assert(unsafeRow.getLong(0) === 0) assert(unsafeRow.getString(1) === "Hello") - assert(unsafeRow.getBinary(2) === "World".getBytes) + assert(unsafeRow.getBinary(2) === "World".getBytes(StandardCharsets.UTF_8)) } test("basic conversion with primitive, string, date and timestamp types") { @@ -100,7 +101,8 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers { row.update(3, DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25"))) val unsafeRow: UnsafeRow = converter.apply(row) - assert(unsafeRow.getSizeInBytes === 8 + (8 * 4) + roundedSize("Hello".getBytes.length)) + assert(unsafeRow.getSizeInBytes === + 8 + (8 * 4) + roundedSize("Hello".getBytes(StandardCharsets.UTF_8).length)) assert(unsafeRow.getLong(0) === 0) assert(unsafeRow.getString(1) === "Hello") @@ -175,7 +177,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers { r.setFloat(6, 600) r.setDouble(7, 700) r.update(8, UTF8String.fromString("hello")) - r.update(9, "world".getBytes) + r.update(9, "world".getBytes(StandardCharsets.UTF_8)) r.setDecimal(10, Decimal(10), 10) r.setDecimal(11, Decimal(10.00, 38, 18), 38) // r.update(11, Array(11)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala index 1522ee34e43a5..e2a8eb8ee1d34 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import java.nio.charset.StandardCharsets + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -107,7 +109,8 @@ class GeneratedProjectionSuite extends SparkFunSuite { val fields = Array[DataType](StringType, struct) val unsafeProj = UnsafeProjection.create(fields) - val innerRow = InternalRow(false, 1.toByte, 2.toShort, 3, 4.0f, "".getBytes, + val innerRow = InternalRow(false, 1.toByte, 2.toShort, 3, 4.0f, + "".getBytes(StandardCharsets.UTF_8), UTF8String.fromString("")) val row1 = InternalRow(UTF8String.fromString(""), innerRow) val unsafe1 = unsafeProj(row1).copy() diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index 68f146f7a2622..b084eda6f84c1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -18,6 +18,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.sql.Date; import java.util.Iterator; import java.util.List; @@ -138,7 +139,7 @@ private static void appendValue(ColumnVector dst, DataType t, Object o) { } else if (t == DataTypes.DoubleType) { dst.appendDouble(((Double)o).doubleValue()); } else if (t == DataTypes.StringType) { - byte[] b =((String)o).getBytes(); + byte[] b =((String)o).getBytes(StandardCharsets.UTF_8); dst.appendByteArray(b, 0, b.length); } else if (t instanceof DecimalType) { DecimalType dt = (DecimalType)t; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index 9a8aedfa56b8c..09c001baaeafd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -18,6 +18,7 @@ import java.util.Arrays; import java.util.Iterator; +import java.util.NoSuchElementException; import org.apache.commons.lang.NotImplementedException; @@ -254,6 +255,9 @@ public Row next() { while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) { ++rowId; } + if (rowId >= maxRows) { + throw new NoSuchElementException(); + } row.rowId = rowId++; return row; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 38aa2dd80a4f1..6a0290c11228f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.csv -import java.nio.charset.Charset +import java.nio.charset.StandardCharsets import org.apache.spark.Logging import org.apache.spark.sql.execution.datasources.CompressionCodecs @@ -64,7 +64,7 @@ private[sql] class CSVOptions( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) val parseMode = parameters.getOrElse("mode", "PERMISSIVE") val charset = parameters.getOrElse("encoding", - parameters.getOrElse("charset", Charset.forName("UTF-8").name())) + parameters.getOrElse("charset", StandardCharsets.UTF_8.name())) val quote = getChar("quote", '\"') val escape = getChar("escape", '\\') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala index 8f1421844c648..8c3f63d307321 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.{ByteArrayOutputStream, OutputStreamWriter, StringReader} +import java.nio.charset.StandardCharsets import com.univocity.parsers.csv.{CsvParser, CsvParserSettings, CsvWriter, CsvWriterSettings} @@ -76,7 +77,7 @@ private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten def writeRow(row: Seq[String], includeHeader: Boolean): String = { val buffer = new ByteArrayOutputStream() - val outputWriter = new OutputStreamWriter(buffer) + val outputWriter = new OutputStreamWriter(buffer, StandardCharsets.UTF_8) val writer = new CsvWriter(outputWriter, writerSettings) if (includeHeader) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index aff672281d640..42c07c8a23f5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.csv -import java.nio.charset.Charset +import java.nio.charset.{Charset, StandardCharsets} import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.io.{LongWritable, Text} @@ -161,7 +161,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { sqlContext: SQLContext, options: CSVOptions, location: String): RDD[String] = { - if (Charset.forName(options.charset) == Charset.forName("UTF-8")) { + if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { sqlContext.sparkContext.textFile(location) } else { val charset = options.charset diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 14ba9f69bb1d7..cce4b74ff2994 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.execution.streaming import java.io._ +import java.nio.charset.StandardCharsets import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.io.Codec -import com.google.common.base.Charsets.UTF_8 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.spark.Logging @@ -184,7 +184,7 @@ class FileStreamSource( private def writeBatch(id: Int, files: Seq[String]): Unit = { assert(files.nonEmpty, "create a new batch without any file") val output = fs.create(new Path(metadataPath + "/" + id), true) - val writer = new PrintWriter(new OutputStreamWriter(output, UTF_8)) + val writer = new PrintWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)) try { // scalastyle:off println writer.println(FileStreamSource.VERSION) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java index 0f9e453d26db8..9e65158eb0a33 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java @@ -40,7 +40,6 @@ public class JavaSaveLoadSuite { private transient JavaSparkContext sc; private transient SQLContext sqlContext; - String originalDefaultSource; File path; Dataset df; @@ -57,7 +56,6 @@ public void setUp() throws IOException { sqlContext = new SQLContext(_sc); sc = new JavaSparkContext(_sc); - originalDefaultSource = sqlContext.conf().defaultDataSourceName(); path = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile(); if (path.exists()) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index aff9efe4b2b16..2aa6f8d4acf7d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.nio.charset.StandardCharsets + import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -167,12 +169,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { } test("misc sha1 function") { - val df = Seq(("ABC", "ABC".getBytes)).toDF("a", "b") + val df = Seq(("ABC", "ABC".getBytes(StandardCharsets.UTF_8))).toDF("a", "b") checkAnswer( df.select(sha1($"a"), sha1($"b")), Row("3c01bdbb26f358bab27f267924aa2c9a03fcfdb8", "3c01bdbb26f358bab27f267924aa2c9a03fcfdb8")) - val dfEmpty = Seq(("", "".getBytes)).toDF("a", "b") + val dfEmpty = Seq(("", "".getBytes(StandardCharsets.UTF_8))).toDF("a", "b") checkAnswer( dfEmpty.selectExpr("sha1(a)", "sha1(b)"), Row("da39a3ee5e6b4b0d3255bfef95601890afd80709", "da39a3ee5e6b4b0d3255bfef95601890afd80709")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index e6e27ec413bb2..2333fa27ca623 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import java.io.File +import java.nio.charset.StandardCharsets import scala.language.postfixOps import scala.util.Random @@ -665,8 +666,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("showString: binary") { val df = Seq( - ("12".getBytes, "ABC.".getBytes), - ("34".getBytes, "12346".getBytes) + ("12".getBytes(StandardCharsets.UTF_8), "ABC.".getBytes(StandardCharsets.UTF_8)), + ("34".getBytes(StandardCharsets.UTF_8), "12346".getBytes(StandardCharsets.UTF_8)) ).toDF() val expectedAnswer = """+-------+----------------+ || _1| _2| diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala index 013a90875e2b2..f5a67fd782d63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.nio.charset.StandardCharsets + import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions.{log => logarithm} import org.apache.spark.sql.test.SharedSQLContext @@ -262,9 +264,9 @@ class MathExpressionsSuite extends QueryTest with SharedSQLContext { test("unhex") { val data = Seq(("1C", "737472696E67")).toDF("a", "b") checkAnswer(data.select(unhex('a)), Row(Array[Byte](28.toByte))) - checkAnswer(data.select(unhex('b)), Row("string".getBytes)) + checkAnswer(data.select(unhex('b)), Row("string".getBytes(StandardCharsets.UTF_8))) checkAnswer(data.selectExpr("unhex(a)"), Row(Array[Byte](28.toByte))) - checkAnswer(data.selectExpr("unhex(b)"), Row("string".getBytes)) + checkAnswer(data.selectExpr("unhex(b)"), Row("string".getBytes(StandardCharsets.UTF_8))) checkAnswer(data.selectExpr("""unhex("##")"""), Row(null)) checkAnswer(data.selectExpr("""unhex("G123")"""), Row(null)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala index 0000a5d1efd09..1aadd700d7443 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.columnar.compression import java.nio.{ByteBuffer, ByteOrder} +import java.nio.charset.StandardCharsets import org.apache.commons.lang3.RandomStringUtils import org.apache.commons.math3.distribution.LogNormalDistribution @@ -313,7 +314,7 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes { } for (i <- 0 until count) { testData.putInt(strLen) - testData.put(g().getBytes) + testData.put(g().getBytes(StandardCharsets.UTF_8)) } testData.rewind() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala index 97638a66ab473..67b3d98c1daed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import scala.util.Random @@ -357,7 +358,8 @@ object ColumnarBatchBenchmark { val maxString = 32 val count = 4 * 1000 - val data = Seq.fill(count)(randomString(minString, maxString)).map(_.getBytes).toArray + val data = Seq.fill(count)(randomString(minString, maxString)) + .map(_.getBytes(StandardCharsets.UTF_8)).toArray def column(memoryMode: MemoryMode) = { i: Int => val column = ColumnVector.allocate(count, BinaryType, memoryMode) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index b3c3e66fbcbd5..ed97f59ea1690 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.vectorized +import java.nio.charset.StandardCharsets + import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random @@ -329,18 +331,21 @@ class ColumnarBatchSuite extends SparkFunSuite { var idx = 0 val values = ("Hello" :: "abc" :: Nil).toArray - column.putByteArray(idx, values(0).getBytes, 0, values(0).getBytes().length) + column.putByteArray(idx, values(0).getBytes(StandardCharsets.UTF_8), + 0, values(0).getBytes(StandardCharsets.UTF_8).length) reference += values(0) idx += 1 assert(column.arrayData().elementsAppended == 5) - column.putByteArray(idx, values(1).getBytes, 0, values(1).getBytes().length) + column.putByteArray(idx, values(1).getBytes(StandardCharsets.UTF_8), + 0, values(1).getBytes(StandardCharsets.UTF_8).length) reference += values(1) idx += 1 assert(column.arrayData().elementsAppended == 8) // Just put llo - val offset = column.putByteArray(idx, values(0).getBytes, 2, values(0).getBytes().length - 2) + val offset = column.putByteArray(idx, values(0).getBytes(StandardCharsets.UTF_8), + 2, values(0).getBytes(StandardCharsets.UTF_8).length - 2) reference += "llo" idx += 1 assert(column.arrayData().elementsAppended == 11) @@ -353,7 +358,7 @@ class ColumnarBatchSuite extends SparkFunSuite { // Put a long string val s = "abcdefghijklmnopqrstuvwxyz" - column.putByteArray(idx, (s + s).getBytes) + column.putByteArray(idx, (s + s).getBytes(StandardCharsets.UTF_8)) reference += (s + s) idx += 1 assert(column.arrayData().elementsAppended == 11 + (s + s).length) @@ -473,7 +478,7 @@ class ColumnarBatchSuite extends SparkFunSuite { batch.column(0).putInt(0, 1) batch.column(1).putDouble(0, 1.1) batch.column(2).putNull(0) - batch.column(3).putByteArray(0, "Hello".getBytes) + batch.column(3).putByteArray(0, "Hello".getBytes(StandardCharsets.UTF_8)) batch.setNumRows(1) // Verify the results of the row. @@ -519,17 +524,17 @@ class ColumnarBatchSuite extends SparkFunSuite { batch.column(0).putNull(0) batch.column(1).putDouble(0, 2.2) batch.column(2).putInt(0, 2) - batch.column(3).putByteArray(0, "abc".getBytes) + batch.column(3).putByteArray(0, "abc".getBytes(StandardCharsets.UTF_8)) batch.column(0).putInt(1, 3) batch.column(1).putNull(1) batch.column(2).putInt(1, 3) - batch.column(3).putByteArray(1, "".getBytes) + batch.column(3).putByteArray(1, "".getBytes(StandardCharsets.UTF_8)) batch.column(0).putInt(2, 4) batch.column(1).putDouble(2, 4.4) batch.column(2).putInt(2, 4) - batch.column(3).putByteArray(2, "world".getBytes) + batch.column(3).putByteArray(2, "world".getBytes(StandardCharsets.UTF_8)) batch.setNumRows(3) def rowEquals(x: InternalRow, y: Row): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index e9d77abb8c23c..e6889bcc783b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.streaming import java.io.{ByteArrayInputStream, File, FileNotFoundException, InputStream} - -import com.google.common.base.Charsets.UTF_8 +import java.nio.charset.StandardCharsets import org.apache.spark.sql.{AnalysisException, StreamTest} import org.apache.spark.sql.catalyst.util._ @@ -392,7 +391,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("readBatch") { - def stringToStream(str: String): InputStream = new ByteArrayInputStream(str.getBytes(UTF_8)) + def stringToStream(str: String): InputStream = + new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)) // Invalid metadata assert(readBatch(stringToStream("")) === Nil) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala index 83c63e04f344a..7fa6760b71c8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.test +import java.nio.charset.StandardCharsets + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext, SQLImplicits} @@ -103,11 +105,11 @@ private[sql] trait SQLTestData { self => protected lazy val binaryData: DataFrame = { val df = sqlContext.sparkContext.parallelize( - BinaryData("12".getBytes, 1) :: - BinaryData("22".getBytes, 5) :: - BinaryData("122".getBytes, 3) :: - BinaryData("121".getBytes, 2) :: - BinaryData("123".getBytes, 4) :: Nil).toDF() + BinaryData("12".getBytes(StandardCharsets.UTF_8), 1) :: + BinaryData("22".getBytes(StandardCharsets.UTF_8), 5) :: + BinaryData("122".getBytes(StandardCharsets.UTF_8), 3) :: + BinaryData("121".getBytes(StandardCharsets.UTF_8), 2) :: + BinaryData("123".getBytes(StandardCharsets.UTF_8), 4) :: Nil).toDF() df.registerTempTable("binaryData") df } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 81508e134695a..54fffb971dbc3 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.thriftserver import java.io._ +import java.nio.charset.StandardCharsets import java.sql.Timestamp import java.util.Date @@ -121,7 +122,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { val process = new ProcessBuilder(command: _*).start() - val stdinWriter = new OutputStreamWriter(process.getOutputStream) + val stdinWriter = new OutputStreamWriter(process.getOutputStream, StandardCharsets.UTF_8) stdinWriter.write(queriesString) stdinWriter.flush() stdinWriter.close() diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index c05527b519daa..e89bb1c470d5a 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File import java.net.URL +import java.nio.charset.StandardCharsets import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable @@ -28,7 +29,6 @@ import scala.concurrent.duration._ import scala.io.Source import scala.util.{Random, Try} -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.jdbc.HiveDriver @@ -700,7 +700,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n """.stripMargin, new File(s"$tempLog4jConf/log4j.properties"), - UTF_8) + StandardCharsets.UTF_8) tempLog4jConf } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 081d849a88886..9725dcfde169c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive import java.io.File import java.net.{URL, URLClassLoader} +import java.nio.charset.StandardCharsets import java.sql.Timestamp import java.util.concurrent.TimeUnit import java.util.regex.Pattern @@ -715,7 +716,7 @@ private[hive] object HiveContext { case (null, _) => "NULL" case (d: Int, DateType) => new DateWritable(d).toString case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString - case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8") + case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8) case (decimal: java.math.BigDecimal, DecimalType()) => // Hive strips trailing zeros so use its toString HiveDecimal.create(decimal).toString diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 5e6641693798f..b6e2f1f6b3ab7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import java.io._ +import java.nio.charset.StandardCharsets import java.util.Properties import javax.annotation.Nullable @@ -113,7 +114,7 @@ case class ScriptTransformation( ioschema.initOutputSerDe(output).getOrElse((null, null)) } - val reader = new BufferedReader(new InputStreamReader(inputStream)) + val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) val outputIterator: Iterator[InternalRow] = new Iterator[InternalRow] with HiveInspectors { var curLine: String = null val scriptOutputStream = new DataInputStream(inputStream) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index 5a539eaec7507..e9356541c22df 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -48,7 +48,6 @@ public class JavaMetastoreDataSourcesSuite { private transient JavaSparkContext sc; private transient HiveContext sqlContext; - String originalDefaultSource; File path; Path hiveManagedPath; FileSystem fs; @@ -66,7 +65,6 @@ public void setUp() throws IOException { sqlContext = TestHive$.MODULE$; sc = new JavaSparkContext(sqlContext.sparkContext()); - originalDefaultSource = sqlContext.conf().defaultDataSourceName(); path = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile(); if (path.exists()) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 9ca07e96eb088..8cfb32f00a884 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.orc import java.io.File +import java.nio.charset.StandardCharsets import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.io.orc.CompressionKind @@ -73,7 +74,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { test("Read/write binary data") { withOrcFile(BinaryData("test".getBytes("utf8")) :: Nil) { file => val bytes = read.orc(file).head().getAs[Array[Byte]](0) - assert(new String(bytes, "utf8") === "test") + assert(new String(bytes, StandardCharsets.UTF_8) === "test") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 441477479167a..f7519c10c8eb1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.dstream import java.io._ import java.net.{ConnectException, Socket} +import java.nio.charset.StandardCharsets import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -113,7 +114,8 @@ object SocketReceiver { * to '\n' delimited strings and returns an iterator to access the strings. */ def bytesToLines(inputStream: InputStream): Iterator[String] = { - val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) + val dataInputStream = new BufferedReader( + new InputStreamReader(inputStream, StandardCharsets.UTF_8)) new NextIterator[String] { protected override def getNext() = { val nextValue = dataInputStream.readLine() diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 806cea24caddb..66448fd40057d 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -18,7 +18,7 @@ package org.apache.spark.streaming; import java.io.*; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -1866,7 +1866,8 @@ public void testSocketString() { @Override public Iterable call(InputStream in) throws IOException { List out = new ArrayList<>(); - try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(in, StandardCharsets.UTF_8))) { for (String line; (line = reader.readLine()) != null;) { out.add(line); } @@ -1930,7 +1931,7 @@ public void testRawSocketStream() { private static List> fileTestPrepare(File testDir) throws IOException { File existingFile = new File(testDir, "0"); - Files.write("0\n", existingFile, Charset.forName("UTF-8")); + Files.write("0\n", existingFile, StandardCharsets.UTF_8); Assert.assertTrue(existingFile.setLastModified(1000)); Assert.assertEquals(1000, existingFile.lastModified()); return Arrays.asList(Arrays.asList("0")); diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java index d09258e0e4a85..091ccbfd85cad 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -38,6 +38,7 @@ import java.io.Serializable; import java.net.ConnectException; import java.net.Socket; +import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicLong; public class JavaReceiverAPISuite implements Serializable { @@ -126,7 +127,8 @@ private void receive() { BufferedReader in = null; try { socket = new Socket(host, port); - in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + in = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); String userInput; while ((userInput = in.readLine()) != null) { store(userInput); diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index ca716cf4e6f11..9a3248b3e8175 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -18,12 +18,12 @@ package org.apache.spark.streaming import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} +import java.nio.charset.StandardCharsets import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import com.google.common.base.Charsets import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -609,7 +609,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester */ def writeFile(i: Int, clock: Clock): Unit = { val file = new File(testDir, i.toString) - Files.write(i + "\n", file, Charsets.UTF_8) + Files.write(i + "\n", file, StandardCharsets.UTF_8) assert(file.setLastModified(clock.getTimeMillis())) // Check that the file's modification date is actually the value we wrote, since rounding or // truncation will break the test: diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index fa17b3a15c4b6..cc2a67187e710 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming import java.io.{BufferedWriter, File, OutputStreamWriter} import java.net.{ServerSocket, Socket, SocketException} -import java.nio.charset.Charset +import java.nio.charset.StandardCharsets import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger @@ -146,7 +146,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val testDir = Utils.createTempDir() // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") - Files.write("0\n", existingFile, Charset.forName("UTF-8")) + Files.write("0\n", existingFile, StandardCharsets.UTF_8) assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) // Set up the streaming context and input streams @@ -369,7 +369,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val testDir = Utils.createTempDir() // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") - Files.write("0\n", existingFile, Charset.forName("UTF-8")) + Files.write("0\n", existingFile, StandardCharsets.UTF_8) assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) // Set up the streaming context and input streams @@ -393,7 +393,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val input = Seq(1, 2, 3, 4, 5) input.foreach { i => val file = new File(testDir, i.toString) - Files.write(i + "\n", file, Charset.forName("UTF-8")) + Files.write(i + "\n", file, StandardCharsets.UTF_8) assert(file.setLastModified(clock.getTimeMillis())) assert(file.lastModified === clock.getTimeMillis()) logInfo("Created file " + file) @@ -448,7 +448,7 @@ class TestServer(portToBind: Int = 0) extends Logging { try { clientSocket.setTcpNoDelay(true) val outputStream = new BufferedWriter( - new OutputStreamWriter(clientSocket.getOutputStream)) + new OutputStreamWriter(clientSocket.getOutputStream, StandardCharsets.UTF_8)) while (clientSocket.isConnected) { val msg = queue.poll(100, TimeUnit.MILLISECONDS) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala index faa9c4f0cbd6a..6406d53f8941a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming import java.io.{File, IOException} -import java.nio.charset.Charset +import java.nio.charset.StandardCharsets import java.util.UUID import scala.collection.JavaConverters._ @@ -371,7 +371,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) val localFile = new File(localTestDir, (i + 1).toString) val hadoopFile = new Path(testDir, (i + 1).toString) val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString) - Files.write(input(i) + "\n", localFile, Charset.forName("UTF-8")) + Files.write(input(i) + "\n", localFile, StandardCharsets.UTF_8) var tries = 0 var done = false while (!done && tries < maxTries) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 0b5ceb768cc86..1035056457830 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, I OutputStreamWriter} import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.util.{Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -29,7 +30,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal -import com.google.common.base.Charsets.UTF_8 import com.google.common.base.Objects import com.google.common.io.Files import org.apache.hadoop.conf.Configuration @@ -619,7 +619,7 @@ private[spark] class Client( val props = new Properties() sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) } confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE)) - val writer = new OutputStreamWriter(confStream, UTF_8) + val writer = new OutputStreamWriter(confStream, StandardCharsets.UTF_8) props.store(writer, "Spark configuration.") writer.flush() confStream.closeEntry() diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index b12e506033e39..78b57da482f70 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.yarn import java.io.{File, FileOutputStream, OutputStreamWriter} +import java.nio.charset.StandardCharsets import java.util.Properties import java.util.concurrent.TimeUnit @@ -25,7 +26,6 @@ import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -75,7 +75,7 @@ abstract class BaseYarnClusterSuite System.setProperty("SPARK_YARN_MODE", "true") val logConfFile = new File(logConfDir, "log4j.properties") - Files.write(LOG4J_CONF, logConfFile, UTF_8) + Files.write(LOG4J_CONF, logConfFile, StandardCharsets.UTF_8) // Disable the disk utilization check to avoid the test hanging when people's disks are // getting full. @@ -191,7 +191,7 @@ abstract class BaseYarnClusterSuite result: File, expected: String): Unit = { finalState should be (SparkAppHandle.State.FINISHED) - val resultString = Files.toString(result, UTF_8) + val resultString = Files.toString(result, StandardCharsets.UTF_8) resultString should be (expected) } @@ -231,7 +231,7 @@ abstract class BaseYarnClusterSuite extraConf.foreach { case (k, v) => props.setProperty(k, v) } val propsFile = File.createTempFile("spark", ".properties", tempDir) - val writer = new OutputStreamWriter(new FileOutputStream(propsFile), UTF_8) + val writer = new OutputStreamWriter(new FileOutputStream(propsFile), StandardCharsets.UTF_8) props.store(writer, "Spark properties.") writer.close() propsFile.getAbsolutePath() diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index e935163c3487f..5068c0cd208b6 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -19,13 +19,13 @@ package org.apache.spark.deploy.yarn import java.io.File import java.net.URL +import java.nio.charset.StandardCharsets import java.util.{HashMap => JHashMap} import scala.collection.mutable import scala.concurrent.duration._ import scala.language.postfixOps -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.Matchers @@ -147,7 +147,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { private def testPySpark(clientMode: Boolean): Unit = { val primaryPyFile = new File(tempDir, "test.py") - Files.write(TEST_PYFILE, primaryPyFile, UTF_8) + Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8) // When running tests, let's not assume the user has built the assembly module, which also // creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH to point at the @@ -171,7 +171,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { subdir } val pyModule = new File(moduleDir, "mod1.py") - Files.write(TEST_PYMODULE, pyModule, UTF_8) + Files.write(TEST_PYMODULE, pyModule, StandardCharsets.UTF_8) val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir) val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",") @@ -245,7 +245,7 @@ private object YarnClusterDriver extends Logging with Matchers { data should be (Set(1, 2, 3, 4)) result = "success" } finally { - Files.write(result, status, UTF_8) + Files.write(result, status, StandardCharsets.UTF_8) sc.stop() } @@ -319,14 +319,14 @@ private object YarnClasspathTest extends Logging { val ccl = Thread.currentThread().getContextClassLoader() val resource = ccl.getResourceAsStream("test.resource") val bytes = ByteStreams.toByteArray(resource) - result = new String(bytes, 0, bytes.length, UTF_8) + result = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8) } catch { case t: Throwable => error(s"loading test.resource to $resultPath", t) // set the exit code if not yet set exitCode = 2 } finally { - Files.write(result, new File(resultPath), UTF_8) + Files.write(result, new File(resultPath), StandardCharsets.UTF_8) } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index c17e8695c24fb..1538ff75be5b3 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -18,8 +18,8 @@ package org.apache.spark.deploy.yarn import java.io.File +import java.nio.charset.StandardCharsets -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.commons.io.FileUtils import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -104,7 +104,7 @@ private object YarnExternalShuffleDriver extends Logging with Matchers { } finally { sc.stop() FileUtils.deleteDirectory(execStateCopy) - Files.write(result, status, UTF_8) + Files.write(result, status, StandardCharsets.UTF_8) } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 9202bd892f01b..70b8732946a2b 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import java.io.{File, IOException} import java.lang.reflect.InvocationTargetException +import java.nio.charset.StandardCharsets import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.conf.Configuration @@ -59,7 +60,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", "\\arg6") try { val argLine = args.map(a => YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ") - Files.write(("bash -c \"echo " + argLine + "\"").getBytes(), scriptFile) + Files.write(("bash -c \"echo " + argLine + "\"").getBytes(StandardCharsets.UTF_8), scriptFile) scriptFile.setExecutable(true) val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath()))