Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into remove-external-pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Mar 14, 2016
2 parents ea47687 + 1840852 commit 97fcb46
Show file tree
Hide file tree
Showing 93 changed files with 318 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class StreamInterceptor implements TransportFrameDecoder.Interceptor {
private final String streamId;
private final long byteCount;
private final StreamCallback callback;

private volatile long bytesRead;
private long bytesRead;

StreamInterceptor(
TransportResponseHandler handler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.network.protocol;

import java.nio.charset.StandardCharsets;

import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;

/** Provides a canonical set of Encoders for simple types. */
Expand All @@ -27,11 +27,11 @@ public class Encoders {
/** Strings are encoded with their length followed by UTF-8 bytes. */
public static class Strings {
public static int encodedLength(String s) {
return 4 + s.getBytes(Charsets.UTF_8).length;
return 4 + s.getBytes(StandardCharsets.UTF_8).length;
}

public static void encode(ByteBuf buf, String s) {
byte[] bytes = s.getBytes(Charsets.UTF_8);
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
buf.writeInt(bytes.length);
buf.writeBytes(bytes);
}
Expand All @@ -40,7 +40,7 @@ public static String decode(ByteBuf buf) {
int length = buf.readInt();
byte[] bytes = new byte[length];
buf.readBytes(bytes);
return new String(bytes, Charsets.UTF_8);
return new String(bytes, StandardCharsets.UTF_8);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -187,14 +187,14 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback
/* Encode a byte[] identifier as a Base64-encoded string. */
public static String encodeIdentifier(String identifier) {
Preconditions.checkNotNull(identifier, "User cannot be null if SASL is enabled");
return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(Charsets.UTF_8)))
.toString(Charsets.UTF_8);
return Base64.encode(Unpooled.wrappedBuffer(identifier.getBytes(StandardCharsets.UTF_8)))
.toString(StandardCharsets.UTF_8);
}

/** Encode a password as a base64-encoded char[] array. */
public static char[] encodePassword(String password) {
Preconditions.checkNotNull(password, "Password cannot be null if SASL is enabled");
return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(Charsets.UTF_8)))
.toString(Charsets.UTF_8).toCharArray();
return Base64.encode(Unpooled.wrappedBuffer(password.getBytes(StandardCharsets.UTF_8)))
.toString(StandardCharsets.UTF_8).toCharArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -68,15 +68,15 @@ public static int nonNegativeHash(Object obj) {
* converted back to the same string through {@link #bytesToString(ByteBuffer)}.
*/
public static ByteBuffer stringToBytes(String s) {
return Unpooled.wrappedBuffer(s.getBytes(Charsets.UTF_8)).nioBuffer();
return Unpooled.wrappedBuffer(s.getBytes(StandardCharsets.UTF_8)).nioBuffer();
}

/**
* Convert the given byte buffer to a string. The resulting string can be
* converted back to the same byte buffer through {@link #stringToBytes(String)}.
*/
public static String bytesToString(ByteBuffer b) {
return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8);
return Unpooled.wrappedBuffer(b).toString(StandardCharsets.UTF_8);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.network.shuffle;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
Expand All @@ -27,7 +28,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
import org.fusesource.leveldbjni.JniDBFactory;
Expand Down Expand Up @@ -152,7 +152,7 @@ public void registerExecutor(
try {
if (db != null) {
byte[] key = dbAppExecKey(fullId);
byte[] value = mapper.writeValueAsString(executorInfo).getBytes(Charsets.UTF_8);
byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8);
db.put(key, value);
}
} catch (Exception e) {
Expand Down Expand Up @@ -350,7 +350,7 @@ private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException {
// we stick a common prefix on all the keys so we can find them in the DB
String appExecJson = mapper.writeValueAsString(appExecId);
String key = (APP_KEY_PREFIX + ";" + appExecJson);
return key.getBytes(Charsets.UTF_8);
return key.getBytes(StandardCharsets.UTF_8);
}

private static AppExecId parseDbAppExecKey(String s) throws IOException {
Expand All @@ -368,10 +368,10 @@ static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(D
ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
if (db != null) {
DBIterator itr = db.iterator();
itr.seek(APP_KEY_PREFIX.getBytes(Charsets.UTF_8));
itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> e = itr.next();
String key = new String(e.getKey(), Charsets.UTF_8);
String key = new String(e.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_KEY_PREFIX)) {
break;
}
Expand Down Expand Up @@ -418,7 +418,7 @@ private static void storeVersion(DB db) throws IOException {

public static class StoreVersion {

static final byte[] KEY = "StoreVersion".getBytes(Charsets.UTF_8);
static final byte[] KEY = "StoreVersion".getBytes(StandardCharsets.UTF_8);

public final int major;
public final int minor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.CharStreams;
Expand All @@ -34,26 +35,29 @@
import static org.junit.Assert.*;

public class ExternalShuffleBlockResolverSuite {
static String sortBlock0 = "Hello!";
static String sortBlock1 = "World!";
private static final String sortBlock0 = "Hello!";
private static final String sortBlock1 = "World!";

static String hashBlock0 = "Elementary";
static String hashBlock1 = "Tabular";
private static final String hashBlock0 = "Elementary";
private static final String hashBlock1 = "Tabular";

static TestShuffleDataContext dataContext;
private static TestShuffleDataContext dataContext;

static TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
private static final TransportConf conf =
new TransportConf("shuffle", new SystemPropertyConfigProvider());

@BeforeClass
public static void beforeAll() throws IOException {
dataContext = new TestShuffleDataContext(2, 5);

dataContext.create();
// Write some sort and hash data.
dataContext.insertSortShuffleData(0, 0,
new byte[][] { sortBlock0.getBytes(), sortBlock1.getBytes() } );
dataContext.insertHashShuffleData(1, 0,
new byte[][] { hashBlock0.getBytes(), hashBlock1.getBytes() } );
dataContext.insertSortShuffleData(0, 0, new byte[][] {
sortBlock0.getBytes(StandardCharsets.UTF_8),
sortBlock1.getBytes(StandardCharsets.UTF_8)});
dataContext.insertHashShuffleData(1, 0, new byte[][] {
hashBlock0.getBytes(StandardCharsets.UTF_8),
hashBlock1.getBytes(StandardCharsets.UTF_8)});
}

@AfterClass
Expand Down Expand Up @@ -100,13 +104,15 @@ public void testSortShuffleBlocks() throws IOException {

InputStream block0Stream =
resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
String block0 = CharStreams.toString(
new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
block0Stream.close();
assertEquals(sortBlock0, block0);

InputStream block1Stream =
resolver.getBlockData("app0", "exec0", "shuffle_0_0_1").createInputStream();
String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
String block1 = CharStreams.toString(
new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
block1Stream.close();
assertEquals(sortBlock1, block1);
}
Expand All @@ -119,13 +125,15 @@ public void testHashShuffleBlocks() throws IOException {

InputStream block0Stream =
resolver.getBlockData("app0", "exec0", "shuffle_1_0_0").createInputStream();
String block0 = CharStreams.toString(new InputStreamReader(block0Stream));
String block0 = CharStreams.toString(
new InputStreamReader(block0Stream, StandardCharsets.UTF_8));
block0Stream.close();
assertEquals(hashBlock0, block0);

InputStream block1Stream =
resolver.getBlockData("app0", "exec0", "shuffle_1_0_1").createInputStream();
String block1 = CharStreams.toString(new InputStreamReader(block1Stream));
String block1 = CharStreams.toString(
new InputStreamReader(block1Stream, StandardCharsets.UTF_8));
block1Stream.close();
assertEquals(hashBlock1, block1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -34,8 +35,8 @@
public class ExternalShuffleCleanupSuite {

// Same-thread Executor used to ensure cleanup happens synchronously in test thread.
Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());

@Test
public void noCleanupAndCleanup() throws IOException {
Expand Down Expand Up @@ -123,27 +124,29 @@ public void cleanupOnlyRemovedApp() throws IOException {
assertCleanedUp(dataContext1);
}

private void assertStillThere(TestShuffleDataContext dataContext) {
private static void assertStillThere(TestShuffleDataContext dataContext) {
for (String localDir : dataContext.localDirs) {
assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
}
}

private void assertCleanedUp(TestShuffleDataContext dataContext) {
private static void assertCleanedUp(TestShuffleDataContext dataContext) {
for (String localDir : dataContext.localDirs) {
assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
}
}

private TestShuffleDataContext createSomeData() throws IOException {
private static TestShuffleDataContext createSomeData() throws IOException {
Random rand = new Random(123);
TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);

dataContext.create();
dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000),
new byte[][] { "ABC".getBytes(), "DEF".getBytes() } );
dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000,
new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } );
dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000), new byte[][] {
"ABC".getBytes(StandardCharsets.UTF_8),
"DEF".getBytes(StandardCharsets.UTF_8)});
dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000, new byte[][] {
"GHI".getBytes(StandardCharsets.UTF_8),
"JKLMNOPQRSTUVWXYZ".getBytes(StandardCharsets.UTF_8)});
return dataContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;

Expand Down Expand Up @@ -825,14 +826,7 @@ public UTF8String translate(Map<Character, Character> dict) {

@Override
public String toString() {
try {
return new String(getBytes(), "utf-8");
} catch (UnsupportedEncodingException e) {
// Turn the exception into unchecked so we can find out about it at runtime, but
// don't need to add lots of boilerplate code everywhere.
throwException(e);
return "unknown"; // we will never reach here.
}
return new String(getBytes(), StandardCharsets.UTF_8);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ package org.apache.spark.api.python

import java.io._
import java.net._
import java.nio.charset.StandardCharsets
import java.util.{ArrayList => JArrayList, Collections, List => JList, Map => JMap}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.existentials
import scala.util.control.NonFatal

import com.google.common.base.Charsets.UTF_8
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat}
Expand Down Expand Up @@ -165,7 +165,7 @@ private[spark] class PythonRunner(
val exLength = stream.readInt()
val obj = new Array[Byte](exLength)
stream.readFully(obj)
throw new PythonException(new String(obj, UTF_8),
throw new PythonException(new String(obj, StandardCharsets.UTF_8),
writerThread.exception.getOrElse(null))
case SpecialLengths.END_OF_DATA_SECTION =>
// We've finished the data section of the output, but we can still
Expand Down Expand Up @@ -624,7 +624,7 @@ private[spark] object PythonRDD extends Logging {
}

def writeUTF(str: String, dataOut: DataOutputStream) {
val bytes = str.getBytes(UTF_8)
val bytes = str.getBytes(StandardCharsets.UTF_8)
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
Expand Down Expand Up @@ -817,7 +817,7 @@ private[spark] object PythonRDD extends Logging {

private
class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
override def call(arr: Array[Byte]) : String = new String(arr, UTF_8)
override def call(arr: Array[Byte]) : String = new String(arr, StandardCharsets.UTF_8)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.api.python

import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
import java.nio.charset.StandardCharsets
import java.util.Arrays

import scala.collection.mutable
Expand Down Expand Up @@ -121,7 +122,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

// Tell the worker our port
val out = new OutputStreamWriter(worker.getOutputStream)
val out = new OutputStreamWriter(worker.getOutputStream, StandardCharsets.UTF_8)
out.write(serverSocket.getLocalPort + "\n")
out.flush()

Expand Down
Loading

0 comments on commit 97fcb46

Please sign in to comment.