Skip to content

Commit

Permalink
Merge pull request apache#22 from shivaram/master
Browse files Browse the repository at this point in the history
Add support for passing Spark environment vars
  • Loading branch information
concretevitamin committed Feb 16, 2014
2 parents 10228fb + e462448 commit 9a1db44
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 9 deletions.
15 changes: 15 additions & 0 deletions README.md
Expand Up @@ -42,6 +42,21 @@ Spark master you can launch R and then run
library(SparkR)
sc <- sparkR.init(master="local")

To increase the memory used by the driver you can export the SPARK\_MEM
environment variable. For example to use 1g, you can run

SPARK_MEM=1g ./sparkR

In a cluster settting to set the amount of memory used by the executors you can
pass the variable `spark.executor.memory` to the SparkContext constructor.

library(SparkR)
sc <- sparkR.init(master="spark://<master>:7077",
sparkEnvir=list(spark.executor.memory="1g"))


## Examples, Unit tests

SparkR comes with several sample programs in the `examples` directory.
To run one of them, use `./sparkR <filename> <args>`. For example:

Expand Down
31 changes: 23 additions & 8 deletions pkg/R/sparkR.R
Expand Up @@ -5,9 +5,11 @@ assemblyJarName <- "sparkr-assembly-0.1.jar"
sparkR.onLoad <- function(libname, pkgname) {
assemblyJarPath <- paste(libname, "/SparkR/", assemblyJarName, sep="")
packageStartupMessage("[SparkR] Initializing with classpath ", assemblyJarPath, "\n")

sparkMem <- Sys.getenv("SPARK_MEM", "512m")
.sparkREnv$libname <- libname
.sparkREnv$assemblyJarPath <- assemblyJarPath
.jinit(classpath=assemblyJarPath)
.jinit(classpath=assemblyJarPath, parameters=paste("-Xmx", sparkMem, sep=""))
}

#' Initialize a new Spark Context.
Expand All @@ -17,16 +19,20 @@ sparkR.onLoad <- function(libname, pkgname) {
#' @param master The Spark master URL.
#' @param appName Application name to register with cluster manager
#' @param sparkHome Spark Home directory
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
#' @export
#' @examples
#'\dontrun{
#' sparkR.init("local[2]", "SparkR", "/home/spark")
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
#' list(spark.executor.memory="1g"))
#'}

sparkR.init <- function(
master = "local",
appName = "SparkR",
sparkHome = Sys.getenv("SPARK_HOME")) {
sparkHome = Sys.getenv("SPARK_HOME"),
sparkEnvir = list() ) {

if (exists(".sparkRjsc", envir=.sparkREnv)) {
return(get(".sparkRjsc", envir=.sparkREnv))
Expand All @@ -36,13 +42,22 @@ sparkR.init <- function(
sparkHome <- normalizePath(sparkHome)
}

# TODO: support other constructors
hm <- .jnew("java/util/HashMap")
for ( varname in names(sparkEnvir)) {
hm$put(varname, sparkEnvir[[varname]])
}

assign(
".sparkRjsc",
.jnew("org/apache/spark/api/java/JavaSparkContext", master, appName,
as.character(sparkHome),
as.character(.sparkREnv$assemblyJarPath)),
envir=.sparkREnv
J("edu.berkeley.cs.amplab.sparkr.RRDD",
"createSparkContext",
master,
appName,
as.character(sparkHome),
.jarray(as.character(.sparkREnv$assemblyJarPath),
"java/lang/String"),
hm),
envir=.sparkREnv
)

get(".sparkRjsc", envir=.sparkREnv)
Expand Down
20 changes: 19 additions & 1 deletion pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala
@@ -1,12 +1,13 @@
package edu.berkeley.cs.amplab.sparkr

import java.io._
import java.util.{Map => JMap}

import scala.collection.JavaConversions._
import scala.io.Source
import scala.reflect.ClassTag

import org.apache.spark.{SparkEnv, Partition, SparkException, TaskContext}
import org.apache.spark.{SparkEnv, Partition, SparkException, TaskContext, SparkConf}
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD, JavaPairRDD}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -156,6 +157,23 @@ class RRDD[T: ClassTag](

object RRDD {

def createSparkContext(
master: String,
appName: String,
sparkHome: String,
jars: Array[String],
vars: JMap[Object, Object]): JavaSparkContext = {

val sparkConf = new SparkConf().setMaster(master)
.setAppName(appName)
.setSparkHome(sparkHome)
.setJars(jars)
for ( (name, value) <- vars) {
sparkConf.set(name.asInstanceOf[String], value.asInstanceOf[String])
}
new JavaSparkContext(sparkConf)
}

/**
* Create an RRDD given a sequence of byte arrays. Used to create RRDD when `parallelize` is
* called from R.
Expand Down

0 comments on commit 9a1db44

Please sign in to comment.