Skip to content

Commit

Permalink
Merge branch 'master' into streaming-missed-options
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxGekk committed Nov 9, 2018
2 parents da697ac + 0558d02 commit 4ca71fc
Show file tree
Hide file tree
Showing 75 changed files with 968 additions and 1,080 deletions.
3 changes: 3 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ exportMethods("%<=>%",
"acos",
"add_months",
"alias",
"approx_count_distinct",
"approxCountDistinct",
"approxQuantile",
"array_contains",
Expand Down Expand Up @@ -252,6 +253,7 @@ exportMethods("%<=>%",
"dayofweek",
"dayofyear",
"decode",
"degrees",
"dense_rank",
"desc",
"element_at",
Expand Down Expand Up @@ -334,6 +336,7 @@ exportMethods("%<=>%",
"posexplode",
"posexplode_outer",
"quarter",
"radians",
"rand",
"randn",
"rank",
Expand Down
73 changes: 64 additions & 9 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ NULL
#' df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
#' tmp <- mutate(df, v1 = log(df$mpg), v2 = cbrt(df$disp),
#' v3 = bround(df$wt, 1), v4 = bin(df$cyl),
#' v5 = hex(df$wt), v6 = toDegrees(df$gear),
#' v5 = hex(df$wt), v6 = degrees(df$gear),
#' v7 = atan2(df$cyl, df$am), v8 = hypot(df$cyl, df$am),
#' v9 = pmod(df$hp, df$cyl), v10 = shiftLeft(df$disp, 1),
#' v11 = conv(df$hp, 10, 16), v12 = sign(df$vs - 0.5),
Expand Down Expand Up @@ -320,23 +320,37 @@ setMethod("acos",
})

#' @details
#' \code{approxCountDistinct}: Returns the approximate number of distinct items in a group.
#' \code{approx_count_distinct}: Returns the approximate number of distinct items in a group.
#'
#' @rdname column_aggregate_functions
#' @aliases approxCountDistinct approxCountDistinct,Column-method
#' @aliases approx_count_distinct approx_count_distinct,Column-method
#' @examples
#'
#' \dontrun{
#' head(select(df, approxCountDistinct(df$gear)))
#' head(select(df, approxCountDistinct(df$gear, 0.02)))
#' head(select(df, approx_count_distinct(df$gear)))
#' head(select(df, approx_count_distinct(df$gear, 0.02)))
#' head(select(df, countDistinct(df$gear, df$cyl)))
#' head(select(df, n_distinct(df$gear)))
#' head(distinct(select(df, "gear")))}
#' @note approx_count_distinct(Column) since 3.0.0
setMethod("approx_count_distinct",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "approx_count_distinct", x@jc)
column(jc)
})

#' @details
#' \code{approxCountDistinct}: Returns the approximate number of distinct items in a group.
#'
#' @rdname column_aggregate_functions
#' @aliases approxCountDistinct approxCountDistinct,Column-method
#' @note approxCountDistinct(Column) since 1.4.0
setMethod("approxCountDistinct",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "approxCountDistinct", x@jc)
.Deprecated("approx_count_distinct")
jc <- callJStatic("org.apache.spark.sql.functions", "approx_count_distinct", x@jc)
column(jc)
})

Expand Down Expand Up @@ -1651,7 +1665,22 @@ setMethod("tanh",
setMethod("toDegrees",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "toDegrees", x@jc)
.Deprecated("degrees")
jc <- callJStatic("org.apache.spark.sql.functions", "degrees", x@jc)
column(jc)
})

#' @details
#' \code{degrees}: Converts an angle measured in radians to an approximately equivalent angle
#' measured in degrees.
#'
#' @rdname column_math_functions
#' @aliases degrees degrees,Column-method
#' @note degrees since 3.0.0
setMethod("degrees",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "degrees", x@jc)
column(jc)
})

Expand All @@ -1665,7 +1694,22 @@ setMethod("toDegrees",
setMethod("toRadians",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "toRadians", x@jc)
.Deprecated("radians")
jc <- callJStatic("org.apache.spark.sql.functions", "radians", x@jc)
column(jc)
})

#' @details
#' \code{radians}: Converts an angle measured in degrees to an approximately equivalent angle
#' measured in radians.
#'
#' @rdname column_math_functions
#' @aliases radians radians,Column-method
#' @note radians since 3.0.0
setMethod("radians",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "radians", x@jc)
column(jc)
})

Expand Down Expand Up @@ -2065,13 +2109,24 @@ setMethod("pmod", signature(y = "Column"),

#' @param rsd maximum estimation error allowed (default = 0.05).
#'
#' @rdname column_aggregate_functions
#' @aliases approx_count_distinct,Column-method
#' @note approx_count_distinct(Column, numeric) since 3.0.0
setMethod("approx_count_distinct",
signature(x = "Column"),
function(x, rsd = 0.05) {
jc <- callJStatic("org.apache.spark.sql.functions", "approx_count_distinct", x@jc, rsd)
column(jc)
})

#' @rdname column_aggregate_functions
#' @aliases approxCountDistinct,Column-method
#' @note approxCountDistinct(Column, numeric) since 1.4.0
setMethod("approxCountDistinct",
signature(x = "Column"),
function(x, rsd = 0.05) {
jc <- callJStatic("org.apache.spark.sql.functions", "approxCountDistinct", x@jc, rsd)
.Deprecated("approx_count_distinct")
jc <- callJStatic("org.apache.spark.sql.functions", "approx_count_distinct", x@jc, rsd)
column(jc)
})

Expand Down
12 changes: 12 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,10 @@ setGeneric("windowOrderBy", function(col, ...) { standardGeneric("windowOrderBy"
#' @name NULL
setGeneric("add_months", function(y, x) { standardGeneric("add_months") })

#' @rdname column_aggregate_functions
#' @name NULL
setGeneric("approx_count_distinct", function(x, ...) { standardGeneric("approx_count_distinct") })

#' @rdname column_aggregate_functions
#' @name NULL
setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCountDistinct") })
Expand Down Expand Up @@ -1287,10 +1291,18 @@ setGeneric("substring_index", function(x, delim, count) { standardGeneric("subst
#' @name NULL
setGeneric("sumDistinct", function(x) { standardGeneric("sumDistinct") })

#' @rdname column_math_functions
#' @name NULL
setGeneric("degrees", function(x) { standardGeneric("degrees") })

#' @rdname column_math_functions
#' @name NULL
setGeneric("toDegrees", function(x) { standardGeneric("toDegrees") })

#' @rdname column_math_functions
#' @name NULL
setGeneric("radians", function(x) { standardGeneric("radians") })

#' @rdname column_math_functions
#' @name NULL
setGeneric("toRadians", function(x) { standardGeneric("toRadians") })
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,7 @@ test_that("column operators", {

test_that("column functions", {
c <- column("a")
c1 <- abs(c) + acos(c) + approxCountDistinct(c) + ascii(c) + asin(c) + atan(c)
c1 <- abs(c) + acos(c) + approx_count_distinct(c) + ascii(c) + asin(c) + atan(c)
c2 <- avg(c) + base64(c) + bin(c) + bitwiseNOT(c) + cbrt(c) + ceil(c) + cos(c)
c3 <- cosh(c) + count(c) + crc32(c) + hash(c) + exp(c)
c4 <- explode(c) + expm1(c) + factorial(c) + first(c) + floor(c) + hex(c)
Expand All @@ -1388,7 +1388,7 @@ test_that("column functions", {
c7 <- mean(c) + min(c) + month(c) + negate(c) + posexplode(c) + quarter(c)
c8 <- reverse(c) + rint(c) + round(c) + rtrim(c) + sha1(c) + monotonically_increasing_id()
c9 <- signum(c) + sin(c) + sinh(c) + size(c) + stddev(c) + soundex(c) + sqrt(c) + sum(c)
c10 <- sumDistinct(c) + tan(c) + tanh(c) + toDegrees(c) + toRadians(c)
c10 <- sumDistinct(c) + tan(c) + tanh(c) + degrees(c) + radians(c)
c11 <- to_date(c) + trim(c) + unbase64(c) + unhex(c) + upper(c)
c12 <- variance(c) + ltrim(c, "a") + rtrim(c, "b") + trim(c, "c")
c13 <- lead("col", 1) + lead(c, 1) + lag("col", 1) + lag(c, 1)
Expand Down
3 changes: 3 additions & 0 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ do
if ! which minikube 1>/dev/null; then
error "Cannot find minikube."
fi
if ! minikube status 1>/dev/null; then
error "Cannot contact minikube. Make sure it's running."
fi
eval $(minikube docker-env)
;;
esac
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ class BarrierTaskContext private[spark] (

override def isInterrupted(): Boolean = taskContext.isInterrupted()

override def isRunningLocally(): Boolean = taskContext.isRunningLocally()

override def addTaskCompletionListener(listener: TaskCompletionListener): this.type = {
taskContext.addTaskCompletionListener(listener)
this
Expand Down
7 changes: 0 additions & 7 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,6 @@ abstract class TaskContext extends Serializable {
*/
def isInterrupted(): Boolean

/**
* Returns true if the task is running locally in the driver program.
* @return false
*/
@deprecated("Local execution was removed, so this always returns false", "2.0.0")
def isRunningLocally(): Boolean

/**
* Adds a (Java friendly) listener to be executed on task completion.
* This will be called in all situations - success, failure, or cancellation. Adding a listener
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ private[spark] class TaskContextImpl(
@GuardedBy("this")
override def isCompleted(): Boolean = synchronized(completed)

override def isRunningLocally(): Boolean = false

override def isInterrupted(): Boolean = reasonIfKilled.isDefined

override def getLocalProperty(key: String): String = localProperties.getProperty(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,4 @@ class ShuffleWriteMetrics private[spark] () extends Serializable {
private[spark] def decRecordsWritten(v: Long): Unit = {
_recordsWritten.setValue(recordsWritten - v)
}

// Legacy methods for backward compatibility.
// TODO: remove these once we make this class private.
@deprecated("use bytesWritten instead", "2.0.0")
def shuffleBytesWritten: Long = bytesWritten
@deprecated("use writeTime instead", "2.0.0")
def shuffleWriteTime: Long = writeTime
@deprecated("use recordsWritten instead", "2.0.0")
def shuffleRecordsWritten: Long = recordsWritten

}
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,9 @@ package object config {
.internal()
.doc("The chunk size in bytes during writing out the bytes of ChunkedByteBuffer.")
.bytesConf(ByteUnit.BYTE)
.checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" +
" ChunkedByteBuffer should not larger than Int.MaxValue.")
.checkValue(_ <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
"The chunk size during writing out the bytes of" +
" ChunkedByteBuffer should not larger than Int.MaxValue - 15.")
.createWithDefault(64 * 1024 * 1024)

private[spark] val CHECKPOINT_COMPRESS =
Expand Down Expand Up @@ -488,17 +489,19 @@ package object config {
"otherwise specified. These buffers reduce the number of disk seeks and system calls " +
"made in creating intermediate shuffle files.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
.checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024,
s"The file buffer size must be greater than 0 and less than" +
s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
.doc("The file system for this buffer size after each partition " +
"is written in unsafe shuffle writer. In KiB unless otherwise specified.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
.checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024,
s"The buffer size must be greater than 0 and less than" +
s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
Expand Down Expand Up @@ -610,7 +613,7 @@ package object config {
.internal()
.doc("For testing only, controls the size of chunks when memory mapping a file")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Int.MaxValue)
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)

private[spark] val BARRIER_SYNC_TIMEOUT =
ConfigBuilder("spark.barrier.sync.timeout")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,33 +47,3 @@ case class AccumulableInfo private[spark] (
private[spark] val countFailedValues: Boolean,
// TODO: use this to identify internal task metrics instead of encoding it in the name
private[spark] val metadata: Option[String] = None)


/**
* A collection of deprecated constructors. This will be removed soon.
*/
object AccumulableInfo {

@deprecated("do not create AccumulableInfo", "2.0.0")
def apply(
id: Long,
name: String,
update: Option[String],
value: String,
internal: Boolean): AccumulableInfo = {
new AccumulableInfo(
id, Option(name), update, Option(value), internal, countFailedValues = false)
}

@deprecated("do not create AccumulableInfo", "2.0.0")
def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = {
new AccumulableInfo(
id, Option(name), update, Option(value), internal = false, countFailedValues = false)
}

@deprecated("do not create AccumulableInfo", "2.0.0")
def apply(id: Long, name: String, value: String): AccumulableInfo = {
new AccumulableInfo(
id, Option(name), None, Option(value), internal = false, countFailedValues = false)
}
}
15 changes: 7 additions & 8 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.scheduler
import java.io.{Externalizable, ObjectInput, ObjectOutput}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.roaringbitmap.RoaringBitmap

Expand Down Expand Up @@ -149,7 +148,7 @@ private[spark] class HighlyCompressedMapStatus private (
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long,
private var hugeBlockSizes: Map[Int, Byte])
private[this] var hugeBlockSizes: scala.collection.Map[Int, Byte])
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
Expand Down Expand Up @@ -189,13 +188,13 @@ private[spark] class HighlyCompressedMapStatus private (
emptyBlocks.readExternal(in)
avgSize = in.readLong()
val count = in.readInt()
val hugeBlockSizesArray = mutable.ArrayBuffer[Tuple2[Int, Byte]]()
val hugeBlockSizesImpl = mutable.Map.empty[Int, Byte]
(0 until count).foreach { _ =>
val block = in.readInt()
val size = in.readByte()
hugeBlockSizesArray += Tuple2(block, size)
hugeBlockSizesImpl(block) = size
}
hugeBlockSizes = hugeBlockSizesArray.toMap
hugeBlockSizes = hugeBlockSizesImpl
}
}

Expand All @@ -215,7 +214,7 @@ private[spark] object HighlyCompressedMapStatus {
val threshold = Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
.getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]()
val hugeBlockSizes = mutable.Map.empty[Int, Byte]
while (i < totalNumBlocks) {
val size = uncompressedSizes(i)
if (size > 0) {
Expand All @@ -226,7 +225,7 @@ private[spark] object HighlyCompressedMapStatus {
totalSmallBlockSize += size
numSmallBlocks += 1
} else {
hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i)))
hugeBlockSizes(i) = MapStatus.compressSize(uncompressedSizes(i))
}
} else {
emptyBlocks.add(i)
Expand All @@ -241,6 +240,6 @@ private[spark] object HighlyCompressedMapStatus {
emptyBlocks.trim()
emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
hugeBlockSizesArray.toMap)
hugeBlockSizes)
}
}

0 comments on commit 4ca71fc

Please sign in to comment.