Skip to content

Commit

Permalink
Merge branch 'support-add-jar-ivy' of https://github.com/AngersZhuuuu…
Browse files Browse the repository at this point in the history
…/spark into support-add-jar-ivy
  • Loading branch information
AngersZhuuuu committed Oct 7, 2020
2 parents 3579de0 + 51daf9a commit d6e8caf
Show file tree
Hide file tree
Showing 509 changed files with 4,307 additions and 71,305 deletions.
16 changes: 6 additions & 10 deletions .github/workflows/build_and_test.yml
Expand Up @@ -168,12 +168,10 @@ jobs:
python3.8 -m pip list
# SparkR
- name: Install R 4.0
uses: r-lib/actions/setup-r@v1
if: contains(matrix.modules, 'sparkr')
run: |
sudo sh -c "echo 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/' >> /etc/apt/sources.list"
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xE298A3A825C0D65DFD57CBB651716619E084DAB9" | sudo apt-key add
sudo apt-get update
sudo apt-get install -y r-base r-base-dev libcurl4-openssl-dev
with:
r-version: 4.0
- name: Install R packages
if: contains(matrix.modules, 'sparkr')
run: |
Expand Down Expand Up @@ -232,11 +230,9 @@ jobs:
# See also https://github.com/sphinx-doc/sphinx/issues/7551.
pip3 install flake8 'sphinx<3.1.0' numpy pydata_sphinx_theme ipython nbsphinx
- name: Install R 4.0
run: |
sudo sh -c "echo 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/' >> /etc/apt/sources.list"
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xE298A3A825C0D65DFD57CBB651716619E084DAB9" | sudo apt-key add
sudo apt-get update
sudo apt-get install -y r-base r-base-dev libcurl4-openssl-dev
uses: r-lib/actions/setup-r@v1
with:
r-version: 4.0
- name: Install R linter dependencies and SparkR
run: |
sudo apt-get install -y libcurl4-openssl-dev
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/test_report.yml
Expand Up @@ -15,7 +15,16 @@ jobs:
github_token: ${{ secrets.GITHUB_TOKEN }}
workflow: ${{ github.event.workflow_run.workflow_id }}
commit: ${{ github.event.workflow_run.head_commit.id }}
- name: Check if JUnit report XML files exist
run: |
if ls **/target/test-reports/*.xml > /dev/null 2>&1; then
echo '::set-output name=FILE_EXISTS::true'
else
echo '::set-output name=FILE_EXISTS::false'
fi
id: check-junit-file
- name: Publish test report
if: steps.check-junit-file.outputs.FILE_EXISTS == 'true'
uses: scacap/action-surefire-report@v1
with:
check_name: Report test results
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Expand Up @@ -348,6 +348,7 @@ exportMethods("%<=>%",
"negate",
"next_day",
"not",
"nth_value",
"ntile",
"otherwise",
"over",
Expand Down Expand Up @@ -426,6 +427,7 @@ exportMethods("%<=>%",
"variance",
"var_pop",
"var_samp",
"vector_to_array",
"weekofyear",
"when",
"window",
Expand Down
86 changes: 78 additions & 8 deletions R/pkg/R/functions.R
Expand Up @@ -338,12 +338,29 @@ NULL
#' tmp <- mutate(df, dist = over(cume_dist(), ws), dense_rank = over(dense_rank(), ws),
#' lag = over(lag(df$mpg), ws), lead = over(lead(df$mpg, 1), ws),
#' percent_rank = over(percent_rank(), ws),
#' rank = over(rank(), ws), row_number = over(row_number(), ws))
#' rank = over(rank(), ws), row_number = over(row_number(), ws),
#' nth_value = over(nth_value(df$mpg, 3), ws))
#' # Get ntile group id (1-4) for hp
#' tmp <- mutate(tmp, ntile = over(ntile(4), ws))
#' head(tmp)}
NULL

#' ML functions for Column operations
#'
#' ML functions defined for \code{Column}.
#'
#' @param x Column to compute on.
#' @param ... additional argument(s).
#' @name column_ml_functions
#' @rdname column_ml_functions
#' @family ml functions
#' @examples
#' \dontrun{
#' df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
#' head(select(df, vector_to_array(df$features)))
#' }
NULL

#' @details
#' \code{lit}: A new Column is created to represent the literal value.
#' If the parameter is a Column, it is returned unchanged.
Expand Down Expand Up @@ -3298,6 +3315,37 @@ setMethod("lead",
column(jc)
})

#' @details
#' \code{nth_value}: Window function: returns the value that is the \code{offset}th
#' row of the window frame# (counting from 1), and \code{null} if the size of window
#' frame is less than \code{offset} rows.
#'
#' @param offset a numeric indicating number of row to use as the value
#' @param na.rm a logical which indicates that the Nth value should skip null in the
#' determination of which row to use
#'
#' @rdname column_window_functions
#' @aliases nth_value nth_value,characterOrColumn-method
#' @note nth_value since 3.1.0
setMethod("nth_value",
signature(x = "characterOrColumn", offset = "numeric"),
function(x, offset, na.rm = FALSE) {
x <- if (is.character(x)) {
column(x)
} else {
x
}
offset <- as.integer(offset)
jc <- callJStatic(
"org.apache.spark.sql.functions",
"nth_value",
x@jc,
offset,
na.rm
)
column(jc)
})

#' @details
#' \code{ntile}: Returns the ntile group id (from 1 to n inclusive) in an ordered window
#' partition. For example, if n is 4, the first quarter of the rows will get value 1, the second
Expand Down Expand Up @@ -4419,10 +4467,32 @@ setMethod("current_timestamp",
#' @aliases timestamp_seconds timestamp_seconds,Column-method
#' @note timestamp_seconds since 3.1.0
setMethod("timestamp_seconds",
signature(x = "Column"),
function(x) {
jc <- callJStatic(
"org.apache.spark.sql.functions", "timestamp_seconds", x@jc
)
column(jc)
})
signature(x = "Column"),
function(x) {
jc <- callJStatic(
"org.apache.spark.sql.functions", "timestamp_seconds", x@jc
)
column(jc)
})

#' @details
#' \code{vector_to_array} Converts a column of MLlib sparse/dense vectors into
#' a column of dense arrays.
#'
#' @param dtype The data type of the output array. Valid values: "float64" or "float32".
#'
#' @rdname column_ml_functions
#' @aliases vector_to_array vector_to_array,Column-method
#' @note vector_to_array since 3.1.0
setMethod("vector_to_array",
signature(x = "Column"),
function(x, dtype = c("float64", "float32")) {
dtype <- match.arg(dtype)
jc <- callJStatic(
"org.apache.spark.ml.functions",
"vector_to_array",
x@jc,
dtype
)
column(jc)
})
8 changes: 8 additions & 0 deletions R/pkg/R/generics.R
Expand Up @@ -1164,6 +1164,10 @@ setGeneric("months_between", function(y, x, ...) { standardGeneric("months_betwe
#' @rdname count
setGeneric("n", function(x) { standardGeneric("n") })

#' @rdname column_window_functions
#' @name NULL
setGeneric("nth_value", function(x, offset, ...) { standardGeneric("nth_value") })

#' @rdname column_nonaggregate_functions
#' @name NULL
setGeneric("nanvl", function(y, x) { standardGeneric("nanvl") })
Expand Down Expand Up @@ -1445,6 +1449,10 @@ setGeneric("var_pop", function(x) { standardGeneric("var_pop") })
#' @name NULL
setGeneric("var_samp", function(x) { standardGeneric("var_samp") })

#' @rdname column_ml_functions
#' @name NULL
setGeneric("vector_to_array", function(x, ...) { standardGeneric("vector_to_array") })

#' @rdname column_datetime_functions
#' @name NULL
setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") })
Expand Down
5 changes: 4 additions & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Expand Up @@ -1424,7 +1424,10 @@ test_that("column functions", {
date_trunc("quarter", c) + current_date() + current_timestamp()
c25 <- overlay(c1, c2, c3, c3) + overlay(c1, c2, c3) + overlay(c1, c2, 1) +
overlay(c1, c2, 3, 4)
c26 <- timestamp_seconds(c1)
c26 <- timestamp_seconds(c1) + vector_to_array(c) +
vector_to_array(c, "float32") + vector_to_array(c, "float64")
c27 <- nth_value("x", 1L) + nth_value("y", 2, TRUE) +
nth_value(column("v"), 3) + nth_value(column("z"), 4L, FALSE)

# Test if base::is.nan() is exposed
expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE))
Expand Down
Expand Up @@ -92,10 +92,6 @@ public class ExternalShuffleBlockResolver {
@VisibleForTesting
final DB db;

private final List<String> knownManagers = Arrays.asList(
"org.apache.spark.shuffle.sort.SortShuffleManager",
"org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");

public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
throws IOException {
this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
Expand Down Expand Up @@ -148,10 +144,6 @@ public void registerExecutor(
ExecutorShuffleInfo executorInfo) {
AppExecId fullId = new AppExecId(appId, execId);
logger.info("Registered executor {} with {}", fullId, executorInfo);
if (!knownManagers.contains(executorInfo.shuffleManager)) {
throw new UnsupportedOperationException(
"Unsupported shuffle manager of executor: " + executorInfo);
}
try {
if (db != null) {
byte[] key = dbAppExecKey(fullId);
Expand Down
Expand Up @@ -71,15 +71,6 @@ public void testBadRequests() throws IOException {
assertTrue("Bad error message: " + e, e.getMessage().contains("not registered"));
}

// Invalid shuffle manager
try {
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
resolver.getBlockData("app0", "exec2", 1, 1, 0);
fail("Should have failed");
} catch (UnsupportedOperationException e) {
// pass
}

// Nonexistent shuffle block
resolver.registerExecutor("app0", "exec3",
dataContext.createExecutorInfo(SORT_MANAGER));
Expand Down
Expand Up @@ -233,9 +233,9 @@ public void testFetchThreeSort() throws Exception {
exec0Fetch.releaseBuffers();
}

@Test (expected = RuntimeException.class)
public void testRegisterInvalidExecutor() throws Exception {
registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
@Test
public void testRegisterWithCustomShuffleManager() throws Exception {
registerExecutor("exec-1", dataContext0.createExecutorInfo("custom shuffle manager"));
}

@Test
Expand Down

0 comments on commit d6e8caf

Please sign in to comment.