From f3daa099bf60edbd6ebf997c00e46db1e09f6dda Mon Sep 17 00:00:00 2001 From: Jacky Li Date: Sun, 13 Mar 2016 18:44:02 -0700 Subject: [PATCH 1/3] [SQL] fix typo in DataSourceRegister ## What changes were proposed in this pull request? fix typo in DataSourceRegister ## How was this patch tested? found when going through latest code Author: Jacky Li Closes #11686 from jackylk/patch-12. --- .../main/scala/org/apache/spark/sql/sources/interfaces.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index e251b52f6c0f9..be2d98c46d3b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -57,7 +57,7 @@ trait DataSourceRegister { * overridden by children to provide a nice alias for the data source. For example: * * {{{ - * override def format(): String = "parquet" + * override def shortName(): String = "parquet" * }}} * * @since 1.5.0 From 473263f9598d1cf880f421aae1b51eb0b6e3cf79 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 13 Mar 2016 18:47:04 -0700 Subject: [PATCH 2/3] [SPARK-13834][BUILD] Update sbt and sbt plugins for 2.x. ## What changes were proposed in this pull request? For 2.0.0, we had better make **sbt** and **sbt plugins** up-to-date. This PR checks the status of each plugins and bumps the followings. * sbt: 0.13.9 --> 0.13.11 * sbteclipse-plugin: 2.2.0 --> 4.0.0 * sbt-dependency-graph: 0.7.4 --> 0.8.2 * sbt-mima-plugin: 0.1.6 --> 0.1.9 * sbt-revolver: 0.7.2 --> 0.8.0 All other plugins are up-to-date. (Note that `sbt-avro` seems to be change from 0.3.2 to 1.0.1, but it's not published in the repository.) During upgrade, this PR also updated the following MiMa error. Note that the related excluding filter is already registered correctly. It seems due to the change of MiMa exception result. ``` // SPARK-12896 Send only accumulator updates to driver, not TaskMetrics ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulable.this"), -ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulator.this"), +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.Accumulator.this"), ``` ## How was this patch tested? Pass the Jenkins build. Author: Dongjoon Hyun Closes #11669 from dongjoon-hyun/update_mima. --- dev/mima | 2 +- project/MimaExcludes.scala | 2 +- project/SparkBuild.scala | 5 ++--- project/build.properties | 2 +- project/plugins.sbt | 8 ++++---- 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/dev/mima b/dev/mima index b7f8d62b7d26f..c8e2df6cfcd4f 100755 --- a/dev/mima +++ b/dev/mima @@ -40,7 +40,7 @@ SPARK_PROFILES="-Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -P generate_mima_ignore "$(build/sbt $SPARK_PROFILES "export assembly/fullClasspath" | tail -n1)" generate_mima_ignore "$(build/sbt $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)" -echo -e "q\n" | build/sbt mima-report-binary-issues | grep -v -e "info.*Resolving" +echo -e "q\n" | build/sbt mimaReportBinaryIssues | grep -v -e "info.*Resolving" ret_val=$? if [ $ret_val != 0 ]; then diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 567a717b9d24b..c4c8d8870f8b9 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -192,7 +192,7 @@ object MimaExcludes { ) ++ Seq( // SPARK-12896 Send only accumulator updates to driver, not TaskMetrics ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulable.this"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.Accumulator.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.Accumulator.this"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.Accumulator.initialValue") ) ++ Seq( // SPARK-12692 Scala style: Fix the style violation (Space before "," or ":") diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index e74fb174725d3..d1c67eac82663 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -26,7 +26,6 @@ import sbt.Classpaths.publishTask import sbt.Keys._ import sbtunidoc.Plugin.UnidocKeys.unidocGenjavadocVersion import com.typesafe.sbt.pom.{PomBuild, SbtPomKeys} -import net.virtualvoid.sbt.graph.Plugin.graphSettings import spray.revolver.RevolverPlugin._ @@ -144,7 +143,7 @@ object SparkBuild extends PomBuild { "org.spark-project" %% "genjavadoc-plugin" % unidocGenjavadocVersion.value cross CrossVersion.full), scalacOptions <+= target.map(t => "-P:genjavadoc:out=" + (t / "java"))) - lazy val sharedSettings = graphSettings ++ sparkGenjavadocSettings ++ Seq ( + lazy val sharedSettings = sparkGenjavadocSettings ++ Seq ( javaHome := sys.env.get("JAVA_HOME") .orElse(sys.props.get("java.home").map { p => new File(p).getParentFile().getAbsolutePath() }) .map(file), @@ -241,7 +240,7 @@ object SparkBuild extends PomBuild { /* Enable shared settings on all projects */ (allProjects ++ optionallyEnabledProjects ++ assemblyProjects ++ Seq(spark, tools)) .foreach(enable(sharedSettings ++ DependencyOverrides.settings ++ - ExcludedDependencies.settings ++ Revolver.settings)) + ExcludedDependencies.settings)) /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) diff --git a/project/build.properties b/project/build.properties index 86ca8755820a4..1e38156e0b577 100644 --- a/project/build.properties +++ b/project/build.properties @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. # -sbt.version=0.13.9 +sbt.version=0.13.11 diff --git a/project/plugins.sbt b/project/plugins.sbt index 822a7c4a82d5e..eeca94a47ce79 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,14 +1,14 @@ addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") -addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.2.0") +addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0") addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0") -addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.4") +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2") addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.8.0") -addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6") +addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.9") addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.1") @@ -16,7 +16,7 @@ addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3") addSbtPlugin("com.cavorite" % "sbt-avro" % "0.3.2") -addSbtPlugin("io.spray" % "sbt-revolver" % "0.7.2") +addSbtPlugin("io.spray" % "sbt-revolver" % "0.8.0") libraryDependencies += "org.ow2.asm" % "asm" % "5.0.3" From 184085284185011d7cc6d054b54d2d38eaf1dd77 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sun, 13 Mar 2016 21:03:49 -0700 Subject: [PATCH 3/3] [SPARK-13823][CORE][STREAMING][SQL] Always specify Charset in String <-> byte[] conversions (and remaining Coverity items) ## What changes were proposed in this pull request? - Fixes calls to `new String(byte[])` or `String.getBytes()` that rely on platform default encoding, to use UTF-8 - Same for `InputStreamReader` and `OutputStreamWriter` constructors - Standardizes on UTF-8 everywhere - Standardizes specifying the encoding with `StandardCharsets.UTF-8`, not the Guava constant or "UTF-8" (which means handling `UnuspportedEncodingException`) - (also addresses the other remaining Coverity scan issues, which are pretty trivial; these are separated into commit https://github.com/srowen/spark/commit/1deecd8d9ca986d8adb1a42d315890ce5349d29c ) ## How was this patch tested? Jenkins tests Author: Sean Owen Closes #11657 from srowen/SPARK-13823. --- .../network/client/StreamInterceptor.java | 3 +- .../spark/network/protocol/Encoders.java | 8 ++--- .../spark/network/sasl/SparkSaslServer.java | 10 +++--- .../apache/spark/network/util/JavaUtils.java | 6 ++-- .../shuffle/ExternalShuffleBlockResolver.java | 12 +++---- .../ExternalShuffleBlockResolverSuite.java | 36 +++++++++++-------- .../shuffle/ExternalShuffleCleanupSuite.java | 21 ++++++----- .../apache/spark/unsafe/types/UTF8String.java | 10 ++---- .../apache/spark/api/python/PythonRDD.scala | 8 ++--- .../api/python/PythonWorkerFactory.scala | 3 +- .../WriteInputFormatTestDataGenerator.scala | 4 +-- .../scala/org/apache/spark/api/r/SerDe.scala | 3 +- .../spark/deploy/FaultToleranceTest.scala | 4 ++- .../spark/deploy/SparkSubmitArguments.scala | 3 +- .../deploy/rest/RestSubmissionClient.scala | 4 +-- .../spark/deploy/worker/DriverRunner.scala | 4 +-- .../spark/deploy/worker/ExecutorRunner.scala | 4 +-- .../scheduler/EventLoggingListener.scala | 4 +-- .../serializer/GenericAvroSerializer.scala | 3 +- .../scala/org/apache/spark/util/Utils.scala | 5 +-- .../java/org/apache/spark/JavaAPISuite.java | 4 +-- .../sort/ShuffleInMemorySorterSuite.java | 3 +- .../sort/UnsafeExternalSorterSuite.java | 2 -- .../sort/UnsafeInMemorySorterSuite.java | 3 +- .../org/apache/spark/SparkContextSuite.scala | 17 ++++----- .../spark/api/python/PythonRDDSuite.scala | 11 +++--- .../spark/deploy/SparkSubmitSuite.scala | 6 ++-- .../history/FsHistoryProviderSuite.scala | 9 ++--- .../deploy/history/HistoryServerSuite.scala | 6 ++-- .../rest/StandaloneRestSubmitSuite.scala | 4 +-- .../NettyBlockTransferSecuritySuite.scala | 7 ++-- .../apache/spark/util/FileAppenderSuite.scala | 12 +++---- .../org/apache/spark/util/UtilsSuite.scala | 16 ++++----- docs/streaming-custom-receivers.md | 6 ++-- .../streaming/JavaCustomReceiver.java | 4 ++- .../examples/streaming/CustomReceiver.scala | 4 ++- .../streaming/flume/sink/SparkSinkSuite.scala | 4 ++- .../streaming/flume/FlumeTestUtils.scala | 4 +-- .../flume/PollingFlumeTestUtils.scala | 5 +-- .../spark/streaming/kafka/KafkaUtils.scala | 4 +-- .../streaming/kinesis/KinesisTestUtils.scala | 3 +- .../kinesis/KPLBasedKinesisTestUtils.scala | 3 +- .../streaming/mqtt/MQTTInputDStream.scala | 4 ++- .../spark/streaming/mqtt/MQTTTestUtils.scala | 4 +-- .../spark/graphx/GraphLoaderSuite.scala | 3 +- .../launcher/AbstractCommandBuilder.java | 5 +-- .../spark/launcher/OutputRedirector.java | 3 +- .../SparkSubmitCommandBuilderSuite.java | 6 +--- .../mllib/api/python/PythonMLLibAPI.scala | 3 +- .../libsvm/JavaLibSVMRelationSuite.java | 4 +-- .../source/libsvm/LibSVMRelationSuite.scala | 6 ++-- .../spark/mllib/util/MLUtilsSuite.scala | 8 ++--- .../spark/sql/catalyst/parser/ParseUtils.java | 4 ++- .../sql/catalyst/expressions/literals.scala | 3 +- .../spark/sql/catalyst/util/package.scala | 3 +- .../expressions/LiteralExpressionSuite.scala | 4 ++- .../expressions/MathFunctionsSuite.scala | 6 ++-- .../expressions/MiscFunctionsSuite.scala | 19 ++++++---- .../expressions/UnsafeRowConverterSuite.scala | 14 ++++---- .../codegen/GeneratedProjectionSuite.scala | 5 ++- .../vectorized/ColumnVectorUtils.java | 3 +- .../execution/vectorized/ColumnarBatch.java | 4 +++ .../datasources/csv/CSVOptions.scala | 4 +-- .../execution/datasources/csv/CSVParser.scala | 3 +- .../datasources/csv/DefaultSource.scala | 4 +-- .../streaming/FileStreamSource.scala | 4 +-- .../spark/sql/sources/JavaSaveLoadSuite.java | 2 -- .../spark/sql/DataFrameFunctionsSuite.scala | 6 ++-- .../org/apache/spark/sql/DataFrameSuite.scala | 5 +-- .../spark/sql/MathExpressionsSuite.scala | 6 ++-- .../CompressionSchemeBenchmark.scala | 3 +- .../vectorized/ColumnarBatchBenchmark.scala | 4 ++- .../vectorized/ColumnarBatchSuite.scala | 21 ++++++----- .../sql/streaming/FileStreamSourceSuite.scala | 6 ++-- .../apache/spark/sql/test/SQLTestData.scala | 12 ++++--- .../sql/hive/thriftserver/CliSuite.scala | 3 +- .../HiveThriftServer2Suites.scala | 4 +-- .../apache/spark/sql/hive/HiveContext.scala | 3 +- .../hive/execution/ScriptTransformation.scala | 3 +- .../hive/JavaMetastoreDataSourcesSuite.java | 2 -- .../spark/sql/hive/orc/OrcQuerySuite.scala | 3 +- .../dstream/SocketInputDStream.scala | 4 ++- .../apache/spark/streaming/JavaAPISuite.java | 7 ++-- .../spark/streaming/JavaReceiverAPISuite.java | 4 ++- .../spark/streaming/CheckpointSuite.scala | 4 +-- .../spark/streaming/InputStreamsSuite.scala | 10 +++--- .../spark/streaming/MasterFailureTest.scala | 4 +-- .../org/apache/spark/deploy/yarn/Client.scala | 4 +-- .../deploy/yarn/BaseYarnClusterSuite.scala | 8 ++--- .../spark/deploy/yarn/YarnClusterSuite.scala | 12 +++---- .../yarn/YarnShuffleIntegrationSuite.scala | 4 +-- .../yarn/YarnSparkHadoopUtilSuite.scala | 3 +- 92 files changed, 321 insertions(+), 244 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java b/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java index 88ba3ccebdf20..b0e85bae7c309 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java @@ -34,8 +34,7 @@ class StreamInterceptor implements TransportFrameDecoder.Interceptor { private final String streamId; private final long byteCount; private final StreamCallback callback; - - private volatile long bytesRead; + private long bytesRead; StreamInterceptor( TransportResponseHandler handler, diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java index 9162d0b977f83..be217522367c5 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java @@ -17,8 +17,8 @@ package org.apache.spark.network.protocol; +import java.nio.charset.StandardCharsets; -import com.google.common.base.Charsets; import io.netty.buffer.ByteBuf; /** Provides a canonical set of Encoders for simple types. */ @@ -27,11 +27,11 @@ public class Encoders { /** Strings are encoded with their length followed by UTF-8 bytes. */ public static class Strings { public static int encodedLength(String s) { - return 4 + s.getBytes(Charsets.UTF_8).length; + return 4 + s.getBytes(StandardCharsets.UTF_8).length; } public static void encode(ByteBuf buf, String s) { - byte[] bytes = s.getBytes(Charsets.UTF_8); + byte[] bytes = s.getBytes(StandardCharsets.UTF_8); buf.writeInt(bytes.length); buf.writeBytes(bytes); } @@ -40,7 +40,7 @@ public static String decode(ByteBuf buf) { int length = buf.readInt(); byte[] bytes = new byte[length]; buf.readBytes(bytes); - return new String(bytes, Charsets.UTF_8); + return new String(bytes, StandardCharsets.UTF_8); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java index 431cb67a2ae0b..b802a5af63c94 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java @@ -28,9 +28,9 @@ import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Map; -import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; @@ -187,14 +187,14 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback /* Encode a byte[] identifier as a Base64-encoded string. */ public static String encodeIdentifier(String identifier) { Preconditions.checkNotNull(identifier, "User cannot be null if SASL is enabled"); - return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(Charsets.UTF_8))) - .toString(Charsets.UTF_8); + return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(StandardCharsets.UTF_8))) + .toString(StandardCharsets.UTF_8); } /** Encode a password as a base64-encoded char[] array. */ public static char[] encodePassword(String password) { Preconditions.checkNotNull(password, "Password cannot be null if SASL is enabled"); - return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(Charsets.UTF_8))) - .toString(Charsets.UTF_8).toCharArray(); + return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(StandardCharsets.UTF_8))) + .toString(StandardCharsets.UTF_8).toCharArray(); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java index ccc527306d920..8d83ae0712f0e 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java @@ -21,11 +21,11 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import io.netty.buffer.Unpooled; @@ -68,7 +68,7 @@ public static int nonNegativeHash(Object obj) { * converted back to the same string through {@link #bytesToString(ByteBuffer)}. */ public static ByteBuffer stringToBytes(String s) { - return Unpooled.wrappedBuffer(s.getBytes(Charsets.UTF_8)).nioBuffer(); + return Unpooled.wrappedBuffer(s.getBytes(StandardCharsets.UTF_8)).nioBuffer(); } /** @@ -76,7 +76,7 @@ public static ByteBuffer stringToBytes(String s) { * converted back to the same byte buffer through {@link #stringToBytes(String)}. */ public static String bytesToString(ByteBuffer b) { - return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8); + return Unpooled.wrappedBuffer(b).toString(StandardCharsets.UTF_8); } /* diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index fe933ed650caf..460110d78f15b 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -18,6 +18,7 @@ package org.apache.spark.network.shuffle; import java.io.*; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -27,7 +28,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; import com.google.common.base.Objects; import com.google.common.collect.Maps; import org.fusesource.leveldbjni.JniDBFactory; @@ -152,7 +152,7 @@ public void registerExecutor( try { if (db != null) { byte[] key = dbAppExecKey(fullId); - byte[] value = mapper.writeValueAsString(executorInfo).getBytes(Charsets.UTF_8); + byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8); db.put(key, value); } } catch (Exception e) { @@ -350,7 +350,7 @@ private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException { // we stick a common prefix on all the keys so we can find them in the DB String appExecJson = mapper.writeValueAsString(appExecId); String key = (APP_KEY_PREFIX + ";" + appExecJson); - return key.getBytes(Charsets.UTF_8); + return key.getBytes(StandardCharsets.UTF_8); } private static AppExecId parseDbAppExecKey(String s) throws IOException { @@ -368,10 +368,10 @@ static ConcurrentMap reloadRegisteredExecutors(D ConcurrentMap registeredExecutors = Maps.newConcurrentMap(); if (db != null) { DBIterator itr = db.iterator(); - itr.seek(APP_KEY_PREFIX.getBytes(Charsets.UTF_8)); + itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); while (itr.hasNext()) { Map.Entry e = itr.next(); - String key = new String(e.getKey(), Charsets.UTF_8); + String key = new String(e.getKey(), StandardCharsets.UTF_8); if (!key.startsWith(APP_KEY_PREFIX)) { break; } @@ -418,7 +418,7 @@ private static void storeVersion(DB db) throws IOException { public static class StoreVersion { - static final byte[] KEY = "StoreVersion".getBytes(Charsets.UTF_8); + static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8); public final int major; public final int minor; diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index 60a1b8b0451fe..d9b5f0261aaba 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.io.CharStreams; @@ -34,15 +35,16 @@ import static org.junit.Assert.*; public class ExternalShuffleBlockResolverSuite { - static String sortBlock0 = "Hello!"; - static String sortBlock1 = "World!"; + private static final String sortBlock0 = "Hello!"; + private static final String sortBlock1 = "World!"; - static String hashBlock0 = "Elementary"; - static String hashBlock1 = "Tabular"; + private static final String hashBlock0 = "Elementary"; + private static final String hashBlock1 = "Tabular"; - static TestShuffleDataContext dataContext; + private static TestShuffleDataContext dataContext; - static TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + private static final TransportConf conf = + new TransportConf("shuffle", new SystemPropertyConfigProvider()); @BeforeClass public static void beforeAll() throws IOException { @@ -50,10 +52,12 @@ public static void beforeAll() throws IOException { dataContext.create(); // Write some sort and hash data. - dataContext.insertSortShuffleData(0, 0, - new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } ); - dataContext.insertHashShuffleData(1, 0, - new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } ); + dataContext.insertSortShuffleData(0, 0, new byte[][] { + sortBlock0.getBytes(StandardCharsets.UTF_8), + sortBlock1.getBytes(StandardCharsets.UTF_8)}); + dataContext.insertHashShuffleData(1, 0, new byte[][] { + hashBlock0.getBytes(StandardCharsets.UTF_8), + hashBlock1.getBytes(StandardCharsets.UTF_8)}); } @AfterClass @@ -100,13 +104,15 @@ public void testSortShuffleBlocks() throws IOException { InputStream block0Stream = resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream(); - String block0 = CharStreams.toString(new InputStreamReader(block0Stream)); + String block0 = CharStreams.toString( + new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); block0Stream.close(); assertEquals(sortBlock0, block0); InputStream block1Stream = resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream(); - String block1 = CharStreams.toString(new InputStreamReader(block1Stream)); + String block1 = CharStreams.toString( + new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); block1Stream.close(); assertEquals(sortBlock1, block1); } @@ -119,13 +125,15 @@ public void testHashShuffleBlocks() throws IOException { InputStream block0Stream = resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream(); - String block0 = CharStreams.toString(new InputStreamReader(block0Stream)); + String block0 = CharStreams.toString( + new InputStreamReader(block0Stream, StandardCharsets.UTF_8)); block0Stream.close(); assertEquals(hashBlock0, block0); InputStream block1Stream = resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream(); - String block1 = CharStreams.toString(new InputStreamReader(block1Stream)); + String block1 = CharStreams.toString( + new InputStreamReader(block1Stream, StandardCharsets.UTF_8)); block1Stream.close(); assertEquals(hashBlock1, block1); } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index 532d7ab8d01bd..43d0201405872 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Random; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,8 +35,8 @@ public class ExternalShuffleCleanupSuite { // Same-thread Executor used to ensure cleanup happens synchronously in test thread. - Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor(); - TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor(); + private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); @Test public void noCleanupAndCleanup() throws IOException { @@ -123,27 +124,29 @@ public void cleanupOnlyRemovedApp() throws IOException { assertCleanedUp(dataContext1); } - private void assertStillThere(TestShuffleDataContext dataContext) { + private static void assertStillThere(TestShuffleDataContext dataContext) { for (String localDir : dataContext.localDirs) { assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists()); } } - private void assertCleanedUp(TestShuffleDataContext dataContext) { + private static void assertCleanedUp(TestShuffleDataContext dataContext) { for (String localDir : dataContext.localDirs) { assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists()); } } - private TestShuffleDataContext createSomeData() throws IOException { + private static TestShuffleDataContext createSomeData() throws IOException { Random rand = new Random(123); TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5); dataContext.create(); - dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), - new byte[][] { "ABC".getBytes(), "DEF".getBytes() } ); - dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000, - new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } ); + dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] { + "ABC".getBytes(StandardCharsets.UTF_8), + "DEF".getBytes(StandardCharsets.UTF_8)}); + dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000, new byte[][] { + "GHI".getBytes(StandardCharsets.UTF_8), + "JKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8)}); return dataContext; } } diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 427a8315e02b7..e16166ade4e5d 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -21,6 +21,7 @@ import java.io.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Map; @@ -825,14 +826,7 @@ public UTF8String translate(Map dict) { @Override public String toString() { - try { - return new String(getBytes(), "utf-8"); - } catch (UnsupportedEncodingException e) { - // Turn the exception into unchecked so we can find out about it at runtime, but - // don't need to add lots of boilerplate code everywhere. - throwException(e); - return "unknown"; // we will never reach here. - } + return new String(getBytes(), StandardCharsets.UTF_8); } @Override diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 05d1c31a08f22..8f306770a184f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -19,6 +19,7 @@ package org.apache.spark.api.python import java.io._ import java.net._ +import java.nio.charset.StandardCharsets import java.util.{ArrayList => JArrayList, Collections, List => JList, Map => JMap} import scala.collection.JavaConverters._ @@ -26,7 +27,6 @@ import scala.collection.mutable import scala.language.existentials import scala.util.control.NonFatal -import com.google.common.base.Charsets.UTF_8 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat} @@ -165,7 +165,7 @@ private[spark] class PythonRunner( val exLength = stream.readInt() val obj = new Array[Byte](exLength) stream.readFully(obj) - throw new PythonException(new String(obj, UTF_8), + throw new PythonException(new String(obj, StandardCharsets.UTF_8), writerThread.exception.getOrElse(null)) case SpecialLengths.END_OF_DATA_SECTION => // We've finished the data section of the output, but we can still @@ -624,7 +624,7 @@ private[spark] object PythonRDD extends Logging { } def writeUTF(str: String, dataOut: DataOutputStream) { - val bytes = str.getBytes(UTF_8) + val bytes = str.getBytes(StandardCharsets.UTF_8) dataOut.writeInt(bytes.length) dataOut.write(bytes) } @@ -817,7 +817,7 @@ private[spark] object PythonRDD extends Logging { private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] { - override def call(arr: Array[Byte]) : String = new String(arr, UTF_8) + override def call(arr: Array[Byte]) : String = new String(arr, StandardCharsets.UTF_8) } /** diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index a2a2f89f1e875..433764be89fb7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -19,6 +19,7 @@ package org.apache.spark.api.python import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter} import java.net.{InetAddress, ServerSocket, Socket, SocketException} +import java.nio.charset.StandardCharsets import java.util.Arrays import scala.collection.mutable @@ -121,7 +122,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream) // Tell the worker our port - val out = new OutputStreamWriter(worker.getOutputStream) + val out = new OutputStreamWriter(worker.getOutputStream, StandardCharsets.UTF_8) out.write(serverSocket.getLocalPort + "\n") out.flush() diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala index 9549784aeabf5..34cb7c61d7034 100644 --- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala +++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala @@ -19,10 +19,10 @@ package org.apache.spark.api.python import java.{util => ju} import java.io.{DataInput, DataOutput} +import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ -import com.google.common.base.Charsets.UTF_8 import org.apache.hadoop.io._ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat @@ -134,7 +134,7 @@ object WriteInputFormatTestDataGenerator { sc.parallelize(intKeys).saveAsSequenceFile(intPath) sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath) sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath) - sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(UTF_8)) } + sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes(StandardCharsets.UTF_8)) } ).saveAsSequenceFile(bytesPath) val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false)) sc.parallelize(bools).saveAsSequenceFile(boolPath) diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index af815f885e8ae..c7fb192f26bd0 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -18,6 +18,7 @@ package org.apache.spark.api.r import java.io.{DataInputStream, DataOutputStream} +import java.nio.charset.StandardCharsets import java.sql.{Date, Time, Timestamp} import scala.collection.JavaConverters._ @@ -109,7 +110,7 @@ private[spark] object SerDe { val bytes = new Array[Byte](len) in.readFully(bytes) assert(bytes(len - 1) == 0) - val str = new String(bytes.dropRight(1), "UTF-8") + val str = new String(bytes.dropRight(1), StandardCharsets.UTF_8) str } diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 434aadd2c61ac..305994a3f3543 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy import java.io._ import java.net.URL +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeoutException import scala.collection.mutable.ListBuffer @@ -348,7 +349,8 @@ private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile def readState() { try { - val masterStream = new InputStreamReader(new URL("http://%s:8080/json".format(ip)).openStream) + val masterStream = new InputStreamReader( + new URL("http://%s:8080/json".format(ip)).openStream, StandardCharsets.UTF_8) val json = JsonMethods.parse(masterStream) val workers = json \ "workers" diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 175756b80b6bb..a62096d771724 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy import java.io.{ByteArrayOutputStream, PrintStream} import java.lang.reflect.InvocationTargetException import java.net.URI +import java.nio.charset.StandardCharsets import java.util.{List => JList} import java.util.jar.JarFile @@ -608,7 +609,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S stream.flush() // Get the output and discard any unnecessary lines from it. - Source.fromString(new String(out.toByteArray())).getLines + Source.fromString(new String(out.toByteArray(), StandardCharsets.UTF_8)).getLines .filter { line => !line.startsWith("log4j") && !line.startsWith("usage") } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 006e2e1472580..d3e092a34c172 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.rest import java.io.{DataOutputStream, FileNotFoundException} import java.net.{ConnectException, HttpURLConnection, SocketException, URL} +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeoutException import javax.servlet.http.HttpServletResponse @@ -28,7 +29,6 @@ import scala.concurrent.duration._ import scala.io.Source import com.fasterxml.jackson.core.JsonProcessingException -import com.google.common.base.Charsets import org.apache.spark.{Logging, SPARK_VERSION => sparkVersion, SparkConf} import org.apache.spark.util.Utils @@ -211,7 +211,7 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { try { val out = new DataOutputStream(conn.getOutputStream) Utils.tryWithSafeFinally { - out.write(json.getBytes(Charsets.UTF_8)) + out.write(json.getBytes(StandardCharsets.UTF_8)) } { out.close() } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index 6049db6d989ae..7f4fe26c0d15e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -18,10 +18,10 @@ package org.apache.spark.deploy.worker import java.io._ +import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.hadoop.fs.Path @@ -174,7 +174,7 @@ private[deploy] class DriverRunner( val stderr = new File(baseDir, "stderr") val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"") val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40) - Files.append(header, stderr, UTF_8) + Files.append(header, stderr, StandardCharsets.UTF_8) CommandUtils.redirectStream(process.getErrorStream, stderr) } runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index c6687a4c63a6a..208a1bb68edb9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -18,10 +18,10 @@ package org.apache.spark.deploy.worker import java.io._ +import java.nio.charset.StandardCharsets import scala.collection.JavaConverters._ -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.spark.{Logging, SecurityManager, SparkConf} @@ -168,7 +168,7 @@ private[deploy] class ExecutorRunner( stdoutAppender = FileAppender(process.getInputStream, stdout, conf) val stderr = new File(executorDir, "stderr") - Files.write(header, stderr, UTF_8) + Files.write(header, stderr, StandardCharsets.UTF_8) stderrAppender = FileAppender(process.getErrorStream, stderr, conf) // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 8354e2a6112a2..2d76d08af6cdd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -19,11 +19,11 @@ package org.apache.spark.scheduler import java.io._ import java.net.URI +import java.nio.charset.StandardCharsets import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import com.google.common.base.Charsets import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.hadoop.fs.permission.FsPermission @@ -254,7 +254,7 @@ private[spark] object EventLoggingListener extends Logging { def initEventLog(logStream: OutputStream): Unit = { val metadata = SparkListenerLogStart(SPARK_VERSION) val metadataJson = compact(JsonProtocol.logStartToJson(metadata)) + "\n" - logStream.write(metadataJson.getBytes(Charsets.UTF_8)) + logStream.write(metadataJson.getBytes(StandardCharsets.UTF_8)) } /** diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 3d5b7105f0ca8..1a8e545b4f59e 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -19,6 +19,7 @@ package org.apache.spark.serializer import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import scala.collection.mutable @@ -86,7 +87,7 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) schemaBytes.arrayOffset() + schemaBytes.position(), schemaBytes.remaining()) val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis)) - new Schema.Parser().parse(new String(bytes, "UTF-8")) + new Schema.Parser().parse(new String(bytes, StandardCharsets.UTF_8)) }) /** diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b4c49513711c2..b5a98ce569a8a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer import java.nio.channels.Channels +import java.nio.charset.StandardCharsets import java.nio.file.Files import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ @@ -1904,7 +1905,7 @@ private[spark] object Utils extends Logging { require(file.exists(), s"Properties file $file does not exist") require(file.isFile(), s"Properties file $file is not a normal file") - val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8") + val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8) try { val properties = new Properties() properties.load(inReader) @@ -2344,7 +2345,7 @@ private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.Ou def read(): Int = if (iterator.hasNext) iterator.next() else -1 } - val reader = new BufferedReader(new InputStreamReader(input)) + val reader = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)) val stringBuilder = new StringBuilder var line = reader.readLine() while (line != null) { diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index e6a4ab7550c2a..a7e74c00793c3 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -21,6 +21,7 @@ import java.nio.channels.FileChannel; import java.nio.ByteBuffer; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -45,7 +46,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.base.Throwables; -import com.google.common.base.Charsets; import com.google.common.io.Files; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -1058,7 +1058,7 @@ public void textFiles() throws IOException { rdd.saveAsTextFile(outputDir); // Read the plain text file and check it's OK File outputFile = new File(outputDir, "part-00000"); - String content = Files.toString(outputFile, Charsets.UTF_8); + String content = Files.toString(outputFile, StandardCharsets.UTF_8); Assert.assertEquals("1\n2\n3\n4\n", content); // Also try reading it in as a text file RDD List expected = Arrays.asList("1", "2", "3", "4"); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java index b4fa33f32a3fd..a3502708aadec 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.shuffle.sort; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Random; @@ -41,7 +42,7 @@ public class ShuffleInMemorySorterSuite { private static String getStringFromDataPage(Object baseObject, long baseOffset, int strLength) { final byte[] strBytes = new byte[strLength]; Platform.copyMemory(baseObject, baseOffset, strBytes, Platform.BYTE_ARRAY_OFFSET, strLength); - return new String(strBytes); + return new String(strBytes, StandardCharsets.UTF_8); } @Test diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index b757ddc3b37f9..a79ed58133f1b 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -80,7 +80,6 @@ public int compare( } }; - SparkConf sparkConf; File tempDir; @Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager; @Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager; @@ -99,7 +98,6 @@ public OutputStream apply(OutputStream stream) { @Before public void setUp() { MockitoAnnotations.initMocks(this); - sparkConf = new SparkConf(); tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test"); spillFilesCreated.clear(); taskContext = mock(TaskContext.class); diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index ff41768df1d8f..90849ab0bd8f3 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.util.collection.unsafe.sort; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import org.junit.Assert; @@ -41,7 +42,7 @@ public class UnsafeInMemorySorterSuite { private static String getStringFromDataPage(Object baseObject, long baseOffset, int length) { final byte[] strBytes = new byte[length]; Platform.copyMemory(baseObject, baseOffset, strBytes, Platform.BYTE_ARRAY_OFFSET, length); - return new String(strBytes); + return new String(strBytes, StandardCharsets.UTF_8); } @Test diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 556afd08bbfe5..841fd02ae8bb6 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -18,12 +18,12 @@ package org.apache.spark import java.io.File +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import scala.concurrent.Await import scala.concurrent.duration.Duration -import com.google.common.base.Charsets._ import com.google.common.io.Files import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} import org.apache.hadoop.mapred.TextInputFormat @@ -115,8 +115,8 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { val absolutePath2 = file2.getAbsolutePath try { - Files.write("somewords1", file1, UTF_8) - Files.write("somewords2", file2, UTF_8) + Files.write("somewords1", file1, StandardCharsets.UTF_8) + Files.write("somewords2", file2, StandardCharsets.UTF_8) val length1 = file1.length() val length2 = file2.length() @@ -243,11 +243,12 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { try { // Create 5 text files. - Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, UTF_8) - Files.write("someline1 in file2\nsomeline2 in file2", file2, UTF_8) - Files.write("someline1 in file3", file3, UTF_8) - Files.write("someline1 in file4\nsomeline2 in file4", file4, UTF_8) - Files.write("someline1 in file2\nsomeline2 in file5", file5, UTF_8) + Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", file1, + StandardCharsets.UTF_8) + Files.write("someline1 in file2\nsomeline2 in file2", file2, StandardCharsets.UTF_8) + Files.write("someline1 in file3", file3, StandardCharsets.UTF_8) + Files.write("someline1 in file4\nsomeline2 in file4", file4, StandardCharsets.UTF_8) + Files.write("someline1 in file2\nsomeline2 in file5", file5, StandardCharsets.UTF_8) sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala index 41f2a5c972b6b..05b4e67412f2e 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.api.python import java.io.{ByteArrayOutputStream, DataOutputStream} +import java.nio.charset.StandardCharsets import org.apache.spark.SparkFunSuite @@ -35,10 +36,12 @@ class PythonRDDSuite extends SparkFunSuite { // The correctness will be tested in Python PythonRDD.writeIteratorToStream(Iterator("a", null), buffer) PythonRDD.writeIteratorToStream(Iterator(null, "a"), buffer) - PythonRDD.writeIteratorToStream(Iterator("a".getBytes, null), buffer) - PythonRDD.writeIteratorToStream(Iterator(null, "a".getBytes), buffer) + PythonRDD.writeIteratorToStream(Iterator("a".getBytes(StandardCharsets.UTF_8), null), buffer) + PythonRDD.writeIteratorToStream(Iterator(null, "a".getBytes(StandardCharsets.UTF_8)), buffer) PythonRDD.writeIteratorToStream(Iterator((null, null), ("a", null), (null, "b")), buffer) - PythonRDD.writeIteratorToStream( - Iterator((null, null), ("a".getBytes, null), (null, "b".getBytes)), buffer) + PythonRDD.writeIteratorToStream(Iterator( + (null, null), + ("a".getBytes(StandardCharsets.UTF_8), null), + (null, "b".getBytes(StandardCharsets.UTF_8))), buffer) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 41ac60ece0eda..91fef772d13c3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.deploy import java.io._ +import java.nio.charset.StandardCharsets import scala.collection.mutable.ArrayBuffer -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.ByteStreams import org.scalatest.{BeforeAndAfterEach, Matchers} import org.scalatest.concurrent.Timeouts @@ -593,7 +593,7 @@ class SparkSubmitSuite val tmpDir = Utils.createTempDir() val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf") - val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf)) + val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf), StandardCharsets.UTF_8) for ((key, value) <- defaults) writer.write(s"$key $value\n") writer.close() @@ -661,7 +661,7 @@ object UserClasspathFirstTest { val ccl = Thread.currentThread().getContextClassLoader() val resource = ccl.getResourceAsStream("test.resource") val bytes = ByteStreams.toByteArray(resource) - val contents = new String(bytes, 0, bytes.length, UTF_8) + val contents = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8) if (contents != "USER") { throw new SparkException("Should have read user resource, but instead read: " + contents) } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 8e8007f4ebf4b..5fd599e190c7c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -20,13 +20,13 @@ package org.apache.spark.deploy.history import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream, OutputStreamWriter} import java.net.URI +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.concurrent.duration._ import scala.language.postfixOps -import com.google.common.base.Charsets import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.hdfs.DistributedFileSystem import org.json4s.jackson.JsonMethods._ @@ -320,8 +320,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc var entry = inputStream.getNextEntry entry should not be null while (entry != null) { - val actual = new String(ByteStreams.toByteArray(inputStream), Charsets.UTF_8) - val expected = Files.toString(logs.find(_.getName == entry.getName).get, Charsets.UTF_8) + val actual = new String(ByteStreams.toByteArray(inputStream), StandardCharsets.UTF_8) + val expected = + Files.toString(logs.find(_.getName == entry.getName).get, StandardCharsets.UTF_8) actual should be (expected) totalEntries += 1 entry = inputStream.getNextEntry @@ -415,7 +416,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc if (isNewFormat) { EventLoggingListener.initEventLog(new FileOutputStream(file)) } - val writer = new OutputStreamWriter(bstream, "UTF-8") + val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8) Utils.tryWithSafeFinally { events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n")) } { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index e5cd2eddba1e8..5822261d8da75 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.history import java.io.{File, FileInputStream, FileWriter, InputStream, IOException} import java.net.{HttpURLConnection, URL} +import java.nio.charset.StandardCharsets import java.util.zip.ZipInputStream import javax.servlet.http.{HttpServletRequest, HttpServletResponse} @@ -25,7 +26,6 @@ import scala.concurrent.duration._ import scala.language.postfixOps import com.codahale.metrics.Counter -import com.google.common.base.Charsets import com.google.common.io.{ByteStreams, Files} import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -216,8 +216,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers val expectedFile = { new File(logDir, entry.getName) } - val expected = Files.toString(expectedFile, Charsets.UTF_8) - val actual = new String(ByteStreams.toByteArray(zipStream), Charsets.UTF_8) + val expected = Files.toString(expectedFile, StandardCharsets.UTF_8) + val actual = new String(ByteStreams.toByteArray(zipStream), StandardCharsets.UTF_8) actual should be (expected) filesCompared += 1 } diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index ee889bf144546..a7bb9aa4686eb 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -19,11 +19,11 @@ package org.apache.spark.deploy.rest import java.io.DataOutputStream import java.net.{HttpURLConnection, URL} +import java.nio.charset.StandardCharsets import javax.servlet.http.HttpServletResponse import scala.collection.mutable -import com.google.common.base.Charsets import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ import org.scalatest.BeforeAndAfterEach @@ -498,7 +498,7 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { if (body.nonEmpty) { conn.setDoOutput(true) val out = new DataOutputStream(conn.getOutputStream) - out.write(body.getBytes(Charsets.UTF_8)) + out.write(body.getBytes(StandardCharsets.UTF_8)) out.close() } conn diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index 47dbcb8fc0eaa..02806a16b9467 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.network.netty import java.io.InputStreamReader import java.nio._ -import java.nio.charset.Charset +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import scala.concurrent.{Await, Promise} @@ -103,7 +103,8 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi val blockManager = mock[BlockDataManager] val blockId = ShuffleBlockId(0, 1, 2) val blockString = "Hello, world!" - val blockBuffer = new NioManagedBuffer(ByteBuffer.wrap(blockString.getBytes)) + val blockBuffer = new NioManagedBuffer(ByteBuffer.wrap( + blockString.getBytes(StandardCharsets.UTF_8))) when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer) val securityManager0 = new SecurityManager(conf0) @@ -117,7 +118,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi val result = fetchBlock(exec0, exec1, "1", blockId) match { case Success(buf) => val actualString = CharStreams.toString( - new InputStreamReader(buf.createInputStream(), Charset.forName("UTF-8"))) + new InputStreamReader(buf.createInputStream(), StandardCharsets.UTF_8)) actualString should equal(blockString) buf.release() Success() diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index b367cc8358342..d30eafd2d4218 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -18,12 +18,12 @@ package org.apache.spark.util import java.io._ +import java.nio.charset.StandardCharsets import java.util.concurrent.CountDownLatch import scala.collection.mutable.HashSet import scala.reflect._ -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.log4j.{Appender, Level, Logger} import org.apache.log4j.spi.LoggingEvent @@ -48,11 +48,11 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { test("basic file appender") { val testString = (1 to 1000).mkString(", ") - val inputStream = new ByteArrayInputStream(testString.getBytes(UTF_8)) + val inputStream = new ByteArrayInputStream(testString.getBytes(StandardCharsets.UTF_8)) val appender = new FileAppender(inputStream, testFile) inputStream.close() appender.awaitTermination() - assert(Files.toString(testFile, UTF_8) === testString) + assert(Files.toString(testFile, StandardCharsets.UTF_8) === testString) } test("rolling file appender - time-based rolling") { @@ -100,7 +100,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { val allGeneratedFiles = new HashSet[String]() val items = (1 to 10).map { _.toString * 10000 } for (i <- 0 until items.size) { - testOutputStream.write(items(i).getBytes(UTF_8)) + testOutputStream.write(items(i).getBytes(StandardCharsets.UTF_8)) testOutputStream.flush() allGeneratedFiles ++= RollingFileAppender.getSortedRolledOverFiles( testFile.getParentFile.toString, testFile.getName).map(_.toString) @@ -267,7 +267,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { // send data to appender through the input stream, and wait for the data to be written val expectedText = textToAppend.mkString("") for (i <- 0 until textToAppend.size) { - outputStream.write(textToAppend(i).getBytes(UTF_8)) + outputStream.write(textToAppend(i).getBytes(StandardCharsets.UTF_8)) outputStream.flush() Thread.sleep(sleepTimeBetweenTexts) } @@ -282,7 +282,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { logInfo("Filtered files: \n" + generatedFiles.mkString("\n")) assert(generatedFiles.size > 1) val allText = generatedFiles.map { file => - Files.toString(file, UTF_8) + Files.toString(file, StandardCharsets.UTF_8) }.mkString("") assert(allText === expectedText) generatedFiles diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 412c0ac9d9be3..093d1bd6e5948 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStr import java.lang.{Double => JDouble, Float => JFloat} import java.net.{BindException, ServerSocket, URI} import java.nio.{ByteBuffer, ByteOrder} +import java.nio.charset.StandardCharsets import java.text.DecimalFormatSymbols import java.util.Locale import java.util.concurrent.TimeUnit @@ -28,7 +29,6 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable.ListBuffer import scala.util.Random -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration @@ -268,7 +268,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { val tmpDir2 = Utils.createTempDir() val f1Path = tmpDir2 + "/f1" val f1 = new FileOutputStream(f1Path) - f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(UTF_8)) + f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(StandardCharsets.UTF_8)) f1.close() // Read first few bytes @@ -295,9 +295,9 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { test("reading offset bytes across multiple files") { val tmpDir = Utils.createTempDir() val files = (1 to 3).map(i => new File(tmpDir, i.toString)) - Files.write("0123456789", files(0), UTF_8) - Files.write("abcdefghij", files(1), UTF_8) - Files.write("ABCDEFGHIJ", files(2), UTF_8) + Files.write("0123456789", files(0), StandardCharsets.UTF_8) + Files.write("abcdefghij", files(1), StandardCharsets.UTF_8) + Files.write("ABCDEFGHIJ", files(2), StandardCharsets.UTF_8) // Read first few bytes in the 1st file assert(Utils.offsetBytes(files, 0, 5) === "01234") @@ -529,7 +529,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { try { System.setProperty("spark.test.fileNameLoadB", "2") Files.write("spark.test.fileNameLoadA true\n" + - "spark.test.fileNameLoadB 1\n", outFile, UTF_8) + "spark.test.fileNameLoadB 1\n", outFile, StandardCharsets.UTF_8) val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath) properties .filter { case (k, v) => k.startsWith("spark.")} @@ -559,7 +559,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath) val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir) val targetDir = new File(tempDir, "target-dir") - Files.write("some text", sourceFile, UTF_8) + Files.write("some text", sourceFile, StandardCharsets.UTF_8) val path = if (Utils.isWindows) { @@ -801,7 +801,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { |trap "" SIGTERM |sleep 10 """.stripMargin - Files.write(cmd.getBytes(), file) + Files.write(cmd.getBytes(StandardCharsets.UTF_8), file) file.getAbsoluteFile.setExecutable(true) val process = new ProcessBuilder(file.getAbsolutePath).start() diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 84547748618d1..732c83dc841d9 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -72,7 +72,8 @@ class CustomReceiver(host: String, port: Int) socket = new Socket(host, port) // Until stopped or connection broken continue reading - val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) + val reader = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) @@ -135,7 +136,8 @@ public class JavaCustomReceiver extends Receiver { // connect to the server socket = new Socket(host, port); - BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + BufferedReader reader = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); // Until stopped or connection broken continue reading while (!isStopped() && (userInput = reader.readLine()) != null) { diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java index 5de56340c6d22..4544ad2b42ca7 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java @@ -36,6 +36,7 @@ import java.io.InputStreamReader; import java.net.ConnectException; import java.net.Socket; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Iterator; import java.util.regex.Pattern; @@ -130,7 +131,8 @@ private void receive() { try { // connect to the server socket = new Socket(host, port); - reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); + reader = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); // Until stopped or connection broken continue reading while (!isStopped() && (userInput = reader.readLine()) != null) { System.out.println("Received data '" + userInput + "'"); diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala index 5ce5778e42a3a..d67da270a8178 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala @@ -20,6 +20,7 @@ package org.apache.spark.examples.streaming import java.io.{BufferedReader, InputStreamReader} import java.net.Socket +import java.nio.charset.StandardCharsets import org.apache.spark.{Logging, SparkConf} import org.apache.spark.storage.StorageLevel @@ -83,7 +84,8 @@ class CustomReceiver(host: String, port: Int) logInfo("Connecting to " + host + ":" + port) socket = new Socket(host, port) logInfo("Connected to " + host + ":" + port) - val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")) + val reader = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala index 7f6cecf9cd18d..e8ca1e716394d 100644 --- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala +++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.flume.sink import java.net.InetSocketAddress +import java.nio.charset.StandardCharsets import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicInteger @@ -184,7 +185,8 @@ class SparkSinkSuite extends FunSuite { private def putEvents(ch: MemoryChannel, count: Int): Unit = { val tx = ch.getTransaction tx.begin() - (1 to count).foreach(x => ch.put(EventBuilder.withBody(x.toString.getBytes))) + (1 to count).foreach(x => + ch.put(EventBuilder.withBody(x.toString.getBytes(StandardCharsets.UTF_8)))) tx.commit() tx.close() } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala index 3f87ce46e5952..945cfa7295d1d 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala @@ -19,12 +19,12 @@ package org.apache.spark.streaming.flume import java.net.{InetSocketAddress, ServerSocket} import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.util.{List => JList} import java.util.Collections import scala.collection.JavaConverters._ -import com.google.common.base.Charsets.UTF_8 import org.apache.avro.ipc.NettyTransceiver import org.apache.avro.ipc.specific.SpecificRequestor import org.apache.commons.lang3.RandomUtils @@ -65,7 +65,7 @@ private[flume] class FlumeTestUtils { val inputEvents = input.asScala.map { item => val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8))) + event.setBody(ByteBuffer.wrap(item.getBytes(StandardCharsets.UTF_8))) event.setHeaders(Collections.singletonMap("test", "header")) event } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala index 9515d07c5ee5b..1a96df6e94b95 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala @@ -17,12 +17,12 @@ package org.apache.spark.streaming.flume +import java.nio.charset.StandardCharsets import java.util.{Collections, List => JList, Map => JMap} import java.util.concurrent._ import scala.collection.mutable.ArrayBuffer -import com.google.common.base.Charsets.UTF_8 import org.apache.flume.event.EventBuilder import org.apache.flume.Context import org.apache.flume.channel.MemoryChannel @@ -193,7 +193,8 @@ private[flume] class PollingFlumeTestUtils { val tx = channel.getTransaction tx.begin() for (j <- 0 until eventsPerBatch) { - channel.put(EventBuilder.withBody(s"${channel.getName}-$t".getBytes(UTF_8), + channel.put(EventBuilder.withBody( + s"${channel.getName}-$t".getBytes(StandardCharsets.UTF_8), Collections.singletonMap(s"test-$t", "header"))) t += 1 } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 0cb875c9758f9..72d9053355706 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -19,12 +19,12 @@ package org.apache.spark.streaming.kafka import java.io.OutputStream import java.lang.{Integer => JInt, Long => JLong} +import java.nio.charset.StandardCharsets import java.util.{List => JList, Map => JMap, Set => JSet} import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import com.google.common.base.Charsets.UTF_8 import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder} @@ -787,7 +787,7 @@ private object KafkaUtilsPythonHelper { def pickle(obj: Object, out: OutputStream, pickler: Pickler) { if (obj == this) { out.write(Opcodes.GLOBAL) - out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(UTF_8)) + out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(StandardCharsets.UTF_8)) } else { pickler.save(this) val msgAndMetaData = obj.asInstanceOf[PythonMessageAndMetadata] diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 0ace453ee9280..026387ed65d50 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.kinesis import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -242,7 +243,7 @@ private[kinesis] class SimpleDataGenerator( val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]() data.foreach { num => val str = num.toString - val data = ByteBuffer.wrap(str.getBytes()) + val data = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8)) val putRecordRequest = new PutRecordRequest().withStreamName(streamName) .withData(data) .withPartitionKey(str) diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala index fdb270eaad8c9..0b455e574e6fa 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.kinesis import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -51,7 +52,7 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG val shardIdToSeqNumbers = new mutable.HashMap[String, ArrayBuffer[(Int, String)]]() data.foreach { num => val str = num.toString - val data = ByteBuffer.wrap(str.getBytes()) + val data = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8)) val future = producer.addUserRecord(streamName, str, data) val kinesisCallBack = new FutureCallback[UserRecordResult]() { override def onFailure(t: Throwable): Unit = {} // do nothing diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 079bd8a9a87ea..cbad6f7fe44dc 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.mqtt +import java.nio.charset.StandardCharsets + import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken import org.eclipse.paho.client.mqttv3.MqttCallback import org.eclipse.paho.client.mqttv3.MqttClient @@ -75,7 +77,7 @@ class MQTTReceiver( // Handles Mqtt message override def messageArrived(topic: String, message: MqttMessage) { - store(new String(message.getPayload(), "utf-8")) + store(new String(message.getPayload(), StandardCharsets.UTF_8)) } override def deliveryComplete(token: IMqttDeliveryToken) { diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala index 26c6dc45d5115..3680c136059a5 100644 --- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala +++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTTestUtils.scala @@ -18,10 +18,10 @@ package org.apache.spark.streaming.mqtt import java.net.{ServerSocket, URI} +import java.nio.charset.StandardCharsets import scala.language.postfixOps -import com.google.common.base.Charsets.UTF_8 import org.apache.activemq.broker.{BrokerService, TransportConnector} import org.apache.commons.lang3.RandomUtils import org.eclipse.paho.client.mqttv3._ @@ -85,7 +85,7 @@ private[mqtt] class MQTTTestUtils extends Logging { client.connect() if (client.isConnected) { val msgTopic = client.getTopic(topic) - val message = new MqttMessage(data.getBytes(UTF_8)) + val message = new MqttMessage(data.getBytes(StandardCharsets.UTF_8)) message.setQos(1) message.setRetained(true) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala index bff9f328d4907..e55b05fa996ad 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphLoaderSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.graphx import java.io.File import java.io.FileOutputStream import java.io.OutputStreamWriter +import java.nio.charset.StandardCharsets import org.apache.spark.SparkFunSuite import org.apache.spark.util.Utils @@ -30,7 +31,7 @@ class GraphLoaderSuite extends SparkFunSuite with LocalSparkContext { withSpark { sc => val tmpDir = Utils.createTempDir() val graphFile = new File(tmpDir.getAbsolutePath, "graph.txt") - val writer = new OutputStreamWriter(new FileOutputStream(graphFile)) + val writer = new OutputStreamWriter(new FileOutputStream(graphFile), StandardCharsets.UTF_8) for (i <- (1 until 101)) writer.write(s"$i 0\n") writer.close() try { diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 46410327a5d72..20387e0f1068d 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -23,6 +23,7 @@ import java.io.FileInputStream; import java.io.InputStreamReader; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -102,7 +103,7 @@ List buildJavaCommand(String extraClassPath) throws IOException { File javaOpts = new File(join(File.separator, getConfDir(), "java-opts")); if (javaOpts.isFile()) { BufferedReader br = new BufferedReader(new InputStreamReader( - new FileInputStream(javaOpts), "UTF-8")); + new FileInputStream(javaOpts), StandardCharsets.UTF_8)); try { String line; while ((line = br.readLine()) != null) { @@ -301,7 +302,7 @@ private Properties loadPropertiesFile() throws IOException { FileInputStream fd = null; try { fd = new FileInputStream(propsFile); - props.load(new InputStreamReader(fd, "UTF-8")); + props.load(new InputStreamReader(fd, StandardCharsets.UTF_8)); for (Map.Entry e : props.entrySet()) { e.setValue(e.getValue().toString().trim()); } diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java index 6e7120167d605..c7959aee9f888 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java +++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java @@ -21,6 +21,7 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.concurrent.ThreadFactory; import java.util.logging.Level; import java.util.logging.Logger; @@ -42,7 +43,7 @@ class OutputRedirector { OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) { this.active = true; - this.reader = new BufferedReader(new InputStreamReader(in)); + this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); this.thread = tf.newThread(new Runnable() { @Override public void run() { diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java index d36731840b1a1..a85afb58b9c98 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java @@ -199,11 +199,7 @@ private void testCmdBuilder(boolean isDriver, boolean useDefaultPropertyFile) th for (String arg : cmd) { if (arg.startsWith("-XX:MaxPermSize=")) { - if (isDriver) { - assertEquals("-XX:MaxPermSize=256m", arg); - } else { - assertEquals("-XX:MaxPermSize=256m", arg); - } + assertEquals("-XX:MaxPermSize=256m", arg); } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 132dc174a894e..53935f328ab8a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -19,6 +19,7 @@ package org.apache.spark.mllib.api.python import java.io.OutputStream import java.nio.{ByteBuffer, ByteOrder} +import java.nio.charset.StandardCharsets import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} import scala.collection.JavaConverters._ @@ -1226,7 +1227,7 @@ private[spark] object SerDe extends Serializable { def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = { if (obj == this) { out.write(Opcodes.GLOBAL) - out.write((module + "\n" + name + "\n").getBytes) + out.write((module + "\n" + name + "\n").getBytes(StandardCharsets.UTF_8)) } else { pickler.save(this) // it will be memorized by Pickler saveState(obj, out, pickler) diff --git a/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java b/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java index b8ddf907d05ad..1c18b2b266fef 100644 --- a/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/source/libsvm/JavaLibSVMRelationSuite.java @@ -19,8 +19,8 @@ import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; -import com.google.common.base.Charsets; import com.google.common.io.Files; import org.junit.After; @@ -55,7 +55,7 @@ public void setUp() throws IOException { tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource"); File file = new File(tempDir, "part-00000"); String s = "1 1:1.0 3:2.0 5:3.0\n0\n0 2:4.0 4:5.0 6:6.0"; - Files.write(s, file, Charsets.US_ASCII); + Files.write(s, file, StandardCharsets.UTF_8); path = tempDir.toURI().toString(); } diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala index 84fc08be09ee7..71f4926290b20 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/source/libsvm/LibSVMRelationSuite.scala @@ -17,9 +17,9 @@ package org.apache.spark.ml.source.libsvm -import java.io.{File, IOException} +import java.io.File +import java.nio.charset.StandardCharsets -import com.google.common.base.Charsets import com.google.common.io.Files import org.apache.spark.{SparkException, SparkFunSuite} @@ -42,7 +42,7 @@ class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext { """.stripMargin tempDir = Utils.createTempDir() val file = new File(tempDir, "part-00000") - Files.write(lines, file, Charsets.US_ASCII) + Files.write(lines, file, StandardCharsets.UTF_8) path = tempDir.toURI.toString } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 70219e9ad9d3e..e542f21a1802c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -18,11 +18,11 @@ package org.apache.spark.mllib.util import java.io.File +import java.nio.charset.StandardCharsets import scala.io.Source import breeze.linalg.{squaredDistance => breezeSquaredDistance} -import com.google.common.base.Charsets import com.google.common.io.Files import org.apache.spark.SparkException @@ -84,7 +84,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { """.stripMargin val tempDir = Utils.createTempDir() val file = new File(tempDir.getPath, "part-00000") - Files.write(lines, file, Charsets.US_ASCII) + Files.write(lines, file, StandardCharsets.UTF_8) val path = tempDir.toURI.toString val pointsWithNumFeatures = loadLibSVMFile(sc, path, 6).collect() @@ -117,7 +117,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { """.stripMargin val tempDir = Utils.createTempDir() val file = new File(tempDir.getPath, "part-00000") - Files.write(lines, file, Charsets.US_ASCII) + Files.write(lines, file, StandardCharsets.UTF_8) val path = tempDir.toURI.toString intercept[SparkException] { @@ -134,7 +134,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { """.stripMargin val tempDir = Utils.createTempDir() val file = new File(tempDir.getPath, "part-00000") - Files.write(lines, file, Charsets.US_ASCII) + Files.write(lines, file, StandardCharsets.UTF_8) val path = tempDir.toURI.toString intercept[SparkException] { diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/parser/ParseUtils.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/parser/ParseUtils.java index 2520c7bb8dae4..01f89112a759b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/parser/ParseUtils.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/parser/ParseUtils.java @@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.parser; +import java.nio.charset.StandardCharsets; + /** * A couple of utility methods that help with parsing ASTs. * @@ -76,7 +78,7 @@ public static String unescapeSQLString(String b) { byte bVal = (byte) ((i3 - '0') + ((i2 - '0') * 8) + ((i1 - '0') * 8 * 8)); byte[] bValArr = new byte[1]; bValArr[0] = bVal; - String tmp = new String(bValArr); + String tmp = new String(bValArr, StandardCharsets.UTF_8); sb.append(tmp); i += 3; continue; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index a76517a89cc4a..e6804d096cd96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.json4s.JsonAST._ @@ -109,7 +110,7 @@ object Literal { case DateType => create(0, DateType) case TimestampType => create(0L, TimestampType) case StringType => Literal("") - case BinaryType => Literal("".getBytes) + case BinaryType => Literal("".getBytes(StandardCharsets.UTF_8)) case CalendarIntervalType => Literal(new CalendarInterval(0, 0)) case arr: ArrayType => create(Array(), arr) case map: MapType => create(Map(), map) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index d9a9b6151a0be..b11365b297184 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst import java.io._ +import java.nio.charset.StandardCharsets import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{NumericType, StringType} @@ -118,7 +119,7 @@ package object util { val writer = new PrintWriter(out) t.printStackTrace(writer) writer.flush() - new String(out.toByteArray) + new String(out.toByteArray, StandardCharsets.UTF_8) } def stringOrNull(a: AnyRef): String = if (a == null) null else a.toString diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala index 124172bd66f19..450222d8cbba3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import java.nio.charset.StandardCharsets + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -54,7 +56,7 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Literal.default(FloatType), 0.0f) checkEvaluation(Literal.default(DoubleType), 0.0) checkEvaluation(Literal.default(StringType), "") - checkEvaluation(Literal.default(BinaryType), "".getBytes) + checkEvaluation(Literal.default(BinaryType), "".getBytes(StandardCharsets.UTF_8)) checkEvaluation(Literal.default(DecimalType.USER_DEFAULT), Decimal(0)) checkEvaluation(Literal.default(DecimalType.SYSTEM_DEFAULT), Decimal(0)) checkEvaluation(Literal.default(DateType), DateTimeUtils.toJavaDate(0)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala index 4ad65db0977c7..fba5f53715039 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import java.nio.charset.StandardCharsets + import com.google.common.math.LongMath import org.apache.spark.SparkFunSuite @@ -440,7 +442,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Hex(Literal(100800200404L)), "177828FED4") checkEvaluation(Hex(Literal(-100800200404L)), "FFFFFFE887D7012C") checkEvaluation(Hex(Literal.create(null, BinaryType)), null) - checkEvaluation(Hex(Literal("helloHex".getBytes())), "68656C6C6F486578") + checkEvaluation(Hex(Literal("helloHex".getBytes(StandardCharsets.UTF_8))), "68656C6C6F486578") // scalastyle:off // Turn off scala style for non-ascii chars checkEvaluation(Hex(Literal("δΈ‰ι‡ηš„".getBytes("UTF8"))), "E4B889E9878DE79A84") @@ -452,7 +454,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("unhex") { checkEvaluation(Unhex(Literal.create(null, StringType)), null) - checkEvaluation(Unhex(Literal("737472696E67")), "string".getBytes) + checkEvaluation(Unhex(Literal("737472696E67")), "string".getBytes(StandardCharsets.UTF_8)) checkEvaluation(Unhex(Literal("")), new Array[Byte](0)) checkEvaluation(Unhex(Literal("F")), Array[Byte](15)) checkEvaluation(Unhex(Literal("ff")), Array[Byte](-1)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala index 75131a6170222..60d50baf511d9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscFunctionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import java.nio.charset.StandardCharsets + import org.apache.commons.codec.digest.DigestUtils import org.apache.spark.SparkFunSuite @@ -27,7 +29,8 @@ import org.apache.spark.sql.types._ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { test("md5") { - checkEvaluation(Md5(Literal("ABC".getBytes)), "902fbdd2b1df0c4f70b4a5d23525e932") + checkEvaluation(Md5(Literal("ABC".getBytes(StandardCharsets.UTF_8))), + "902fbdd2b1df0c4f70b4a5d23525e932") checkEvaluation(Md5(Literal.create(Array[Byte](1, 2, 3, 4, 5, 6), BinaryType)), "6ac1e56bc78f031059be7be854522c4c") checkEvaluation(Md5(Literal.create(null, BinaryType)), null) @@ -35,27 +38,31 @@ class MiscFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("sha1") { - checkEvaluation(Sha1(Literal("ABC".getBytes)), "3c01bdbb26f358bab27f267924aa2c9a03fcfdb8") + checkEvaluation(Sha1(Literal("ABC".getBytes(StandardCharsets.UTF_8))), + "3c01bdbb26f358bab27f267924aa2c9a03fcfdb8") checkEvaluation(Sha1(Literal.create(Array[Byte](1, 2, 3, 4, 5, 6), BinaryType)), "5d211bad8f4ee70e16c7d343a838fc344a1ed961") checkEvaluation(Sha1(Literal.create(null, BinaryType)), null) - checkEvaluation(Sha1(Literal("".getBytes)), "da39a3ee5e6b4b0d3255bfef95601890afd80709") + checkEvaluation(Sha1(Literal("".getBytes(StandardCharsets.UTF_8))), + "da39a3ee5e6b4b0d3255bfef95601890afd80709") checkConsistencyBetweenInterpretedAndCodegen(Sha1, BinaryType) } test("sha2") { - checkEvaluation(Sha2(Literal("ABC".getBytes), Literal(256)), DigestUtils.sha256Hex("ABC")) + checkEvaluation(Sha2(Literal("ABC".getBytes(StandardCharsets.UTF_8)), Literal(256)), + DigestUtils.sha256Hex("ABC")) checkEvaluation(Sha2(Literal.create(Array[Byte](1, 2, 3, 4, 5, 6), BinaryType), Literal(384)), DigestUtils.sha384Hex(Array[Byte](1, 2, 3, 4, 5, 6))) // unsupported bit length checkEvaluation(Sha2(Literal.create(null, BinaryType), Literal(1024)), null) checkEvaluation(Sha2(Literal.create(null, BinaryType), Literal(512)), null) - checkEvaluation(Sha2(Literal("ABC".getBytes), Literal.create(null, IntegerType)), null) + checkEvaluation(Sha2(Literal("ABC".getBytes(StandardCharsets.UTF_8)), + Literal.create(null, IntegerType)), null) checkEvaluation(Sha2(Literal.create(null, BinaryType), Literal.create(null, IntegerType)), null) } test("crc32") { - checkEvaluation(Crc32(Literal("ABC".getBytes)), 2743272264L) + checkEvaluation(Crc32(Literal("ABC".getBytes(StandardCharsets.UTF_8))), 2743272264L) checkEvaluation(Crc32(Literal.create(Array[Byte](1, 2, 3, 4, 5, 6), BinaryType)), 2180413220L) checkEvaluation(Crc32(Literal.create(null, BinaryType)), null) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala index 68545f33e5465..1265908182b3a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.scalatest.Matchers @@ -77,16 +78,16 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers { val row = new SpecificMutableRow(fieldTypes) row.setLong(0, 0) row.update(1, UTF8String.fromString("Hello")) - row.update(2, "World".getBytes) + row.update(2, "World".getBytes(StandardCharsets.UTF_8)) val unsafeRow: UnsafeRow = converter.apply(row) assert(unsafeRow.getSizeInBytes === 8 + (8 * 3) + - roundedSize("Hello".getBytes.length) + - roundedSize("World".getBytes.length)) + roundedSize("Hello".getBytes(StandardCharsets.UTF_8).length) + + roundedSize("World".getBytes(StandardCharsets.UTF_8).length)) assert(unsafeRow.getLong(0) === 0) assert(unsafeRow.getString(1) === "Hello") - assert(unsafeRow.getBinary(2) === "World".getBytes) + assert(unsafeRow.getBinary(2) === "World".getBytes(StandardCharsets.UTF_8)) } test("basic conversion with primitive, string, date and timestamp types") { @@ -100,7 +101,8 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers { row.update(3, DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25"))) val unsafeRow: UnsafeRow = converter.apply(row) - assert(unsafeRow.getSizeInBytes === 8 + (8 * 4) + roundedSize("Hello".getBytes.length)) + assert(unsafeRow.getSizeInBytes === + 8 + (8 * 4) + roundedSize("Hello".getBytes(StandardCharsets.UTF_8).length)) assert(unsafeRow.getLong(0) === 0) assert(unsafeRow.getString(1) === "Hello") @@ -175,7 +177,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers { r.setFloat(6, 600) r.setDouble(7, 700) r.update(8, UTF8String.fromString("hello")) - r.update(9, "world".getBytes) + r.update(9, "world".getBytes(StandardCharsets.UTF_8)) r.setDecimal(10, Decimal(10), 10) r.setDecimal(11, Decimal(10.00, 38, 18), 38) // r.update(11, Array(11)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala index 1522ee34e43a5..e2a8eb8ee1d34 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import java.nio.charset.StandardCharsets + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -107,7 +109,8 @@ class GeneratedProjectionSuite extends SparkFunSuite { val fields = Array[DataType](StringType, struct) val unsafeProj = UnsafeProjection.create(fields) - val innerRow = InternalRow(false, 1.toByte, 2.toShort, 3, 4.0f, "".getBytes, + val innerRow = InternalRow(false, 1.toByte, 2.toShort, 3, 4.0f, + "".getBytes(StandardCharsets.UTF_8), UTF8String.fromString("")) val row1 = InternalRow(UTF8String.fromString(""), innerRow) val unsafe1 = unsafeProj(row1).copy() diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index 68f146f7a2622..b084eda6f84c1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -18,6 +18,7 @@ import java.math.BigDecimal; import java.math.BigInteger; +import java.nio.charset.StandardCharsets; import java.sql.Date; import java.util.Iterator; import java.util.List; @@ -138,7 +139,7 @@ private static void appendValue(ColumnVector dst, DataType t, Object o) { } else if (t == DataTypes.DoubleType) { dst.appendDouble(((Double)o).doubleValue()); } else if (t == DataTypes.StringType) { - byte[] b =((String)o).getBytes(); + byte[] b =((String)o).getBytes(StandardCharsets.UTF_8); dst.appendByteArray(b, 0, b.length); } else if (t instanceof DecimalType) { DecimalType dt = (DecimalType)t; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index 9a8aedfa56b8c..09c001baaeafd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -18,6 +18,7 @@ import java.util.Arrays; import java.util.Iterator; +import java.util.NoSuchElementException; import org.apache.commons.lang.NotImplementedException; @@ -254,6 +255,9 @@ public Row next() { while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) { ++rowId; } + if (rowId >= maxRows) { + throw new NoSuchElementException(); + } row.rowId = rowId++; return row; } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 38aa2dd80a4f1..6a0290c11228f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.csv -import java.nio.charset.Charset +import java.nio.charset.StandardCharsets import org.apache.spark.Logging import org.apache.spark.sql.execution.datasources.CompressionCodecs @@ -64,7 +64,7 @@ private[sql] class CSVOptions( parameters.getOrElse("sep", parameters.getOrElse("delimiter", ","))) val parseMode = parameters.getOrElse("mode", "PERMISSIVE") val charset = parameters.getOrElse("encoding", - parameters.getOrElse("charset", Charset.forName("UTF-8").name())) + parameters.getOrElse("charset", StandardCharsets.UTF_8.name())) val quote = getChar("quote", '\"') val escape = getChar("escape", '\\') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala index 8f1421844c648..8c3f63d307321 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.{ByteArrayOutputStream, OutputStreamWriter, StringReader} +import java.nio.charset.StandardCharsets import com.univocity.parsers.csv.{CsvParser, CsvParserSettings, CsvWriter, CsvWriterSettings} @@ -76,7 +77,7 @@ private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) exten def writeRow(row: Seq[String], includeHeader: Boolean): String = { val buffer = new ByteArrayOutputStream() - val outputWriter = new OutputStreamWriter(buffer) + val outputWriter = new OutputStreamWriter(buffer, StandardCharsets.UTF_8) val writer = new CsvWriter(outputWriter, writerSettings) if (includeHeader) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index aff672281d640..42c07c8a23f5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.csv -import java.nio.charset.Charset +import java.nio.charset.{Charset, StandardCharsets} import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.io.{LongWritable, Text} @@ -161,7 +161,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { sqlContext: SQLContext, options: CSVOptions, location: String): RDD[String] = { - if (Charset.forName(options.charset) == Charset.forName("UTF-8")) { + if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { sqlContext.sparkContext.textFile(location) } else { val charset = options.charset diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 14ba9f69bb1d7..cce4b74ff2994 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.execution.streaming import java.io._ +import java.nio.charset.StandardCharsets import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.io.Codec -import com.google.common.base.Charsets.UTF_8 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.spark.Logging @@ -184,7 +184,7 @@ class FileStreamSource( private def writeBatch(id: Int, files: Seq[String]): Unit = { assert(files.nonEmpty, "create a new batch without any file") val output = fs.create(new Path(metadataPath + "/" + id), true) - val writer = new PrintWriter(new OutputStreamWriter(output, UTF_8)) + val writer = new PrintWriter(new OutputStreamWriter(output, StandardCharsets.UTF_8)) try { // scalastyle:off println writer.println(FileStreamSource.VERSION) diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java index 0f9e453d26db8..9e65158eb0a33 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java @@ -40,7 +40,6 @@ public class JavaSaveLoadSuite { private transient JavaSparkContext sc; private transient SQLContext sqlContext; - String originalDefaultSource; File path; Dataset df; @@ -57,7 +56,6 @@ public void setUp() throws IOException { sqlContext = new SQLContext(_sc); sc = new JavaSparkContext(_sc); - originalDefaultSource = sqlContext.conf().defaultDataSourceName(); path = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile(); if (path.exists()) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index aff9efe4b2b16..2aa6f8d4acf7d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.nio.charset.StandardCharsets + import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -167,12 +169,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { } test("misc sha1 function") { - val df = Seq(("ABC", "ABC".getBytes)).toDF("a", "b") + val df = Seq(("ABC", "ABC".getBytes(StandardCharsets.UTF_8))).toDF("a", "b") checkAnswer( df.select(sha1($"a"), sha1($"b")), Row("3c01bdbb26f358bab27f267924aa2c9a03fcfdb8", "3c01bdbb26f358bab27f267924aa2c9a03fcfdb8")) - val dfEmpty = Seq(("", "".getBytes)).toDF("a", "b") + val dfEmpty = Seq(("", "".getBytes(StandardCharsets.UTF_8))).toDF("a", "b") checkAnswer( dfEmpty.selectExpr("sha1(a)", "sha1(b)"), Row("da39a3ee5e6b4b0d3255bfef95601890afd80709", "da39a3ee5e6b4b0d3255bfef95601890afd80709")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index e6e27ec413bb2..2333fa27ca623 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import java.io.File +import java.nio.charset.StandardCharsets import scala.language.postfixOps import scala.util.Random @@ -665,8 +666,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("showString: binary") { val df = Seq( - ("12".getBytes, "ABC.".getBytes), - ("34".getBytes, "12346".getBytes) + ("12".getBytes(StandardCharsets.UTF_8), "ABC.".getBytes(StandardCharsets.UTF_8)), + ("34".getBytes(StandardCharsets.UTF_8), "12346".getBytes(StandardCharsets.UTF_8)) ).toDF() val expectedAnswer = """+-------+----------------+ || _1| _2| diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala index 013a90875e2b2..f5a67fd782d63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/MathExpressionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import java.nio.charset.StandardCharsets + import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions.{log => logarithm} import org.apache.spark.sql.test.SharedSQLContext @@ -262,9 +264,9 @@ class MathExpressionsSuite extends QueryTest with SharedSQLContext { test("unhex") { val data = Seq(("1C", "737472696E67")).toDF("a", "b") checkAnswer(data.select(unhex('a)), Row(Array[Byte](28.toByte))) - checkAnswer(data.select(unhex('b)), Row("string".getBytes)) + checkAnswer(data.select(unhex('b)), Row("string".getBytes(StandardCharsets.UTF_8))) checkAnswer(data.selectExpr("unhex(a)"), Row(Array[Byte](28.toByte))) - checkAnswer(data.selectExpr("unhex(b)"), Row("string".getBytes)) + checkAnswer(data.selectExpr("unhex(b)"), Row("string".getBytes(StandardCharsets.UTF_8))) checkAnswer(data.selectExpr("""unhex("##")"""), Row(null)) checkAnswer(data.selectExpr("""unhex("G123")"""), Row(null)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala index 0000a5d1efd09..1aadd700d7443 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.columnar.compression import java.nio.{ByteBuffer, ByteOrder} +import java.nio.charset.StandardCharsets import org.apache.commons.lang3.RandomStringUtils import org.apache.commons.math3.distribution.LogNormalDistribution @@ -313,7 +314,7 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes { } for (i <- 0 until count) { testData.putInt(strLen) - testData.put(g().getBytes) + testData.put(g().getBytes(StandardCharsets.UTF_8)) } testData.rewind() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala index 97638a66ab473..67b3d98c1daed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import scala.util.Random @@ -357,7 +358,8 @@ object ColumnarBatchBenchmark { val maxString = 32 val count = 4 * 1000 - val data = Seq.fill(count)(randomString(minString, maxString)).map(_.getBytes).toArray + val data = Seq.fill(count)(randomString(minString, maxString)) + .map(_.getBytes(StandardCharsets.UTF_8)).toArray def column(memoryMode: MemoryMode) = { i: Int => val column = ColumnVector.allocate(count, BinaryType, memoryMode) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index b3c3e66fbcbd5..ed97f59ea1690 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.vectorized +import java.nio.charset.StandardCharsets + import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random @@ -329,18 +331,21 @@ class ColumnarBatchSuite extends SparkFunSuite { var idx = 0 val values = ("Hello" :: "abc" :: Nil).toArray - column.putByteArray(idx, values(0).getBytes, 0, values(0).getBytes().length) + column.putByteArray(idx, values(0).getBytes(StandardCharsets.UTF_8), + 0, values(0).getBytes(StandardCharsets.UTF_8).length) reference += values(0) idx += 1 assert(column.arrayData().elementsAppended == 5) - column.putByteArray(idx, values(1).getBytes, 0, values(1).getBytes().length) + column.putByteArray(idx, values(1).getBytes(StandardCharsets.UTF_8), + 0, values(1).getBytes(StandardCharsets.UTF_8).length) reference += values(1) idx += 1 assert(column.arrayData().elementsAppended == 8) // Just put llo - val offset = column.putByteArray(idx, values(0).getBytes, 2, values(0).getBytes().length - 2) + val offset = column.putByteArray(idx, values(0).getBytes(StandardCharsets.UTF_8), + 2, values(0).getBytes(StandardCharsets.UTF_8).length - 2) reference += "llo" idx += 1 assert(column.arrayData().elementsAppended == 11) @@ -353,7 +358,7 @@ class ColumnarBatchSuite extends SparkFunSuite { // Put a long string val s = "abcdefghijklmnopqrstuvwxyz" - column.putByteArray(idx, (s + s).getBytes) + column.putByteArray(idx, (s + s).getBytes(StandardCharsets.UTF_8)) reference += (s + s) idx += 1 assert(column.arrayData().elementsAppended == 11 + (s + s).length) @@ -473,7 +478,7 @@ class ColumnarBatchSuite extends SparkFunSuite { batch.column(0).putInt(0, 1) batch.column(1).putDouble(0, 1.1) batch.column(2).putNull(0) - batch.column(3).putByteArray(0, "Hello".getBytes) + batch.column(3).putByteArray(0, "Hello".getBytes(StandardCharsets.UTF_8)) batch.setNumRows(1) // Verify the results of the row. @@ -519,17 +524,17 @@ class ColumnarBatchSuite extends SparkFunSuite { batch.column(0).putNull(0) batch.column(1).putDouble(0, 2.2) batch.column(2).putInt(0, 2) - batch.column(3).putByteArray(0, "abc".getBytes) + batch.column(3).putByteArray(0, "abc".getBytes(StandardCharsets.UTF_8)) batch.column(0).putInt(1, 3) batch.column(1).putNull(1) batch.column(2).putInt(1, 3) - batch.column(3).putByteArray(1, "".getBytes) + batch.column(3).putByteArray(1, "".getBytes(StandardCharsets.UTF_8)) batch.column(0).putInt(2, 4) batch.column(1).putDouble(2, 4.4) batch.column(2).putInt(2, 4) - batch.column(3).putByteArray(2, "world".getBytes) + batch.column(3).putByteArray(2, "world".getBytes(StandardCharsets.UTF_8)) batch.setNumRows(3) def rowEquals(x: InternalRow, y: Row): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index e9d77abb8c23c..e6889bcc783b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.streaming import java.io.{ByteArrayInputStream, File, FileNotFoundException, InputStream} - -import com.google.common.base.Charsets.UTF_8 +import java.nio.charset.StandardCharsets import org.apache.spark.sql.{AnalysisException, StreamTest} import org.apache.spark.sql.catalyst.util._ @@ -392,7 +391,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { } test("readBatch") { - def stringToStream(str: String): InputStream = new ByteArrayInputStream(str.getBytes(UTF_8)) + def stringToStream(str: String): InputStream = + new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)) // Invalid metadata assert(readBatch(stringToStream("")) === Nil) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala index 83c63e04f344a..7fa6760b71c8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestData.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.test +import java.nio.charset.StandardCharsets + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext, SQLImplicits} @@ -103,11 +105,11 @@ private[sql] trait SQLTestData { self => protected lazy val binaryData: DataFrame = { val df = sqlContext.sparkContext.parallelize( - BinaryData("12".getBytes, 1) :: - BinaryData("22".getBytes, 5) :: - BinaryData("122".getBytes, 3) :: - BinaryData("121".getBytes, 2) :: - BinaryData("123".getBytes, 4) :: Nil).toDF() + BinaryData("12".getBytes(StandardCharsets.UTF_8), 1) :: + BinaryData("22".getBytes(StandardCharsets.UTF_8), 5) :: + BinaryData("122".getBytes(StandardCharsets.UTF_8), 3) :: + BinaryData("121".getBytes(StandardCharsets.UTF_8), 2) :: + BinaryData("123".getBytes(StandardCharsets.UTF_8), 4) :: Nil).toDF() df.registerTempTable("binaryData") df } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 81508e134695a..54fffb971dbc3 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.thriftserver import java.io._ +import java.nio.charset.StandardCharsets import java.sql.Timestamp import java.util.Date @@ -121,7 +122,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { val process = new ProcessBuilder(command: _*).start() - val stdinWriter = new OutputStreamWriter(process.getOutputStream) + val stdinWriter = new OutputStreamWriter(process.getOutputStream, StandardCharsets.UTF_8) stdinWriter.write(queriesString) stdinWriter.flush() stdinWriter.close() diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index c05527b519daa..e89bb1c470d5a 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File import java.net.URL +import java.nio.charset.StandardCharsets import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable @@ -28,7 +29,6 @@ import scala.concurrent.duration._ import scala.io.Source import scala.util.{Random, Try} -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.jdbc.HiveDriver @@ -700,7 +700,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n """.stripMargin, new File(s"$tempLog4jConf/log4j.properties"), - UTF_8) + StandardCharsets.UTF_8) tempLog4jConf } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 081d849a88886..9725dcfde169c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive import java.io.File import java.net.{URL, URLClassLoader} +import java.nio.charset.StandardCharsets import java.sql.Timestamp import java.util.concurrent.TimeUnit import java.util.regex.Pattern @@ -715,7 +716,7 @@ private[hive] object HiveContext { case (null, _) => "NULL" case (d: Int, DateType) => new DateWritable(d).toString case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString - case (bin: Array[Byte], BinaryType) => new String(bin, "UTF-8") + case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8) case (decimal: java.math.BigDecimal, DecimalType()) => // Hive strips trailing zeros so use its toString HiveDecimal.create(decimal).toString diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 5e6641693798f..b6e2f1f6b3ab7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import java.io._ +import java.nio.charset.StandardCharsets import java.util.Properties import javax.annotation.Nullable @@ -113,7 +114,7 @@ case class ScriptTransformation( ioschema.initOutputSerDe(output).getOrElse((null, null)) } - val reader = new BufferedReader(new InputStreamReader(inputStream)) + val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) val outputIterator: Iterator[InternalRow] = new Iterator[InternalRow] with HiveInspectors { var curLine: String = null val scriptOutputStream = new DataInputStream(inputStream) diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index 5a539eaec7507..e9356541c22df 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -48,7 +48,6 @@ public class JavaMetastoreDataSourcesSuite { private transient JavaSparkContext sc; private transient HiveContext sqlContext; - String originalDefaultSource; File path; Path hiveManagedPath; FileSystem fs; @@ -66,7 +65,6 @@ public void setUp() throws IOException { sqlContext = TestHive$.MODULE$; sc = new JavaSparkContext(sqlContext.sparkContext()); - originalDefaultSource = sqlContext.conf().defaultDataSourceName(); path = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile(); if (path.exists()) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 9ca07e96eb088..8cfb32f00a884 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.orc import java.io.File +import java.nio.charset.StandardCharsets import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.io.orc.CompressionKind @@ -73,7 +74,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { test("Read/write binary data") { withOrcFile(BinaryData("test".getBytes("utf8")) :: Nil) { file => val bytes = read.orc(file).head().getAs[Array[Byte]](0) - assert(new String(bytes, "utf8") === "test") + assert(new String(bytes, StandardCharsets.UTF_8) === "test") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 441477479167a..f7519c10c8eb1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.dstream import java.io._ import java.net.{ConnectException, Socket} +import java.nio.charset.StandardCharsets import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -113,7 +114,8 @@ object SocketReceiver { * to '\n' delimited strings and returns an iterator to access the strings. */ def bytesToLines(inputStream: InputStream): Iterator[String] = { - val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")) + val dataInputStream = new BufferedReader( + new InputStreamReader(inputStream, StandardCharsets.UTF_8)) new NextIterator[String] { protected override def getNext() = { val nextValue = dataInputStream.readLine() diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 806cea24caddb..66448fd40057d 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -18,7 +18,7 @@ package org.apache.spark.streaming; import java.io.*; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -1866,7 +1866,8 @@ public void testSocketString() { @Override public Iterable call(InputStream in) throws IOException { List out = new ArrayList<>(); - try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(in, StandardCharsets.UTF_8))) { for (String line; (line = reader.readLine()) != null;) { out.add(line); } @@ -1930,7 +1931,7 @@ public void testRawSocketStream() { private static List> fileTestPrepare(File testDir) throws IOException { File existingFile = new File(testDir, "0"); - Files.write("0\n", existingFile, Charset.forName("UTF-8")); + Files.write("0\n", existingFile, StandardCharsets.UTF_8); Assert.assertTrue(existingFile.setLastModified(1000)); Assert.assertEquals(1000, existingFile.lastModified()); return Arrays.asList(Arrays.asList("0")); diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java index d09258e0e4a85..091ccbfd85cad 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -38,6 +38,7 @@ import java.io.Serializable; import java.net.ConnectException; import java.net.Socket; +import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicLong; public class JavaReceiverAPISuite implements Serializable { @@ -126,7 +127,8 @@ private void receive() { BufferedReader in = null; try { socket = new Socket(host, port); - in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + in = new BufferedReader( + new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)); String userInput; while ((userInput = in.readLine()) != null) { store(userInput); diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index ca716cf4e6f11..9a3248b3e8175 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -18,12 +18,12 @@ package org.apache.spark.streaming import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream} +import java.nio.charset.StandardCharsets import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import com.google.common.base.Charsets import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -609,7 +609,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester */ def writeFile(i: Int, clock: Clock): Unit = { val file = new File(testDir, i.toString) - Files.write(i + "\n", file, Charsets.UTF_8) + Files.write(i + "\n", file, StandardCharsets.UTF_8) assert(file.setLastModified(clock.getTimeMillis())) // Check that the file's modification date is actually the value we wrote, since rounding or // truncation will break the test: diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index fa17b3a15c4b6..cc2a67187e710 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming import java.io.{BufferedWriter, File, OutputStreamWriter} import java.net.{ServerSocket, Socket, SocketException} -import java.nio.charset.Charset +import java.nio.charset.StandardCharsets import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger @@ -146,7 +146,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val testDir = Utils.createTempDir() // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") - Files.write("0\n", existingFile, Charset.forName("UTF-8")) + Files.write("0\n", existingFile, StandardCharsets.UTF_8) assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) // Set up the streaming context and input streams @@ -369,7 +369,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val testDir = Utils.createTempDir() // Create a file that exists before the StreamingContext is created: val existingFile = new File(testDir, "0") - Files.write("0\n", existingFile, Charset.forName("UTF-8")) + Files.write("0\n", existingFile, StandardCharsets.UTF_8) assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000) // Set up the streaming context and input streams @@ -393,7 +393,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val input = Seq(1, 2, 3, 4, 5) input.foreach { i => val file = new File(testDir, i.toString) - Files.write(i + "\n", file, Charset.forName("UTF-8")) + Files.write(i + "\n", file, StandardCharsets.UTF_8) assert(file.setLastModified(clock.getTimeMillis())) assert(file.lastModified === clock.getTimeMillis()) logInfo("Created file " + file) @@ -448,7 +448,7 @@ class TestServer(portToBind: Int = 0) extends Logging { try { clientSocket.setTcpNoDelay(true) val outputStream = new BufferedWriter( - new OutputStreamWriter(clientSocket.getOutputStream)) + new OutputStreamWriter(clientSocket.getOutputStream, StandardCharsets.UTF_8)) while (clientSocket.isConnected) { val msg = queue.poll(100, TimeUnit.MILLISECONDS) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala index faa9c4f0cbd6a..6406d53f8941a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming import java.io.{File, IOException} -import java.nio.charset.Charset +import java.nio.charset.StandardCharsets import java.util.UUID import scala.collection.JavaConverters._ @@ -371,7 +371,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) val localFile = new File(localTestDir, (i + 1).toString) val hadoopFile = new Path(testDir, (i + 1).toString) val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString) - Files.write(input(i) + "\n", localFile, Charset.forName("UTF-8")) + Files.write(input(i) + "\n", localFile, StandardCharsets.UTF_8) var tries = 0 var done = false while (!done && tries < maxTries) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 0b5ceb768cc86..1035056457830 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream, I OutputStreamWriter} import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.util.{Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -29,7 +30,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal -import com.google.common.base.Charsets.UTF_8 import com.google.common.base.Objects import com.google.common.io.Files import org.apache.hadoop.conf.Configuration @@ -619,7 +619,7 @@ private[spark] class Client( val props = new Properties() sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) } confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE)) - val writer = new OutputStreamWriter(confStream, UTF_8) + val writer = new OutputStreamWriter(confStream, StandardCharsets.UTF_8) props.store(writer, "Spark configuration.") writer.flush() confStream.closeEntry() diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index b12e506033e39..78b57da482f70 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.yarn import java.io.{File, FileOutputStream, OutputStreamWriter} +import java.nio.charset.StandardCharsets import java.util.Properties import java.util.concurrent.TimeUnit @@ -25,7 +26,6 @@ import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -75,7 +75,7 @@ abstract class BaseYarnClusterSuite System.setProperty("SPARK_YARN_MODE", "true") val logConfFile = new File(logConfDir, "log4j.properties") - Files.write(LOG4J_CONF, logConfFile, UTF_8) + Files.write(LOG4J_CONF, logConfFile, StandardCharsets.UTF_8) // Disable the disk utilization check to avoid the test hanging when people's disks are // getting full. @@ -191,7 +191,7 @@ abstract class BaseYarnClusterSuite result: File, expected: String): Unit = { finalState should be (SparkAppHandle.State.FINISHED) - val resultString = Files.toString(result, UTF_8) + val resultString = Files.toString(result, StandardCharsets.UTF_8) resultString should be (expected) } @@ -231,7 +231,7 @@ abstract class BaseYarnClusterSuite extraConf.foreach { case (k, v) => props.setProperty(k, v) } val propsFile = File.createTempFile("spark", ".properties", tempDir) - val writer = new OutputStreamWriter(new FileOutputStream(propsFile), UTF_8) + val writer = new OutputStreamWriter(new FileOutputStream(propsFile), StandardCharsets.UTF_8) props.store(writer, "Spark properties.") writer.close() propsFile.getAbsolutePath() diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index e935163c3487f..5068c0cd208b6 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -19,13 +19,13 @@ package org.apache.spark.deploy.yarn import java.io.File import java.net.URL +import java.nio.charset.StandardCharsets import java.util.{HashMap => JHashMap} import scala.collection.mutable import scala.concurrent.duration._ import scala.language.postfixOps -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.Matchers @@ -147,7 +147,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { private def testPySpark(clientMode: Boolean): Unit = { val primaryPyFile = new File(tempDir, "test.py") - Files.write(TEST_PYFILE, primaryPyFile, UTF_8) + Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8) // When running tests, let's not assume the user has built the assembly module, which also // creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH to point at the @@ -171,7 +171,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { subdir } val pyModule = new File(moduleDir, "mod1.py") - Files.write(TEST_PYMODULE, pyModule, UTF_8) + Files.write(TEST_PYMODULE, pyModule, StandardCharsets.UTF_8) val mod2Archive = TestUtils.createJarWithFiles(Map("mod2.py" -> TEST_PYMODULE), moduleDir) val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",") @@ -245,7 +245,7 @@ private object YarnClusterDriver extends Logging with Matchers { data should be (Set(1, 2, 3, 4)) result = "success" } finally { - Files.write(result, status, UTF_8) + Files.write(result, status, StandardCharsets.UTF_8) sc.stop() } @@ -319,14 +319,14 @@ private object YarnClasspathTest extends Logging { val ccl = Thread.currentThread().getContextClassLoader() val resource = ccl.getResourceAsStream("test.resource") val bytes = ByteStreams.toByteArray(resource) - result = new String(bytes, 0, bytes.length, UTF_8) + result = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8) } catch { case t: Throwable => error(s"loading test.resource to $resultPath", t) // set the exit code if not yet set exitCode = 2 } finally { - Files.write(result, new File(resultPath), UTF_8) + Files.write(result, new File(resultPath), StandardCharsets.UTF_8) } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index c17e8695c24fb..1538ff75be5b3 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -18,8 +18,8 @@ package org.apache.spark.deploy.yarn import java.io.File +import java.nio.charset.StandardCharsets -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files import org.apache.commons.io.FileUtils import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -104,7 +104,7 @@ private object YarnExternalShuffleDriver extends Logging with Matchers { } finally { sc.stop() FileUtils.deleteDirectory(execStateCopy) - Files.write(result, status, UTF_8) + Files.write(result, status, StandardCharsets.UTF_8) } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 9202bd892f01b..70b8732946a2b 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import java.io.{File, IOException} import java.lang.reflect.InvocationTargetException +import java.nio.charset.StandardCharsets import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.conf.Configuration @@ -59,7 +60,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging val args = Array("arg1", "${arg.2}", "\"arg3\"", "'arg4'", "$arg5", "\\arg6") try { val argLine = args.map(a => YarnSparkHadoopUtil.escapeForShell(a)).mkString(" ") - Files.write(("bash -c \"echo " + argLine + "\"").getBytes(), scriptFile) + Files.write(("bash -c \"echo " + argLine + "\"").getBytes(StandardCharsets.UTF_8), scriptFile) scriptFile.setExecutable(true) val proc = Runtime.getRuntime().exec(Array(scriptFile.getAbsolutePath()))