Skip to content

Commit

Permalink
Unify train and predict pools to fix GPU OOM
Browse files Browse the repository at this point in the history
This fixes GPU out-of-memory problems that happened when we had two
different pools (for predict and train). When we did train then
predict sequentially (or viceversa) each pool wanted to have the whole
GPU so out-of-memory errors happened. This won't fix out-of-memory
errors when running parallel tasks on GPU (errors which also happened
before).

CPU deployments shouldn't be affected.

Fixes #87
Sem-Ver: bugfix
  • Loading branch information
ignacio authored and alvarolopez committed Mar 16, 2020
1 parent 4dfa03d commit 55a9e6d
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 47 deletions.
5 changes: 4 additions & 1 deletion deepaas/api/v2/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ async def post(self, request, wsk_args=None):
task = self.model_obj.predict(**args)
await task

ret = task.result()
ret = task.result()['output']

if isinstance(ret, model.v2.wrapper.ReturnedFile):
ret = open(ret.filename, 'rb')

accept = args.get("accept", "application/json")
if accept != "application/json":
Expand Down
14 changes: 4 additions & 10 deletions deepaas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,13 @@
"/debug" endpoint. Default is to not provide this information. This will not
provide logging information about the API itself.
"""),
cfg.IntOpt('predict-workers',
cfg.IntOpt('workers',
short='p',
default=1,
help="""
Specify the number of workers to spawn for prediction tasks. If using a CPU you
probably want to increase this number, if using a GPU probably you want to
leave it to 1. (defaults to 1)
"""),
cfg.IntOpt('train-workers',
default=1,
help="""
Specify the number of workers to spawn for training tasks. Unless you know what
you are doing you should leave this number to 1. (defaults to 1)
Specify the number of workers to spawn. If using a CPU you probably want to
increase this number, if using a GPU probably you want to leave it to 1.
(defaults to 1)
"""),
cfg.IntOpt('client-max-size',
default=0,
Expand Down
88 changes: 53 additions & 35 deletions deepaas/model/v2/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

import asyncio
import collections
import concurrent.futures
import contextlib
import datetime
import functools
import io
import multiprocessing
import multiprocessing.pool
import os
Expand Down Expand Up @@ -50,7 +50,7 @@
.. py:attribute:: filename
Complete file path to the temporary file in the filesyste,
Complete file path to the temporary file in the filesystem,
.. py:attribute:: content_type
Expand All @@ -61,8 +61,33 @@
Filename of the original file being uploaded.
"""

ReturnedFile = collections.namedtuple("ReturnedFile", ("name",
"filename",
"content_type",
"original_filename"))
"""Class to pass the files returned from predict in a pickable way
.. py:attribute:: name
Name of the argument where this file is being sent.
.. py:attribute:: filename
Complete file path to the temporary file in the filesystem,
.. py:attribute:: content_type
Content-type of the uploaded file
.. py:attribute:: original_filename
Filename of the original file being uploaded.
"""


# set defaults to None, mainly for compatibility (vkoz)
UploadedFile.__new__.__defaults__ = (None, None, None, None)
ReturnedFile.__new__.__defaults__ = (None, None, None, None)


class ModelWrapper(object):
Expand All @@ -75,7 +100,7 @@ class ModelWrapper(object):
:param name: Model name
:param model: Model object
:raises HTTPInternalServerError: in case that a model has defined
a reponse schema that is nod JSON schema valid (DRAFT 4)
a response schema that is not JSON schema valid (DRAFT 4)
"""
def __init__(self, name, model_obj, app):
self.name = name
Expand All @@ -84,11 +109,8 @@ def __init__(self, name, model_obj, app):

self._loop = asyncio.get_event_loop()

self._predict_workers = CONF.predict_workers
self._predict_executor = self._init_predict_executor()

self._train_workers = CONF.train_workers
self._train_executor = self._init_train_executor()
self._workers = CONF.workers
self._executor = self._init_executor()

self._setup_cleanup()

Expand Down Expand Up @@ -125,16 +147,10 @@ def _setup_cleanup(self):
self._app.on_cleanup.append(self._close_executors)

async def _close_executors(self, app):
self._train_executor.shutdown()
self._predict_executor.shutdown()

def _init_predict_executor(self):
n = self._predict_workers
executor = concurrent.futures.ThreadPoolExecutor(max_workers=n)
return executor
self._executor.shutdown()

def _init_train_executor(self):
n = self._train_workers
def _init_executor(self):
n = self._workers
executor = CancellablePool(max_workers=n)
return executor

Expand Down Expand Up @@ -168,7 +184,7 @@ def validate_response(self, response):
If the wrapped model has defined a ``response`` attribute we will
validate the response that
:param response: The reponse that will be validated.
:param response: The response that will be validated.
:raises exceptions.InternalServerError: in case the reponse cannot be
validated.
"""
Expand Down Expand Up @@ -213,18 +229,10 @@ def get_metadata(self):
}
return d

def _run_in_predict_pool(self, func, *args, **kwargs):
async def task(fn):
return await self._loop.run_in_executor(self._predict_executor, fn)

return self._loop.create_task(
task(functools.partial(func, *args, **kwargs))
)

def _run_in_train_pool(self, func, *args, **kwargs):
def _run_in_pool(self, func, *args, **kwargs):
fn = functools.partial(func, *args, **kwargs)
ret = self._loop.create_task(
self._train_executor.apply(fn)
self._executor.apply(fn)
)
return ret

Expand All @@ -243,17 +251,27 @@ async def warm(self):
LOG.debug("Cannot warm (initialize) model '%s'" % self.name)
return

run = self._loop.run_in_executor
executor = self._predict_executor
n = self._predict_workers
try:
n = self._workers
LOG.debug("Warming '%s' model with %s workers" % (self.name, n))
fs = [run(executor, func) for i in range(0, n)]
fs = [self._run_in_pool(func) for _ in range(0, n)]
await asyncio.gather(*fs)
LOG.debug("Model '%s' has been warmed" % self.name)
except NotImplementedError:
LOG.debug("Cannot warm (initialize) model '%s'" % self.name)

@staticmethod
def predict_wrap(predict_func, *args, **kwargs):
"""Wrapper function to allow returning files from predict
This wrapper exists because buffer objects are not pickable,
thus cannot be returned from the executor.
"""
ret = predict_func(*args, **kwargs)
if isinstance(ret, io.BufferedReader):
ret = ReturnedFile(filename=ret.name)

return ret

def predict(self, *args, **kwargs):
"""Perform a prediction on wrapped model's ``predict`` method.
Expand All @@ -280,8 +298,8 @@ def predict(self, *args, **kwargs):
# FIXME(aloga); cleanup of tmpfile here

with self._catch_error():
return self._run_in_predict_pool(
self.model_obj.predict, *args, **kwargs
return self._run_in_pool(
self.predict_wrap, self.model_obj.predict, *args, **kwargs
)

def train(self, *args, **kwargs):
Expand All @@ -296,7 +314,7 @@ def train(self, *args, **kwargs):
"""

with self._catch_error():
return self._run_in_train_pool(
return self._run_in_pool(
self.model_obj.train, *args, **kwargs
)

Expand Down
2 changes: 1 addition & 1 deletion deepaas/tests/test_v2_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async def test_dummy_model_with_wrapper(self, m_clean):
w = v2_wrapper.ModelWrapper("foo", v2_test.TestModel(), self.app)
task = w.predict()
await task
ret = task.result()
ret = task.result()['output']
self.assertDictEqual(
{'date': '2019-01-1',
'labels': [{'label': 'foo', 'probability': 1.0}]},
Expand Down
5 changes: 5 additions & 0 deletions releasenotes/notes/unify-train-predict-261e92c21d9f47d1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
Fix [#83](https://github.com/indigo-dc/DEEPaaS/issues/87) out out memory
errors due to the usage of two different executor pools.

0 comments on commit 55a9e6d

Please sign in to comment.