New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python API hangs when run twice in distributed setting #1789

Closed
SfinxCZ opened this Issue Oct 26, 2018 · 4 comments

Comments

Projects
None yet
3 participants
@SfinxCZ
Contributor

SfinxCZ commented Oct 26, 2018

I have a problem with python API in distributed setting. When I run training twice, the second run hangs. I've attached python test and docker image to reproduce the behavior.

Environment info

Operating System: Docker container based on python:3.6-slim image (running on MacOS)

CPU/GPU model: CPU

C++/Python/R version: Python version

Example

tests/python_package_test/test_distributed_sklearn.py

from multiprocessing import Pipe, Process
from unittest import TestCase
import random

from sklearn.datasets import make_blobs
import numpy as np
import numpy.testing as npt


def _fit_local(q):
    import lightgbm
    X, y, weight, params = q.recv()
    classifier = lightgbm.LGBMClassifier(**params)
    classifier.fit(X, y, sample_weight=weight)
    classifier.booster_.free_network()
    q.send(classifier)
    q.close()


def fit_model(X, y, weight=None, time_out=120):
    parent_conn_a, child_conn_a = Pipe()
    parent_conn_b, child_conn_b = Pipe()
    process_a = Process(target=_fit_local, args=(child_conn_a,))
    process_b = Process(target=_fit_local, args=(child_conn_b,))
    idx_a = np.random.rand(X.shape[0]) > 0.5
    idx_b = ~idx_a
    X_a, y_a = X[idx_a, :], y[idx_a]
    X_b, y_b = X[idx_b, :], y[idx_b]
    if weight:
        w_a, w_b = weight[idx_a], weight[idx_b]
    else:
        w_a, w_b = None, None
    params_a, params_b = _build_params(time_out)
    process_a.start()
    process_b.start()
    parent_conn_a.send((X_a, y_a, w_a, params_a))
    parent_conn_b.send((X_b, y_b, w_b, params_b))
    model_a = parent_conn_a.recv()
    model_b = parent_conn_b.recv()
    process_a.join(timeout=120)
    process_b.join(timeout=120)

    # import lightgbm
    # model = lightgbm.LGBMClassifier()
    # model.fit(X, y, sample_weight=weight)
    return model_a, model_b


def _build_params(time_out):
    start_port = random.randint(0, 10_000) + 12400
    machines = f"127.0.0.1:{start_port},127.0.0.1:{start_port+1}"
    params_a = {
        "machines": machines,
        "local_listen_port": start_port,
        "time_out": time_out,
        "num_machines": 2,
        "verbose": 100,
        "tree_learner": "data"
    }
    params_b = {
        "machines": machines,
        "local_listen_port": start_port + 1,
        "time_out": time_out,
        "num_machines": 2,
        "verbose": 100,
        "tree_learner": "data"
    }
    return params_a, params_b


def assert_array_almost_equal(x, y, eps=1e-8):
    npt.assert_equal(x.shape, y.shape)
    assert np.sum((x != y)) / (np.product(x.shape)) <= eps


class TestDistributedSklearn(TestCase):

    def test_run_twice(self):
        X_1, y_1 = make_blobs(n_samples=100, centers=2)
        X_2, y_2 = make_blobs(n_samples=100, centers=2)

        model_a, model_b = fit_model(X_1, y_1, time_out=20)
        self.assertIsNotNone(model_a)
        self.assertIsNotNone(model_b)
        del model_a
        del model_b

        model_a, model_b = fit_model(X_2, y_2, time_out=20)
        self.assertIsNotNone(model_a)
        self.assertIsNotNone(model_b)
        del model_a
        del model_b

Dockerfile:

FROM python:3.6-slim

RUN apt-get update && \
    apt-get install -y --no-install-recommends \
        cmake \
        libc6-dev \
        make \
        gcc \
        g++ \
        libssl-dev \
        automake \
        libtool \
        net-tools \
        && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

ADD . /app

RUN mkdir /app/build

WORKDIR /app/build

WORKDIR /app/python-package
RUN python setup.py bdist_wheel && pip install dist/*
RUN python -m pip install dask distributed pytest pytest-xdist

WORKDIR /app/tests/python_package_test

ENTRYPOINT [ "/bin/bash", "-c", "pytest ${*}", "--" ]

Steps to reproduce

  1. docker build -t lightgbm-test .
  2. docker run -it lightgbm-test /app/tests/python_package_test/test_distributed_sklearn.py::TestDistributedSklearn::test_run_twice [-ss]
@guolinke

This comment has been minimized.

Member

guolinke commented Oct 27, 2018

You can try num_thread=1.
It is the problem with multiprocessing .

@Laurae2

This comment has been minimized.

Collaborator

Laurae2 commented Oct 27, 2018

I think this a specific known issue with OpenMP and fork?

xgboost can't be run when forked (hangs) for instance. User must create sockets (create new processes) instead of forking (create new threads).

@SfinxCZ can you run your script by creating new processes (requires memory copy of the objects + load libraries in each process) instead of forking?

@guolinke

This comment has been minimized.

Member

guolinke commented Oct 27, 2018

@Laurae2 yeah, it is the root cause.
Maybe we should add some documents to address this problem.

@SfinxCZ

This comment has been minimized.

Contributor

SfinxCZ commented Oct 28, 2018

@guolinke Thanks for hits, multiprocessing.get_context("spawn") fixed the problems.

@SfinxCZ SfinxCZ closed this Oct 28, 2018

Laurae2 added a commit that referenced this issue Oct 28, 2018

guolinke added a commit that referenced this issue Oct 29, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment