Skip to content

Commit

Permalink
[SPARK-16367][PYSPARK] Add wheelhouse support
Browse files Browse the repository at this point in the history
- Merge of apache#13599 ("virtualenv in pyspark", Bug SPARK-13587)
- and apache#5408 (wheel package support for Pyspark", bug SPARK-6764)
- Documentation updated
- only Standalone and YARN supported. Mesos not supported
- only tested with virtualenv/pip. Conda not tested
   - client deployment + pip install w/ index: ok (1 min 30 exec)
   - client deployment + wheelhouse w/o index: ko
     (cffi refuse the builded wheel)

Signed-off-by: Gaetan Semet <gaetan@xeberon.net>
  • Loading branch information
gsemet committed Oct 5, 2016
1 parent 7e3063e commit cf977e4
Show file tree
Hide file tree
Showing 8 changed files with 687 additions and 62 deletions.
Expand Up @@ -18,14 +18,18 @@
package org.apache.spark.api.python

import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter}
import java.io.{File, FileInputStream, FileOutputStream, IOException}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
import java.nio.charset.StandardCharsets
import java.nio.file.{Paths, Files}
import java.util.Arrays
import java.util.concurrent.atomic.AtomicInteger
import java.util.zip.{ZipEntry, ZipInputStream}

import scala.collection.mutable
import scala.collection.JavaConverters._

import org.apache.commons.io.IOUtils
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.util.{RedirectThread, Utils}
Expand All @@ -50,12 +54,32 @@ private[spark] class PythonWorkerFactory(pythonExec: String,
val daemonWorkers = new mutable.WeakHashMap[Socket, Int]()
val idleWorkers = new mutable.Queue[Socket]()
var lastActivity = 0L
val sparkFiles = conf.getOption("spark.files")
val virtualEnvEnabled = conf.getBoolean("spark.pyspark.virtualenv.enabled", false)
val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
val virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "")
val virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "virtualenv")
val virtualEnvSystemSitePackages = conf.getBoolean(
"spark.pyspark.virtualenv.system_site_packages", false)
val virtualWheelhouse = conf.get("spark.pyspark.virtualenv.wheelhouse", "wheelhouse.zip")
// virtualRequirements is empty string by default
val virtualRequirements = conf.get("spark.pyspark.virtualenv.requirements", "")
val virtualIndexUrl = conf.get("spark.pyspark.virtualenv.index_url", null)
val virtualTrustedHost = conf.get("spark.pyspark.virtualenv.trusted_host", null)
val virtualInstallPackage = conf.get("spark.pyspark.virtualenv.install_package", null)
val upgradePip = conf.getBoolean("spark.pyspark.virtualenv.upgrade_pip", false)
val virtualUseIndex = conf.getBoolean("spark.pyspark.virtualenv.use_index", true)
var virtualEnvName: String = _
var virtualPythonExec: String = _

// search for "wheelhouse.zip" to trigger unzipping and installation of wheelhouse
// also search for "requirements.txt if provided"
for (filename <- sparkFiles.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten) {
logDebug("Looking inside" + filename)
val file = new File(filename)
val prefixes = Iterator.iterate(file)(_.getParentFile).takeWhile(_ != null).toList.reverse
logDebug("=> prefixes" + prefixes)
}

new MonitorThread().start()

var simpleWorkers = new mutable.WeakHashMap[Socket, Process]()
Expand All @@ -65,7 +89,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String,
envVars.getOrElse("PYTHONPATH", ""),
sys.env.getOrElse("PYTHONPATH", ""))

if (conf.getBoolean("spark.pyspark.virtualenv.enabled", false)) {
if (virtualEnvEnabled) {
setupVirtualEnv()
}

Expand All @@ -82,15 +106,73 @@ private[spark] class PythonWorkerFactory(pythonExec: String,
}
}


def unzipWheelhouse(zipFile: String, outputFolder: String): Unit = {
val buffer = new Array[Byte](1024)
try {
// output directory
val folder = new File(outputFolder);
if (!folder.exists()) {
folder.mkdir();
}

// zip file content
val zis: ZipInputStream = new ZipInputStream(new FileInputStream(zipFile));
// get the zipped file list entry
var ze: ZipEntry = zis.getNextEntry();

while (ze != null) {
if (!ze.isDirectory()) {
val fileName = ze.getName();
val newFile = new File(outputFolder + File.separator + fileName);
logDebug("Unzipping file " + newFile.getAbsoluteFile());

// create folders
new File(newFile.getParent()).mkdirs();
val fos = new FileOutputStream(newFile);
var len: Int = zis.read(buffer);

while (len > 0) {
fos.write(buffer, 0, len)
len = zis.read(buffer)
}
fos.close()
}
ze = zis.getNextEntry()
}
zis.closeEntry()
zis.close()
} catch {
case e: IOException => logError("exception caught: " + e.getMessage)
}
}

/**
* Create virtualenv using native virtualenv or conda
*
* Native Virtualenv:
* - Execute command: virtualenv -p pythonExec --no-site-packages virtualenvName
* - Execute command: python -m pip --cache-dir cache-dir install -r requirement_file
* - Install virtualenv:
* virtualenv -p pythonExec [--system-site-packages] virtualenvName
* - if wheelhouse specified:
* - unzip wheelhouse
* - upgrade pip if set by conf (default: no)
* - install using pip:
*
* pip install -r requirement_file.txt \
* --find-links=wheelhouse \
* [--no-index] \
* [--index-url http://pypi.mirror/simple] [--trusted-host pypi.mirror] \
* [package.whl]
*
* else, if no wheelhouse is set:
*
* pip install -r requirement_file.txt \
* [--no-index] \
* [--index-url http://pypi.mirror/simple] [--trusted-host pypi.mirror] \
* [package.whl]
*
* Conda
* - Execute command: conda create --name virtualenvName --file requirement_file -y
* - Execute command: conda create --name virtualenvName --file requirement_file.txt -y
*
*/
def setupVirtualEnv(): Unit = {
Expand All @@ -100,41 +182,114 @@ private[spark] class PythonWorkerFactory(pythonExec: String,
// fetched from FileServer
val pyspark_requirements =
if (Utils.isLocalMaster(conf)) {
conf.get("spark.pyspark.virtualenv.requirements")
virtualRequirements
} else {
conf.get("spark.pyspark.virtualenv.requirements").split("/").last
virtualRequirements.split("/").last
}

logDebug("wheelhouse: " + virtualWheelhouse)
if (virtualWheelhouse != null &&
!virtualWheelhouse.isEmpty &&
Files.exists(Paths.get(virtualWheelhouse))) {
logDebug("Unziping wheelhouse archive " + virtualWheelhouse)
unzipWheelhouse(virtualWheelhouse, "wheelhouse")
}

val createEnvCommand =
if (virtualEnvType == "native") {
Arrays.asList(virtualEnvPath,
"-p", pythonExec,
"--no-site-packages", virtualEnvName)
if (virtualEnvSystemSitePackages) {
Arrays.asList(virtualEnvPath, "-p", pythonExec, "--system-site-packages", virtualEnvName)
}
else {
Arrays.asList(virtualEnvPath, "-p", pythonExec, virtualEnvName)
}
} else {
Arrays.asList(virtualEnvPath,
"create", "--prefix", System.getProperty("user.dir") + "/" + virtualEnvName,
"--file", pyspark_requirements, "-y")
// Conda creates everything and install the packages
var basePipArgs = mutable.ListBuffer[String]()
basePipArgs += (virtualEnvPath,
"create",
"--prefix",
System.getProperty("user.dir") + "/" + virtualEnvName)
if (pyspark_requirements != null && !pyspark_requirements.isEmpty) {
basePipArgs += ("--file", pyspark_requirements)
}
basePipArgs += ("-y")
basePipArgs.toList.asJava
}
execCommand(createEnvCommand)
// virtualenv will be created in the working directory of Executor.
virtualPythonExec = virtualEnvName + "/bin/python"

// virtualenv will be created in the working directory of Executor.
if (virtualEnvType == "native") {
execCommand(Arrays.asList(virtualPythonExec, "-m", "pip",
"--cache-dir", System.getProperty("user.home"),
"install", "-r", pyspark_requirements))
var virtualenvPipExec = virtualEnvName + "/bin/pip"
var pipUpgradeArgs = mutable.ListBuffer[String]()
if (upgradePip){
pipUpgradeArgs += (virtualenvPipExec, "install", "--upgrade", "pip")
}
var basePipArgs = mutable.ListBuffer[String]()
basePipArgs += (virtualenvPipExec, "install")
if (pyspark_requirements != null && !pyspark_requirements.isEmpty) {
basePipArgs += ("-r", pyspark_requirements)
}
if (virtualWheelhouse != null &&
!virtualWheelhouse.isEmpty &&
Files.exists(Paths.get(virtualWheelhouse))) {
basePipArgs += ("--find-links=wheelhouse")
pipUpgradeArgs += ("--find-links=wheelhouse")
}
if (virtualIndexUrl != null && !virtualIndexUrl.isEmpty) {
basePipArgs += ("--index-url", virtualIndexUrl)
pipUpgradeArgs += ("--index-url", virtualIndexUrl)
} else if (! virtualUseIndex){
basePipArgs += ("--no-index")
pipUpgradeArgs += ("--no-index")
}
if (virtualTrustedHost != null && !virtualTrustedHost.isEmpty) {
basePipArgs += ("--trusted-host", virtualTrustedHost)
pipUpgradeArgs += ("--trusted-host", virtualTrustedHost)
}
if (upgradePip){
// upgrade pip in the virtualenv
execCommand(pipUpgradeArgs.toList.asJava)
}
if (virtualInstallPackage != null && !virtualInstallPackage.isEmpty) {
basePipArgs += (virtualInstallPackage)
}
execCommand(basePipArgs.toList.asJava)
}
// do not execute a second command line in "conda" mode
}

def execCommand(commands: java.util.List[String]): Unit = {
logDebug("Running command:" + commands.asScala.mkString(" "))
val pb = new ProcessBuilder(commands).inheritIO()
logDebug("Running command: " + commands.asScala.mkString(" "))

val pb = new ProcessBuilder(commands)
pb.environment().putAll(envVars.asJava)
pb.environment().putAll(System.getenv())
pb.environment().put("HOME", System.getProperty("user.home"))

val proc = pb.start()

val exitCode = proc.waitFor()
if (exitCode != 0) {
throw new RuntimeException("Fail to run command: " + commands.asScala.mkString(" "))
val errString = try {
val err = Option(proc.getErrorStream())
err.map(IOUtils.toString)
} catch {
case io: IOException => None
}

val outString = try {
val out = Option(proc.getInputStream())
out.map(IOUtils.toString)
} catch {
case io: IOException => None
}

throw new RuntimeException("Fail to run command: " + commands.asScala.mkString(" ") +
"\nOutput: " + outString +
"\nStderr: " + errString
)
}
}

Expand Down Expand Up @@ -183,8 +338,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String,
// Create and start the worker
val realPythonExec = if (virtualEnvEnabled) virtualPythonExec else pythonExec
logDebug(s"Starting worker with pythonExec: ${realPythonExec}")
val pb = new ProcessBuilder(Arrays.asList(realPythonExec,
"-m", "pyspark.worker"))
val pb = new ProcessBuilder(Arrays.asList(realPythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
Expand Down
Expand Up @@ -505,8 +505,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| dependency conflicts.
| --repositories Comma-separated list of additional remote repositories to
| search for the maven coordinates given with --packages.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
| on the PYTHONPATH for Python apps.
| --py-files PY_FILES Comma-separated list of .zip, .egg, .whl or .py files to
| place on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor.
|
Expand Down
15 changes: 12 additions & 3 deletions docs/cluster-overview.md
Expand Up @@ -87,9 +87,18 @@ The following table summarizes terms you'll see used to refer to cluster concept
<tr>
<td>Application jar</td>
<td>
A jar containing the user's Spark application. In some cases users will want to create
an "uber jar" containing their application along with its dependencies. The user's jar
should never include Hadoop or Spark libraries, however, these will be added at runtime.
A jar containing the user's Spark application (for Java and Scala driver). In some cases
users will want to create an "uber jar" containing their application along with its
dependencies. The user's jar should never include Hadoop or Spark libraries, however, these
will be added at runtime.
</td>
</tr>
<tr>
<td>Application Wheelhouse</td>
<td>
An archive containing precompiled wheels of the user's PySpark application and dependencies
(for Python driver). The user's wheelhouse should not include jars, only Python Wheel files
for one or more architectures.
</td>
</tr>
<tr>
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Expand Up @@ -392,7 +392,7 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.submit.pyFiles</code></td>
<td></td>
<td>
Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.
Comma-separated list of .zip, .egg, .whl or .py files to place on the PYTHONPATH for Python apps.
</td>
</tr>
<tr>
Expand Down
16 changes: 8 additions & 8 deletions docs/programming-guide.md
Expand Up @@ -24,7 +24,7 @@ along with if you launch Spark's interactive shell -- either `bin/spark-shell` f

<div data-lang="scala" markdown="1">

Spark {{site.SPARK_VERSION}} is built and distributed to work with Scala {{site.SCALA_BINARY_VERSION}}
Spark {{site.SPARK_VERSION}} is built and distributed to work with Scala {{site.SCALA_BINARY_VERSION}}
by default. (Spark can be built to work with other versions of Scala, too.) To write
applications in Scala, you will need to use a compatible Scala version (e.g. {{site.SCALA_BINARY_VERSION}}.X).

Expand Down Expand Up @@ -211,7 +211,7 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes,

In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called `sc`. Making your own SparkContext will not work. You can set which master the
context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files
context connects to using the `--master` argument, and you can add Python .zip, .egg, .whl or .py files
to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType)
Expand Down Expand Up @@ -240,13 +240,13 @@ use IPython, set the `PYSPARK_DRIVER_PYTHON` variable to `ipython` when running
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
{% endhighlight %}

To use the Jupyter notebook (previously known as the IPython notebook),
To use the Jupyter notebook (previously known as the IPython notebook),

{% highlight bash %}
$ PYSPARK_DRIVER_PYTHON=jupyter ./bin/pyspark
{% endhighlight %}

You can customize the `ipython` or `jupyter` commands by setting `PYSPARK_DRIVER_PYTHON_OPTS`.
You can customize the `ipython` or `jupyter` commands by setting `PYSPARK_DRIVER_PYTHON_OPTS`.

After the Jupyter Notebook server is launched, you can create a new "Python 2" notebook from
the "Files" tab. Inside the notebook, you can input the command `%pylab inline` as part of
Expand Down Expand Up @@ -812,7 +812,7 @@ The variables within the closure sent to each executor are now copies and thus,

In local mode, in some circumstances the `foreach` function will actually execute within the same JVM as the driver and will reference the same original **counter**, and may actually update it.

To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.

In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.

Expand Down Expand Up @@ -1231,8 +1231,8 @@ storage levels is:
</tr>
</table>

**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library,
so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`,
**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library,
so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`,
`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, and `DISK_ONLY_2`.*

Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it.
Expand Down Expand Up @@ -1374,7 +1374,7 @@ res2: Long = 10

While this code used the built-in support for accumulators of type Long, programmers can also
create their own types by subclassing [AccumulatorV2](api/scala/index.html#org.apache.spark.AccumulatorV2).
The AccumulatorV2 abstract class has several methods which need to override:
The AccumulatorV2 abstract class has several methods which need to override:
`reset` for resetting the accumulator to zero, and `add` for add anothor value into the accumulator, `merge` for merging another same-type accumulator into this one. Other methods need to override can refer to scala API document. For example, supposing we had a `MyVector` class
representing mathematical vectors, we could write:

Expand Down

0 comments on commit cf977e4

Please sign in to comment.