Skip to content

Commit

Permalink
parallel tutorial and install docs
Browse files Browse the repository at this point in the history
  • Loading branch information
dfm committed Oct 19, 2017
1 parent 7476b47 commit 750a82d
Show file tree
Hide file tree
Showing 6 changed files with 582 additions and 79 deletions.
2 changes: 1 addition & 1 deletion docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ I18NSPHINXOPTS = $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .

default: dirhtml
TUTORIALS = tutorials/quickstart.rst tutorials/autocorr.rst tutorials/monitor.rst \
tutorials/line.rst
tutorials/line.rst tutorials/parallel.rst

tutorials/%.rst: _static/notebooks/%.ipynb tutorials/tutorial_rst.tpl
jupyter nbconvert --template tutorials/tutorial_rst --to rst $< --output-dir tutorials
Expand Down
240 changes: 221 additions & 19 deletions docs/_static/notebooks/parallel.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,40 @@
"import numpy as np\n",
"\n",
"def log_prob(theta):\n",
" time.sleep(np.random.uniform(0.005, 0.008))\n",
" t = time.time() + np.random.uniform(0.005, 0.008)\n",
" while True:\n",
" if time.time() >= t:\n",
" break\n",
" return -0.5*np.sum(theta**2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This "
"This probability function will randomly sleep for a fraction of a second every time it is called.\n",
"This is meant to emulate a more realistic situation where the model is computationally expensive to compute.\n",
"\n",
"To start, let's sample the usual (serial) way:"
]
},
{
"cell_type": "code",
"execution_count": 16,
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"100%|██████████| 100/100 [00:24<00:00, 4.14it/s]"
"100%|██████████| 100/100 [00:21<00:00, 4.71it/s]"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Serial took 24.8 seconds\n"
"Serial took 21.2 seconds\n"
]
},
{
Expand Down Expand Up @@ -130,27 +136,34 @@
"## Multiprocessing\n",
"\n",
"The simplest method of parallelizing emcee is to use the [multiprocessing module from the standard library](https://docs.python.org/3/library/multiprocessing.html).\n",
"To parallelize the above sampling, "
"To parallelize the above sampling, you could update the code as follows:"
]
},
{
"cell_type": "code",
"execution_count": 17,
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"100%|██████████| 100/100 [00:06<00:00, 15.71it/s]\n"
"100%|██████████| 100/100 [00:06<00:00, 16.45it/s]"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Multiprocessing took 6.4 seconds\n",
"3.9 times faster than serial\n"
"Multiprocessing took 6.2 seconds\n",
"3.4 times faster than serial\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\n"
]
}
],
Expand All @@ -167,6 +180,13 @@
" print(\"{0:.1f} times faster than serial\".format(serial_time / multi_time))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"I have 4 cores on the machine where this is being tested:"
]
},
{
"cell_type": "code",
"execution_count": 6,
Expand All @@ -186,6 +206,13 @@
"print(\"{0} CPUs\".format(ncpu))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We don't quite get the factor of 4 runtime decrease that you might expect because there is some overhead in the parallelization, but we're getting pretty close with this example and this will get even closer for more expensive models."
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand All @@ -194,14 +221,25 @@
"\n",
"Multiprocessing can only be used for distributing calculations across processors on one machine.\n",
"If you want to take advantage of a bigger cluster, you'll need to use MPI.\n",
"In that case, you need to execute the code using the `mpiexec` executable, so "
"In that case, you need to execute the code using the `mpiexec` executable, so this demo is slightly more convoluted.\n",
"For this example, we'll write the code to a file called `script.py` and then execute it using MPI, but when you really use the MPI pool, you'll probably just want to edit the script directly.\n",
"To run this example, you'll first need to install [the schwimmbad library](https://github.com/adrn/schwimmbad) because emcee no longer includes its own `MPIPool`."
]
},
{
"cell_type": "code",
"execution_count": 21,
"execution_count": 7,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"MPI took 9.0 seconds\n",
"2.4 times faster than serial\n"
]
}
],
"source": [
"with open(\"script.py\", \"w\") as f:\n",
" f.write(\"\"\"\n",
Expand All @@ -212,7 +250,10 @@
"from schwimmbad import MPIPool\n",
"\n",
"def log_prob(theta):\n",
" time.sleep(np.random.uniform(0.005, 0.008))\n",
" t = time.time() + np.random.uniform(0.005, 0.008)\n",
" while True:\n",
" if time.time() >= t:\n",
" break\n",
" return -0.5*np.sum(theta**2)\n",
"\n",
"with MPIPool() as pool:\n",
Expand All @@ -238,23 +279,184 @@
"print(\"{0:.1f} times faster than serial\".format(serial_time / mpi_time))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There is often more overhead introduced by MPI than multiprocessing so we get less of a gain this time.\n",
"That being said, MPI is much more flexible and it can be used to scale to huge systems."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Pickling, data transfer & arguments\n",
"\n",
"All parallel Python implementations work by spinning up multiple `python` processes with identical environments then and passing information between the processes using `pickle`.\n",
"This means that the probability function [must be picklable](https://docs.python.org/3/library/pickle.html#pickle-picklable).\n",
"\n",
"Some users might hit issues when they use `args` to pass data to their model.\n",
"These args must be pickled and passed every time the model is called.\n",
"This can be a problem if you have a large dataset, as you can see here:"
]
},
{
"cell_type": "code",
"execution_count": 22,
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"100%|██████████| 100/100 [00:21<00:00, 4.77it/s]"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"MPI took 9.9 seconds\n",
"2.5 times faster than serial\n"
"Serial took 21.2 seconds\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\n"
]
}
],
"source": [
"print(\"MPI took {0:.1f} seconds\".format(mpi_time))\n",
"print(\"{0:.1f} times faster than serial\".format(serial_time / mpi_time))"
"def log_prob_data(theta, data):\n",
" a = data[0] # Use the data somehow...\n",
" t = time.time() + np.random.uniform(0.005, 0.008)\n",
" while True:\n",
" if time.time() >= t:\n",
" break\n",
" return -0.5*np.sum(theta**2)\n",
"\n",
"data = np.random.randn(5000, 200)\n",
"\n",
"sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prob_data, args=(data,))\n",
"start = time.time()\n",
"sampler.run_mcmc(initial, nsteps, progress=True)\n",
"end = time.time()\n",
"serial_data_time = end - start\n",
"print(\"Serial took {0:.1f} seconds\".format(serial_data_time))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We basically get no change in performance when we include the `data` argument here.\n",
"Now let's try including this naively using multiprocessing:"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"100%|██████████| 100/100 [02:18<00:00, 1.40s/it]"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Multiprocessing took 138.7 seconds\n",
"0.2 times faster(?) than serial\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\n"
]
}
],
"source": [
"with Pool() as pool:\n",
" sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prob_data, pool=pool, args=(data,))\n",
" start = time.time()\n",
" sampler.run_mcmc(initial, nsteps, progress=True)\n",
" end = time.time()\n",
" multi_data_time = end - start\n",
" print(\"Multiprocessing took {0:.1f} seconds\".format(multi_data_time))\n",
" print(\"{0:.1f} times faster(?) than serial\".format(serial_data_time / multi_data_time))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Brutal.\n",
"\n",
"We can do better than that though.\n",
"It's a bit ugly, but if we just make `data` a global variable and use that variable within the model calculation, then we take no hit at all."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"100%|██████████| 100/100 [00:06<00:00, 16.22it/s]"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Multiprocessing took 6.2 seconds\n",
"3.4 times faster than serial\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\n"
]
}
],
"source": [
"def log_prob_data_global(theta):\n",
" a = data[0] # Use the data somehow...\n",
" t = time.time() + np.random.uniform(0.005, 0.008)\n",
" while True:\n",
" if time.time() >= t:\n",
" break\n",
" return -0.5*np.sum(theta**2)\n",
"\n",
"with Pool() as pool:\n",
" sampler = emcee.EnsembleSampler(nwalkers, ndim, log_prob_data_global, pool=pool)\n",
" start = time.time()\n",
" sampler.run_mcmc(initial, nsteps, progress=True)\n",
" end = time.time()\n",
" multi_data_global_time = end - start\n",
" print(\"Multiprocessing took {0:.1f} seconds\".format(multi_data_global_time))\n",
" print(\"{0:.1f} times faster than serial\".format(serial_data_time / multi_data_global_time))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"That's better!\n",
"This works because, in the global variable case, the dataset is only pickled and passed between processes once (when the pool is created) instead of once for every model evaluation."
]
},
{
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ If you run into any issues, please don't hesitate to `open an issue on GitHub

tutorials/quickstart
tutorials/line
tutorials/parallel
tutorials/autocorr
tutorials/monitor

Expand Down

0 comments on commit 750a82d

Please sign in to comment.