Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

500 Error on MacOS #1122

Closed
MikhailKravets opened this issue Apr 30, 2023 · 4 comments · Fixed by #1169
Closed

500 Error on MacOS #1122

MikhailKravets opened this issue Apr 30, 2023 · 4 comments · Fixed by #1169

Comments

@MikhailKravets
Copy link

I'm running MLServer on MacOS (arm). I was following Serving XGBoost models example. I receive a 500 error when requesting a model with url http://localhost:8080/v2/models/mushroom-xgboost/versions/v0.1.0/infer.

MLServer throws an error

...
  File "/opt/homebrew/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/queues.py", line 126, in qsize
    return self._maxsize - self._sem._semlock._get_value()
NotImplementedError

Developers of queues.py left a comment that this is broken on MacOS 😬

    def qsize(self):
        # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
        return self._maxsize - self._sem._semlock._get_value()

Is it possible to get around this when running models with MLServer?
Thanks!

@MikhailKravets
Copy link
Author

I may suggest a solution based on vterron/lemon that uses synchronized counter.

In mlserver/parallel/worker.py

# Few imports added / changed
from multiprocessing import Process, Value, get_context
from multiprocessing.queues import Queue


# New classes
class SharedCounter:

    def __init__(self, n=0):
        self.count = Value('i', n)

    def increment(self, n=1):
        """ Increment the counter by n (default = 1) """
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        """ Return the value of the counter """
        return self.count.value


class SharedCounterQueue(Queue):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs, ctx=get_context())
        self._size = SharedCounter(0)

    def put(self, *args, **kwargs):
        self._size.increment(1)
        super().put(*args, **kwargs)

    def get(self, *args, **kwargs):
        if hasattr(self, "_size"):
            self._size.increment(-1)
        return super().get(*args, **kwargs)

    def qsize(self):
        """ Reliable implementation of multiprocessing.Queue.qsize() """
        return self._size.value

    def empty(self):
        """ Reliable implementation of multiprocessing.Queue.empty() """
        return not self.qsize()


# The actual update in Worker constructor
class Worker(Process):
    def __init__(
        self, settings: Settings, responses: Queue, env: Optional[Environment] = None
    ):
        super().__init__()
        self._settings = settings
        self._responses = responses

        if sys.platform == "darwin":
            self._requests = SharedCounterQueue()
            self._model_updates = SharedCounterQueue()
        else:
            self._requests: Queue[ModelRequestMessage] = Queue()
            self._model_updates: Queue[ModelUpdateMessage] = Queue()
        self._env = env

        self.__executor = None

I'm sure this code is far from production-ready but allows us to test mlserver running models on MacOS.

@JustasUrbonas
Copy link

Same issue here. Running mlserver inside a Docker container works.

@adriangonz
Copy link
Contributor

Hey @MikhailKravets @JustasUrbonas ,

Thanks for raising this one.

From this SO thread, I think the best solution may be to just use the Queue returned by the Manager() object? That one seems to have a qsize() implementation working in macOS. The vterron/lemon implementation is (unfortunately) GPL, so we can't use that one.

@MikhailKravets
Copy link
Author

MikhailKravets commented May 2, 2023

Hi @adriangonz,

Unfortunately, this approach will not work. multiprocessing.Manager() creates SyncManager. So, when you run

manager = multiprocessing.Manager()
queue = manager.Queue()

The queue is Proxy Object to a real Queue. This object doesn't have all attributes of the original Queue which leads to further errors, for example, coro_run method on line 137 of worker.py uses the _reader Connection which isn't present in Proxy.

Anyway, I noticed that qsize() method of the Queue is used for some kind of monitoring (?). So I've slightly updated _workers_queue_monitor method in mlserver/parallel/dispatcher.py:Dispatcher line 113

  def _workers_queue_monitor(self, worker: Worker, worker_pid: int):
      """Get metrics from every worker request queue"""

      if sys.platform == "darwin":
          logger.warning(
              "Worker metrics are not available on MacOS. "
              "qsize method from multiprocessing:queues.py:126 Raises NotImplementedError "
              "on Mac OSX because of broken sem_getvalue()"
          )
          return
      queue_size = worker._requests.qsize()

      self.parallel_request_queue_size.labels(workerpid=str(worker_pid)).observe(
          float(queue_size)
      )

I can't say what it breaks so far but it makes model endpoints work 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants