Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-3873-sql
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Jan 5, 2016
2 parents 7102295 + 1cdc42d commit ff2dbe1
Show file tree
Hide file tree
Showing 156 changed files with 671 additions and 1,568 deletions.
33 changes: 16 additions & 17 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ setMethod("registerTempTable",
setMethod("insertInto",
signature(x = "DataFrame", tableName = "character"),
function(x, tableName, overwrite = FALSE) {
callJMethod(x@sdf, "insertInto", tableName, overwrite)
jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
callJMethod(write, "insertInto", tableName)
})

#' Cache
Expand Down Expand Up @@ -1948,18 +1951,15 @@ setMethod("write.df",
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
# nolint start
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"')
}
# nolint end
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
callJMethod(df@sdf, "save", source, jmode, options)
write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "save", path)
})

#' @rdname write.df
Expand Down Expand Up @@ -2013,15 +2013,14 @@ setMethod("saveAsTable",
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
# nolint start
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"')
}
# nolint end
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options)

write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
callJMethod(write, "saveAsTable", tableName)
})

#' summary
Expand Down
10 changes: 5 additions & 5 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,12 @@ jsonFile <- function(sqlContext, path) {

# TODO: support schema
jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
.Deprecated("read.json")
rdd <- serializeToString(rdd)
if (is.null(schema)) {
sdf <- callJMethod(sqlContext, "jsonRDD", callJMethod(getJRDD(rdd), "rdd"), samplingRatio)
read <- callJMethod(sqlContext, "read")
# samplingRatio is deprecated
sdf <- callJMethod(read, "json", callJMethod(getJRDD(rdd), "rdd"))
dataFrame(sdf)
} else {
stop("not implemented")
Expand Down Expand Up @@ -289,10 +292,7 @@ read.parquet <- function(sqlContext, path) {
# TODO: Implement saveasParquetFile and write examples for both
parquetFile <- function(sqlContext, ...) {
.Deprecated("read.parquet")
# Allow the user to have a more flexible definiton of the text file path
paths <- lapply(list(...), function(x) suppressWarnings(normalizePath(x)))
sdf <- callJMethod(sqlContext, "parquetFile", paths)
dataFrame(sdf)
read.parquet(sqlContext, unlist(list(...)))
}

#' SQL Query
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ setMethod("cast",
setMethod("%in%",
signature(x = "Column"),
function(x, table) {
jc <- callJMethod(x@jc, "in", as.list(table))
jc <- callJMethod(x@jc, "isin", as.list(table))
return(column(jc))
})

Expand Down
9 changes: 9 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,12 @@ assignNewEnv <- function(data) {
splitString <- function(input) {
Filter(nzchar, unlist(strsplit(input, ",|\\s")))
}

convertToJSaveMode <- function(mode) {
allModes <- c("append", "overwrite", "error", "ignore")
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"') # nolint
}
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
jmode
}
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,12 @@ test_that("read/write json files", {
test_that("jsonRDD() on a RDD with json string", {
rdd <- parallelize(sc, mockLines)
expect_equal(count(rdd), 3)
df <- jsonRDD(sqlContext, rdd)
df <- suppressWarnings(jsonRDD(sqlContext, rdd))
expect_is(df, "DataFrame")
expect_equal(count(df), 3)

rdd2 <- flatMap(rdd, function(x) c(x, x))
df <- jsonRDD(sqlContext, rdd2)
df <- suppressWarnings(jsonRDD(sqlContext, rdd2))
expect_is(df, "DataFrame")
expect_equal(count(df), 6)
})
Expand Down
13 changes: 3 additions & 10 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -662,9 +662,7 @@ private[spark] object Utils extends Logging {

private[spark] def isRunningInYarnContainer(conf: SparkConf): Boolean = {
// These environment variables are set by YARN.
// For Hadoop 0.23.X, we check for YARN_LOCAL_DIRS (we use this below in getYarnLocalDirs())
// For Hadoop 2.X, we check for CONTAINER_ID.
conf.getenv("CONTAINER_ID") != null || conf.getenv("YARN_LOCAL_DIRS") != null
conf.getenv("CONTAINER_ID") != null
}

/**
Expand Down Expand Up @@ -740,17 +738,12 @@ private[spark] object Utils extends Logging {
logError(s"Failed to create local root dir in $root. Ignoring this directory.")
None
}
}.toArray
}
}

/** Get the Yarn approved local directories. */
private def getYarnLocalDirs(conf: SparkConf): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
// local dirs, so lets check both. We assume one of the 2 is set.
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(conf.getenv("LOCAL_DIRS"))
.getOrElse(""))
val localDirs = Option(conf.getenv("LOCAL_DIRS")).getOrElse("")

if (localDirs.isEmpty) {
throw new Exception("Yarn Local dirs can't be empty")
Expand Down
6 changes: 3 additions & 3 deletions docs/sparkr.md
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,12 @@ The following functions are masked by the SparkR package:
</table>

Since part of SparkR is modeled on the `dplyr` package, certain functions in SparkR share the same names with those in `dplyr`. Depending on the load order of the two packages, some functions from the package loaded first are masked by those in the package loaded after. In such case, prefix such calls with the package name, for instance, `SparkR::cume_dist(x)` or `dplyr::cume_dist(x)`.

You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-manual/R-devel/library/base/html/search.html)


# Migration Guide

## Upgrading From SparkR 1.6 to 1.7
## Upgrading From SparkR 1.5.x to 1.6

- Until Spark 1.6, the default mode for writes was `append`. It was changed in Spark 1.7 to `error` to match the Scala API.
- Before Spark 1.6, the default mode for writes was `append`. It was changed in Spark 1.6.0 to `error` to match the Scala API.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import java.nio.ByteBuffer
import java.util.Collections

import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.{SparkConf, SparkContext}


/*
Need to create following keyspace and column family in cassandra before running this example
Start CQL shell using ./bin/cqlsh and execute following commands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import java.util.Arrays
import java.util.SortedMap

import org.apache.cassandra.db.IColumn
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat
import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat
import org.apache.cassandra.thrift._
import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.hadoop.mapreduce.Job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.File

import scala.io.Source._

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
// scalastyle:off println
package org.apache.spark.examples

import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark._


object HBaseTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseTest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.examples

import java.util.Random

import breeze.linalg.{Vector, DenseVector}
import breeze.linalg.{DenseVector, Vector}

/**
* Logistic regression based classification.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Random
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet

import breeze.linalg.{Vector, DenseVector, squaredDistance}
import breeze.linalg.{squaredDistance, DenseVector, Vector}

import org.apache.spark.SparkContext._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.examples

import java.util.Random

import breeze.linalg.{Vector, DenseVector}
import breeze.linalg.{DenseVector, Vector}

/**
* Logistic regression based classification.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
// scalastyle:off println
package org.apache.spark.examples

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
* Usage: MultiBroadcastTest [slices] [numElem]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Random

import scala.math.exp

import breeze.linalg.{Vector, DenseVector}
import breeze.linalg.{DenseVector, Vector}
import org.apache.hadoop.conf.Configuration

import org.apache.spark._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// scalastyle:off println
package org.apache.spark.examples

import breeze.linalg.{Vector, DenseVector, squaredDistance}
import breeze.linalg.{squaredDistance, DenseVector, Vector}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Random

import scala.math.exp

import breeze.linalg.{Vector, DenseVector}
import breeze.linalg.{DenseVector, Vector}

import org.apache.spark._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
// scalastyle:off println
package org.apache.spark.examples

import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

/**
* Computes the PageRank of URLs from an input file. Input file should
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
// scalastyle:off println
package org.apache.spark.examples

import scala.util.Random
import scala.collection.mutable
import scala.util.Random

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import java.util.Random

import scala.math.exp

import breeze.linalg.{Vector, DenseVector}
import breeze.linalg.{DenseVector, Vector}
import org.apache.hadoop.conf.Configuration

import org.apache.spark._
import org.apache.spark.scheduler.InputFormatInfo
import org.apache.spark.storage.StorageLevel


/**
* Logistic regression based classification.
* This example uses Tachyon to persist rdds during computation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
package org.apache.spark.examples.graphx

import scala.collection.mutable

import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.PartitionStrategy._
import org.apache.spark.graphx.lib._
import org.apache.spark.storage.StorageLevel

/**
* Driver program for running graph algorithms.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
// scalastyle:off println
package org.apache.spark.examples.graphx

import java.io.{FileOutputStream, PrintWriter}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.graphx.{GraphXUtils, PartitionStrategy}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.graphx.util.GraphGenerators
import java.io.{PrintWriter, FileOutputStream}

/**
* The SynthBenchmark application can be used to run various GraphX algorithms on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
// scalastyle:off println
package org.apache.spark.examples.ml

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkConf, SparkContext}
// $example on$
import org.apache.spark.ml.regression.AFTSurvivalRegression
import org.apache.spark.mllib.linalg.Vectors
// $example off$
import org.apache.spark.sql.SQLContext

/**
* An example for AFTSurvivalRegression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
// scalastyle:off println
package org.apache.spark.examples.ml

import org.apache.spark.{SparkConf, SparkContext}
// $example on$
import org.apache.spark.ml.feature.Binarizer
// $example off$
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object BinarizerExample {
def main(args: Array[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
// scalastyle:off println
package org.apache.spark.examples.ml

import org.apache.spark.{SparkConf, SparkContext}
// $example on$
import org.apache.spark.ml.feature.Bucketizer
// $example off$
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object BucketizerExample {
def main(args: Array[String]): Unit = {
Expand Down

0 comments on commit ff2dbe1

Please sign in to comment.