From 276e4d68d0490f1bc2aede1ae520d017f51d5bcc Mon Sep 17 00:00:00 2001 From: Petar Maymounkov Date: Wed, 25 Mar 2015 19:34:16 -0400 Subject: [PATCH] cleanup --- pending/tele/_tele_test.go | 74 ------------------- pending/tele/addr.go | 111 ---------------------------- pending/tele/conn.go | 48 ------------ pending/tele/dialer.go | 115 ----------------------------- pending/tele/listener.go | 147 ------------------------------------- pending/tele/transport.go | 42 ----------- 6 files changed, 537 deletions(-) delete mode 100644 pending/tele/_tele_test.go delete mode 100644 pending/tele/addr.go delete mode 100644 pending/tele/conn.go delete mode 100644 pending/tele/dialer.go delete mode 100644 pending/tele/listener.go delete mode 100644 pending/tele/transport.go diff --git a/pending/tele/_tele_test.go b/pending/tele/_tele_test.go deleted file mode 100644 index 35994bf..0000000 --- a/pending/tele/_tele_test.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2013 The Go Circuit Project -// Use of this source code is governed by the license for -// The Go Circuit Project, found in the LICENSE file. -// -// Authors: -// 2013 Petar Maymounkov - -package tele - -import ( - "encoding/gob" - "fmt" - "net" - "os" - "testing" - - "github.com/gocircuit/core/use/n" -) - -type testMsg struct{} - -func init() { - gob.Register(&testMsg{}) -} - -func TestTele(t *testing.T) { - sys := &System{} - y, z := make(chan int), make(chan int) - // Listener - go func() { - x := sys.NewTransport("listener") - l := x.Listen(MustParseNetAddr("0.0.0.0:44111")) - z <- 1 - conn := l.Accept() - msg, err := conn.Read() - if err != nil { - failnow("listener read (%s)", err) - } - if _, ok := msg.(*testMsg); !ok { - failnow("listener message type") - } - if err = conn.Close(); err != nil { - failnow("listener close (%s)", err) - } - y <- 1 - }() - // Dialer - go func() { - x := sys.NewTransport("dialer") - <-z - conn, err := x.Dial(&Addr{ - ID: n.Id("listener"), - PID: os.Getpid(), - TCP: MustParseNetAddr("127.0.0.1:44111").(*net.TCPAddr), - }) - if err != nil { - failnow("dialer dial (%s)", err) - } - if err = conn.Write(&testMsg{}); err != nil { - failnow("dialer write (%s)", err) - } - if err = conn.Close(); err != nil { - failnow("dialer close (%s)", err) - } - y <- 1 - }() - <-y - <-y -} - -func failnow(format string, v ...interface{}) { - println(fmt.Sprintf(format, v...)) - os.Exit(1) -} diff --git a/pending/tele/addr.go b/pending/tele/addr.go deleted file mode 100644 index 15ea8e6..0000000 --- a/pending/tele/addr.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright 2013 Tumblr, Inc. -// Use of this source code is governed by the license for -// The Go Circuit Project, found in the LICENSE file. -// -// Authors: -// 2013 Petar Maymounkov - -package tele - -import ( - "encoding/gob" - "fmt" - "net" - "net/url" - "strconv" - "strings" - - "github.com/gocircuit/core/errors" - "github.com/gocircuit/core/sys" -) - -// Addr maintains a single unique instance for each addr. -// Addr object uniqueness is required by the n.Addr interface. -type Addr struct { - ID sys.Id - PID int - TCP *net.TCPAddr -} - -func init() { - gob.Register(&Addr{}) -} - -func MustParseNetAddr(s string) net.Addr { - addr, err := ParseNetAddr(s) - if err != nil { - panic(err) - } - return addr -} - -func ParseNetAddr(s string) (net.Addr, error) { - return net.ResolveTCPAddr("tcp", s) -} - -func NewNetAddr(id sys.Id, pid int, addr net.Addr) *Addr { - return &Addr{ID: id, PID: pid, TCP: addr.(*net.TCPAddr)} -} - -func NewAddr(id sys.Id, pid int, hostport string) (sys.Addr, error) { - a, err := net.ResolveTCPAddr("tcp", hostport) - if err != nil { - return nil, err - } - return &Addr{ID: id, PID: pid, TCP: a}, nil -} - -func (a *Addr) Id() sys.Id { - return a.ID -} - -func (a *Addr) String() string { - u := url.URL{ - Scheme: sys.Scheme, - Host: sanitizeTCP(a.TCP), - Path: "/" + strconv.Itoa(a.PID) + "/" + a.ID.String(), - } - return u.String() -} - -// circuit://123.3.45.0:3456/2345/R1122334455667788 -func ParseAddr(s string) (*Addr, error) { - u, err := url.Parse(s) - if err != nil { - return nil, err - } - if u.Scheme != sys.Scheme { - return nil, errors.NewError("worker address URL scheme mismatch") - } - // Net address - naddr, err := ParseNetAddr(u.Host) - if err != nil { - return nil, err - } - // Parse path - parts := strings.Split(u.Path, "/") - if len(parts) != 3 { - return nil, errors.NewError(fmt.Sprintf("parse path: %#v", parts)) - } - if parts[0] != "" { - return nil, errors.NewError("must start with slash") - } - // PID - pid, err := strconv.Atoi(parts[1]) - if err != nil { - return nil, err - } - // Worker ID - id, err := sys.ParseId(parts[2]) - if err != nil { - return nil, err - } - return &Addr{ID: id, PID: pid, TCP: naddr.(*net.TCPAddr)}, nil -} - -func sanitizeTCP(a *net.TCPAddr) string { - if len(a.IP) == 0 { - return "noaddr" - } - return a.String() -} diff --git a/pending/tele/conn.go b/pending/tele/conn.go deleted file mode 100644 index 17c082f..0000000 --- a/pending/tele/conn.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2013 The Go Circuit Project -// Use of this source code is governed by the license for -// The Go Circuit Project, found in the LICENSE file. -// -// Authors: -// 2013 Petar Maymounkov - -package tele - -import ( - "github.com/gocircuit/core/sys" - "github.com/gocircuit/core/sys/tele/blend" -) - -type Conn struct { - addr *Addr - sub *blend.Conn -} - -func NewConn(sub *blend.Conn, addr *Addr) *Conn { - return &Conn{addr: addr, sub: sub} -} - -func (c *Conn) Read() (v interface{}, err error) { - if v, err = c.sub.Read(); err != nil { - return nil, err - } - return -} - -func (c *Conn) Write(v interface{}) (err error) { - if err = c.sub.Write(v); err != nil { - return err - } - return nil -} - -func (c *Conn) Close() error { - return c.sub.Close() -} - -func (c *Conn) Abort(reason error) { - c.sub.Abort(reason) -} - -func (c *Conn) Addr() sys.Addr { - return c.addr -} diff --git a/pending/tele/dialer.go b/pending/tele/dialer.go deleted file mode 100644 index 6db4064..0000000 --- a/pending/tele/dialer.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2013 The Go Circuit Project -// Use of this source code is governed by the license for -// The Go Circuit Project, found in the LICENSE file. -// -// Authors: -// 2013 Petar Maymounkov - -package tele - -import ( - // "log" - "sync" - "time" - - "github.com/gocircuit/core/errors" - "github.com/gocircuit/core/sys" - "github.com/gocircuit/core/sys/tele/blend" -) - -// Dialer -type Dialer struct { - dialback sys.Addr - sub *blend.Transport // Encloses *blend.Dialer - sync.Mutex - open map[sys.Id]*blend.DialSession // Open dial sessions -} - -func newDialer(dialback sys.Addr, sub *blend.Transport) *Dialer { - return &Dialer{ - dialback: dialback, - sub: sub, - open: make(map[sys.Id]*blend.DialSession), - } -} - -func (d *Dialer) Dial(addr sys.Addr) (conn sys.Conn, err error) { - d.Lock() - defer d.Unlock() - // - workerID := addr.Id() - s, present := d.open[workerID] - if !present { - // Make new session to worker if one not present - s, err = d.sub.DialSession(addr.(*Addr).TCP, func() { - d.scrub(addr.Id()) - }) - if err != nil { - return nil, err - } - if err = d.auth(addr, s.Dial()); err != nil { - s.Close() - return nil, err - } - d.open[workerID] = s - go d.watch(workerID, s) // Watch for idleness and close - } - return NewConn(s.Dial(), addr.(*Addr)), nil -} - -// Idleness duration should be greater than the locus heartbeats over permanent cross-references -const IdleDuration = time.Second * 10 - -func (d *Dialer) watch(workerID sys.Id, s *blend.DialSession) { - var ready bool - for { - time.Sleep(IdleDuration) - if d.expire(workerID, s, &ready) { - return - } - } -} - -func (d *Dialer) expire(workerID sys.Id, s *blend.DialSession, ready *bool) (closed bool) { - d.Lock() - defer d.Unlock() - // - numconn, lastuse := s.NumConn() - if numconn == 0 && time.Now().Sub(lastuse) > IdleDuration { - if *ready { - delete(d.open, workerID) - // log.Printf("idle session %s expiring", s) - s.Close() - return true - } - *ready = true - } - return false -} - -func (d *Dialer) scrub(workerID sys.Id) { - d.Lock() - defer d.Unlock() - delete(d.open, workerID) -} - -func (d *Dialer) auth(addr sys.Addr, conn *blend.Conn) error { - defer conn.Close() - if err := conn.Write(&HelloMsg{ - SourceAddr: d.dialback, - TargetAddr: addr, - }); err != nil { - return err - } - msg, err := conn.Read() - if err != nil { - return err - } - switch q := msg.(type) { - case *WelcomeMsg: - return nil - case *RejectMsg: - return errors.NewError("dial rejected by remote (%s)", errors.Unpack(q.Err)) - } - return errors.NewError("unknown welcome response") -} diff --git a/pending/tele/listener.go b/pending/tele/listener.go deleted file mode 100644 index 0efb6c5..0000000 --- a/pending/tele/listener.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright 2013 The Go Circuit Project -// Use of this source code is governed by the license for -// The Go Circuit Project, found in the LICENSE file. -// -// Authors: -// 2013 Petar Maymounkov - -package tele - -import ( - "encoding/gob" - "log" - "net" - "os" - "sync" - - "github.com/gocircuit/core/errors" - "github.com/gocircuit/core/sys" - "github.com/gocircuit/core/sys/tele/blend" -) - -// Listener -type Listener struct { - addr *Addr - listener *blend.Listener - ach__ sync.Mutex - ach chan sys.Conn -} - -func newListener(workerID sys.Id, pid int, listener *blend.Listener) *Listener { - l := &Listener{ - addr: NewNetAddr(workerID, pid, listener.Addr()), // Compute what our address looks like on the outside. - listener: listener, - ach: make(chan sys.Conn), - } - go l.loop() - return l -} - -func (l *Listener) loop() { - for { - session := l.listener.AcceptSession() - go func() { - defer session.Close() - // Authenticate dialer on first connection - sourceAddr, err := l.handshake(session.Accept()) - if err != nil { - return - } - for { - conn := session.Accept() - if conn == nil { - // Nil conn signifies the session has been closed - return - } - // For now, listening sessions do not expire themselves on inactivity to prevent - // race against DialSessions, who currently hold the sole responsibility. - l.ach__.Lock() - l.ach <- NewConn(conn, sourceAddr) - l.ach__.Unlock() - } - }() - } -} - -func (l *Listener) handshake(conn *blend.Conn) (sourceAddr *Addr, err error) { - if conn == nil { - return nil, errors.NewError("listener off") - } - defer conn.Close() - // - var msg interface{} - msg, err = conn.Read() - if err != nil { - return nil, err - } - defer func() { - if err != nil { - conn.Write(&RejectMsg{err}) - } else { - err = conn.Write(&WelcomeMsg{}) - } - }() - hello, ok := msg.(*HelloMsg) - if !ok { - log.Println("rejecting", conn.RemoteAddr().String(), "unknown hello message type") - return nil, errors.NewError("rejecting unknown hello type") - } - // Accept user connections - da, ok := hello.SourceAddr.(*Addr) - if !ok { - log.Println("rejecting", conn.RemoteAddr().String(), "unknown source address type") - return nil, errors.NewError("rejecting unknown source address type") - } - reverseAddr(da, conn.RemoteAddr()) - la, ok := hello.TargetAddr.(*Addr) - if !ok { - log.Println("rejecting ", conn.RemoteAddr().String(), "unknown target address type") - return nil, errors.NewError("rejecting unknown target address type") - } - if la.Id() != l.addr.Id() { - log.Println("rejecting", conn.RemoteAddr().String(), "due to worker identity mismatch") - return nil, errors.NewError("rejecting worker identity mismatch, looks for %s, got %s", la.Id(), l.addr.Id()) - } - if la.PID != os.Getpid() { - log.Println("rejecting", conn.RemoteAddr().String(), "due to worker PID mismatch") - return nil, errors.NewError("rejecting worker PID mismatch, looks for %d, got %d", la.PID, os.Getpid()) - } - return da, nil -} - -func reverseAddr(bound *Addr, seen net.Addr) { - // var saved = bound.String() - if !bound.TCP.IP.IsUnspecified() { - return - } - bound.TCP.IP = seen.(*net.TCPAddr).IP - // log.Printf("Reverse dial address auto-completed: %s => %s", saved, bound.String()) -} - -func (l *Listener) Accept() sys.Conn { - return <-l.ach -} - -func (l *Listener) Addr() sys.Addr { - return l.addr -} - -func (l *Listener) Close() {} - -// Dialer sends HelloMsg to accepter when opening a session to advertise its workerID and local process ID -type HelloMsg struct { - SourceAddr sys.Addr - TargetAddr sys.Addr -} - -type WelcomeMsg struct{} - -type RejectMsg struct { - Err error -} - -func init() { - gob.Register(&HelloMsg{}) - gob.Register(&WelcomeMsg{}) - gob.Register(&RejectMsg{}) -} diff --git a/pending/tele/transport.go b/pending/tele/transport.go deleted file mode 100644 index 4f9c73c..0000000 --- a/pending/tele/transport.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2013 The Go Circuit Project -// Use of this source code is governed by the license for -// The Go Circuit Project, found in the LICENSE file. -// -// Authors: -// 2013 Petar Maymounkov - -// Package tele implements the circuit/use/n networking module using Teleport Transport -package tele - -import ( - "net" - "os" - - "github.com/gocircuit/core/sys" - "github.com/gocircuit/core/sys/tele" - "github.com/gocircuit/core/sys/tele/blend" -) - -// workerID is the ID for this transport endpoint. -// addr is the networking address to listen to. -func NewTransport(workerID sys.Id, addr net.Addr, key []byte) sys.Peer { - var u *blend.Transport - if len(key) == 0 { - u = tele.NewStructOverTCP() - } else { - u = tele.NewStructOverTCPWithHMAC(key) - } - l := newListener(workerID, os.Getpid(), u.Listen(addr)) - return &Transport{ - Id: workerID, - Dialer: newDialer(l.Addr(), u), - Listener: l, - } -} - -// Transport cumulatively represents the ability to listen for connections and dial into remote endpoints. -type Transport struct { - sys.Id - *Dialer - *Listener -}