Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-13898
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Mar 21, 2016
2 parents 59cae95 + 060a28c commit 776e1a1
Show file tree
Hide file tree
Showing 597 changed files with 5,145 additions and 3,347 deletions.
10 changes: 5 additions & 5 deletions R/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ To set other options like driver memory, executor memory etc. you can pass in th
If you wish to use SparkR from RStudio or other R frontends you will need to set some environment variables which point SparkR to your Spark installation. For example
```
# Set this to where Spark is installed
Sys.setenv(SPARK_HOME="/Users/shivaram/spark")
Sys.setenv(SPARK_HOME="/Users/username/spark")
# This line loads SparkR from the installed directory
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
Expand All @@ -51,7 +51,7 @@ sc <- sparkR.init(master="local")

The [instructions](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) for making contributions to Spark also apply to SparkR.
If you only make R file changes (i.e. no Scala changes) then you can just re-install the R package using `R/install-dev.sh` and test your changes.
Once you have made your changes, please include unit tests for them and run existing unit tests using the `run-tests.sh` script as described below.
Once you have made your changes, please include unit tests for them and run existing unit tests using the `R/run-tests.sh` script as described below.

#### Generating documentation

Expand All @@ -60,17 +60,17 @@ The SparkR documentation (Rd files and HTML files) are not a part of the source
### Examples, Unit tests

SparkR comes with several sample programs in the `examples/src/main/r` directory.
To run one of them, use `./bin/sparkR <filename> <args>`. For example:
To run one of them, use `./bin/spark-submit <filename> <args>`. For example:

./bin/sparkR examples/src/main/r/dataframe.R
./bin/spark-submit examples/src/main/r/dataframe.R

You can also run the unit-tests for SparkR by running (you need to install the [testthat](http://cran.r-project.org/web/packages/testthat/index.html) package first):

R -e 'install.packages("testthat", repos="http://cran.us.r-project.org")'
./R/run-tests.sh

### Running on YARN
The `./bin/spark-submit` and `./bin/sparkR` can also be used to submit jobs to YARN clusters. You will need to set YARN conf dir before doing so. For example on CDH you can run
The `./bin/spark-submit` can also be used to submit jobs to YARN clusters. You will need to set YARN conf dir before doing so. For example on CDH you can run
```
export YARN_CONF_DIR=/etc/hadoop/conf
./bin/spark-submit --master yarn examples/src/main/r/dataframe.R
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@

/**
* Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to
* setup Netty Channel pipelines with a {@link org.apache.spark.network.server.TransportChannelHandler}.
* setup Netty Channel pipelines with a
* {@link org.apache.spark.network.server.TransportChannelHandler}.
*
* There are two communication protocols that the TransportClient provides, control-plane RPCs and
* data-plane "chunk fetching". The handling of the RPCs is performed outside of the scope of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}.
*/
public final class NettyManagedBuffer extends ManagedBuffer {
public class NettyManagedBuffer extends ManagedBuffer {
private final ByteBuf buf;

public NettyManagedBuffer(ByteBuf buf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import java.nio.ByteBuffer;

/**
* Callback for streaming data. Stream data will be offered to the {@link #onData(String, ByteBuffer)}
* method as it arrives. Once all the stream data is received, {@link #onComplete(String)} will be
* called.
* Callback for streaming data. Stream data will be offered to the
* {@link #onData(String, ByteBuffer)} method as it arrives. Once all the stream data is received,
* {@link #onComplete(String)} will be called.
* <p>
* The network library guarantees that a single thread will call these methods at a time, but
* different call may be made by different threads.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private static class ClientPool {
TransportClient[] clients;
Object[] locks;

public ClientPool(int size) {
ClientPool(int size) {
clients = new TransportClient[size];
locks = new Object[size];
for (int i = 0; i < size; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ public interface Message extends Encodable {
boolean isBodyInFrame();

/** Preceding every serialized Message is its type, which allows us to deserialize it. */
public static enum Type implements Encodable {
enum Type implements Encodable {
ChunkFetchRequest(0), ChunkFetchSuccess(1), ChunkFetchFailure(2),
RpcRequest(3), RpcResponse(4), RpcFailure(5),
StreamRequest(6), StreamResponse(7), StreamFailure(8),
OneWayMessage(9), User(-1);

private final byte id;

private Type(int id) {
Type(int id) {
assert id < 128 : "Cannot have more than 128 message types";
this.id = (byte) id;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.network.protocol;

import org.apache.spark.network.protocol.Message;

/** Messages from the client to the server. */
public interface RequestMessage extends Message {
// token interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.network.protocol;

import org.apache.spark.network.protocol.Message;

/** Messages from the server to the client. */
public interface ResponseMessage extends Message {
// token interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ class SaslMessage extends AbstractMessage {

public final String appId;

public SaslMessage(String appId, byte[] message) {
SaslMessage(String appId, byte[] message) {
this(appId, Unpooled.wrappedBuffer(message));
}

public SaslMessage(String appId, ByteBuf message) {
SaslMessage(String appId, ByteBuf message) {
super(new NettyManagedBuffer(message), true);
this.appId = appId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.spark.network.client.TransportClient;

/**
* StreamManager which allows registration of an Iterator&lt;ManagedBuffer&gt;, which are individually
* fetched as chunks by the client. Each registered buffer is one chunk.
* StreamManager which allows registration of an Iterator&lt;ManagedBuffer&gt;, which are
* individually fetched as chunks by the client. Each registered buffer is one chunk.
*/
public class OneForOneStreamManager extends StreamManager {
private final Logger logger = LoggerFactory.getLogger(OneForOneStreamManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (responseHandler.numOutstandingRequests() > 0) {
String address = NettyUtils.getRemoteAddress(ctx.channel());
logger.error("Connection to {} has been quiet for {} ms while there are outstanding " +
"requests. Assuming connection is dead; please adjust spark.network.timeout if this " +
"is wrong.", address, requestTimeoutNs / 1000 / 1000);
"requests. Assuming connection is dead; please adjust spark.network.timeout if " +
"this is wrong.", address, requestTimeoutNs / 1000 / 1000);
client.timeOut();
ctx.close();
} else if (closeIdleConnections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public enum ByteUnit {
TiB ((long) Math.pow(1024L, 4L)),
PiB ((long) Math.pow(1024L, 5L));

private ByteUnit(long multiplier) {
ByteUnit(long multiplier) {
this.multiplier = multiplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import java.util.NoSuchElementException;

import org.apache.spark.network.util.ConfigProvider;

/** Uses System properties to obtain config values. */
public class SystemPropertyConfigProvider extends ConfigProvider {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private boolean feedInterceptor(ByteBuf buf) throws Exception {
return interceptor != null;
}

public static interface Interceptor {
public interface Interceptor {

/**
* Handles data received from the remote end.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* Suite which ensures that requests that go without a response for the network timeout period are
* failed, and the connection closed.
*
* In this suite, we use 2 seconds as the connection timeout, with some slack given in the tests,
* In this suite, we use 10 seconds as the connection timeout, with some slack given in the tests,
* to ensure stability in different test environments.
*/
public class RequestTimeoutIntegrationSuite {
Expand All @@ -61,7 +61,7 @@ public class RequestTimeoutIntegrationSuite {
@Before
public void setUp() throws Exception {
Map<String, String> configMap = Maps.newHashMap();
configMap.put("spark.shuffle.io.connectionTimeout", "2s");
configMap.put("spark.shuffle.io.connectionTimeout", "10s");
conf = new TransportConf("shuffle", new MapConfigProvider(configMap));

defaultManager = new StreamManager() {
Expand Down Expand Up @@ -118,10 +118,10 @@ public StreamManager getStreamManager() {
callback0.latch.await();
assertEquals(responseSize, callback0.successLength);

// Second times out after 2 seconds, with slack. Must be IOException.
// Second times out after 10 seconds, with slack. Must be IOException.
TestCallback callback1 = new TestCallback();
client.sendRpc(ByteBuffer.allocate(0), callback1);
callback1.latch.await(4, TimeUnit.SECONDS);
callback1.latch.await(60, TimeUnit.SECONDS);
assertNotNull(callback1.failure);
assertTrue(callback1.failure instanceof IOException);

Expand Down Expand Up @@ -223,7 +223,7 @@ public StreamManager getStreamManager() {
// not complete yet, but should complete soon
assertEquals(-1, callback0.successLength);
assertNull(callback0.failure);
callback0.latch.await(2, TimeUnit.SECONDS);
callback0.latch.await(60, TimeUnit.SECONDS);
assertTrue(callback0.failure instanceof IOException);

// failed at same time as previous
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.network.sasl;

import java.lang.Override;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
final ExternalShuffleBlockResolver blockManager;
private final OneForOneStreamManager streamManager;

public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException {
public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile)
throws IOException {
this(new OneForOneStreamManager(),
new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@ public static class StoreVersion {
public final int major;
public final int minor;

@JsonCreator public StoreVersion(@JsonProperty("major") int major, @JsonProperty("minor") int minor) {
@JsonCreator public StoreVersion(
@JsonProperty("major") int major,
@JsonProperty("minor") int minor) {
this.major = major;
this.minor = minor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class RetryingBlockFetcher {
* Used to initiate the first fetch for all blocks, and subsequently for retrying the fetch on any
* remaining blocks.
*/
public static interface BlockFetchStarter {
public interface BlockFetchStarter {
/**
* Creates a new BlockFetcher to fetch the given block ids which may do some synchronous
* bootstrapping followed by fully asynchronous block fetching.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ public abstract class BlockTransferMessage implements Encodable {
protected abstract Type type();

/** Preceding every serialized message is its type, which allows us to deserialize it. */
public static enum Type {
public enum Type {
OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4),
HEARTBEAT(5);

private final byte id;

private Type(int id) {
Type(int id) {
assert id < 128 : "Cannot have more than 128 message types";
this.id = (byte) id;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ public void onBlockFetchFailure(String blockId, Throwable t) {
};

String[] blockIds = { "shuffle_2_3_4", "shuffle_6_7_8" };
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener);
OneForOneBlockFetcher fetcher =
new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener);
fetcher.start();
blockFetchLatch.await();
checkSecurityException(exception.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public void testBadMessages() {
// pass
}

ByteBuffer unexpectedMsg = new UploadBlock("a", "e", "b", new byte[1], new byte[2]).toByteBuffer();
ByteBuffer unexpectedMsg = new UploadBlock("a", "e", "b", new byte[1],
new byte[2]).toByteBuffer();
try {
handler.receive(client, unexpectedMsg, callback);
fail("Should have thrown");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ final class Murmur3_x86_32 {

private final int seed;

public Murmur3_x86_32(int seed) {
Murmur3_x86_32(int seed) {
this.seed = seed;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public static boolean anySet(Object baseObject, long baseOffset, long bitSetWidt
* To iterate over the true bits in a BitSet, use the following loop:
* <pre>
* <code>
* for (long i = bs.nextSetBit(0, sizeInWords); i &gt;= 0; i = bs.nextSetBit(i + 1, sizeInWords)) {
* for (long i = bs.nextSetBit(0, sizeInWords); i &gt;= 0;
* i = bs.nextSetBit(i + 1, sizeInWords)) {
* // operate on index i here
* }
* </code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,6 @@ public final <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
// These methods take separate "first" and "rest" elements to avoid having the same type erasure
public abstract <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
public abstract JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
public abstract <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
public abstract <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>>
rest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
* A function that returns Doubles, and can be used to construct DoubleRDDs.
*/
public interface DoubleFunction<T> extends Serializable {
public double call(T t) throws Exception;
double call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
* A two-argument function that takes arguments of type T1 and T2 and returns an R.
*/
public interface Function2<T1, T2, R> extends Serializable {
public R call(T1 v1, T2 v2) throws Exception;
R call(T1 v1, T2 v2) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
* A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
*/
public interface Function3<T1, T2, T3, R> extends Serializable {
public R call(T1 v1, T2 v2, T3 v3) throws Exception;
R call(T1 v1, T2 v2, T3 v3) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@
* construct PairRDDs.
*/
public interface PairFunction<T, K, V> extends Serializable {
public Tuple2<K, V> call(T t) throws Exception;
Tuple2<K, V> call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public class TaskMemoryManager {

/**
* Maximum supported data page size (in bytes). In principle, the maximum addressable page size is
* (1L &lt;&lt; OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's maximum page
* size is limited by the maximum amount of data that can be stored in a long[] array, which is
* (2^32 - 1) * 8 bytes (or 16 gigabytes). Therefore, we cap this at 16 gigabytes.
* (1L &lt;&lt; OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's
* maximum page size is limited by the maximum amount of data that can be stored in a long[]
* array, which is (2^32 - 1) * 8 bytes (or 16 gigabytes). Therefore, we cap this at 16 gigabytes.
*/
public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
*/
private boolean stopping = false;

public BypassMergeSortShuffleWriter(
BypassMergeSortShuffleWriter(
BlockManager blockManager,
IndexShuffleBlockResolver shuffleBlockResolver,
BypassMergeSortShuffleHandle<K, V> handle,
Expand All @@ -115,7 +115,7 @@ public BypassMergeSortShuffleWriter(
this.partitioner = dep.partitioner();
this.numPartitions = partitioner.numPartitions();
this.writeMetrics = taskContext.taskMetrics().registerShuffleWriteMetrics();
this.serializer = Serializer.getSerializer(dep.serializer());
this.serializer = dep.serializer();
this.shuffleBlockResolver = shuffleBlockResolver;
}

Expand Down
Loading

0 comments on commit 776e1a1

Please sign in to comment.