Skip to content

Commit

Permalink
Merge in master
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed Jan 6, 2019
2 parents 789bbdd + a17851c commit 348e2f8
Show file tree
Hide file tree
Showing 194 changed files with 1,517 additions and 970 deletions.
10 changes: 1 addition & 9 deletions R/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,7 @@ To set other options like driver memory, executor memory etc. you can pass in th

#### Using SparkR from RStudio

If you wish to use SparkR from RStudio or other R frontends you will need to set some environment variables which point SparkR to your Spark installation. For example
```R
# Set this to where Spark is installed
Sys.setenv(SPARK_HOME="/Users/username/spark")
# This line loads SparkR from the installed directory
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sparkR.session()
```
If you wish to use SparkR from RStudio, please refer [SparkR documentation](https://spark.apache.org/docs/latest/sparkr.html#starting-up-from-rstudio).

#### Making changes to SparkR

Expand Down
60 changes: 40 additions & 20 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,9 @@ NULL
#' \itemize{
#' \item \code{from_json}: a structType object to use as the schema to use
#' when parsing the JSON string. Since Spark 2.3, the DDL-formatted string is
#' also supported for the schema.
#' \item \code{from_csv}: a DDL-formatted string
#' also supported for the schema. Since Spark 3.0, \code{schema_of_json} or
#' the DDL-formatted string literal can also be accepted.
#' \item \code{from_csv}: a structType object, DDL-formatted string or \code{schema_of_csv}
#' }
#' @param ... additional argument(s).
#' \itemize{
Expand Down Expand Up @@ -2254,40 +2255,54 @@ setMethod("date_format", signature(y = "Column", x = "character"),
column(jc)
})

setClassUnion("characterOrstructTypeOrColumn", c("character", "structType", "Column"))

#' @details
#' \code{from_json}: Parses a column containing a JSON string into a Column of \code{structType}
#' with the specified \code{schema} or array of \code{structType} if \code{as.json.array} is set
#' to \code{TRUE}. If the string is unparseable, the Column will contain the value NA.
#'
#' @rdname column_collection_functions
#' @param as.json.array indicating if input string is JSON array of objects or a single object.
#' @aliases from_json from_json,Column,characterOrstructType-method
#' @aliases from_json from_json,Column,characterOrstructTypeOrColumn-method
#' @examples
#'
#' \dontrun{
#' df2 <- sql("SELECT named_struct('date', cast('2000-01-01' as date)) as d")
#' df2 <- mutate(df2, d2 = to_json(df2$d, dateFormat = 'dd/MM/yyyy'))
#' schema <- structType(structField("date", "string"))
#' head(select(df2, from_json(df2$d2, schema, dateFormat = 'dd/MM/yyyy')))

#' df2 <- sql("SELECT named_struct('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#' schema <- structType(structField("name", "string"))
#' head(select(df2, from_json(df2$people_json, schema)))
#' head(select(df2, from_json(df2$people_json, "name STRING")))}
#' head(select(df2, from_json(df2$people_json, "name STRING")))
#' head(select(df2, from_json(df2$people_json, schema_of_json(head(df2)$people_json))))}
#' @note from_json since 2.2.0
setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"),
setMethod("from_json", signature(x = "Column", schema = "characterOrstructTypeOrColumn"),
function(x, schema, as.json.array = FALSE, ...) {
if (is.character(schema)) {
schema <- structType(schema)
jschema <- structType(schema)$jobj
} else if (class(schema) == "structType") {
jschema <- schema$jobj
} else {
jschema <- schema@jc
}

if (as.json.array) {
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
"createArrayType",
schema$jobj)
} else {
jschema <- schema$jobj
# This case is R-specifically different. Unlike Scala and Python side,
# R side has 'as.json.array' option to indicate if the schema should be
# treated as struct or element type of array in order to make it more
# R-friendly.
if (class(schema) == "Column") {
jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"createArrayType",
jschema)
} else {
jschema <- callJStatic("org.apache.spark.sql.types.DataTypes",
"createArrayType",
jschema)
}
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
Expand Down Expand Up @@ -2328,22 +2343,27 @@ setMethod("schema_of_json", signature(x = "characterOrColumn"),
#' If the string is unparseable, the Column will contain the value NA.
#'
#' @rdname column_collection_functions
#' @aliases from_csv from_csv,Column,character-method
#' @aliases from_csv from_csv,Column,characterOrstructTypeOrColumn-method
#' @examples
#'
#' \dontrun{
#' df <- sql("SELECT 'Amsterdam,2018' as csv")
#' csv <- "Amsterdam,2018"
#' df <- sql(paste0("SELECT '", csv, "' as csv"))
#' schema <- "city STRING, year INT"
#' head(select(df, from_csv(df$csv, schema)))}
#' head(select(df, from_csv(df$csv, schema)))
#' head(select(df, from_csv(df$csv, structType(schema))))
#' head(select(df, from_csv(df$csv, schema_of_csv(csv))))}
#' @note from_csv since 3.0.0
setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"),
setMethod("from_csv", signature(x = "Column", schema = "characterOrstructTypeOrColumn"),
function(x, schema, ...) {
if (class(schema) == "Column") {
jschema <- schema@jc
} else if (is.character(schema)) {
if (class(schema) == "structType") {
schema <- callJMethod(schema$jobj, "toDDL")
}

if (is.character(schema)) {
jschema <- callJStatic("org.apache.spark.sql.functions", "lit", schema)
} else {
stop("schema argument should be a column or character")
jschema <- schema@jc
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
Expand Down
16 changes: 14 additions & 2 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,12 @@ test_that("column functions", {
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, structType("a INT")), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, schema_of_csv("1")), "csv")))
expect_equal(c[[1]][[1]]$`_c0`, 1)
c <- collect(select(df, alias(from_csv(df$col, schema_of_csv(lit("1"))), "csv")))
expect_equal(c[[1]][[1]]$`_c0`, 1)

df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, schema_of_csv("Amsterdam,2018")))
Expand All @@ -1651,7 +1657,9 @@ test_that("column functions", {
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
df <- as.DataFrame(j)
schemas <- list(structType(structField("age", "integer"), structField("height", "double")),
"age INT, height DOUBLE")
"age INT, height DOUBLE",
schema_of_json("{\"age\":16,\"height\":176.5}"),
schema_of_json(lit("{\"age\":16,\"height\":176.5}")))
for (schema in schemas) {
s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
expect_equal(ncol(s), 1)
Expand Down Expand Up @@ -1691,7 +1699,11 @@ test_that("column functions", {
# check if array type in string is correctly supported.
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
df <- as.DataFrame(list(list("people" = jsonArr)))
for (schema in list(structType(structField("name", "string")), "name STRING")) {
schemas <- list(structType(structField("name", "string")),
"name STRING",
schema_of_json("{\"name\":\"Alice\"}"),
schema_of_json(lit("{\"name\":\"Bob\"}")))
for (schema in schemas) {
arr <- collect(select(df, alias(from_json(df$people, schema, as.json.array = TRUE), "arrcol")))
expect_equal(ncol(arr), 1)
expect_equal(nrow(arr), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

public enum ByteUnit {
BYTE(1),
KiB(1024L),
MiB((long) Math.pow(1024L, 2L)),
GiB((long) Math.pow(1024L, 3L)),
TiB((long) Math.pow(1024L, 4L)),
PiB((long) Math.pow(1024L, 5L));
KiB(1L << 10),
MiB(1L << 20),
GiB(1L << 30),
TiB(1L << 40),
PiB(1L << 50);

ByteUnit(long multiplier) {
this.multiplier = multiplier;
Expand Down Expand Up @@ -50,7 +50,7 @@ public long convertTo(long d, ByteUnit u) {
}
}

public double toBytes(long d) {
public long toBytes(long d) {
if (d < 0) {
throw new IllegalArgumentException("Negative size value. Size must be positive: " + d);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ public int chunkFetchHandlerThreads() {
}
int chunkFetchHandlerThreadsPercent =
conf.getInt("spark.shuffle.server.chunkFetchHandlerThreadsPercent", 100);
return (int)Math.ceil(
(this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors()) *
chunkFetchHandlerThreadsPercent/(double)100);
int threads =
this.serverThreads() > 0 ? this.serverThreads() : 2 * NettyRuntime.availableProcessors();
return (int) Math.ceil(threads * (chunkFetchHandlerThreadsPercent / 100.0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ public void testRpcHandlerDelegate() throws Exception {
verify(handler).getStreamManager();

saslHandler.channelInactive(null);
verify(handler).channelInactive(any(TransportClient.class));
verify(handler).channelInactive(isNull());

saslHandler.exceptionCaught(null, null);
verify(handler).exceptionCaught(any(Throwable.class), any(TransportClient.class));
verify(handler).exceptionCaught(isNull(), isNull());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.network.util;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -69,7 +68,7 @@ public void testInterception() throws Exception {
decoder.channelRead(ctx, len);
decoder.channelRead(ctx, dataBuf);
verify(interceptor, times(interceptedReads)).handle(any(ByteBuf.class));
verify(ctx).fireChannelRead(any(ByteBuffer.class));
verify(ctx).fireChannelRead(any(ByteBuf.class));
assertEquals(0, len.refCnt());
assertEquals(0, dataBuf.refCnt());
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.mockito.ArgumentCaptor;

import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import org.apache.spark.network.buffer.ManagedBuffer;
Expand Down Expand Up @@ -79,6 +79,8 @@ public void testRegisterExecutor() {
@SuppressWarnings("unchecked")
@Test
public void testOpenShuffleBlocks() {
when(client.getClientId()).thenReturn("app0");

RpcResponseCallback callback = mock(RpcResponseCallback.class);

ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down
31 changes: 21 additions & 10 deletions common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,22 +209,33 @@ public static long reallocateMemory(long address, long oldSize, long newSize) {
}

/**
* Uses internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's
* MaxDirectMemorySize limit (the default limit is too low and we do not want to require users
* to increase it).
* Allocate a DirectByteBuffer, potentially bypassing the JVM's MaxDirectMemorySize limit.
*/
public static ByteBuffer allocateDirectBuffer(int size) {
try {
long memory = allocateMemory(size);
ByteBuffer buffer = (ByteBuffer) DBB_CONSTRUCTOR.newInstance(memory, size);
if (CLEANER_CREATE_METHOD != null) {
if (CLEANER_CREATE_METHOD == null) {
// Can't set a Cleaner (see comments on field), so need to allocate via normal Java APIs
try {
DBB_CLEANER_FIELD.set(buffer,
CLEANER_CREATE_METHOD.invoke(null, buffer, (Runnable) () -> freeMemory(memory)));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(e);
return ByteBuffer.allocateDirect(size);
} catch (OutOfMemoryError oome) {
// checkstyle.off: RegexpSinglelineJava
throw new OutOfMemoryError("Failed to allocate direct buffer (" + oome.getMessage() +
"); try increasing -XX:MaxDirectMemorySize=... to, for example, your heap size");
// checkstyle.on: RegexpSinglelineJava
}
}
// Otherwise, use internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's
// MaxDirectMemorySize limit (the default limit is too low and we do not want to
// require users to increase it).
long memory = allocateMemory(size);
ByteBuffer buffer = (ByteBuffer) DBB_CONSTRUCTOR.newInstance(memory, size);
try {
DBB_CLEANER_FIELD.set(buffer,
CLEANER_CREATE_METHOD.invoke(null, buffer, (Runnable) () -> freeMemory(memory)));
} catch (IllegalAccessException | InvocationTargetException e) {
freeMemory(memory);
throw new IllegalStateException(e);
}
return buffer;
} catch (Exception e) {
throwException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
final long recordOffset = offset;
UnsafeAlignedOffset.putSize(base, offset, klen + vlen + uaoSize);
UnsafeAlignedOffset.putSize(base, offset + uaoSize, klen);
offset += (2 * uaoSize);
offset += (2L * uaoSize);
Platform.copyMemory(kbase, koff, base, offset, klen);
offset += klen;
Platform.copyMemory(vbase, voff, base, offset, vlen);
Expand Down Expand Up @@ -780,7 +780,7 @@ private void allocate(int capacity) {
assert (capacity >= 0);
capacity = Math.max((int) Math.min(MAX_CAPACITY, ByteArrayMethods.nextPowerOf2(capacity)), 64);
assert (capacity <= MAX_CAPACITY);
longArray = allocateArray(capacity * 2);
longArray = allocateArray(capacity * 2L);
longArray.zeroOut();

this.growthThreshold = (int) (capacity * loadFactor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private[spark] class ExecutorAllocationManager(
// allocation is only supported for YARN and the default number of cores per executor in YARN is
// 1, but it might need to be attained differently for different cluster managers
private val tasksPerExecutorForFullParallelism =
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
conf.get(EXECUTOR_CORES) / conf.getInt("spark.task.cpus", 1)

private val executorAllocationRatio =
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
Expand Down Expand Up @@ -223,7 +223,7 @@ private[spark] class ExecutorAllocationManager(
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutorForFullParallelism == 0) {
throw new SparkException("spark.executor.cores must not be < spark.task.cpus.")
throw new SparkException(s"${EXECUTOR_CORES.key} must not be < spark.task.cpus.")
}

if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
Expand Down

0 comments on commit 348e2f8

Please sign in to comment.