Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into add-filter-for-in…
Browse files Browse the repository at this point in the history
…nerjoin
  • Loading branch information
viirya committed Mar 8, 2016
2 parents bf4777c + 46f25c2 commit 2c92f90
Show file tree
Hide file tree
Showing 841 changed files with 11,822 additions and 7,346 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ cache
work/
out/
.DS_Store
third_party/libmesos.so
third_party/libmesos.dylib
build/apache-maven*
build/zinc*
build/scala*
Expand Down Expand Up @@ -60,7 +58,6 @@ dev/create-release/*final
spark-*-bin-*.tgz
unit-tests.log
/lib/
rat-results.txt
scalastyle.txt
scalastyle-output.xml
R-unit-tests.log
Expand Down
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ exportMethods("%in%",
"add_months",
"alias",
"approxCountDistinct",
"approxQuantile",
"array_contains",
"asc",
"ascii",
Expand Down
7 changes: 7 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ setGeneric("crosstab", function(x, col1, col2) { standardGeneric("crosstab") })
# @export
setGeneric("freqItems", function(x, cols, support = 0.01) { standardGeneric("freqItems") })

# @rdname statfunctions
# @export
setGeneric("approxQuantile",
function(x, col, probabilities, relativeError) {
standardGeneric("approxQuantile")
})

# @rdname distinct
# @export
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
Expand Down
39 changes: 39 additions & 0 deletions R/pkg/R/stats.R
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,45 @@ setMethod("freqItems", signature(x = "DataFrame", cols = "character"),
collect(dataFrame(sct))
})

#' approxQuantile
#'
#' Calculates the approximate quantiles of a numerical column of a DataFrame.
#'
#' The result of this algorithm has the following deterministic bound:
#' If the DataFrame has N elements and if we request the quantile at probability `p` up to error
#' `err`, then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank
#' of `x` is close to (p * N). More precisely,
#' floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
#' This method implements a variation of the Greenwald-Khanna algorithm (with some speed
#' optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670
#' Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.
#'
#' @param x A SparkSQL DataFrame.
#' @param col The name of the numerical column.
#' @param probabilities A list of quantile probabilities. Each number must belong to [0, 1].
#' For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
#' @param relativeError The relative target precision to achieve (>= 0). If set to zero,
#' the exact quantiles are computed, which could be very expensive.
#' Note that values greater than 1 are accepted but give the same result as 1.
#' @return The approximate quantiles at the given probabilities.
#'
#' @rdname statfunctions
#' @name approxQuantile
#' @export
#' @examples
#' \dontrun{
#' df <- jsonFile(sqlContext, "/path/to/file.json")
#' quantiles <- approxQuantile(df, "key", c(0.5, 0.8), 0.0)
#' }
setMethod("approxQuantile",
signature(x = "DataFrame", col = "character",
probabilities = "numeric", relativeError = "numeric"),
function(x, col, probabilities, relativeError) {
statFunctions <- callJMethod(x@sdf, "stat")
callJMethod(statFunctions, "approxQuantile", col,
as.list(probabilities), relativeError)
})

#' sampleBy
#'
#' Returns a stratified sample without replacement based on the fraction given on each stratum.
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,14 @@ test_that("sampleBy() on a DataFrame", {
expect_identical(as.list(result[2, ]), list(key = "1", count = 7))
})

test_that("approxQuantile() on a DataFrame", {
l <- lapply(c(0:99), function(i) { i })
df <- createDataFrame(sqlContext, l, "key")
quantiles <- approxQuantile(df, "key", c(0.5, 0.8), 0.0)
expect_equal(quantiles[[1]], 50)
expect_equal(quantiles[[2]], 80)
})

test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
expect_equal(grepl("Table not found: blah", retError), TRUE)
Expand Down
3 changes: 1 addition & 2 deletions bin/beeline.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,4 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

set SPARK_HOME=%~dp0..
cmd /V /E /C "%SPARK_HOME%\bin\spark-class.cmd" org.apache.hive.beeline.BeeLine %*
cmd /V /E /C "%~dp0spark-class.cmd" org.apache.hive.beeline.BeeLine %*
2 changes: 1 addition & 1 deletion bin/spark-submit.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ rem
rem This is the entry point for running Spark submit. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.

cmd /V /E /C spark-submit2.cmd %*
cmd /V /E /C "%~dp0spark-submit2.cmd" %*
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
* failure.
*/
public interface RpcResponseCallback {
/** Successful serialized result from server. */
/**
* Successful serialized result from server.
*
* After `onSuccess` returns, `response` will be recycled and its content will become invalid.
* Please copy the content of `response` if you want to use it after `onSuccess` returns.
*/
void onSuccess(ByteBuffer response);

/** Exception either propagated from server or raised on client side. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,11 @@ public ByteBuffer sendRpcSync(ByteBuffer message, long timeoutMs) {
sendRpc(message, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
result.set(response);
ByteBuffer copy = ByteBuffer.allocate(response.remaining());
copy.put(response);
// flip "copy" to make it readable
copy.flip();
result.set(copy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;

/**
* Message indicating an error when transferring a stream.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;

/**
* Request to stream data from the remote end.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.netty.buffer.ByteBuf;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NettyManagedBuffer;

/**
* Response to {@link StreamRequest} when the stream has been successfully opened.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;

import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.apache.spark.network.util.NettyUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.nio.ByteBuffer;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ private static boolean isSymlink(File file) throws IOException {
.build();

/**
* Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count for
* internal use. If no suffix is provided a direct conversion is attempted.
* Convert a passed time string (e.g. 50s, 100ms, or 250us) to a time count in the given unit.
* The unit is also considered the default if the given string does not specify a unit.
*/
private static long parseTimeString(String str, TimeUnit unit) {
public static long timeStringAs(String str, TimeUnit unit) {
String lower = str.toLowerCase().trim();

try {
Expand Down Expand Up @@ -195,23 +195,22 @@ private static long parseTimeString(String str, TimeUnit unit) {
* no suffix is provided, the passed number is assumed to be in ms.
*/
public static long timeStringAsMs(String str) {
return parseTimeString(str, TimeUnit.MILLISECONDS);
return timeStringAs(str, TimeUnit.MILLISECONDS);
}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to seconds for internal use. If
* no suffix is provided, the passed number is assumed to be in seconds.
*/
public static long timeStringAsSec(String str) {
return parseTimeString(str, TimeUnit.SECONDS);
return timeStringAs(str, TimeUnit.SECONDS);
}

/**
* Convert a passed byte string (e.g. 50b, 100kb, or 250mb) to a ByteUnit for
* internal use. If no suffix is provided a direct conversion of the provided default is
* attempted.
* Convert a passed byte string (e.g. 50b, 100kb, or 250mb) to the given. If no suffix is
* provided, a direct conversion to the provided unit is attempted.
*/
private static long parseByteString(String str, ByteUnit unit) {
public static long byteStringAs(String str, ByteUnit unit) {
String lower = str.toLowerCase().trim();

try {
Expand Down Expand Up @@ -252,7 +251,7 @@ private static long parseByteString(String str, ByteUnit unit) {
* If no suffix is provided, the passed number is assumed to be in bytes.
*/
public static long byteStringAsBytes(String str) {
return parseByteString(str, ByteUnit.BYTE);
return byteStringAs(str, ByteUnit.BYTE);
}

/**
Expand All @@ -262,7 +261,7 @@ public static long byteStringAsBytes(String str) {
* If no suffix is provided, the passed number is assumed to be in kibibytes.
*/
public static long byteStringAsKb(String str) {
return parseByteString(str, ByteUnit.KiB);
return byteStringAs(str, ByteUnit.KiB);
}

/**
Expand All @@ -272,7 +271,7 @@ public static long byteStringAsKb(String str) {
* If no suffix is provided, the passed number is assumed to be in mebibytes.
*/
public static long byteStringAsMb(String str) {
return parseByteString(str, ByteUnit.MiB);
return byteStringAs(str, ByteUnit.MiB);
}

/**
Expand All @@ -282,7 +281,7 @@ public static long byteStringAsMb(String str) {
* If no suffix is provided, the passed number is assumed to be in gibibytes.
*/
public static long byteStringAsGb(String str) {
return parseByteString(str, ByteUnit.GiB);
return byteStringAs(str, ByteUnit.GiB);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.internal.PlatformDependent;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.network.util;

import java.util.Iterator;
import java.util.LinkedList;

import com.google.common.base.Preconditions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public static void tearDown() {
testFile.delete();
}

class FetchResult {
static class FetchResult {
public Set<Integer> successChunks;
public Set<Integer> failedChunks;
public List<ManagedBuffer> buffers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ public StreamManager getStreamManager() {
synchronized (callback1) {
client.sendRpc(ByteBuffer.allocate(0), callback1);
callback1.wait(4 * 1000);
assert (callback1.failure != null);
assert (callback1.failure instanceof IOException);
assertNotNull(callback1.failure);
assertTrue(callback1.failure instanceof IOException);
}
semaphore.release();
}
Expand Down Expand Up @@ -167,8 +167,8 @@ public StreamManager getStreamManager() {
synchronized (callback0) {
client0.sendRpc(ByteBuffer.allocate(0), callback0);
callback0.wait(FOREVER);
assert (callback0.failure instanceof IOException);
assert (!client0.isActive());
assertTrue(callback0.failure instanceof IOException);
assertFalse(client0.isActive());
}

// Increment the semaphore and the second request should succeed quickly.
Expand Down Expand Up @@ -236,15 +236,15 @@ public StreamManager getStreamManager() {

synchronized (callback1) {
// failed at same time as previous
assert (callback0.failure instanceof IOException);
assertTrue(callback0.failure instanceof IOException);
}
}

/**
* Callback which sets 'success' or 'failure' on completion.
* Additionally notifies all waiters on this callback when invoked.
*/
class TestCallback implements RpcResponseCallback, ChunkReceivedCallback {
static class TestCallback implements RpcResponseCallback, ChunkReceivedCallback {

int successLength = -1;
Throwable failure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void tearDown() {
clientFactory.close();
}

class RpcResult {
static class RpcResult {
public Set<String> successMessages;
public Set<String> errorMessages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -95,7 +96,7 @@ public void run() {
try {
TransportClient client =
factory.createClient(TestUtils.getLocalHost(), server1.getPort());
assert (client.isActive());
assertTrue(client.isActive());
clients.add(client);
} catch (IOException e) {
failed.incrementAndGet();
Expand All @@ -115,8 +116,8 @@ public void run() {
attempts[i].join();
}

assert(failed.get() == 0);
assert(clients.size() == maxConnections);
Assert.assertEquals(0, failed.get());
Assert.assertEquals(clients.size(), maxConnections);

for (TransportClient client : clients) {
client.close();
Expand Down
Loading

0 comments on commit 2c92f90

Please sign in to comment.