Skip to content

Conversation

@exyi
Copy link
Contributor

@exyi exyi commented Nov 6, 2021

This is just a rebase of this PR exyi#1, a first small step towards implementing Julia UDFs

@dfdx
Copy link
Owner

dfdx commented Nov 12, 2021

To continue our discussion, I see several possible options for implementing Julia workers.

Option 1. Julia + libraries

Many people look at Spark.jl as a way to run custom Julia code in a distributed environment. Currently it's kind of supported via RNN API, but the implementation is buggy, sometimes slow and produces cryptic error messages, so I don't really want to bring it to the UDFs. Also, using current implementation we cannot use 3rd party libraries unless they are already installed on the worker nodes. If we want to be really flexible about the supported operations, we should have a way to bring dependencies to executors. How can we do this?

  • We can put the responsibility of installing the dependencies onto users. For example, in AWS EMR a user can create a custom bootstrap action (see example) to instantiate a project on each worker before running the Spark application. However, if you have a permanent cluster and run multiple applications over a long period of time, recreating the cluster with new bootstrap actions is not an option.
  • Following the PySpark approach, we can pack the current environment and activate it before processing each batch of data. However, I'm not really sure Julia environments can be transferred to another host just like Python's virtualenv packed using venv-pack.
  • A reliable way to install the dependencies would be to freeze the versions in Manifest.toml and instantiate the project in each executor. Surely, it would be a pretty slow operation, so we want to do it only once for each Spark application. We could do it as a first pseudo-action before launching any actual tasks, but there's not guarantee workers won't collapse and be replaced with new ones where the dependencies are not yet installed.
  • My best shot in this direction is to launch a permanent Julia server that accepts connections from the JVM executors, installs the dependencies on the first call and then processes data without this overhead. I spent a couple of days trying to design such a server, but given a number of modes (local, standalone, YARN, Mesos, Kubernetes), number of settings (multiple executors on a worker, multiple threads on an executor) and different runtimes (e.g. AWS on-demand vs. spot instances), creating a reliable Julia server with predictable performance looks at best like 6 months of intensive work.

All in all, Apache Spark looks like a terrible environment for running such workflows, and even making a custom solution based e.g. on pure Kubernetes seems a way easier (see some of my musings on this topic). There's also a number of specially designed frameworks for different tasks (e.g. training huge ML models) which we can wrap.

Option 2. Julia only

If we drop the requirement to support custom libraries, we neither need to pack the environment, nor install the libraries during initialization. However, the Julia startup time is still pretty high (Julia 1.6 takes ~1.3 seconds on my machine), so launching it for every batch still looks like a huge overhead.

Also, without custom libraries it's unclear how much value we bring. Yeah, we will be able to use simple transformations like adding 2 fields in a dataframe or e.g. computing their sine, but definitely not things like parsing a custom string format or running MCMC.

Option 3. Julia-to-Java compiler

In case we really only need simple transformations, instead of create a Julia processes on worker nodes we can actually compile Julia functions to equivalent Java UDFs and use them directly. A huge advantage of this approach is that we get 100% performance and all the flexibility of the native Java/Scala implementation. The main disadvantage of course is that we lose all the value if Julia. Spark.jl then becomes a thin and convenient interface to the modern Spark, but not more than that.

Technically, creating a simple Julia-to-Java compiler doesn't seem too complicated. I already experimented with generating Java classes in runtime here, and generating Java code from a Julia function can be achieved using a tracer (e.g. via Ghost.jl) or directly analyzing the IR code (e.g. via IRTools.jl).

@aviks
Copy link
Collaborator

aviks commented Dec 6, 2021

I think option 1 is the only reasonable way forward. 2, as you say, makes this somewhat pointless, and 3 is a research project that will not be feasible for many years to come.

I think leaning into what Julia already provides is the best option going forward. This basically means using the manifest.toml as the mechanism to transfer and activate an environment. From what I can see, a manifest with a package server provides the same functionality as venv-pack -- basically give the same libraries/dependencies on your remote host as on you local. Am I missing something?

This does not solve the latency problem, but for batch/"production" jobs, using a startup script to instantiate/precompile is very reasonable.

For interactive jobs, the precompilation latency remains, but that's not too different from using the REPL, is it?

I suppose we'll need some way for the user to say "use this manifest" to run my job. Maybe model it similar to how pyspark specifies the environments?

@dfdx
Copy link
Owner

dfdx commented Dec 6, 2021

This does not solve the latency problem, but for batch/"production" jobs, using a startup script to instantiate/precompile is very reasonable.

The hard part is how to do it inside Spark executors. Spark doesn't provide any guarantees about executor environment, we only know that the process will run inside some directory, but this directory may be on a local disk, in YARN container, Kubernetes pod, etc. We also don't how long this directory will live and whether it's safe to store anything (e.g. downloaded packages) outside of this dir.

Now consider several options:

  1. Start a new Julia process and call Pkg.instantiate() on every chunk of data (effectively, on every call to this function). If the project is instantiated for the first time, the dependencies will be downloaded and precompiled. Otherwise, call to Pkg.instantiate() will only do the check. However, just starting Julia and doing this check takes 4s on my laptop, which is significantly longer than the processing time for many small tasks.
  2. Instantiate the project on all executors as the first task and don't run Pkg.instantiate() afterwards. Unfortunately, this breaks the very core assumption of Spark that any task can be safely recalculated. As a concrete issue, imagine that you lose a particular executor, which is typical e.g. for AWS spot instances, - Spark will assume that the instantiation task was successful and will not repeat it on a new executor.
  3. Start a local Julia server and accept connections from the JVM executor, instantiate at the server start. I actually tried this approach, but it turns out to hard as hell to maintain potentially multiple Julia servers on the same machine and set up reliable communication with the JVM.

Maybe, we can speed up (1) by writing a file-flag to indicate that the project is already instantiated in this executor, but all-in-all the absence of any guarantees about executor lifetime makes me really sad. We also should be very careful with DEPOT_PATH not to pollute anything outside of the executor directory, because we don't have a dedicated cleanup step in this case. In this context, simply running Julia on Kubernetes (e.g. via K8sClusterManager.jl) looks like a much more robust foundation for distributed computing in production. Thus I tend to postpone this issue until I see a really good use case for custom Julia code with libraries on a Spark cluster.

@exyi
Copy link
Contributor Author

exyi commented Sep 10, 2022

I'd still like to see Julia UDF, but it's not happening in this form, so I'll close this PR.

@exyi exyi closed this Sep 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants