Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resilient example #75

Merged
merged 5 commits into from
Jun 13, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions resilience.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Resilience against hardware failures\n",
"\n",
"Scenario: We have a cluster that partially consists of preemptible ressources. That is, we'll have to deal with workers suddenly being shut down during computation. While demonstrated here with a `LocalCluster`, Dask's resilience against preempted ressources is most useful with, e.g., [Dask Kubernetes](https://kubernetes.dask.org/) or [Dask Jobqueue](https://jobqueue.dask.org).\n",
"\n",
"Relevant docs: <http://distributed.dask.org/en/latest/resilience.html#hardware-failures>"
willirath marked this conversation as resolved.
Show resolved Hide resolved
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Increase resilience\n",
"\n",
"Whenever a worker shuts down, the scheduler will increment the suspiciousness counter of _all_ tasks that were assigned (not necessarily computing) to the worker in question. Whenever the suspiciousness of a task exceeds a certain threshold (3 by default), the task will be considered broken. We want to compute many tasks on only a few workers with workers shutting down randomly. So we expect the suspiciousness of all tasks to grow rapidly. Let's increase the threshold:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"\n",
"dask.config.set({'distributed.scheduler.allowed-failures': 100});"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## All other imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client, LocalCluster\n",
"from dask import bag as db\n",
"import os\n",
"import random\n",
"from time import sleep"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## A cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster = LocalCluster(threads_per_worker=1, n_workers=4, memory_limit=400e6)\n",
"client = Client(cluster)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## A simple workload\n",
"\n",
"We'll multiply a range of numbers by two, add some sleep to simulate some real work, and then reduce the whole sequence of doubled numbers by summing them."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def multiply_by_two(x):\n",
" sleep(0.02)\n",
" return 2 * x"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"N = 400\n",
"\n",
"x = db.from_sequence(range(N), npartitions=N // 2)\n",
"\n",
"mults = x.map(multiply_by_two)\n",
"\n",
"summed = mults.sum()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Suddenly shutting down workers"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's mark two worker process id's as non-preemptible."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"all_current_workers = [w.pid for w in cluster.scheduler.workers.values()]\n",
"non_preemptible_workers = all_current_workers[:2]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def kill_a_worker():\n",
" preemptible_workers = [\n",
" w.pid for w in cluster.scheduler.workers.values()\n",
" if w.pid not in non_preemptible_workers]\n",
" if preemptible_workers:\n",
" os.kill(random.choice(preemptible_workers), 15)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Start the computation and keep shutting down workers while it's running"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"summed = client.compute(summed)\n",
"\n",
"while not summed.done():\n",
" kill_a_worker()\n",
" sleep(3.0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Check if results match"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(f\"`sum(range({N}))` on cluster: {summed.result()}\\t(should be {N * (N-1)})\")"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}