Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core.DialUDP: how to close Read the udp.dispatcherConn #1611

Closed
rurirei opened this issue Feb 6, 2023 · 27 comments
Closed

core.DialUDP: how to close Read the udp.dispatcherConn #1611

rurirei opened this issue Feb 6, 2023 · 27 comments

Comments

@rurirei
Copy link
Contributor

rurirei commented Feb 6, 2023

as per: v2fly/v2ray-core#2258

a udp.dispatcherConn returned by core.DialUDP won't close it's c.Read(), blocks after the c.cache be received out.

func (c *dispatcherConn) callback(ctx context.Context, packet *udp.Packet) {
select {
case <-c.done.Wait():
packet.Payload.Release()
return
case c.cache <- packet:
default:
packet.Payload.Release()
return
}
}
func (c *dispatcherConn) ReadFrom(p []byte) (int, net.Addr, error) {
select {
case <-c.done.Wait():
return 0, nil, io.EOF
case packet := <-c.cache:
n := copy(p, packet.Payload.Bytes())
return n, &net.UDPAddr{
IP: packet.Source.Address.IP(),
Port: int(packet.Source.Port),
}, nil
}
}

possible fix: call handleInput() directly like this

func (c *dispatchConn) Read(p []byte) (int, error) {
	mb, err := c.link.Reader.ReadMultiBuffer()
	if err != nil {
		return 0, err
	}

	return mb.CopyTo(p), nil
}

or a unreliable timeout simply:

func (c *dispatcherConn) ReadFrom(p []byte) (int, net.Addr, error) {
	select {
	case <-c.done.Wait():
		return 0, nil, io.EOF
	case packet := <-c.cache:
		n := copy(p, packet.Payload.Bytes())
		return n, &net.UDPAddr{
			IP:   packet.Source.Address.IP(),
			Port: int(packet.Source.Port),
		}, nil
	}
	case <-time.After(time.Second *4):
		return 0, nil, io.EOF
	}
}
@RPRX
Copy link
Member

RPRX commented Feb 6, 2023

所以为什么需要提前返回呢?Read 阻塞是正常行为

@rurirei
Copy link
Contributor Author

rurirei commented Feb 6, 2023

though c.Close() closes the Read, i won't know the time when i should to close this conn.

所以为什么需要提前返回呢?Read 阻塞是正常行为

@rurirei
Copy link
Contributor Author

rurirei commented Feb 6, 2023

as this blocks always except i call remoteConn.Close(), but where is the time point?

var localConn net.Conn

var remoteConn, err := core.DialUDP(ctx, dest)

io.Copy(localConn, remoteConn)

@RPRX
Copy link
Member

RPRX commented Feb 6, 2023

though c.Close() closes the Read, i won't know the time when i should to close this conn.

所以为什么需要提前返回呢?Read 阻塞是正常行为

在 Xray 实现的代理协议中,超时不活跃的会被自动 Close(取决于你设置的 policy),这是在外部控制的
虽然 *ray 的代码写得跟迷宫一样,自带混淆,不细看的话很难说这个 Close 有没有被调用
同理,你可以自己在外部实现一个超时不活跃调用 Close

@rurirei
Copy link
Contributor Author

rurirei commented Feb 6, 2023

though c.Close() closes the Read, i won't know the time when i should to close this conn.

所以为什么需要提前返回呢?Read 阻塞是正常行为

在 Xray 实现的代理协议中,超时不活跃的会被自动 Close(取决于你设置的 policy),这是在外部控制的 虽然 *ray 的代码写得跟迷宫一样,自带混淆,不细看的话很难说这个 Close 有没有被调用 同理,你可以自己在外部实现一个超时不活跃调用 Close

that's why there is a issue. either v2ray or xray do not call c.Close(), as core.Dial and core.DialUDP is for external api. but those won't provide any io.EOF mechanism to close its Read method.

@RPRX
Copy link
Member

RPRX commented Feb 6, 2023

// DialUDP provides a way to exchange UDP packets through Xray instance to remote servers.
// Since it is under a proxy context, the LocalAddr() in returned PacketConn will not show the real address.
//
// TODO: SetDeadline() / SetReadDeadline() / SetWriteDeadline() are not implemented.
//
// xray:api:beta
func DialUDP(ctx context.Context, v *Instance) (net.PacketConn, error) {

...

@rurirei
Copy link
Contributor Author

rurirei commented Feb 6, 2023

同理,你可以自己在外部实现一个超时不活跃调用 Close

though a timeout is not reliable enough i thought.

@rurirei
Copy link
Contributor Author

rurirei commented Feb 6, 2023

to call handleInput() directly, rather than using c.cache channel to receive the packet is more immidiate and could give me a error, maybe? tested ok in my situation.

func (c *dispatchConn) Read(p []byte) (int, error) {
	mb, err := c.link.Reader.ReadMultiBuffer()
	if err != nil {
		return 0, err
	}

	return mb.Copy(p), nil
}

@RPRX
Copy link
Member

RPRX commented Feb 6, 2023

看起来这个 dispatcherConn 只有 API 用到了对吧

@rurirei
Copy link
Contributor Author

rurirei commented Feb 6, 2023

看起来这个 dispatcherConn 只有 API 用到了对吧

yes.

@RPRX
Copy link
Member

RPRX commented Feb 6, 2023

type Dispatcher struct {
	sync.RWMutex
	conns      map[net.Destination]*connEntry
	dispatcher routing.Dispatcher
	callback   ResponseCallback
	callClose  ...
}

给它加一个 callClose 然后在 handleInput 里 defer 调用吧,你来还是我来,我正准备发个小版本

@RPRX
Copy link
Member

RPRX commented Feb 6, 2023

这个问题本质上是因为,在代理协议中它是被控方,不用考虑 Close 别人,但作为 API 和代理协议的桥梁,它没有传导 Close 信息

所以最合适的做法是加个回调

@rurirei
Copy link
Contributor Author

rurirei commented Feb 6, 2023

type Dispatcher struct {
	sync.RWMutex
	conns      map[net.Destination]*connEntry
	dispatcher routing.Dispatcher
	callback   ResponseCallback
	callClose  ...
}

给它加一个 callClose 然后在 handleInput 里 defer 调用吧,你来还是我来,我正准备发个小版本

thanks for reply.

that won't solve, as c.cache is channel to be received out, the packets waited in c.cache would be received out or not on the time when callClose callback is called, says there is no garantee for c.cache is read out or not even then c.callClose done.

func handleInput(ctx context.Context, conn *connEntry, dest net.Destination, callback ResponseCallback) {
defer conn.cancel()
input := conn.link.Reader
timer := conn.timer
for {
select {
case <-ctx.Done():
return
default:
}
mb, err := input.ReadMultiBuffer()
if err != nil {
if !errors.Is(err, io.EOF) {
newError("failed to handle UDP input").Base(err).WriteToLog(session.ExportIDToError(ctx))
}
return
}
timer.Update()
for _, b := range mb {
callback(ctx, &udp.Packet{
Payload: b,
Source: dest,
})
}
}
}

@rurirei
Copy link
Contributor Author

rurirei commented Feb 6, 2023

the key point is that we won't know the time when c.cache would be read out.

@RPRX
Copy link
Member

RPRX commented Feb 6, 2023

XTLS 对这种问题的做法是 runtime.Gosched(),但那是时间紧急的特殊情况
实际环境中,一般都是没包后过段时间它才会返回包括 io.EOF 在内的 bug,至少足以让另一个协程处理完了,很难丢包,否则 *ray 到处都是这个问题
当然,加 100 毫秒的延迟也是一个可行的解决方案

@RPRX
Copy link
Member

RPRX commented Feb 6, 2023

或者可以判断一下,chan 里还有数据的话就处理,直到没有再返回 io.EOF

@RPRX
Copy link
Member

RPRX commented Feb 6, 2023

func (c *dispatcherConn) ReadFrom(p []byte) (int, net.Addr, error) {
	var packet *udp.Packet
	select {
	case <-c.done.Wait():
		if len(c.cache) != 0 {
			packet = <-c.cache
			break
		}
		return 0, nil, io.EOF
	case packet = <-c.cache:
	}
	return copy(p, packet.Payload.Bytes()), &net.UDPAddr{
		IP:   packet.Source.Address.IP(),
		Port: int(packet.Source.Port),
	}, nil
}

比如这样,我这边全改好了,你看没问题的话我就 commit 了

@rurirei

This comment was marked as resolved.

@rurirei
Copy link
Contributor Author

rurirei commented Feb 6, 2023

BTW is there any disadvantage of len(c.cache)?

@RPRX

This comment was marked as resolved.

@rurirei

This comment was marked as resolved.

@RPRX

This comment was marked as resolved.

@rurirei
Copy link
Contributor Author

rurirei commented Feb 6, 2023

func (c *dispatcherConn) ReadFrom(p []byte) (int, net.Addr, error) {
	var packet *udp.Packet
	select {
	case <-c.done.Wait():
		if len(c.cache) != 0 {
			packet = <-c.cache
			break
		}
		return 0, nil, io.EOF
	case packet = <-c.cache:
	}
	return copy(p, packet.Payload.Bytes()), &net.UDPAddr{
		IP:   packet.Source.Address.IP(),
		Port: int(packet.Source.Port),
	}, nil
}

比如这样,我这边全改好了,你看没问题的话我就 commit 了

sorry and that's ok.

@rurirei rurirei closed this as completed Feb 6, 2023
@RPRX
Copy link
Member

RPRX commented Feb 6, 2023

OK

BTW is there any disadvantage of len(c.cache)?

Golang 对 channel 的内置方法,应该是并发安全的,不过反正我们这里没有并发

@RPRX
Copy link
Member

RPRX commented Feb 6, 2023

写法改成了再 select 一次,我觉得这样更好,不过应该不会有人并发调用 Read 吧

麻烦你本地测一下 e0b0201

@rurirei
Copy link
Contributor Author

rurirei commented Feb 6, 2023

写法改成了再 select 一次,我觉得这样更好,不过应该不会有人并发调用 Read 吧

麻烦你本地测一下 e0b0201

thanks for reply, tested ok.

aware whether it's ok to add callClose() for all udp with NewDispatcher. (to change the ResponseCallbackFunc is not recommanded i thought).

@RPRX
Copy link
Member

RPRX commented Feb 6, 2023

aware whether it's ok to add callClose() for all udp with NewDispatcher. (to change the ResponseCallbackFunc is not recommanded i thought).

对,目前如果主动 Close 了 dispatcherConn,并不会传导到代理协议侧,由于要改的代码比较多,PR is welcomed

#1542 这个有点相似

RPRX added a commit that referenced this issue Feb 8, 2023
mwhorse46 added a commit to mwhorse46/Xray-core that referenced this issue Feb 19, 2023
rampagekiller0725 added a commit to rampagekiller0725/wox that referenced this issue Jun 29, 2023
Autumn216 added a commit to Autumn216/wox that referenced this issue Oct 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants