Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

packet contained unknown content: -1 #27

Closed
hengyuan-hu opened this issue Feb 10, 2022 · 3 comments
Closed

packet contained unknown content: -1 #27

hengyuan-hu opened this issue Feb 10, 2022 · 3 comments

Comments

@hengyuan-hu
Copy link

hengyuan-hu commented Feb 10, 2022

I am trying to run parallel worker processes that send dictionary of tensors to an inference process where batched neural network forwarding happens. I get error E0209 20:15:45.299231 2754780 /tmp/pip-req-build-fpmouidj/src/tensorpipe/tensorpipe/core/listener_impl.cc:333] packet contained unknown content: -1. The program runs fine and terminates fine. I believe the results are correct too. Here is the minimal code to reproduce the error.

import asyncio
import multiprocessing as mp
import moolib
import torch


local_addr = "127.0.0.1:4412"

async def process(queue, callback):
    i = 0
    try:
        while True:
            # print("waiting for batch", i)
            i += 1
            return_callback, args, kwargs = await queue
            if args and kwargs:
                retval = callback(*args, **kwargs)
            elif args:
                retval = callback(*args)
            elif kwargs:
                retval = callback(**kwargs)
            else:
                retval = callback()
            return_callback(retval)
    except asyncio.CancelledError:
        print("process cancelled")
        pass
    except Exception as e:
        print(e)
        raise


linear_layer = torch.nn.Linear(1000, 1000)


def run_linear_host(x):
    a = linear_layer(x["a"])
    b = linear_layer(x["b"])
    return a + b


async def host_func(barrier):
    host = moolib.Rpc()
    host.set_name("host")
    host.listen(local_addr)
    queue = host.define_queue("linear", batch_size=1000, dynamic_batching=True)

    barrier.wait()
    print("host process passed the barrier")
    await process(queue, run_linear_host)


async def client_func(barrier, pid):
    client = moolib.Rpc()
    barrier.wait()
    client.connect(local_addr)
    print(f"process {pid} connected")

    ys = []
    for i in range(100):
        x  = {
            "a": torch.rand(1000),
            "b": torch.rand(1000),
        }
        y = await client.async_("host", "linear", x)
        ys.append(y)
    print("done")



if __name__ == "__main__":
    num_thread = 10
    barrier = mp.Barrier(num_thread + 1)

    host_p = mp.Process(target=lambda: asyncio.run(host_func(barrier)))
    host_p.start()

    processes = []
    for i in range(num_thread):
        p = mp.Process(target=lambda: asyncio.run(client_func(barrier, i)))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    host_p.terminate()

The error disappears if I run it with a smaller num_thread=1,2,3 and appears more frequently with larger num_thread.

I installed the latest main branch with pip install git+https://github.com/facebookresearch/moolib.

@tscmoo
Copy link
Contributor

tscmoo commented Feb 10, 2022

AFAIK, this is caused by incoming connections being closed or not properly established before tensorpipe can finish its handshaking. To be honest, I'm not sure why this happens, but - moolib maintains several connections and establishes and closes them when appropriate, and these errors seem to have no impact whatsoever.
I think these errors go away with #26

@hengyuan-hu
Copy link
Author

Ok so I should try again once that PR is merged right?

@tscmoo
Copy link
Contributor

tscmoo commented Feb 11, 2022

Yes, that would be helpful.
Let's leave this issue open until it's fixed.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants