From d712e339f09c1f795478d7f49e338acbcb962cda Mon Sep 17 00:00:00 2001 From: Contextualist Date: Sat, 9 Sep 2023 14:15:15 -0700 Subject: [PATCH 1/7] refactor: pnet: clarify function HolePunching --- pkg/pnet/p2p.go | 67 +++++++++++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/pkg/pnet/p2p.go b/pkg/pnet/p2p.go index ed4d126..a6f4f2d 100644 --- a/pkg/pnet/p2p.go +++ b/pkg/pnet/p2p.go @@ -81,34 +81,40 @@ func HolePunching(ctx context.Context, bridgeURL string, id string, isA bool, op } nplan := len(opts.Ports) - nA := 2 - nB := 2 + info, err := exchangeConnInfo(ctx, bridgeURL, id, opts.Ports[0], nplan, opts.UseIPv6) + if err != nil { + return nil, err + } + if conn, err = rendezvousWithTimeout(ctx, info.laddr, info.peerAddrs); err == nil { + return conn, nil + } + + // Try out the rest of nA x nB plans + var planp []int + var nA, nB int *tern(isA, &nA, &nB) = nplan - // Try out all nA x nB plans + *tern(isA, &nB, &nA) = info.peerNPlan for i := 0; i < nA; i++ { for j := 0; j < nB; j++ { - q := tern(isA, i, j) - info, err := exchangeConnInfo(ctx, bridgeURL, id, opts.Ports[q], nplan, opts.UseIPv6) - if err != nil { - return nil, err - } - *tern(isA, &nB, &nA) = info.peerNPlan - ctx1, cancel := context.WithTimeout(ctx, rendezvousTimeout) - defer cancel() - conn, err := rendezvous(ctx1, info) - if err != nil { - if errors.Is(ctx1.Err(), context.DeadlineExceeded) { - defaultLogger.Infof("rendezvous timeout for %+v", info) - continue - } - return nil, err - } + planp = append(planp, tern(isA, i, j)) + } + } + for q := range planp[1:] { + info, err := exchangeConnInfo(ctx, bridgeURL, id, q, nplan, opts.UseIPv6) + if err != nil { + return nil, err + } + if conn, err = rendezvousWithTimeout(ctx, info.laddr, info.peerAddrs); err == nil { return conn, nil } } return nil, errors.New("all rendezvous attempts failed") } +func ExchangeConnInfo(ctx context.Context, bridgeURL string, id string, port int, nplan int, useIPv6 bool) (*connInfo, error) { + return exchangeConnInfo(ctx, bridgeURL, id, port, nplan, useIPv6) +} + func exchangeConnInfo(ctx context.Context, bridgeURL string, id string, port int, nplan int, useIPv6 bool) (*connInfo, error) { client := NewHTTPClient(useIPv6, fmt.Sprintf(":%v", port)) sendReader, sendWriter := io.Pipe() @@ -196,10 +202,23 @@ func exchangeConnInfoProto(ctx context.Context, sender io.WriteCloser, chRecvOrE return &connInfo{sinfo.PriAddr, addrs, pinfo.PeerNPlan}, nil } -func rendezvous(ctx context.Context, info *connInfo) (conn net.Conn, err error) { - defaultLogger.Infof("rendezvous with %s", strings.Join(info.peerAddrs, " | ")) +func rendezvousWithTimeout(ctx context.Context, laddr string, peerAddrs []string) (conn net.Conn, err error) { + ctx1, cancel := context.WithTimeout(ctx, rendezvousTimeout) + defer cancel() + conn, err = rendezvous(ctx1, laddr, peerAddrs) + if err != nil { + if errors.Is(ctx1.Err(), context.DeadlineExceeded) { + defaultLogger.Infof("rendezvous timeout for %v -> %v", laddr, peerAddrs) + } + return nil, err + } + return conn, nil +} + +func rendezvous(ctx context.Context, laddr string, peerAddrs []string) (conn net.Conn, err error) { + defaultLogger.Infof("rendezvous with %s", strings.Join(peerAddrs, " | ")) chWin := make(chan net.Conn) - l, err := Listen(ctx, "tcp", info.laddr) + l, err := Listen(ctx, "tcp", laddr) if err != nil { return nil, fmt.Errorf("unable to set up rendezvous: %w", err) } @@ -207,8 +226,8 @@ func rendezvous(ctx context.Context, info *connInfo) (conn net.Conn, err error) cc := make(chan struct{}) defer close(cc) go accept(ctx, l, chWin, cc) - for _, peerAddr := range info.peerAddrs { - go connect(ctx, info.laddr, peerAddr, chWin, cc) + for _, peerAddr := range peerAddrs { + go connect(ctx, laddr, peerAddr, chWin, cc) } select { From 74612f3103963b7f603e911e56250e5b5b635d52 Mon Sep 17 00:00:00 2001 From: Contextualist Date: Sun, 10 Sep 2023 10:11:56 -0700 Subject: [PATCH 2/7] refactor: config: move from cmd/acp to pkg/config --- cmd/acp/main.go | 9 +++++---- {cmd/acp => pkg/config}/config.go | 10 +++++----- {cmd/acp => pkg/config}/config_test.go | 8 ++++---- 3 files changed, 14 insertions(+), 13 deletions(-) rename {cmd/acp => pkg/config}/config.go (95%) rename {cmd/acp => pkg/config}/config_test.go (89%) diff --git a/cmd/acp/main.go b/cmd/acp/main.go index 6fe32e6..2bd6353 100644 --- a/cmd/acp/main.go +++ b/cmd/acp/main.go @@ -10,6 +10,7 @@ import ( "os" tea "github.com/charmbracelet/bubbletea" + "github.com/contextualist/acp/pkg/config" "github.com/contextualist/acp/pkg/pnet" "github.com/contextualist/acp/pkg/tui" ) @@ -62,13 +63,13 @@ func main() { return } if *doSetup || *doSetupWith != "" { - checkErr(setup(*doSetupWith)) + checkErr(config.Setup(*doSetupWith)) return } filenames := flag.Args() - conf := mustGetConfig() - conf.applyDefault() + conf := config.MustGetConfig() + conf.ApplyDefault() ctx, userCancel := context.WithCancel(context.Background()) logger = tui.NewLoggerControl(*debug) @@ -77,7 +78,7 @@ func main() { tui.RunProgram(loggerModel, userCancel, *destination == "-") } -func transfer(ctx context.Context, conf *Config, filenames []string, loggerModel tea.Model) { +func transfer(ctx context.Context, conf *config.Config, filenames []string, loggerModel tea.Model) { defer logger.End() conn, err := pnet.HolePunching( diff --git a/cmd/acp/config.go b/pkg/config/config.go similarity index 95% rename from cmd/acp/config.go rename to pkg/config/config.go index 061695e..fd496f9 100644 --- a/cmd/acp/config.go +++ b/pkg/config/config.go @@ -1,4 +1,4 @@ -package main +package config import ( "crypto/rand" @@ -27,7 +27,7 @@ type Config struct { UPnP bool `json:"upnp,omitempty"` } -func (conf *Config) applyDefault() { +func (conf *Config) ApplyDefault() { if conf.Server == "" { conf.Server = "https://acp.deno.dev" } @@ -38,7 +38,7 @@ func (conf *Config) applyDefault() { var configFilename = filepath.Join(userConfigDir(), "acp", "config.json") -func setup(confStr string) (err error) { +func Setup(confStr string) (err error) { var conf *Config if confStr != "" { conf = &Config{} @@ -64,7 +64,7 @@ func setup(confStr string) (err error) { confBytes, _ := json.Marshal(&conf) confStr = string(confBytes) } - conf.applyDefault() + conf.ApplyDefault() fmt.Printf(`acp is set up on this machine. To set up another machine, run the following command there (DO NOT share the command publicly as it contains encryption keys) @@ -79,7 +79,7 @@ If you already have the executable, run return nil } -func mustGetConfig() *Config { +func MustGetConfig() *Config { conf, err := getConfig() if err != nil { if errors.Is(err, os.ErrNotExist) { diff --git a/cmd/acp/config_test.go b/pkg/config/config_test.go similarity index 89% rename from cmd/acp/config_test.go rename to pkg/config/config_test.go index 08542ec..a88adf4 100644 --- a/cmd/acp/config_test.go +++ b/pkg/config/config_test.go @@ -1,4 +1,4 @@ -package main +package config import ( "encoding/json" @@ -14,7 +14,7 @@ func TestSetup(t *testing.T) { if _, err := getConfig(); !errors.Is(err, os.ErrNotExist) { t.Fatalf("Config exists before creation; err: %v", err) } - if err := setup(""); err != nil { + if err := Setup(""); err != nil { t.Fatalf("Config initialization failed: %v", err) } conf, err := getConfig() @@ -35,7 +35,7 @@ func TestSetupWith(t *testing.T) { Ports: []int{0, 9527}, } conf0Bytes, _ := json.Marshal(&conf0) - if err := setup(string(conf0Bytes)); err != nil { + if err := Setup(string(conf0Bytes)); err != nil { t.Fatalf("Config override failed: %v", err) } conf, err := getConfig() @@ -46,7 +46,7 @@ func TestSetupWith(t *testing.T) { t.Fatalf("Config does not match the intented setup value: expect: %+v, got: %+v", conf0, conf) } - if err := setup(`{"ipv6":"true"}`); err == nil { + if err := Setup(`{"ipv6":"true"}`); err == nil { t.Fatalf("Setup failed to catch an invalid input") } } From 3ba994d593ca81a6545aaf37b2565b7a9a5fd565 Mon Sep 17 00:00:00 2001 From: Contextualist Date: Wed, 13 Sep 2023 22:09:15 -0700 Subject: [PATCH 3/7] refactor: stream: adapt HolePunching into interface --- cmd/acp/main.go | 73 ++++++++++++++++------------ cmd/acp/stream.go | 8 ---- pkg/pnet/p2p.go | 103 +++++++++------------------------------- pkg/pnet/p2p_test.go | 24 +++++----- pkg/pnet/portmapping.go | 2 +- pkg/stream/dialer.go | 57 ++++++++++++++++++++++ pkg/stream/layer.go | 13 +++++ pkg/stream/tcppunch.go | 102 +++++++++++++++++++++++++++++++++++++++ pkg/tui/status.go | 69 ++++++++++++++++----------- 9 files changed, 291 insertions(+), 160 deletions(-) create mode 100644 pkg/stream/dialer.go create mode 100644 pkg/stream/layer.go create mode 100644 pkg/stream/tcppunch.go diff --git a/cmd/acp/main.go b/cmd/acp/main.go index 2bd6353..f6e55b6 100644 --- a/cmd/acp/main.go +++ b/cmd/acp/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/base64" "errors" "flag" "fmt" @@ -12,6 +11,7 @@ import ( tea "github.com/charmbracelet/bubbletea" "github.com/contextualist/acp/pkg/config" "github.com/contextualist/acp/pkg/pnet" + "github.com/contextualist/acp/pkg/stream" "github.com/contextualist/acp/pkg/tui" ) @@ -79,47 +79,48 @@ func main() { } func transfer(ctx context.Context, conf *config.Config, filenames []string, loggerModel tea.Model) { + pnet.SetLogger(logger) + stream.SetLogger(logger) defer logger.End() - conn, err := pnet.HolePunching( - ctx, - conf.Server+"/v2/exchange", - conf.ID, - len(filenames) > 0, - pnet.HolePunchingOptions{ - UseIPv6: conf.UseIPv6, - Ports: conf.Ports, - UPnP: conf.UPnP, - }, - logger, - ) - if errors.Is(err, context.Canceled) || !checkErr(err) { - return - } - - psk, err := base64.StdEncoding.DecodeString(conf.PSK) + dialer, _ := stream.GetDialer("tcp_punch") + err := dialer.Init(*conf) if !checkErr(err) { return } - conn, err = encrypted(conn, psk) + + sinfo := pnet.SelfInfo{ChanName: conf.ID} + dialer.SetInfo(&sinfo) + info, err := pnet.ExchangeConnInfo( + ctx, + conf.Server+"/v2/exchange", + &sinfo, + conf.Ports[0], + conf.UseIPv6, + ) if !checkErr(err) { return } - stream, _ := conn.(io.ReadWriteCloser) - var status *tui.StatusControl - if !*debug { - status = tui.NewStatusControl() - stream = status.Monitor(stream) - logger.Next(tui.NewStatusModel(status)) - } - + var status interface{ Next(tea.Model) } if len(filenames) > 0 { + var s io.WriteCloser + s, err = dialer.IntoSender(ctx, *info) + if !checkErr(err) { + return + } + s, status = monitor(s) logger.Debugf("sending...") - err = sendFiles(filenames, stream) + err = sendFiles(filenames, s) } else { + var s io.ReadCloser + s, err = dialer.IntoReceiver(ctx, *info) + if !checkErr(err) { + return + } + s, status = monitor(s) logger.Debugf("receiving...") - err = receiveFiles(stream) + err = receiveFiles(s) } if !*debug { @@ -128,10 +129,22 @@ func transfer(ctx context.Context, conf *config.Config, filenames []string, logg checkErr(err) } +func monitor[T io.Closer](s T) (T, *tui.StatusControl[T]) { + var status *tui.StatusControl[T] + if !*debug { + status = tui.NewStatusControl[T]() + s = status.Monitor(s) + logger.Next(tui.NewStatusModel(status)) + } + return s, status +} + func checkErr(err error) bool { if err == nil { return true } - exitStatement = err.Error() + if !errors.Is(err, context.Canceled) { + exitStatement = err.Error() + } return false } diff --git a/cmd/acp/stream.go b/cmd/acp/stream.go index 98e647f..6f715b2 100644 --- a/cmd/acp/stream.go +++ b/cmd/acp/stream.go @@ -4,21 +4,13 @@ import ( "archive/tar" "fmt" "io" - "net" "os" "path/filepath" "strings" "github.com/klauspost/pgzip" - aead "github.com/shadowsocks/go-shadowsocks2/shadowaead" ) -func encrypted(conn net.Conn, psk []byte) (net.Conn, error) { - cipher, err := aead.Chacha20Poly1305(psk) - conn = aead.NewConn(conn, cipher) - return conn, err -} - func sendFiles(filenames []string, to io.WriteCloser) (err error) { defer to.Close() z := pgzip.NewWriter(to) diff --git a/pkg/pnet/p2p.go b/pkg/pnet/p2p.go index a6f4f2d..2bad333 100644 --- a/pkg/pnet/p2p.go +++ b/pkg/pnet/p2p.go @@ -19,16 +19,6 @@ type ( Infof(format string, a ...any) Debugf(format string, a ...any) } - - // Options for function HolePunching - HolePunchingOptions struct { - // Whether to use IPv6 instead of IPv4 for rendezvous - UseIPv6 bool - // Local port(s) to be used for rendezvous; default (nil) will be interpreted as {0} - Ports []int - // Whether to try requesting UPnP port mapping from the router - UPnP bool - } ) type ( @@ -39,23 +29,18 @@ type ( error } - connInfo struct { - laddr string - peerAddrs []string - peerNPlan int - } - - selfInfo struct { + SelfInfo struct { PriAddr string `json:"priAddr"` ChanName string `json:"chanName"` NPlan int `json:"nPlan"` } - addrPair struct { + AddrPair struct { PriAddr string `json:"priAddr"` PubAddr string `json:"pubAddr"` } - peerInfo struct { - PeerAddrs []addrPair `json:"peerAddrs"` + PeerInfo struct { + Laddr string + PeerAddrs []AddrPair `json:"peerAddrs"` PeerNPlan int `json:"peerNPlan"` } ) @@ -67,55 +52,13 @@ const ( var defaultLogger Logger -// HolePunching negotiates via a rendezvous server with a peer with the same id to establish a connection. -func HolePunching(ctx context.Context, bridgeURL string, id string, isA bool, opts HolePunchingOptions, l Logger) (conn net.Conn, err error) { +// SetLogger sets the internal logger for pnet +func SetLogger(l Logger) { defaultLogger = l - if len(opts.Ports) == 0 { - opts.Ports = []int{0} - } - if opts.UPnP { - err := addPortMapping(ctx, opts.Ports...) - if err != nil { - defaultLogger.Infof("failed to add port mapping: %v", err) - } - } - - nplan := len(opts.Ports) - info, err := exchangeConnInfo(ctx, bridgeURL, id, opts.Ports[0], nplan, opts.UseIPv6) - if err != nil { - return nil, err - } - if conn, err = rendezvousWithTimeout(ctx, info.laddr, info.peerAddrs); err == nil { - return conn, nil - } - - // Try out the rest of nA x nB plans - var planp []int - var nA, nB int - *tern(isA, &nA, &nB) = nplan - *tern(isA, &nB, &nA) = info.peerNPlan - for i := 0; i < nA; i++ { - for j := 0; j < nB; j++ { - planp = append(planp, tern(isA, i, j)) - } - } - for q := range planp[1:] { - info, err := exchangeConnInfo(ctx, bridgeURL, id, q, nplan, opts.UseIPv6) - if err != nil { - return nil, err - } - if conn, err = rendezvousWithTimeout(ctx, info.laddr, info.peerAddrs); err == nil { - return conn, nil - } - } - return nil, errors.New("all rendezvous attempts failed") } -func ExchangeConnInfo(ctx context.Context, bridgeURL string, id string, port int, nplan int, useIPv6 bool) (*connInfo, error) { - return exchangeConnInfo(ctx, bridgeURL, id, port, nplan, useIPv6) -} - -func exchangeConnInfo(ctx context.Context, bridgeURL string, id string, port int, nplan int, useIPv6 bool) (*connInfo, error) { +// ExchangeConnInfo exchanges oneself's info for the peer's info, which can be used to establish a connection +func ExchangeConnInfo(ctx context.Context, bridgeURL string, info *SelfInfo, port int, useIPv6 bool) (*PeerInfo, error) { client := NewHTTPClient(useIPv6, fmt.Sprintf(":%v", port)) sendReader, sendWriter := io.Pipe() reqCtx, cancelReq := context.WithCancel(context.Background()) @@ -136,7 +79,6 @@ func exchangeConnInfo(ctx context.Context, bridgeURL string, id string, port int } close(chRecvOrErr) }() - info := selfInfo{ChanName: id, NPlan: nplan} select { case la, ok := <-client.GetLAddr(): if !ok { // dial error @@ -151,10 +93,10 @@ func exchangeConnInfo(ctx context.Context, bridgeURL string, id string, port int return nil, context.Canceled } - return exchangeConnInfoProto(ctx, sendWriter, chRecvOrErr, &info, cancelReq) + return exchangeConnInfoProto(ctx, sendWriter, chRecvOrErr, info, cancelReq) } -func exchangeConnInfoProto(ctx context.Context, sender io.WriteCloser, chRecvOrErr <-chan readerOrError, sinfo *selfInfo, cancelReq context.CancelFunc) (*connInfo, error) { +func exchangeConnInfoProto(ctx context.Context, sender io.WriteCloser, chRecvOrErr <-chan readerOrError, sinfo *SelfInfo, cancelReq context.CancelFunc) (*PeerInfo, error) { infoEnc, _ := json.Marshal(sinfo) err := sendPacket(sender, infoEnc) if err != nil { @@ -186,26 +128,32 @@ func exchangeConnInfoProto(ctx context.Context, sender io.WriteCloser, chRecvOrE return nil, fmt.Errorf("failed to communicate with the bridge: %w", err) } defaultLogger.Debugf("recv %s", recv) - var pinfo peerInfo + var pinfo PeerInfo err = json.Unmarshal(recv, &pinfo) if err != nil { return nil, fmt.Errorf("failed to parse msg from bridge: %w", err) } + pinfo.Laddr = sinfo.PriAddr + return &pinfo, nil +} + +func flattenAddrs(aps []AddrPair) []string { var addrs []string - for _, ap := range pinfo.PeerAddrs { + for _, ap := range aps { addrs = append(addrs, ap.PriAddr) if ap.PubAddr != ap.PriAddr { addrs = append(addrs, ap.PubAddr) } } - return &connInfo{sinfo.PriAddr, addrs, pinfo.PeerNPlan}, nil + return addrs } -func rendezvousWithTimeout(ctx context.Context, laddr string, peerAddrs []string) (conn net.Conn, err error) { +// RendezvousWithTimeout performs simultaneous connection opening for TCP hole punching, with `rendezvousTimeout` +func RendezvousWithTimeout(ctx context.Context, laddr string, peerAddrs []AddrPair) (conn net.Conn, err error) { ctx1, cancel := context.WithTimeout(ctx, rendezvousTimeout) defer cancel() - conn, err = rendezvous(ctx1, laddr, peerAddrs) + conn, err = rendezvous(ctx1, laddr, flattenAddrs(peerAddrs)) if err != nil { if errors.Is(ctx1.Err(), context.DeadlineExceeded) { defaultLogger.Infof("rendezvous timeout for %v -> %v", laddr, peerAddrs) @@ -303,10 +251,3 @@ func connect(ctx context.Context, laddr, raddr string, chWin chan<- net.Conn, cc conn.Close() } } - -func tern[T any](t bool, a T, b T) T { - if t { - return a - } - return b -} diff --git a/pkg/pnet/p2p_test.go b/pkg/pnet/p2p_test.go index ada919a..9de9fd9 100644 --- a/pkg/pnet/p2p_test.go +++ b/pkg/pnet/p2p_test.go @@ -19,14 +19,14 @@ func TestExchangeConnInfoProto(t *testing.T) { chRecvOrErr := make(chan readerOrError) go func() { chRecvOrErr <- readerOrError{ReadCloser: downR}; close(chRecvOrErr) }() - sinfo0 := selfInfo{"127.0.0.1:30001", "test-exchange-proto", 1} - cInfo0 := connInfo{sinfo0.PriAddr, []string{"127.0.0.1:30002", "80.80.80.80:30003"}, 1} + sinfo0 := SelfInfo{PriAddr: "127.0.0.1:30001", ChanName: "test-exchange-proto", NPlan: 1} + cInfo0 := PeerInfo{Laddr: sinfo0.PriAddr, PeerAddrs: []AddrPair{{"127.0.0.1:30002", "80.80.80.80:30003"}}, PeerNPlan: 1} go func() { // mock server protocol clientData, err := receivePacket(upR) if err != nil { t.Errorf("error on receiving client data: %v", err) } - var sinfo selfInfo + var sinfo SelfInfo err = json.Unmarshal(clientData, &sinfo) if err != nil { t.Errorf("error on parsing client data: %v", err) @@ -34,7 +34,7 @@ func TestExchangeConnInfoProto(t *testing.T) { if sinfo != sinfo0 { t.Errorf("unexpected client data: %v", sinfo) } - err = sendPacket(downW, must(json.Marshal(&peerInfo{[]addrPair{{cInfo0.peerAddrs[0], cInfo0.peerAddrs[1]}}, cInfo0.peerNPlan}))) + err = sendPacket(downW, must(json.Marshal(&cInfo0))) if err != nil { t.Errorf("error on replying to client: %v", err) } @@ -45,7 +45,7 @@ func TestExchangeConnInfoProto(t *testing.T) { t.Fatalf("exchange proto: %v", err) } if !reflect.DeepEqual(*cInfo, cInfo0) { - t.Fatalf("connInfo from exchange proto not matched: expect: %+v, got: %+v", cInfo0, *cInfo) + t.Fatalf("PeerInfo from exchange proto not matched: expect: %+v, got: %+v", cInfo0, *cInfo) } } @@ -60,7 +60,7 @@ func TestExchangeConnInfo(t *testing.T) { if err != nil { t.Errorf("error on receiving client data: %v", err) } - var sinfo selfInfo + var sinfo SelfInfo err = json.Unmarshal(clientData, &sinfo) if err != nil { t.Errorf("error on parsing client data: %v", err) @@ -70,10 +70,10 @@ func TestExchangeConnInfo(t *testing.T) { } var rsp []byte select { - case ch1 <- must(json.Marshal(&peerInfo{[]addrPair{{sinfo.PriAddr, ra}}, nplan})): + case ch1 <- must(json.Marshal(&PeerInfo{PeerAddrs: []AddrPair{{sinfo.PriAddr, ra}}, PeerNPlan: nplan})): rsp = <-ch2 case rsp = <-ch1: - ch2 <- must(json.Marshal(&peerInfo{[]addrPair{{sinfo.PriAddr, rb}}, nplan})) + ch2 <- must(json.Marshal(&PeerInfo{PeerAddrs: []AddrPair{{sinfo.PriAddr, rb}}, PeerNPlan: nplan})) } w.WriteHeader(http.StatusOK) err = sendPacket(w, rsp) @@ -85,23 +85,23 @@ func TestExchangeConnInfo(t *testing.T) { chRaddr := make(chan string) runClient := func() { - cInfo, err := exchangeConnInfo(context.Background(), server.URL, id0, 0, nplan, false) + cInfo, err := ExchangeConnInfo(context.Background(), server.URL, &SelfInfo{ChanName: id0, NPlan: nplan}, 0, false) if err != nil { t.Errorf("exchange: %v", err) } - chRaddr <- cInfo.peerAddrs[1] + chRaddr <- cInfo.PeerAddrs[0].PubAddr } go runClient() go runClient() rx, ry := <-chRaddr, <-chRaddr if !(rx == ra && ry == rb) && !(rx == rb && ry == ra) { - t.Errorf("connInfo.peerRaddr from exchange not matched: expect: {%s,%s}, got: {%s,%s}", ra, rb, rx, ry) + t.Errorf("PeerInfo.peerRaddr from exchange not matched: expect: {%s,%s}, got: {%s,%s}", ra, rb, rx, ry) } } func TestExchangeConnInfoError(t *testing.T) { defaultLogger = &testLogger{t} - _, err := exchangeConnInfo(context.Background(), "http://localhost:40404", "test-exchange-err", 0, 1, false) + _, err := ExchangeConnInfo(context.Background(), "http://localhost:40404", &SelfInfo{ChanName: "test-exchange-err", NPlan: 1}, 0, false) var opErr *net.OpError if !errors.As(err, &opErr) || opErr.Op != "dial" { t.Fatalf("exchangeConnInfo did not return a dial error on dial failure: %v", err) diff --git a/pkg/pnet/portmapping.go b/pkg/pnet/portmapping.go index bff2712..bb3fe22 100644 --- a/pkg/pnet/portmapping.go +++ b/pkg/pnet/portmapping.go @@ -28,7 +28,7 @@ type routerClient interface { GetExternalIPAddress() (string, error) } -func addPortMapping(ctx context.Context, ports ...int) error { +func AddPortMapping(ctx context.Context, ports ...int) error { client, err := pickRouterClient(ctx) if err != nil { return fmt.Errorf("failed to find a router client: %w", err) diff --git a/pkg/stream/dialer.go b/pkg/stream/dialer.go new file mode 100644 index 0000000..0779630 --- /dev/null +++ b/pkg/stream/dialer.go @@ -0,0 +1,57 @@ +package stream + +import ( + "context" + "errors" + "fmt" + "io" + + "github.com/contextualist/acp/pkg/config" + "github.com/contextualist/acp/pkg/pnet" +) + +// A Dialer establishes a stream for sending / receiving files, with the help of a +// rendezvous service. +// +// After implementing a new Dialer: +// 1. Register +// +// func init() { +// registerDialer("dialer_name", &DialerImpl{}) +// } +// +// 2. Specify information exchange in pnet.SelfInfo and pnet.PeerInfo +// 3. (optional) Provide configurable options in config.Config +type Dialer interface { + // Initialize Dialer from config and environment, while checking availability. + // Return ErrNotAvailable if Dialer is not supported + Init(conf config.Config) error + // Populate the info struct to be sent to rendezvous service for information exchange + SetInfo(info *pnet.SelfInfo) + // Base on the info received, establish a stream as the sender + IntoSender(ctx context.Context, info pnet.PeerInfo) (io.WriteCloser, error) + // Base on the info received, establish a stream as the receiver + IntoReceiver(ctx context.Context, info pnet.PeerInfo) (io.ReadCloser, error) +} + +var allDialers = make(map[string]Dialer) + +func registerDialer(name string, d Dialer) { + allDialers[name] = d +} + +func GetDialer(name string) (Dialer, error) { + d, ok := allDialers[name] + if !ok { + return nil, fmt.Errorf("unknown dialer %s", name) + } + return d, nil +} + +var ErrNotAvailable = errors.New("this dialer is not available") + +var defaultLogger pnet.Logger + +func SetLogger(l pnet.Logger) { + defaultLogger = l +} diff --git a/pkg/stream/layer.go b/pkg/stream/layer.go new file mode 100644 index 0000000..f49a500 --- /dev/null +++ b/pkg/stream/layer.go @@ -0,0 +1,13 @@ +package stream + +import ( + "net" + + aead "github.com/shadowsocks/go-shadowsocks2/shadowaead" +) + +func encrypted(conn net.Conn, psk []byte) (net.Conn, error) { + cipher, err := aead.Chacha20Poly1305(psk) + conn = aead.NewConn(conn, cipher) + return conn, err +} diff --git a/pkg/stream/tcppunch.go b/pkg/stream/tcppunch.go new file mode 100644 index 0000000..7cae9f5 --- /dev/null +++ b/pkg/stream/tcppunch.go @@ -0,0 +1,102 @@ +package stream + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "io" + "net" + + "github.com/contextualist/acp/pkg/config" + "github.com/contextualist/acp/pkg/pnet" +) + +func init() { + registerDialer("tcp_punch", &TcpHolePunch{}) +} + +type TcpHolePunch struct { + bridgeURL string + id string + psk []byte + // Whether to use IPv6 instead of IPv4 for rendezvous + useIPv6 bool + // Local port(s) to be used for rendezvous + ports []int + // Whether to try requesting uPnP port mapping from the router + uPnP bool +} + +func (d *TcpHolePunch) Init(conf config.Config) (err error) { + d.bridgeURL = conf.Server + "/v2/exchange" + d.id = conf.ID + d.psk, err = base64.StdEncoding.DecodeString(conf.PSK) + if err != nil { + return fmt.Errorf("error decoding PSK: %w", err) + } + d.useIPv6 = conf.UseIPv6 + d.ports = conf.Ports + d.uPnP = conf.UPnP + return nil +} + +func (d *TcpHolePunch) SetInfo(info *pnet.SelfInfo) { + info.NPlan = len(d.ports) +} + +func (d *TcpHolePunch) IntoSender(ctx context.Context, info pnet.PeerInfo) (io.WriteCloser, error) { + conn, err := d.holePunching(ctx, info, true) + if err != nil { + return nil, err + } + return encrypted(conn, d.psk) +} + +func (d *TcpHolePunch) IntoReceiver(ctx context.Context, info pnet.PeerInfo) (io.ReadCloser, error) { + conn, err := d.holePunching(ctx, info, false) + if err != nil { + return nil, err + } + return encrypted(conn, d.psk) +} + +// HolePunching negotiates via a rendezvous server with a peer with the same id to establish a connection. +func (d *TcpHolePunch) holePunching(ctx context.Context, info pnet.PeerInfo, isA bool) (conn net.Conn, err error) { + if d.uPnP { + err = pnet.AddPortMapping(ctx, d.ports...) + if err != nil { + defaultLogger.Infof("failed to add port mapping: %v", err) + } + } + + nplan := len(d.ports) + if conn, err = pnet.RendezvousWithTimeout(ctx, info.Laddr, info.PeerAddrs); err == nil { + return conn, nil + } + + // Try out the rest of nA x nB plans + var planp []int + for i := 0; i < tern(isA, nplan, info.PeerNPlan); i++ { + for j := 0; j < tern(isA, info.PeerNPlan, nplan); j++ { + planp = append(planp, d.ports[tern(isA, i, j)]) + } + } + for q := range planp[1:] { + info, err := pnet.ExchangeConnInfo(ctx, d.bridgeURL, &pnet.SelfInfo{ChanName: d.id, NPlan: nplan}, q, d.useIPv6) + if err != nil { + return nil, err + } + if conn, err = pnet.RendezvousWithTimeout(ctx, info.Laddr, info.PeerAddrs); err == nil { + return conn, nil + } + } + return nil, errors.New("all rendezvous attempts failed") +} + +func tern[T any](t bool, a T, b T) T { + if t { + return a + } + return b +} diff --git a/pkg/tui/status.go b/pkg/tui/status.go index 2e29331..59e345d 100644 --- a/pkg/tui/status.go +++ b/pkg/tui/status.go @@ -13,43 +13,53 @@ import ( type ( // A StatusControl is the user handler for a StausModel - StatusControl struct { - *meteredReadWriteCloser + StatusControl[T io.Closer] struct { + *meteredReadWriteCloser[T] chNext chan tea.Msg } ) -func NewStatusControl() *StatusControl { - return &StatusControl{ +func NewStatusControl[T io.Closer]() *StatusControl[T] { + return &StatusControl[T]{ chNext: make(chan tea.Msg), } } // Monitor wraps around a read/write stream for obtaining data transfer stats -func (c *StatusControl) Monitor(stream io.ReadWriteCloser) io.ReadWriteCloser { +func (c *StatusControl[T]) Monitor(stream T) T { c.meteredReadWriteCloser = newMeteredReadWriteCloser(stream, 300*time.Millisecond) - return c.meteredReadWriteCloser + return any(c.meteredReadWriteCloser).(T) } // Next switches to the next BubbleTea Model and shut down current StatusModel -func (c *StatusControl) Next(m tea.Model) { +func (c *StatusControl[_]) Next(m tea.Model) { c.chNext <- modelSwitchMsg{m} close(c.chNext) } -type meteredReadWriteCloser struct { - io.ReadWriteCloser +type meteredReadWriteCloser[T io.Closer] struct { + reader io.ReadCloser + writer io.WriteCloser rate, total atomic.Uint64 startTime time.Time ticker *time.Ticker } -func newMeteredReadWriteCloser(inner io.ReadWriteCloser, interval time.Duration) *meteredReadWriteCloser { +// NOTE: inner should be io.ReadCloser | io.WriteCloser | io.ReadWriteCloser +func newMeteredReadWriteCloser[T io.Closer](inner T, interval time.Duration) *meteredReadWriteCloser[T] { ticker := time.NewTicker(interval) - m := &meteredReadWriteCloser{ - ReadWriteCloser: inner, - startTime: time.Now(), - ticker: ticker, + m := &meteredReadWriteCloser[T]{ + startTime: time.Now(), + ticker: ticker, + } + if r, ok := any(inner).(io.ReadCloser); ok { + m.reader = r + } + if w, ok := any(inner).(io.WriteCloser); ok { + m.writer = w + } + if m.reader == nil && m.writer == nil { + panic("inner is neither io.ReadCloser nor io.WriteCloser") } go func() { for range ticker.C { @@ -59,43 +69,46 @@ func newMeteredReadWriteCloser(inner io.ReadWriteCloser, interval time.Duration) return m } -func (m *meteredReadWriteCloser) Read(p []byte) (n int, err error) { - n, err = m.ReadWriteCloser.Read(p) +func (m *meteredReadWriteCloser[_]) Read(p []byte) (n int, err error) { + n, err = m.reader.Read(p) m.total.Add(uint64(n)) return } -func (m *meteredReadWriteCloser) Write(p []byte) (n int, err error) { - n, err = m.ReadWriteCloser.Write(p) +func (m *meteredReadWriteCloser[_]) Write(p []byte) (n int, err error) { + n, err = m.writer.Write(p) m.total.Add(uint64(n)) return } -func (m *meteredReadWriteCloser) Close() error { +func (m *meteredReadWriteCloser[_]) Close() error { m.ticker.Stop() - return m.ReadWriteCloser.Close() + if m.reader != nil { + return m.reader.Close() + } + return m.writer.Close() } // A StatusModel displays a updating stats of data transfer -type StatusModel struct { +type StatusModel[T io.Closer] struct { spinner spinner.Model - status *StatusControl + status *StatusControl[T] } -func NewStatusModel(c *StatusControl) tea.Model { - return StatusModel{ +func NewStatusModel[T io.Closer](c *StatusControl[T]) tea.Model { + return StatusModel[T]{ spinner: spinner.New(spinner.WithSpinner(spinner.Points)), status: c, } } -func (m StatusModel) waitForNext() tea.Msg { +func (m StatusModel[_]) waitForNext() tea.Msg { return <-m.status.chNext } -func (m StatusModel) Init() tea.Cmd { +func (m StatusModel[_]) Init() tea.Cmd { return tea.Batch(m.spinner.Tick, m.waitForNext) } -func (m StatusModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { +func (m StatusModel[_]) Update(msg tea.Msg) (tea.Model, tea.Cmd) { switch msg := msg.(type) { case spinner.TickMsg: var cmd tea.Cmd @@ -113,7 +126,7 @@ func (m StatusModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { return m, nil } -func (m StatusModel) View() string { +func (m StatusModel[_]) View() string { rate, total := m.status.rate.Load(), m.status.total.Load() return fmt.Sprintf("%s %6s/s %6s", m.spinner.View(), humanize.Bytes(rate), humanize.Bytes(total)) } From 2d6bfb6b3b9216557cc83a8a50e96c941e8d6860 Mon Sep 17 00:00:00 2001 From: Contextualist Date: Sat, 16 Sep 2023 11:06:23 -0700 Subject: [PATCH 4/7] feat: stream: add Tailscale support --- edge/index.ts | 11 +- go.mod | 1 + go.sum | 3 + pkg/pnet/p2p.go | 8 +- pkg/stream/tailscale.go | 253 ++++++++++++++++++++++++++++++++++++ pkg/tailscale/cli.go | 105 +++++++++++++++ pkg/tailscale/interfaces.go | 52 ++++++++ pkg/tailscale/tsaddr.go | 42 ++++++ 8 files changed, 470 insertions(+), 5 deletions(-) create mode 100644 pkg/stream/tailscale.go create mode 100644 pkg/tailscale/cli.go create mode 100644 pkg/tailscale/interfaces.go create mode 100644 pkg/tailscale/tsaddr.go diff --git a/edge/index.ts b/edge/index.ts index aad8857..3017363 100644 --- a/edge/index.ts +++ b/edge/index.ts @@ -3,7 +3,9 @@ type ConnInfo = Deno.ServeHandlerInfo interface ClientInfo { priAddr: string, chanName: string, - nPlan: number, + nPlan?: number, + tsAddr?: string, + tsCap?: number, } interface AddrPair { @@ -13,7 +15,9 @@ interface AddrPair { interface ReplyInfo { peerAddrs: AddrPair[], - peerNPlan: number, + peerNPlan?: number, + tsAddr?: string, + tsCap?: number, } @@ -25,12 +29,13 @@ async function handleExchangeV2(req: Request, connInfo: ConnInfo): Promise 0 { + logE("from tailscale file cp: %s", string(errmsg)) + } + if err := cmd.Wait(); err != nil { + logE("error running tailscale file cp: %v", err) + } + stdinb.NotifyExit() + }() + return stdinb, nil +} + +func (ts *TSCli) RunGet(ctx context.Context, targetDir string) ([]byte, error) { + return ts.run(ctx, "file", "get", "-conflict=overwrite", targetDir).CombinedOutput() +} + +type writeBlockingCloser struct { + io.WriteCloser + chExit chan struct{} +} + +func newWriteBlockingCloser(inner io.WriteCloser) *writeBlockingCloser { + chExit := make(chan struct{}) + return &writeBlockingCloser{inner, chExit} +} + +func (w *writeBlockingCloser) NotifyExit() { + close(w.chExit) +} + +func (w *writeBlockingCloser) Close() error { + err := w.WriteCloser.Close() + <-w.chExit + return err +} diff --git a/pkg/tailscale/interfaces.go b/pkg/tailscale/interfaces.go new file mode 100644 index 0000000..1702290 --- /dev/null +++ b/pkg/tailscale/interfaces.go @@ -0,0 +1,52 @@ +package tailscale + +import ( + "net" + "net/netip" + "strings" +) + +// Adapted from tailscale.com/net/interfaces interfaces.go +// +// Interface returns the current machine's Tailscale interface, if any. +// If none is found, all zero values are returned. +// A non-nil error is only returned on a problem listing the system interfaces. +func Interface() ([]netip.Addr, *net.Interface, error) { + ifs, err := net.Interfaces() + if err != nil { + return nil, nil, err + } + for _, iface := range ifs { + if !maybeTailscaleInterfaceName(iface.Name) { + continue + } + addrs, err := iface.Addrs() + if err != nil { + continue + } + var tsIPs []netip.Addr + for _, a := range addrs { + if ipnet, ok := a.(*net.IPNet); ok { + nip, ok := netip.AddrFromSlice(ipnet.IP) + nip = nip.Unmap() + if ok && IsTailscaleIP(nip) { + tsIPs = append(tsIPs, nip) + } + } + } + if len(tsIPs) > 0 { + return tsIPs, &iface, nil + } + } + return nil, nil, nil +} + +// maybeTailscaleInterfaceName reports whether s is an interface +// name that might be used by Tailscale. +func maybeTailscaleInterfaceName(s string) bool { + return s == "Tailscale" || + strings.HasPrefix(s, "wg") || + strings.HasPrefix(s, "ts") || + strings.HasPrefix(s, "tailscale") || + strings.HasPrefix(s, "utun") +} diff --git a/pkg/tailscale/tsaddr.go b/pkg/tailscale/tsaddr.go new file mode 100644 index 0000000..405c82e --- /dev/null +++ b/pkg/tailscale/tsaddr.go @@ -0,0 +1,42 @@ +package tailscale + +import ( + "net/netip" + "sync" +) + +// Adapted from tailscale.com/net/tsaddr tsaddr.go +// +// IsTailscaleIP reports whether ip is an IP address in a range that +// Tailscale assigns from. +func IsTailscaleIP(ip netip.Addr) bool { + if ip.Is4() { + return CGNATRange().Contains(ip) && !ChromeOSVMRange().Contains(ip) + } + return TailscaleULARange().Contains(ip) +} + +var ( + // CGNATRange returns the Carrier Grade NAT address range that + // is the superset range that Tailscale assigns out of. + // See https://tailscale.com/s/cgnat + // Note that Tailscale does not assign out of the ChromeOSVMRange. + CGNATRange = mustPrefix("100.64.0.0/10") + // ChromeOSVMRange returns the subset of the CGNAT IPv4 range used by + // ChromeOS to interconnect the host OS to containers and VMs. We + // avoid allocating Tailscale IPs from it, to avoid conflicts. + ChromeOSVMRange = mustPrefix("100.115.92.0/23") + // TailscaleULARange returns the IPv6 Unique Local Address range that + // is the superset range that Tailscale assigns out of. + TailscaleULARange = mustPrefix("fd7a:115c:a1e0::/48") +) + +func mustPrefix(prefix string) func() *netip.Prefix { + return sync.OnceValue(func() *netip.Prefix { + v, err := netip.ParsePrefix(prefix) + if err != nil { + panic(err) + } + return &v + }) +} From 775c033b66336a3ef26fa29b937d7e275bbc1f56 Mon Sep 17 00:00:00 2001 From: Contextualist Date: Sun, 17 Sep 2023 16:28:12 -0700 Subject: [PATCH 5/7] feat: acp: support multiple connection strategies --- cmd/acp/main.go | 26 +++++++++++++------ cmd/acp/strategy.go | 60 ++++++++++++++++++++++++++++++++++++++++++++ edge/index.ts | 2 ++ pkg/config/config.go | 16 +++++++----- pkg/pnet/p2p.go | 12 +++++---- pkg/pnet/p2p_test.go | 15 ++++++----- 6 files changed, 105 insertions(+), 26 deletions(-) create mode 100644 cmd/acp/strategy.go diff --git a/cmd/acp/main.go b/cmd/acp/main.go index f6e55b6..99e422c 100644 --- a/cmd/acp/main.go +++ b/cmd/acp/main.go @@ -83,14 +83,24 @@ func transfer(ctx context.Context, conf *config.Config, filenames []string, logg stream.SetLogger(logger) defer logger.End() - dialer, _ := stream.GetDialer("tcp_punch") - err := dialer.Init(*conf) - if !checkErr(err) { + sinfo := pnet.SelfInfo{ChanName: conf.ID} + strategy, errs := tryEach(conf.Strategy, func(name string) (s string, err error) { + var d stream.Dialer + if d, err = stream.GetDialer(name); err != nil { + return + } + if err = d.Init(*conf); err != nil { + return "", fmt.Errorf("failed to init dialer %s: %w", name, err) + } + d.SetInfo(&sinfo) + return name, nil + }) + sinfo.Strategy = strategy + if len(strategy) == 0 { + checkErr(fmt.Errorf("none of the dialers from the strategy is available: %w", errors.Join(errs...))) return } - sinfo := pnet.SelfInfo{ChanName: conf.ID} - dialer.SetInfo(&sinfo) info, err := pnet.ExchangeConnInfo( ctx, conf.Server+"/v2/exchange", @@ -105,7 +115,8 @@ func transfer(ctx context.Context, conf *config.Config, filenames []string, logg var status interface{ Next(tea.Model) } if len(filenames) > 0 { var s io.WriteCloser - s, err = dialer.IntoSender(ctx, *info) + strategyFinal := strategyConsensus(strategy, info.Strategy) + s, err = tryUntil(strategyFinal, func(dn string) (io.WriteCloser, error) { return must(stream.GetDialer(dn)).IntoSender(ctx, *info) }) if !checkErr(err) { return } @@ -114,7 +125,8 @@ func transfer(ctx context.Context, conf *config.Config, filenames []string, logg err = sendFiles(filenames, s) } else { var s io.ReadCloser - s, err = dialer.IntoReceiver(ctx, *info) + strategyFinal := strategyConsensus(info.Strategy, strategy) + s, err = tryUntil(strategyFinal, func(dn string) (io.ReadCloser, error) { return must(stream.GetDialer(dn)).IntoReceiver(ctx, *info) }) if !checkErr(err) { return } diff --git a/cmd/acp/strategy.go b/cmd/acp/strategy.go new file mode 100644 index 0000000..ec922e6 --- /dev/null +++ b/cmd/acp/strategy.go @@ -0,0 +1,60 @@ +package main + +import ( + "errors" + "fmt" +) + +// Merge strategy lists from two parties into a common one, +// following the precedency set by the first party. +func strategyConsensus(pa, pb []string) (c []string) { + pbSet := make(map[string]struct{}, len(pb)) + for _, x := range pb { + pbSet[x] = struct{}{} + } + + for _, x := range pa { + if _, ok := pbSet[x]; ok { + c = append(c, x) + } + } + logger.Debugf("strategy: a=%v, b=%v, consensus=%v", pa, pb, c) + return +} + +// Map a func onto a slice, for each element returning a result or an error +func tryEach[U, V any](a []U, fn func(U) (V, error)) (r []V, errs []error) { + for _, x := range a { + y, err := fn(x) + if err != nil { + logger.Debugf("attempt failed: %v", err) + errs = append(errs, err) + continue + } + r = append(r, y) + } + return +} + +// Map a func onto a slice, until returning the first successful result +func tryUntil[U, V any](a []U, fn func(U) (V, error)) (r V, err error) { + var errs []error + for _, x := range a { + r, err = fn(x) + if err != nil { + logger.Debugf("attempt failed: %v", err) + errs = append(errs, err) + continue + } + return + } + err = fmt.Errorf("all attempts failed: %w", errors.Join(errs...)) + return +} + +func must[T any](t T, err error) T { + if err != nil { + panic(err) + } + return t +} diff --git a/edge/index.ts b/edge/index.ts index 3017363..562b40f 100644 --- a/edge/index.ts +++ b/edge/index.ts @@ -3,6 +3,7 @@ type ConnInfo = Deno.ServeHandlerInfo interface ClientInfo { priAddr: string, chanName: string, + strategy?: string[], nPlan?: number, tsAddr?: string, tsCap?: number, @@ -15,6 +16,7 @@ interface AddrPair { interface ReplyInfo { peerAddrs: AddrPair[], + strategy?: string[], peerNPlan?: number, tsAddr?: string, tsCap?: number, diff --git a/pkg/config/config.go b/pkg/config/config.go index fd496f9..ba4f1f6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -19,12 +19,13 @@ const ( // Config defines the user-specific information for the transfer. // In general, it needs to be consistent across all devices of a user. type Config struct { - ID string `json:"id"` - PSK string `json:"psk"` - Server string `json:"server,omitempty"` - UseIPv6 bool `json:"ipv6,omitempty"` - Ports []int `json:"ports,omitempty"` - UPnP bool `json:"upnp,omitempty"` + ID string `json:"id"` + PSK string `json:"psk"` + Server string `json:"server,omitempty"` + UseIPv6 bool `json:"ipv6,omitempty"` + Ports []int `json:"ports,omitempty"` + UPnP bool `json:"upnp,omitempty"` + Strategy []string `json:"strategy,omitempty"` } func (conf *Config) ApplyDefault() { @@ -34,6 +35,9 @@ func (conf *Config) ApplyDefault() { if len(conf.Ports) == 0 { conf.Ports = []int{0} } + if len(conf.Strategy) == 0 { + conf.Strategy = []string{"tcp_punch"} + } } var configFilename = filepath.Join(userConfigDir(), "acp", "config.json") diff --git a/pkg/pnet/p2p.go b/pkg/pnet/p2p.go index e687c6d..fa70269 100644 --- a/pkg/pnet/p2p.go +++ b/pkg/pnet/p2p.go @@ -30,11 +30,12 @@ type ( } SelfInfo struct { - PriAddr string `json:"priAddr"` - ChanName string `json:"chanName"` - NPlan int `json:"nPlan,omitempty"` - TSAddr string `json:"tsAddr,omitempty"` - TSCap uint `json:"tsCap,omitempty"` + PriAddr string `json:"priAddr"` + ChanName string `json:"chanName"` + Strategy []string `json:"strategy,omitempty"` + NPlan int `json:"nPlan,omitempty"` + TSAddr string `json:"tsAddr,omitempty"` + TSCap uint `json:"tsCap,omitempty"` } AddrPair struct { PriAddr string `json:"priAddr"` @@ -43,6 +44,7 @@ type ( PeerInfo struct { Laddr string PeerAddrs []AddrPair `json:"peerAddrs"` + Strategy []string `json:"strategy,omitempty"` PeerNPlan int `json:"peerNPlan,omitempty"` TSAddr string `json:"tsAddr,omitempty"` TSCap uint `json:"tsCap,omitempty"` diff --git a/pkg/pnet/p2p_test.go b/pkg/pnet/p2p_test.go index 9de9fd9..1d94838 100644 --- a/pkg/pnet/p2p_test.go +++ b/pkg/pnet/p2p_test.go @@ -19,8 +19,8 @@ func TestExchangeConnInfoProto(t *testing.T) { chRecvOrErr := make(chan readerOrError) go func() { chRecvOrErr <- readerOrError{ReadCloser: downR}; close(chRecvOrErr) }() - sinfo0 := SelfInfo{PriAddr: "127.0.0.1:30001", ChanName: "test-exchange-proto", NPlan: 1} - cInfo0 := PeerInfo{Laddr: sinfo0.PriAddr, PeerAddrs: []AddrPair{{"127.0.0.1:30002", "80.80.80.80:30003"}}, PeerNPlan: 1} + sinfo0 := SelfInfo{PriAddr: "127.0.0.1:30001", ChanName: "test-exchange-proto"} + cInfo0 := PeerInfo{Laddr: sinfo0.PriAddr, PeerAddrs: []AddrPair{{"127.0.0.1:30002", "80.80.80.80:30003"}}} go func() { // mock server protocol clientData, err := receivePacket(upR) if err != nil { @@ -31,7 +31,7 @@ func TestExchangeConnInfoProto(t *testing.T) { if err != nil { t.Errorf("error on parsing client data: %v", err) } - if sinfo != sinfo0 { + if !reflect.DeepEqual(sinfo, sinfo0) { t.Errorf("unexpected client data: %v", sinfo) } err = sendPacket(downW, must(json.Marshal(&cInfo0))) @@ -53,7 +53,6 @@ func TestExchangeConnInfo(t *testing.T) { defaultLogger = &testLogger{t} id0 := "test-exchange" ra, rb := "80.80.80.80:30011", "80.80.80.80:30012" - nplan := 1 ch1, ch2 := make(chan []byte), make(chan []byte) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { clientData, err := receivePacket(r.Body) @@ -70,10 +69,10 @@ func TestExchangeConnInfo(t *testing.T) { } var rsp []byte select { - case ch1 <- must(json.Marshal(&PeerInfo{PeerAddrs: []AddrPair{{sinfo.PriAddr, ra}}, PeerNPlan: nplan})): + case ch1 <- must(json.Marshal(&PeerInfo{PeerAddrs: []AddrPair{{sinfo.PriAddr, ra}}})): rsp = <-ch2 case rsp = <-ch1: - ch2 <- must(json.Marshal(&PeerInfo{PeerAddrs: []AddrPair{{sinfo.PriAddr, rb}}, PeerNPlan: nplan})) + ch2 <- must(json.Marshal(&PeerInfo{PeerAddrs: []AddrPair{{sinfo.PriAddr, rb}}})) } w.WriteHeader(http.StatusOK) err = sendPacket(w, rsp) @@ -85,7 +84,7 @@ func TestExchangeConnInfo(t *testing.T) { chRaddr := make(chan string) runClient := func() { - cInfo, err := ExchangeConnInfo(context.Background(), server.URL, &SelfInfo{ChanName: id0, NPlan: nplan}, 0, false) + cInfo, err := ExchangeConnInfo(context.Background(), server.URL, &SelfInfo{ChanName: id0}, 0, false) if err != nil { t.Errorf("exchange: %v", err) } @@ -101,7 +100,7 @@ func TestExchangeConnInfo(t *testing.T) { func TestExchangeConnInfoError(t *testing.T) { defaultLogger = &testLogger{t} - _, err := ExchangeConnInfo(context.Background(), "http://localhost:40404", &SelfInfo{ChanName: "test-exchange-err", NPlan: 1}, 0, false) + _, err := ExchangeConnInfo(context.Background(), "http://localhost:40404", &SelfInfo{ChanName: "test-exchange-err"}, 0, false) var opErr *net.OpError if !errors.As(err, &opErr) || opErr.Op != "dial" { t.Fatalf("exchangeConnInfo did not return a dial error on dial failure: %v", err) From 23650fc2ff2a5191fd616c595b55d1712e48b3f9 Mon Sep 17 00:00:00 2001 From: Contextualist Date: Mon, 18 Sep 2023 19:50:37 -0700 Subject: [PATCH 6/7] tweak: config: infer strategy during setup --- pkg/config/config.go | 19 +++++++++++++++++-- pkg/config/config_test.go | 11 ++++++----- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index ba4f1f6..ab23834 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -9,6 +9,8 @@ import ( "os" "path/filepath" "runtime" + + tsapi "github.com/contextualist/acp/pkg/tailscale" ) const ( @@ -49,6 +51,9 @@ func Setup(confStr string) (err error) { if err = json.Unmarshal([]byte(confStr), conf); err != nil { return err } + if len(conf.Strategy) == 0 { + conf.Strategy = inferStrategy() + } if err = setConfig(conf); err != nil { return err } @@ -56,8 +61,9 @@ func Setup(confStr string) (err error) { conf, err = getConfig() if errors.Is(err, os.ErrNotExist) { conf = &Config{ - ID: base64.StdEncoding.EncodeToString(randBytes(idLen)), - PSK: base64.StdEncoding.EncodeToString(randBytes(pskLen)), + ID: base64.StdEncoding.EncodeToString(randBytes(idLen)), + PSK: base64.StdEncoding.EncodeToString(randBytes(pskLen)), + Strategy: inferStrategy(), } if err = setConfig(conf); err != nil { return err @@ -122,6 +128,15 @@ func setConfig(conf *Config) error { return nil } +func inferStrategy() []string { + _, iface, _ := tsapi.Interface() + _, err := tsapi.Path() + if iface != nil || err == nil { + return []string{"tailscale", "tcp_punch"} + } + return nil +} + func userConfigDir() string { switch runtime.GOOS { case "linux", "darwin": diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index a88adf4..80ff621 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -28,11 +28,12 @@ func TestSetup(t *testing.T) { func TestSetupWith(t *testing.T) { conf0 := Config{ - ID: "AAAAAAAA", - PSK: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=", - Server: "http://localhost:8000", - UseIPv6: true, - Ports: []int{0, 9527}, + ID: "AAAAAAAA", + PSK: "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=", + Server: "http://localhost:8000", + UseIPv6: true, + Ports: []int{0, 9527}, + Strategy: []string{"tcp_punch"}, } conf0Bytes, _ := json.Marshal(&conf0) if err := Setup(string(conf0Bytes)); err != nil { From 984fbcb63384694e6928c8ad5a8c15b0b05a4d01 Mon Sep 17 00:00:00 2001 From: Contextualist Date: Mon, 18 Sep 2023 20:22:29 -0700 Subject: [PATCH 7/7] docs: Tailscale integration --- README.md | 12 +++++++++--- cmd/acp/update.go | 1 + docs/advanced.md | 11 +++++++++++ docs/mechanism.md | 3 ++- 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index c942da5..0407f55 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ Highlights (aka "Why making another file-transfer tool?"): - Designed for personal use; no need to copy-paste a token / code for each transfer -- Rendezvous service runs distributively on [serverless edge function](https://deno.com/deploy/docs), +- Rendezvous service runs distributively on [serverless edge function](https://stackoverflow.blog/2023/02/23/how-edge-functions-move-your-back-end-close-to-your-front-end/), a robust solution with low latency worldwide. ([How does this work?](docs/mechanism.md)) Other features: @@ -15,6 +15,7 @@ Other features: - Compression (gzip) - Cross platform: Linux, macOS, Windows - Support transfering multiple files and directories +- Optional [Tailscale integration](docs/advanced.md#tailscale-integration) See also [comparison table with similar tools](#similar-projects). @@ -57,7 +58,7 @@ You can run the sender and receiver in arbitrary order. Whenever both sides are up and running, they will attempt to establish a P2P connection. If you see messages such as `rendezvous timeout`, at least one side is behind a firewall or a strict NAT that prohibits P2P connection. -For advanced configuration and self-hosting (it's free & takes only 5 minutes!), check out [the docs here](docs/advanced.md). +For advanced configuration and self-hosting, check out [the docs here](docs/advanced.md). ## Similar projects @@ -68,7 +69,7 @@ For advanced configuration and self-hosting (it's free & takes only 5 minutes!), | LAN | O | O | O | O | O | | WAN (local ↔︎ remote) | O | O | O | P | O | | WAN (remote ↔︎ remote) | | P | O | P | O | -| relay | | | | P | O | +| relay | | | P | P | O | | p2p | | | O | O | O | | distributive | | | O | O | | @@ -86,3 +87,8 @@ Apart from the dependencies listed in [`go.mod`](go.mod), this project is also b - [**mholt/archiver**](https://github.com/mholt/archiver): tar/untar implementation - [**libp2p/go-reuseport**](https://github.com/libp2p/go-reuseport): address reuse for TCP hole-punching - [**egoist/bina**](https://github.com/egoist/bina): installation script +- [**Tailscale**](https://tailscale.com), as one of the connection option, provides a painstaking implementation of NAT traversal and a distributive relay service + +## Disclaimer + +This project is not associated with Deno Land Inc. or Tailscale Inc. diff --git a/cmd/acp/update.go b/cmd/acp/update.go index d95a775..fefdda9 100644 --- a/cmd/acp/update.go +++ b/cmd/acp/update.go @@ -50,6 +50,7 @@ func tryUpdate(exe string, repo string, currTag string) error { if err != nil { return fmt.Errorf("failed to update: %w", err) } + fmt.Println("acp has been updated") return nil } diff --git a/docs/advanced.md b/docs/advanced.md index 18ebfde..9fa7e3b 100644 --- a/docs/advanced.md +++ b/docs/advanced.md @@ -15,6 +15,10 @@ List of configurable options: - `[0]`: bind to a random port; - `[9527]`: bind to port 9527; - `[0,9527]`: bind to a random port and port 9527. +- `strategy` (default: `["tcp_punch"]`): List of dialers for connection attempts, ordered by preference. + Available dialers: + - `tcp_punch`: TCP hole-punching + - `tailscale`: TCP over Tailnet / Taildrop (requires Tailscale running) - `upnp` (default: `false`): Request UPnP port mapping from supported router. This may not work for random port. @@ -54,3 +58,10 @@ acp - < tmp-file # receiver acp -d - > tmp-file ``` + + +## Tailscale integration + +Tailscale has a more robust NAT traversal implementation and [distributed relay fallback](https://tailscale.com/blog/how-tailscale-works/#encrypted-tcp-relays-derp), so it is guarenteed to make connections in all cases. Acp can use Tailscale as a transport backend if you have Tailscale running on both side. + +If you have Tailscale running before installing acp, Tailscale support is automatically enabled for acp. Otherwise you can set `strategy: ["tailscale","tcp-punch"]` in config to enable Tailscale support after installing Tailscale. diff --git a/docs/mechanism.md b/docs/mechanism.md index 377c133..ebbf78d 100644 --- a/docs/mechanism.md +++ b/docs/mechanism.md @@ -59,4 +59,5 @@ The following steps take place to create such mappings and use them for P2P conn

*Still curious about the implementation?* -You can find the rendezvous service in [edge/index.ts](../edge/index.ts) and the client-size of hole-punching in [pkg/pnet/p2p.go](../pkg/pnet/p2p.go). +You can find the rendezvous service in [edge/index.ts](../edge/index.ts) and the client-side of hole-punching in [pkg/pnet/p2p.go](../pkg/pnet/p2p.go). +BTW, you would probably find it interesting to read [Tailscale's exteneded discussion on NAT traversal](https://tailscale.com/blog/how-nat-traversal-works/).