Skip to content

Commit

Permalink
Merge branch 'master' into conf-opts
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Mar 7, 2016
2 parents be1daed + d7eac9d commit dbfed7d
Show file tree
Hide file tree
Showing 578 changed files with 5,700 additions and 2,806 deletions.
3 changes: 1 addition & 2 deletions bin/beeline.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,4 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

set SPARK_HOME=%~dp0..
cmd /V /E /C "%SPARK_HOME%\bin\spark-class.cmd" org.apache.hive.beeline.BeeLine %*
cmd /V /E /C "%~dp0spark-class.cmd" org.apache.hive.beeline.BeeLine %*
2 changes: 1 addition & 1 deletion bin/spark-submit.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ rem
rem This is the entry point for running Spark submit. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.

cmd /V /E /C spark-submit2.cmd %*
cmd /V /E /C "%~dp0spark-submit2.cmd" %*
1 change: 1 addition & 0 deletions checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,6 @@
<property name="exceptionVariableName" value="expected"/>
</module>
<module name="CommentsIndentation"/>
<module name="UnusedImports"/>
</module>
</module>
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
* failure.
*/
public interface RpcResponseCallback {
/** Successful serialized result from server. */
/**
* Successful serialized result from server.
*
* After `onSuccess` returns, `response` will be recycled and its content will become invalid.
* Please copy the content of `response` if you want to use it after `onSuccess` returns.
*/
void onSuccess(ByteBuffer response);

/** Exception either propagated from server or raised on client side. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,11 @@ public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) {
sendRpc(message, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
result.set(response);
ByteBuffer copy = ByteBuffer.allocate(response.remaining());
copy.put(response);
// flip "copy" to make it readable
copy.flip();
result.set(copy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;

/**
* Message indicating an error when transferring a stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;

/**
* Request to stream data from the remote end.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.netty.buffer.ByteBuf;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;

/**
* Response to {@link StreamRequest} when the stream has been successfully opened.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;

import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.apache.spark.network.util.NettyUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.nio.ByteBuffer;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.internal.PlatformDependent;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.network.util;

import java.util.Iterator;
import java.util.LinkedList;

import com.google.common.base.Preconditions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public static void tearDown() {
testFile.delete();
}

class FetchResult {
static class FetchResult {
public Set<Integer> successChunks;
public Set<Integer> failedChunks;
public List<ManagedBuffer> buffers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ public StreamManager getStreamManager() {
synchronized (callback1) {
client.sendRpc(ByteBuffer.allocate(0), callback1);
callback1.wait(4 * 1000);
assert (callback1.failure != null);
assert (callback1.failure instanceof IOException);
assertNotNull(callback1.failure);
assertTrue(callback1.failure instanceof IOException);
}
semaphore.release();
}
Expand Down Expand Up @@ -167,8 +167,8 @@ public StreamManager getStreamManager() {
synchronized (callback0) {
client0.sendRpc(ByteBuffer.allocate(0), callback0);
callback0.wait(FOREVER);
assert (callback0.failure instanceof IOException);
assert (!client0.isActive());
assertTrue(callback0.failure instanceof IOException);
assertFalse(client0.isActive());
}

// Increment the semaphore and the second request should succeed quickly.
Expand Down Expand Up @@ -236,15 +236,15 @@ public StreamManager getStreamManager() {

synchronized (callback1) {
// failed at same time as previous
assert (callback0.failure instanceof IOException);
assertTrue(callback0.failure instanceof IOException);
}
}

/**
* Callback which sets 'success' or 'failure' on completion.
* Additionally notifies all waiters on this callback when invoked.
*/
class TestCallback implements RpcResponseCallback, ChunkReceivedCallback {
static class TestCallback implements RpcResponseCallback, ChunkReceivedCallback {

int successLength = -1;
Throwable failure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void tearDown() {
clientFactory.close();
}

class RpcResult {
static class RpcResult {
public Set<String> successMessages;
public Set<String> errorMessages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -95,7 +96,7 @@ public void run() {
try {
TransportClient client =
factory.createClient(TestUtils.getLocalHost(), server1.getPort());
assert (client.isActive());
assertTrue(client.isActive());
clients.add(client);
} catch (IOException e) {
failed.incrementAndGet();
Expand All @@ -115,8 +116,8 @@ public void run() {
attempts[i].join();
}

assert(failed.get() == 0);
assert(clients.size() == maxConnections);
Assert.assertEquals(0, failed.get());
Assert.assertEquals(clients.size(), maxConnections);

for (TransportClient client : clients) {
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testByteBufBody() throws Exception {
assertEquals(42, result.readLong());
assertEquals(84, result.readLong());

assert(msg.release());
assertTrue(msg.release());
assertEquals(0, bodyPassedToNettyManagedBuffer.refCnt());
assertEquals(0, header.refCnt());
}
Expand All @@ -77,7 +77,7 @@ public void testDeallocateReleasesManagedBuffer() throws Exception {
ByteBuf body = (ByteBuf) managedBuf.convertToNetty();
assertEquals(2, body.refCnt());
MessageWithHeader msg = new MessageWithHeader(managedBuf, header, body, body.readableBytes());
assert(msg.release());
assertTrue(msg.release());
Mockito.verify(managedBuf, Mockito.times(1)).release();
assertEquals(0, body.refCnt());
}
Expand All @@ -94,7 +94,7 @@ private void testFileRegionBody(int totalWrites, int writesPerCall) throws Excep
for (long i = 0; i < 8; i++) {
assertEquals(i, result.readLong());
}
assert(msg.release());
assertTrue(msg.release());
}

private ByteBuf doWrite(MessageWithHeader msg, int minExpectedWrites) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.util.JavaUtils;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public int hashCode() {
return Objects.hashCode(appId);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof RegisterDriver)) {
return false;
}
return Objects.equal(appId, ((RegisterDriver) o).appId);
}

public static RegisterDriver decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
return new RegisterDriver(appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void afterEach() {
handler.applicationRemoved(APP_ID, false /* cleanupLocalDirs */);
}

class FetchResult {
static class FetchResult {
public Set<String> successBlocks;
public Set<String> failedBlocks;
public List<ManagedBuffer> buffers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.sasl.SaslServerBootstrap;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
}
}

assert stub != null;
assertNotNull(stub);
stub.when(fetchStarter).createAndStart((String[]) any(), (BlockFetchingListener) anyObject());
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion tags/pom.xml → common/tags/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
Expand Down
2 changes: 1 addition & 1 deletion unsafe/pom.xml → common/unsafe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class UTF8StringPropertyCheckSuite extends FunSuite with GeneratorDrivenProperty

test("concat") {
def concat(orgin: Seq[String]): String =
if (orgin.exists(_ == null)) null else orgin.mkString
if (orgin.contains(null)) null else orgin.mkString

forAll { (inputs: Seq[String]) =>
assert(UTF8String.concat(inputs.map(toUTF8): _*) === toUTF8(inputs.mkString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.shuffle.sort;

import org.apache.spark.memory.TaskMemoryManager;

/**
* Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
* <p>
Expand All @@ -28,7 +26,7 @@
* </pre>
* This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
* our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
* 13-bit page numbers assigned by {@link TaskMemoryManager}), this
* 13-bit page numbers assigned by {@link org.apache.spark.memory.TaskMemoryManager}), this
* implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
* <p>
* Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ final class ShuffleInMemorySorter {
private static final class SortComparator implements Comparator<PackedRecordPointer> {
@Override
public int compare(PackedRecordPointer left, PackedRecordPointer right) {
return left.getPartitionId() - right.getPartitionId();
int leftId = left.getPartitionId();
int rightId = right.getPartitionId();
return leftId < rightId ? -1 : (leftId > rightId ? 1 : 0);
}
}
private static final SortComparator SORT_COMPARATOR = new SortComparator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import scala.Option;
import scala.Product2;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.primitives.UnsignedLongs;

import org.apache.spark.annotation.Private;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.ByteArray;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.util.Utils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.util.collection.unsafe.sort;

import org.apache.spark.memory.TaskMemoryManager;

final class RecordPointerAndKeyPrefix {
/**
* A pointer to a record; see {@link TaskMemoryManager} for a
* A pointer to a record; see {@link org.apache.spark.memory.TaskMemoryManager} for a
* description of how these addresses are encoded.
*/
public long recordPointer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ public UnsafeSorterIterator getIterator() throws IOException {
/**
* Chain multiple UnsafeSorterIterator together as single one.
*/
class ChainedIterator extends UnsafeSorterIterator {
static class ChainedIterator extends UnsafeSorterIterator {

private final Queue<UnsafeSorterIterator> iterators;
private UnsafeSorterIterator current;
Expand Down
Loading

0 comments on commit dbfed7d

Please sign in to comment.