Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

mx.nd.Custom not working in subprocess #14396

Closed
YutingZhang opened this issue Mar 12, 2019 · 5 comments · Fixed by #14451
Closed

mx.nd.Custom not working in subprocess #14396

YutingZhang opened this issue Mar 12, 2019 · 5 comments · Fixed by #14451
Labels

Comments

@YutingZhang
Copy link
Contributor

YutingZhang commented Mar 12, 2019

mx.nd.Custom gets stuck at subprocess.

The following code to replicate the error is from wkcn/MobulaOP#40 (comment)

from concurrent import futures

import mxnet as mx
import sys

class AdditionOP(mx.operator.CustomOp):
    def __init__(self):
        super(AdditionOP, self).__init__()
    def forward(self, is_train, req, in_data, out_data, aux):
        out_data[0][:] = in_data[0] + in_data[1]
    def backward(self, req, out_grad, in_data, out_data, in_grad, aux):
        in_grad[0][:] = out_grad[0]
        in_grad[1][:] = out_grad[0]

@mx.operator.register("AdditionOP")
class AdditionOPProp(mx.operator.CustomOpProp):
    def __init__(self):
        super(AdditionOPProp, self).__init__()
    def list_arguments(self):
        return ['a', 'b']
    def list_outputs(self):
        return ['output']
    def infer_shape(self, in_shape):
        return in_shape, [in_shape[0]]
    def create_operator(self, ctx, shapes, dtypes):
        return AdditionOP()

def foo():
    a = mx.nd.array([1, 2, 3])
    b = mx.nd.array([4, 5, 6])

    a.attach_grad()
    b.attach_grad()

    print("REC")
    with mx.autograd.record():
        c = mx.nd.Custom(a, b, op_type='AdditionOP')

    dc = mx.nd.array([7, 8, 9])
    c.backward(dc)

    print('Okay :-)')
    print('a + b = c \n {} + {} = {}'.format(a.asnumpy(), b.asnumpy(), c.asnumpy()))

def main():
    ex = futures.ProcessPoolExecutor(1)
    r = ex.submit(foo)
    r.result()

if __name__ == '__main__':
    main()

asnumpy gets stuck due to mx.nd.Custom

@mxnet-label-bot
Copy link
Contributor

Hey, this is the MXNet Label Bot.
Thank you for submitting the issue! I will try and suggest some labels so that the appropriate MXNet community members can help resolve it.
Here are my recommended labels: Bug

@wkcn wkcn added the Bug label Mar 12, 2019
@wkcn
Copy link
Member

wkcn commented Mar 12, 2019

I found that the custom operator has been Engine::Get()->PushSync, but it couldn't be executed. (forward and backward).

@zhreshold
Copy link
Member

I rekon there's a limitation of custom op with requires a global lock when executing the python custom op, and it might cause dead lock when combined with subprocess.

@eric-haibin-lin raising for expertise in engine part.

@anirudh2290
Copy link
Member

Yes this is because of the dead lock in the subprocess. One way to fix this is to create a start and stop functions in CustomOperator, which should be called from pthread_atfork prepare and child handlers.
Using thread pool to manage CustomOperator threads would make the implementation cleaner. Anyone wants to try and create a PR for this ? @wkcn @arcadiaphy @YutingZhang ?

@arcadiaphy
Copy link
Member

arcadiaphy commented Mar 16, 2019

After #14363, the threads is created when running custom operator, so custom operator needs also to be executed in main process to reproduce the bug:

from concurrent import futures

import mxnet as mx
import sys

class AdditionOP(mx.operator.CustomOp):
    def __init__(self):
        super(AdditionOP, self).__init__()
    def forward(self, is_train, req, in_data, out_data, aux):
        out_data[0][:] = in_data[0] + in_data[1]
    def backward(self, req, out_grad, in_data, out_data, in_grad, aux):
        in_grad[0][:] = out_grad[0]
        in_grad[1][:] = out_grad[0]

@mx.operator.register("AdditionOP")
class AdditionOPProp(mx.operator.CustomOpProp):
    def __init__(self):
        super(AdditionOPProp, self).__init__()
    def list_arguments(self):
        return ['a', 'b']
    def list_outputs(self):
        return ['output']
    def infer_shape(self, in_shape):
        return in_shape, [in_shape[0]]
    def create_operator(self, ctx, shapes, dtypes):
        return AdditionOP()

def foo():
    a = mx.nd.array([1, 2, 3])
    b = mx.nd.array([4, 5, 6])

    a.attach_grad()
    b.attach_grad()

    print("REC")
    with mx.autograd.record():
        c = mx.nd.Custom(a, b, op_type='AdditionOP')

    dc = mx.nd.array([7, 8, 9])
    c.backward(dc)

    print('Okay :-)')
    print('a + b = c \n {} + {} = {}'.format(a.asnumpy(), b.asnumpy(), c.asnumpy()))

def main():
    foo()  # ensure custom threads created in main process
    ex = futures.ProcessPoolExecutor(1)
    r = ex.submit(foo)
    r.result()

if __name__ == '__main__':
    main()

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants