Skip to content

Commit

Permalink
feat: stream: add Tailscale support
Browse files Browse the repository at this point in the history
  • Loading branch information
Contextualist committed Sep 16, 2023
1 parent 3ba994d commit 2d6bfb6
Show file tree
Hide file tree
Showing 8 changed files with 470 additions and 5 deletions.
11 changes: 8 additions & 3 deletions edge/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ type ConnInfo = Deno.ServeHandlerInfo
interface ClientInfo {
priAddr: string,
chanName: string,
nPlan: number,
nPlan?: number,
tsAddr?: string,
tsCap?: number,
}

interface AddrPair {
Expand All @@ -13,7 +15,9 @@ interface AddrPair {

interface ReplyInfo {
peerAddrs: AddrPair[],
peerNPlan: number,
peerNPlan?: number,
tsAddr?: string,
tsCap?: number,
}


Expand All @@ -25,12 +29,13 @@ async function handleExchangeV2(req: Request, connInfo: ConnInfo): Promise<Respo

const pubAddr = joinHostPort(connInfo.remoteAddr)
const conn = req.body!.getReader({ mode: "byob" })
const { priAddr, chanName, nPlan = 1 }: ClientInfo = JSON.parse(
const { priAddr, chanName, nPlan = 1, ...otherInfo }: ClientInfo = JSON.parse(
new TextDecoder().decode(await receivePacket(conn))
)
const reply: ReplyInfo = {
peerAddrs: [{ pubAddr, priAddr }],
peerNPlan: nPlan,
...otherInfo,
}
const x0 = JSON.stringify(reply)
//console.log(`accepted from ${x0}`)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/charmbracelet/lipgloss v0.8.0 // indirect
github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 // indirect
github.com/fsnotify/fsnotify v1.6.0
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81 h1:q2hJAaP1k2
github.com/containerd/console v1.0.4-0.20230313162750-1ae8d489ac81/go.mod h1:YynlIjWYF8myEu6sdkwKIvGQq+cOckRm6So2avqoYAk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/huin/goupnp v1.2.0 h1:uOKW26NG1hsSSbXIZ1IR7XP9Gjd1U8pnLaCMgntmkmY=
github.com/huin/goupnp v1.2.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 h1:iQTw/8FWTuc7uiaSepXwyf3o52HaUYcV+Tu66S3F5GA=
Expand Down Expand Up @@ -55,6 +57,7 @@ golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
Expand Down
8 changes: 6 additions & 2 deletions pkg/pnet/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ type (
SelfInfo struct {
PriAddr string `json:"priAddr"`
ChanName string `json:"chanName"`
NPlan int `json:"nPlan"`
NPlan int `json:"nPlan,omitempty"`
TSAddr string `json:"tsAddr,omitempty"`
TSCap uint `json:"tsCap,omitempty"`
}
AddrPair struct {
PriAddr string `json:"priAddr"`
Expand All @@ -41,7 +43,9 @@ type (
PeerInfo struct {
Laddr string
PeerAddrs []AddrPair `json:"peerAddrs"`
PeerNPlan int `json:"peerNPlan"`
PeerNPlan int `json:"peerNPlan,omitempty"`
TSAddr string `json:"tsAddr,omitempty"`
TSCap uint `json:"tsCap,omitempty"`
}
)

Expand Down
253 changes: 253 additions & 0 deletions pkg/stream/tailscale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package stream

import (
"context"
"errors"
"fmt"
"io"
"net/netip"
"os"
"path"
"path/filepath"
"runtime"
"strings"

"github.com/fsnotify/fsnotify"

"github.com/contextualist/acp/pkg/config"
"github.com/contextualist/acp/pkg/pnet"
tsapi "github.com/contextualist/acp/pkg/tailscale"
)

func init() {
registerDialer("tailscale", &Tailscale{tun: &tailscaleTun{}, tdrop: &taildrop{}})
}

type TSCapability uint

const (
TSTaildrop TSCapability = 1 << iota
TSTun
)

type Tailscale struct {
tun *tailscaleTun
tdrop *taildrop
capability TSCapability
}

func (d *Tailscale) Init(conf config.Config) error {
if d.tun.Init(conf) == nil {
d.capability |= TSTun
}
if d.tdrop.Init(conf) == nil {
d.capability |= TSTaildrop
}
if d.capability == 0 {
return ErrNotAvailable
}
return nil
}

func (d *Tailscale) SetInfo(info *pnet.SelfInfo) {
if d.capability&TSTun != 0 {
d.tun.SetInfo(info)
} else {
d.tdrop.SetInfo(info)
}
info.TSCap = uint(d.capability)
}

func (d *Tailscale) IntoSender(ctx context.Context, info pnet.PeerInfo) (io.WriteCloser, error) {
inner, err := d.pickImpl(d.capability, TSCapability(info.TSCap))
if err != nil {
return nil, err
}
return inner.IntoSender(ctx, info)
}

func (d *Tailscale) IntoReceiver(ctx context.Context, info pnet.PeerInfo) (io.ReadCloser, error) {
inner, err := d.pickImpl(TSCapability(info.TSCap), d.capability)
if err != nil {
return nil, err
}
return inner.IntoReceiver(ctx, info)
}

func (d *Tailscale) pickImpl(cap1, cap2 TSCapability) (Dialer, error) {
cap := cap1 & cap2
if cap&TSTun != 0 {
defaultLogger.Debugf("using Tailscale Tun")
return d.tun, nil
}
if cap&TSTaildrop != 0 {
defaultLogger.Debugf("using Taildrop")
return d.tdrop, nil
}
return nil, errors.New("neither of Tailscale Tun and Taildrop is supported on both side")
}

type tailscaleTun struct {
laddr string
}

func (d *tailscaleTun) Init(conf config.Config) error {
addrs, _, err := tsapi.Interface()
if err != nil || len(addrs) == 0 {
defaultLogger.Debugf("tailscale network interface search failed, found addrs %v: %v", addrs, err)
return ErrNotAvailable
}
laddr := fmt.Sprintf("%s:%v", addrs[0].String(), conf.Ports[0])
listener, err := pnet.Listen(context.TODO(), "tcp", laddr)
if err != nil {
defaultLogger.Debugf("listen at tailscale addr %s failed: %v", laddr, err)
return ErrNotAvailable
}
d.laddr = listener.Addr().String()
_ = listener.Close()
defaultLogger.Debugf("tailscale IP address is available")
return nil
}

func (d *tailscaleTun) SetInfo(info *pnet.SelfInfo) {
info.TSAddr = d.laddr
}

func (d *tailscaleTun) IntoSender(ctx context.Context, info pnet.PeerInfo) (io.WriteCloser, error) {
return pnet.RendezvousWithTimeout(ctx, d.laddr, []pnet.AddrPair{{PriAddr: info.TSAddr, PubAddr: info.TSAddr}})
}

func (d *tailscaleTun) IntoReceiver(ctx context.Context, info pnet.PeerInfo) (io.ReadCloser, error) {
return pnet.RendezvousWithTimeout(ctx, d.laddr, []pnet.AddrPair{{PriAddr: info.TSAddr, PubAddr: info.TSAddr}})
}

type taildrop struct {
cli *tsapi.TSCli
tsIP string
}

func (d *taildrop) Init(_ config.Config) error {
bin, err := tsapi.Path()
if err != nil {
defaultLogger.Debugf("not found: %v", err)
return ErrNotAvailable
}
d.cli = &tsapi.TSCli{Prefix: []string{bin}}

if soc, ok := os.LookupEnv("TS_SOCKET"); ok {
defaultLogger.Debugf("will be using socket %s when running tailscale", soc)
d.cli.Prefix = append(d.cli.Prefix, fmt.Sprintf("--socket=%s", soc))
}

sf, err := d.cli.RunStatus(context.TODO())
if err != nil {
defaultLogger.Debugf(err.Error())
return ErrNotAvailable
}
d.tsIP = sf.TsIPs[0]
if runtime.GOOS == "linux" && sf.Tun {
// heuristic test if tailscaled is running as root
defaultLogger.Debugf("will be running tailscale as root")
d.cli.Prefix = append([]string{"sudo"}, d.cli.Prefix...)
}

defaultLogger.Debugf("taildrop is available")
return nil
}

func (d *taildrop) SetInfo(info *pnet.SelfInfo) {
info.TSAddr = d.tsIP
}

const TmpArcName = "acp-tmp.tar.gz"

func (d *taildrop) IntoSender(ctx context.Context, info pnet.PeerInfo) (io.WriteCloser, error) {
peerIP := stripPort(info.TSAddr)
return d.cli.StartCp(ctx, TmpArcName, peerIP, defaultLogger.Infof)
}

func (d *taildrop) IntoReceiver(ctx context.Context, info pnet.PeerInfo) (r io.ReadCloser, err error) {
var tsInbox string
switch runtime.GOOS {
case "linux":
tsInbox, err = os.Getwd()
case "darwin", "windows":
var home string
home, err = os.UserHomeDir()
tsInbox = filepath.Join(home, "Downloads")
default:
return nil, fmt.Errorf("OS %s is not supported", runtime.GOOS)
}
if err != nil {
return nil, fmt.Errorf("error getting Taildrop receive dir: %w", err)
}
targetName := filepath.Join(tsInbox, TmpArcName)

defaultLogger.Infof("Taildrop receiving...")
if runtime.GOOS == "linux" {
out, err := d.cli.RunGet(ctx, ".")
if err != nil {
return nil, fmt.Errorf("error running tailscale file get: %w\noutput:\n%s", err, string(out))
}
} else {
// The tailscale daemon automatically receives the file to Downloads, so we wait for it to finish
// CAVEAT: if `targetName` already exist, tailscale daemon will receive file as `targetName 1`
err := waitFileTransferEnd(targetName)
if err != nil {
return nil, fmt.Errorf("error waiting for Taildrop receive: %w", err)
}
}
fi, err := os.Open(targetName)
if err != nil {
return nil, err
}
return &fileWithCleanup{fi}, nil
}

func stripPort(s string) string {
ap, err := netip.ParseAddrPort(s)
if err != nil {
return s
}
return ap.Addr().String()
}

func waitFileTransferEnd(fname string) (err error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("cannot create file watcher: %w", err)
}
defer watcher.Close()
parent := path.Dir(fname)
err = watcher.Add(parent)
if err != nil {
return fmt.Errorf("cannot subscribe watcher to path %s: %w", parent, err)
}

for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if !strings.HasPrefix(event.Name, fname) {
continue
}
defaultLogger.Debugf("fs event: %v", event)
if event.Has(fsnotify.Create) && event.Name == fname {
return
}
case err = <-watcher.Errors:
return
}
}
}

type fileWithCleanup struct {
*os.File
}

func (f *fileWithCleanup) Close() error {
_ = os.Remove(f.File.Name())
return f.File.Close()
}
Loading

0 comments on commit 2d6bfb6

Please sign in to comment.