Skip to content

Commit

Permalink
check chained server through ping or recently proxied site fix #4268
Browse files Browse the repository at this point in the history
  • Loading branch information
fffw committed May 18, 2016
1 parent 3e2f1c5 commit 40341f6
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 62 deletions.
61 changes: 11 additions & 50 deletions src/github.com/getlantern/balancer/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/getlantern/withtimeout"
)

// Dialer captures the configuration for dialing arbitrary addresses.
Expand All @@ -21,14 +19,11 @@ type Dialer struct {
// OnClose: (optional) callback for when this dialer is stopped.
OnClose func()

// Check: (optional) - a function that's used to test reachibility metrics
// Check: - a function that's used to test reachibility metrics
// periodically or if the dialer was failed to connect.
//
// Checks are scheduled at exponentially increasing intervals that are
// capped at 1 minute.
//
// If Check is not specified, a default Check will be used that makes an
// HTTP request to http://www.google.com/humans.txt using this Dialer.
// capped at MaxCheckTimeout.
Check func() bool

// Determines whether a dialer can be trusted with unencrypted traffic.
Expand All @@ -39,7 +34,8 @@ type Dialer struct {
}

var (
maxCheckTimeout = 1 * time.Minute
// The maximum wait time before checking an idle or failed dialer.
MaxCheckTimeout = 2 * time.Second
)

type dialer struct {
Expand All @@ -60,7 +56,7 @@ type dialer struct {
func (d *dialer) Start() {
d.consecSuccesses = 1 // be optimistic
d.closeCh = make(chan struct{})
d.checkTimer = time.NewTimer(maxCheckTimeout)
d.checkTimer = time.NewTimer(MaxCheckTimeout)
if d.Check == nil {
d.Check = d.defaultCheck
}
Expand All @@ -75,13 +71,6 @@ func (d *dialer) Start() {
}
return
case <-d.checkTimer.C:
// We suspect that the check process may be causing users to get blacklisted.
// At the moment, it's not strictly necessary and won't be until we do
// multiple servers with pro, so let's skip it for now.
// TODO: reenable for Pro if necessary
if true {
continue
}
log.Tracef("Start checking dialer %s", d.Label)
ok := d.Check()
if ok {
Expand Down Expand Up @@ -127,7 +116,7 @@ func (d *dialer) updateEMADialTime(t time.Duration) {
// Ref dialer.EMADialTime() for the rationale.
// The values is large enough to safely ignore decimals.
newEMA := (atomic.LoadInt64(&d.emaDialTime) + t.Nanoseconds()) / 2
log.Tracef("Dialer %s EMA(exponential moving average) dial time: %d", d.Label, newEMA)
log.Tracef("Dialer %s EMA(exponential moving average) dial time: %v", d.Label, time.Duration(newEMA))
atomic.StoreInt64(&d.emaDialTime, newEMA)
}

Expand All @@ -136,7 +125,7 @@ func (d *dialer) markSuccess() {
log.Tracef("Dialer %s consecutive successes: %d -> %d", d.Label, newCS-1, newCS)
atomic.StoreInt32(&d.consecFailures, 0)
d.muCheckTimer.Lock()
d.checkTimer.Reset(maxCheckTimeout)
d.checkTimer.Reset(MaxCheckTimeout)
d.muCheckTimer.Unlock()
}

Expand All @@ -145,43 +134,15 @@ func (d *dialer) markFailure() {
newCF := atomic.AddInt32(&d.consecFailures, 1)
log.Tracef("Dialer %s consecutive failures: %d -> %d", d.Label, newCF-1, newCF)
nextCheck := time.Duration(newCF*newCF) * 100 * time.Millisecond
if nextCheck > maxCheckTimeout {
nextCheck = maxCheckTimeout
if nextCheck > MaxCheckTimeout {
nextCheck = MaxCheckTimeout
}
d.muCheckTimer.Lock()
d.checkTimer.Reset(nextCheck)
d.muCheckTimer.Unlock()
}

func (d *dialer) defaultCheck() bool {
client := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
Dial: d.dial,
},
}
ok, timedOut, _ := withtimeout.Do(60*time.Second, func() (interface{}, error) {
req, err := http.NewRequest("GET", "http://www.google.com/humans.txt", nil)
if err != nil {
log.Errorf("Could not create HTTP request?")
return false, nil
}
if d.OnRequest != nil {
d.OnRequest(req)
}
resp, err := client.Do(req)
if err != nil {
log.Debugf("Error testing dialer %s to humans.txt: %s", d.Label, err)
return false, nil
}
if err := resp.Body.Close(); err != nil {
log.Debugf("Unable to close response body: %v", err)
}
log.Tracef("Tested dialer %s to humans.txt, status code %d", d.Label, resp.StatusCode)
return resp.StatusCode == 200, nil
})
if timedOut {
log.Errorf("Timed out checking dialer at: %v", d.Label)
}
return !timedOut && ok.(bool)
log.Errorf("No check function provided for dialer %s", d.Label)
return false
}
99 changes: 87 additions & 12 deletions src/github.com/getlantern/flashlight/client/chained.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@ import (
"fmt"
"net"
"net/http"
"strings"
"sync/atomic"
"time"

"github.com/getlantern/balancer"
"github.com/getlantern/chained"
"github.com/getlantern/idletiming"
"github.com/getlantern/withtimeout"
)

// Close connections idle for a period to avoid dangling connections.
// 1 hour is long enough to avoid interrupt normal connections but short enough
// to eliminate "too many open files" error.
var idleTimeout = 1 * time.Hour

// Lantern internal sites won't be used as check target.
var internalSiteSuffix = "getiantem.org"

// ForceChainedProxyAddr - If specified, all proxying will go through this address
var ForceChainedProxyAddr string

Expand Down Expand Up @@ -43,6 +49,10 @@ type ChainedServerInfo struct {

// PluggableTransportSettings: Settings for pluggable transport
PluggableTransportSettings map[string]string

// the host:port used to check this server. It's set as the last dialed
// host if the port is 80, except those with internalSiteSuffix.
checkTarget atomic.Value
}

// Dialer creates a *balancer.Dialer backed by a chained server.
Expand Down Expand Up @@ -70,18 +80,9 @@ func (s *ChainedServerInfo) Dialer(deviceID string) (*balancer.Dialer, error) {
ccfg := chained.Config{
DialServer: dial,
Label: label,
}

authToken := s.AuthToken
if ForceAuthToken != "" {
authToken = ForceAuthToken
}

ccfg.OnRequest = func(req *http.Request) {
if authToken != "" {
req.Header.Add("X-LANTERN-AUTH-TOKEN", authToken)
}
req.Header.Set("X-LANTERN-DEVICE-ID", deviceID)
OnRequest: func(req *http.Request) {
s.attachHeaders(req, deviceID)
},
}
d := chained.NewDialer(ccfg)

Expand All @@ -94,6 +95,7 @@ func (s *ChainedServerInfo) Dialer(deviceID string) (*balancer.Dialer, error) {
return nil, err
}

s.updateCheckTarget(addr)
conn = idletiming.Conn(conn, idleTimeout, func() {
log.Debugf("Proxy connection to %s via %s idle for %v, closing", addr, conn.RemoteAddr(), idleTimeout)
if err := conn.Close(); err != nil {
Expand All @@ -103,6 +105,79 @@ func (s *ChainedServerInfo) Dialer(deviceID string) (*balancer.Dialer, error) {

return conn, nil
},
Check: func() bool {
return s.check(d, deviceID)
},
OnRequest: ccfg.OnRequest,
}, nil
}

func (s *ChainedServerInfo) attachHeaders(req *http.Request, deviceID string) {
authToken := s.AuthToken
if ForceAuthToken != "" {
authToken = ForceAuthToken
}
if authToken != "" {
req.Header.Add("X-LANTERN-AUTH-TOKEN", authToken)
}
req.Header.Set("X-LANTERN-DEVICE-ID", deviceID)
}

func (s *ChainedServerInfo) updateCheckTarget(addr string) {
host, port, e := net.SplitHostPort(addr)
if e != nil {
log.Errorf("failed to split port from %s", addr)
return
}
if strings.HasSuffix(host, internalSiteSuffix) || port != "80" {
log.Tracef("Skip setting %s as check target", addr)
return
}
s.checkTarget.Store(addr)
}

func (s *ChainedServerInfo) check(dial func(string, string) (net.Conn, error), deviceID string) bool {
rt := &http.Transport{
DisableKeepAlives: true,
Dial: dial,
}
var url string
checkTarget, isSet := s.checkTarget.Load().(string)
if !isSet {
url = "http://ping-chained-server"
} else {
url = fmt.Sprintf("http://%s/index.html", checkTarget)
}
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
log.Errorf("Could not create HTTP request: %v", err)
return false
}
if !isSet {
req.Header.Set("X-Lantern-Ping", "small")
}

s.attachHeaders(req, deviceID)
ok, timedOut, _ := withtimeout.Do(60*time.Second, func() (interface{}, error) {
resp, err := rt.RoundTrip(req)
if err != nil {
log.Debugf("Error testing dialer %s to %s: %s", s.Addr, url, err)
return false, nil
}
if err := resp.Body.Close(); err != nil {
log.Debugf("Unable to close response body: %v", err)
}
good := resp.StatusCode < 500
msg := fmt.Sprintf("HEAD %s through chained server at %s, status code %d", url, s.Addr, resp.StatusCode)
if !good {
log.Debug(msg)
} else {
log.Trace(msg)
}
return good, nil
})
if timedOut {
log.Errorf("Timed out checking dialer at: %v", s.Addr)
}
return !timedOut && ok.(bool)
}

0 comments on commit 40341f6

Please sign in to comment.