Skip to content

Commit

Permalink
..
Browse files Browse the repository at this point in the history
  • Loading branch information
vikulin committed Dec 20, 2023
1 parent 673dc9b commit 7c85939
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 109 deletions.
5 changes: 3 additions & 2 deletions src/core/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ type links struct {
core *Core
tcp *linkTCP // TCP interface support
tls *linkTLS // TLS interface support
mptcp *linkMPTCP // QUIC interface support
unix *linkUNIX // UNIX interface support
socks *linkSOCKS // SOCKS interface support
quic *linkQUIC // QUIC interface support
mptcp *linkMPTCP // QUIC interface support

// _links can only be modified safely from within the links actor
_links map[linkInfo]*link // *link is nil if connection in progress
}
Expand Down Expand Up @@ -93,10 +94,10 @@ func (l *links) init(c *Core) error {
l.core = c
l.tcp = l.newLinkTCP()
l.tls = l.newLinkTLS(l.tcp)
l.mptcp = l.newLinkMPTCP(l.tcp)
l.unix = l.newLinkUNIX()
l.socks = l.newLinkSOCKS()
l.quic = l.newLinkQUIC()
l.mptcp = l.newLinkMPTCP()
l._links = make(map[linkInfo]*link)

var listeners []ListenAddress
Expand Down
124 changes: 17 additions & 107 deletions src/core/link_mptcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,66 +5,35 @@ import (
"fmt"
"net"
"net/url"
"strconv"
"time"

"github.com/Arceliar/phony"
)

type linkMPTCP struct {
phony.Inbox
*links
listenconfig *net.ListenConfig
_listeners map[*Listener]context.CancelFunc
tcp *linkTCP
listener *net.ListenConfig
_listeners map[*Listener]context.CancelFunc
}

func (l *links) newLinkMPTCP() *linkMPTCP {
func (l *links) newLinkMPTCP(tcp *linkTCP) *linkMPTCP {
lt := &linkMPTCP{
links: l,
listenconfig: &net.ListenConfig{
tcp: tcp,
listener: &net.ListenConfig{
Control: tcp.tcpContext,
KeepAlive: -1,
},
_listeners: map[*Listener]context.CancelFunc{},
}
lt.listenconfig.Control = lt.tcp.tcpContext
lt.listenconfig.SetMultipathTCP(true)
lt.listener.Control = lt.tcp.tcpContext
lt.listener.SetMultipathTCP(true)
return lt
}

func (l *linkMPTCP) dialersFor(url *url.URL, info linkInfo) ([]*tcpDialer, error) {
host, p, err := net.SplitHostPort(url.Host)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(p)
if err != nil {
return nil, err
}
ips, err := net.LookupIP(host)
if err != nil {
return nil, err
}
dialers := make([]*tcpDialer, 0, len(ips))
for _, ip := range ips {
addr := &net.TCPAddr{
IP: ip,
Port: port,
}
dialer, err := l.dialerFor(addr, info.sintf)
if err != nil {
continue
}
dialers = append(dialers, &tcpDialer{
info: info,
dialer: dialer,
addr: addr,
})
}
return dialers, nil
}

func (l *linkMPTCP) dial(ctx context.Context, url *url.URL, info linkInfo, options linkOptions) (net.Conn, error) {
dialers, err := l.dialersFor(url, info)
dialers, err := l.tcp.dialersFor(url, info)
if err != nil {
return nil, err
}
Expand All @@ -73,6 +42,12 @@ func (l *linkMPTCP) dial(ctx context.Context, url *url.URL, info linkInfo, optio
}
for _, d := range dialers {
var conn net.Conn
d.dialer.SetMultipathTCP(true)
if d.dialer.MultipathTCP() {
l.core.log.Infof("Enabled MPTCP")
} else {
l.core.log.Infof("Enabled TCP")
}
conn, err = d.dialer.DialContext(ctx, "tcp", d.addr.String())
if err != nil {
l.core.log.Warnf("Failed to connect to %s: %s", d.addr, err)
Expand All @@ -90,70 +65,5 @@ func (l *linkMPTCP) listen(ctx context.Context, url *url.URL, sintf string) (net
hostport = fmt.Sprintf("[%s%%%s]:%s", host, sintf, port)
}
}
return l.listenconfig.Listen(ctx, "tcp", hostport)
}

func (l *linkMPTCP) dialerFor(dst *net.TCPAddr, sintf string) (*net.Dialer, error) {
if dst.IP.IsLinkLocalUnicast() {
if sintf != "" {
dst.Zone = sintf
}
if dst.Zone == "" {
return nil, fmt.Errorf("link-local address requires a zone")
}
}
dialer := &net.Dialer{
Timeout: time.Second * 5,
KeepAlive: -1,
Control: l.tcp.tcpContext,
}
dialer.SetMultipathTCP(true)
if dialer.MultipathTCP() {
l.core.log.Infof("Enabled MPTCP")
} else {
l.core.log.Warnf("MultipathTCP is not on after having been forced to on. TCP will be used.")
}
if sintf != "" {
dialer.Control = l.tcp.getControl(sintf)
ief, err := net.InterfaceByName(sintf)
if err != nil {
return nil, fmt.Errorf("interface %q not found", sintf)
}
if ief.Flags&net.FlagUp == 0 {
return nil, fmt.Errorf("interface %q is not up", sintf)
}
addrs, err := ief.Addrs()
if err != nil {
return nil, fmt.Errorf("interface %q addresses not available: %w", sintf, err)
}
for addrindex, addr := range addrs {
src, _, err := net.ParseCIDR(addr.String())
if err != nil {
continue
}
if !src.IsGlobalUnicast() && !src.IsLinkLocalUnicast() {
continue
}
bothglobal := src.IsGlobalUnicast() == dst.IP.IsGlobalUnicast()
bothlinklocal := src.IsLinkLocalUnicast() == dst.IP.IsLinkLocalUnicast()
if !bothglobal && !bothlinklocal {
continue
}
if (src.To4() != nil) != (dst.IP.To4() != nil) {
continue
}
if bothglobal || bothlinklocal || addrindex == len(addrs)-1 {
dialer.LocalAddr = &net.TCPAddr{
IP: src,
Port: 0,
Zone: sintf,
}
break
}
}
if dialer.LocalAddr == nil {
return nil, fmt.Errorf("no suitable source address found on interface %q", sintf)
}
}
return dialer, nil
return l.listener.Listen(ctx, "tcp", hostport)
}

0 comments on commit 7c85939

Please sign in to comment.