Skip to content

Commit

Permalink
merge master & resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
WeichenXu123 committed Dec 25, 2017
2 parents 47dccdd + 33ae243 commit f7a54ae
Show file tree
Hide file tree
Showing 361 changed files with 35,118 additions and 3,188 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ exportMethods("%<=>%",
"date_add",
"date_format",
"date_sub",
"date_trunc",
"datediff",
"dayofmonth",
"dayofweek",
Expand Down
34 changes: 30 additions & 4 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,17 @@ NULL
#'
#' @param x Column to compute on. In \code{window}, it must be a time Column of
#' \code{TimestampType}.
#' @param format For \code{to_date} and \code{to_timestamp}, it is the string to use to parse
#' Column \code{x} to DateType or TimestampType. For \code{trunc}, it is the string
#' to use to specify the truncation method. For example, "year", "yyyy", "yy" for
#' truncate by year, or "month", "mon", "mm" for truncate by month.
#' @param format The format for the given dates or timestamps in Column \code{x}. See the
#' format used in the following methods:
#' \itemize{
#' \item \code{to_date} and \code{to_timestamp}: it is the string to use to parse
#' Column \code{x} to DateType or TimestampType.
#' \item \code{trunc}: it is the string to use to specify the truncation method.
#' For example, "year", "yyyy", "yy" for truncate by year, or "month", "mon",
#' "mm" for truncate by month.
#' \item \code{date_trunc}: it is similar with \code{trunc}'s but additionally
#' supports "day", "dd", "second", "minute", "hour", "week" and "quarter".
#' }
#' @param ... additional argument(s).
#' @name column_datetime_functions
#' @rdname column_datetime_functions
Expand Down Expand Up @@ -3478,3 +3485,22 @@ setMethod("trunc",
x@jc, as.character(format))
column(jc)
})

#' @details
#' \code{date_trunc}: Returns timestamp truncated to the unit specified by the format.
#'
#' @rdname column_datetime_functions
#' @aliases date_trunc date_trunc,character,Column-method
#' @export
#' @examples
#'
#' \dontrun{
#' head(select(df, df$time, date_trunc("hour", df$time), date_trunc("minute", df$time),
#' date_trunc("week", df$time), date_trunc("quarter", df$time)))}
#' @note date_trunc since 2.3.0
setMethod("date_trunc",
signature(format = "character", x = "Column"),
function(format, x) {
jc <- callJStatic("org.apache.spark.sql.functions", "date_trunc", format, x@jc)
column(jc)
})
5 changes: 5 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,11 @@ setGeneric("date_format", function(y, x) { standardGeneric("date_format") })
#' @name NULL
setGeneric("date_sub", function(y, x) { standardGeneric("date_sub") })

#' @rdname column_datetime_functions
#' @export
#' @name NULL
setGeneric("date_trunc", function(format, x) { standardGeneric("date_trunc") })

#' @rdname column_datetime_functions
#' @export
#' @name NULL
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,8 @@ test_that("column functions", {
c22 <- not(c)
c23 <- trunc(c, "year") + trunc(c, "yyyy") + trunc(c, "yy") +
trunc(c, "month") + trunc(c, "mon") + trunc(c, "mm")
c24 <- date_trunc("hour", c) + date_trunc("minute", c) + date_trunc("week", c) +
date_trunc("quarter", c)

# Test if base::is.nan() is exposed
expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE))
Expand Down Expand Up @@ -1729,6 +1731,7 @@ test_that("date functions on a DataFrame", {
expect_gt(collect(select(df2, unix_timestamp()))[1, 1], 0)
expect_gt(collect(select(df2, unix_timestamp(df2$b)))[1, 1], 0)
expect_gt(collect(select(df2, unix_timestamp(lit("2015-01-01"), "yyyy-MM-dd")))[1, 1], 0)
expect_equal(collect(select(df2, month(date_trunc("yyyy", df2$b))))[, 1], c(1, 1))

l3 <- list(list(a = 1000), list(a = -1000))
df3 <- createDataFrame(l3)
Expand Down
5 changes: 4 additions & 1 deletion R/pkg/tests/run-all.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ if (.Platform$OS.type == "windows") {

# Setup global test environment
# Install Spark first to set SPARK_HOME
install.spark()

# NOTE(shivaram): We set overwrite to handle any old tar.gz files or directories left behind on
# CRAN machines. For Jenkins we should already have SPARK_HOME set.
install.spark(overwrite = TRUE)

sparkRDir <- file.path(Sys.getenv("SPARK_HOME"), "R")
sparkRWhitelistSQLDirs <- c("spark-warehouse", "metastore_db")
Expand Down
4 changes: 3 additions & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ install:
# Install maven and dependencies
- ps: .\dev\appveyor-install-dependencies.ps1
# Required package for R unit tests
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'testthat', 'e1071', 'survival'), repos='http://cran.us.r-project.org')"
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'devtools', 'e1071', 'survival'), repos='http://cran.us.r-project.org')"
# Here, we use the fixed version of testthat. For more details, please see SPARK-22817.
- cmd: R -e "devtools::install_version('testthat', version = '1.0.2', repos='http://cran.us.r-project.org')"
- cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival')"

build_script:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.util.AbstractReferenceCounted;
import org.apache.commons.crypto.stream.CryptoInputStream;
import org.apache.commons.crypto.stream.CryptoOutputStream;

import org.apache.spark.network.util.AbstractFileRegion;
import org.apache.spark.network.util.ByteArrayReadableChannel;
import org.apache.spark.network.util.ByteArrayWritableChannel;

Expand Down Expand Up @@ -161,7 +161,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
}

private static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
private static class EncryptedMessage extends AbstractFileRegion {
private final boolean isByteBuf;
private final ByteBuf buf;
private final FileRegion region;
Expand Down Expand Up @@ -199,10 +199,45 @@ public long position() {
}

@Override
public long transfered() {
public long transferred() {
return transferred;
}

@Override
public EncryptedMessage touch(Object o) {
super.touch(o);
if (region != null) {
region.touch(o);
}
if (buf != null) {
buf.touch(o);
}
return this;
}

@Override
public EncryptedMessage retain(int increment) {
super.retain(increment);
if (region != null) {
region.retain(increment);
}
if (buf != null) {
buf.retain(increment);
}
return this;
}

@Override
public boolean release(int decrement) {
if (region != null) {
region.release(decrement);
}
if (buf != null) {
buf.release(decrement);
}
return super.release(decrement);
}

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
Preconditions.checkArgument(position == transfered(), "Invalid position.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCountUtil;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.util.AbstractFileRegion;

/**
* A wrapper message that holds two separate pieces (a header and a body).
*
* The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
*/
class MessageWithHeader extends AbstractReferenceCounted implements FileRegion {
class MessageWithHeader extends AbstractFileRegion {

@Nullable private final ManagedBuffer managedBuffer;
private final ByteBuf header;
Expand Down Expand Up @@ -91,7 +91,7 @@ public long position() {
}

@Override
public long transfered() {
public long transferred() {
return totalBytesTransferred;
}

Expand Down Expand Up @@ -160,4 +160,37 @@ private int writeNioBuffer(

return ret;
}

@Override
public MessageWithHeader touch(Object o) {
super.touch(o);
header.touch(o);
ReferenceCountUtil.touch(body, o);
return this;
}

@Override
public MessageWithHeader retain(int increment) {
super.retain(increment);
header.retain(increment);
ReferenceCountUtil.retain(body, increment);
if (managedBuffer != null) {
for (int i = 0; i < increment; i++) {
managedBuffer.retain();
}
}
return this;
}

@Override
public boolean release(int decrement) {
header.release(decrement);
ReferenceCountUtil.release(body, decrement);
if (managedBuffer != null) {
for (int i = 0; i < decrement; i++) {
managedBuffer.release();
}
}
return super.release(decrement);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.AbstractReferenceCounted;

import org.apache.spark.network.util.AbstractFileRegion;
import org.apache.spark.network.util.ByteArrayWritableChannel;
import org.apache.spark.network.util.NettyUtils;

Expand Down Expand Up @@ -129,7 +129,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)
}

@VisibleForTesting
static class EncryptedMessage extends AbstractReferenceCounted implements FileRegion {
static class EncryptedMessage extends AbstractFileRegion {

private final SaslEncryptionBackend backend;
private final boolean isByteBuf;
Expand Down Expand Up @@ -183,10 +183,45 @@ public long position() {
* Returns an approximation of the amount of data transferred. See {@link #count()}.
*/
@Override
public long transfered() {
public long transferred() {
return transferred;
}

@Override
public EncryptedMessage touch(Object o) {
super.touch(o);
if (buf != null) {
buf.touch(o);
}
if (region != null) {
region.touch(o);
}
return this;
}

@Override
public EncryptedMessage retain(int increment) {
super.retain(increment);
if (buf != null) {
buf.retain(increment);
}
if (region != null) {
region.retain(increment);
}
return this;
}

@Override
public boolean release(int decrement) {
if (region != null) {
region.release(decrement);
}
if (buf != null) {
buf.release(decrement);
}
return super.release(decrement);
}

/**
* Transfers data from the original message to the channel, encrypting it in the process.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.util;

import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;

public abstract class AbstractFileRegion extends AbstractReferenceCounted implements FileRegion {

@Override
@SuppressWarnings("deprecation")
public final long transfered() {
return transferred();
}

@Override
public AbstractFileRegion retain() {
super.retain();
return this;
}

@Override
public AbstractFileRegion retain(int increment) {
super.retain(increment);
return this;
}

@Override
public AbstractFileRegion touch() {
super.touch();
return this;
}

@Override
public AbstractFileRegion touch(Object o) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private void testServerToClient(Message msg) {
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);

while (!serverChannel.outboundMessages().isEmpty()) {
clientChannel.writeInbound(serverChannel.readOutbound());
clientChannel.writeOneInbound(serverChannel.readOutbound());
}

assertEquals(1, clientChannel.inboundMessages().size());
Expand All @@ -72,7 +72,7 @@ private void testClientToServer(Message msg) {
NettyUtils.createFrameDecoder(), MessageDecoder.INSTANCE);

while (!clientChannel.outboundMessages().isEmpty()) {
serverChannel.writeInbound(clientChannel.readOutbound());
serverChannel.writeOneInbound(clientChannel.readOutbound());
}

assertEquals(1, serverChannel.inboundMessages().size());
Expand Down
Loading

0 comments on commit f7a54ae

Please sign in to comment.