Skip to content

Commit

Permalink
Merge branch 'master' into staging-client
Browse files Browse the repository at this point in the history
  • Loading branch information
rod-hynes committed Aug 19, 2020
2 parents 7f73098 + a9f0895 commit 10cb019
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 18 deletions.
4 changes: 3 additions & 1 deletion psiphon/TCPConn.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ func proxiedTcpDial(
go func() {
conn, err := upstreamDialer("tcp", addr)
if _, ok := err.(*upstreamproxy.Error); ok {
NoticeUpstreamProxyError(err)
if config.UpstreamProxyErrorCallback != nil {
config.UpstreamProxyErrorCallback(err)
}
}
resultChannel <- upstreamDialResult{
conn: conn,
Expand Down
2 changes: 2 additions & 0 deletions psiphon/common/parameters/clientParameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const (
StaggerConnectionWorkersPeriod = "StaggerConnectionWorkersPeriod"
StaggerConnectionWorkersJitter = "StaggerConnectionWorkersJitter"
LimitIntensiveConnectionWorkers = "LimitIntensiveConnectionWorkers"
UpstreamProxyErrorWaitDuration = "UpstreamProxyErrorWaitDuration"
IgnoreHandshakeStatsRegexps = "IgnoreHandshakeStatsRegexps"
PrioritizeTunnelProtocolsProbability = "PrioritizeTunnelProtocolsProbability"
PrioritizeTunnelProtocols = "PrioritizeTunnelProtocols"
Expand Down Expand Up @@ -281,6 +282,7 @@ var defaultClientParameters = map[string]struct {
StaggerConnectionWorkersPeriod: {value: time.Duration(0), minimum: time.Duration(0)},
StaggerConnectionWorkersJitter: {value: 0.1, minimum: 0.0},
LimitIntensiveConnectionWorkers: {value: 0, minimum: 0},
UpstreamProxyErrorWaitDuration: {value: 30 * time.Second, minimum: time.Duration(0)},
IgnoreHandshakeStatsRegexps: {value: false},
TunnelOperateShutdownTimeout: {value: 1 * time.Second, minimum: 1 * time.Millisecond, flags: useNetworkLatencyMultiplier},
TunnelPortForwardDialTimeout: {value: 10 * time.Second, minimum: 1 * time.Millisecond, flags: useNetworkLatencyMultiplier},
Expand Down
50 changes: 50 additions & 0 deletions psiphon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,55 @@ loop:
continue
}

// upstreamProxyErrorCallback will post NoticeUpstreamProxyError when the
// tunnel dial fails due to an upstream proxy error. As the upstream proxy
// is user configured, the error message may need to be relayed to the user.

upstreamProxyErrorCallback := func(err error) {

// Do not post the notice when overall context is canceled or timed-out:
// the upstream proxy connection error is likely a result of the
// cancellation, and not a condition to be fixed by the user.
//
// Concurrency note: due to accessing controller.establishCtx,
// upstreamProxyErrorCallback must only be called while establishing;
// specifically, from the following ConnectTunnel call.

if controller.establishCtx.Err() != nil {
return
}

// Another class of non-fatal upstream proxy error arises from proxies
// which limit permitted proxied ports. In this case, some tunnels may fail
// due to dial port, while others may eventually succeed. To avoid this
// class of errors, delay posting the notice. If the upstream proxy works,
// _some_ tunnel should connect. If the upstream proxy configuration is
// broken, the error should persist and eventually get posted.

p := controller.config.GetClientParameters().Get()
workerPoolSize := p.Int(parameters.ConnectionWorkerPoolSize)
waitDuration := p.Duration(parameters.UpstreamProxyErrorWaitDuration)
p.Close()

controller.concurrentEstablishTunnelsMutex.Lock()
establishConnectTunnelCount := controller.establishConnectTunnelCount
controller.concurrentEstablishTunnelsMutex.Unlock()

// Delay until either UpstreamProxyErrorWaitDuration has elapsed (excluding
// time spent waiting for network connectivity) or, to post sooner if many
// candidates are failing, at least workerPoolSize tunnel connection
// attempts have completed. We infer that at least workerPoolSize
// candidates have completed by checking that at least 2*workerPoolSize
// candidates have started.

if time.Since(candidateServerEntry.adjustedEstablishStartTime) < waitDuration &&
establishConnectTunnelCount < 2*workerPoolSize {
return
}

NoticeUpstreamProxyError(err)
}

// Select the tunnel protocol. The selection will be made at random
// from protocols supported by the server entry, optionally limited by
// LimitTunnelProtocols.
Expand Down Expand Up @@ -1727,6 +1776,7 @@ loop:

dialParams, err := MakeDialParameters(
controller.config,
upstreamProxyErrorCallback,
canReplay,
selectProtocol,
candidateServerEntry.serverEntry,
Expand Down
6 changes: 5 additions & 1 deletion psiphon/dialParameters.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ type DialParameters struct {
// when establishment is cancelled.
func MakeDialParameters(
config *Config,
upstreamProxyErrorCallback func(error),
canReplay func(serverEntry *protocol.ServerEntry, replayProtocol string) bool,
selectProtocol func(serverEntry *protocol.ServerEntry) (string, bool),
serverEntry *protocol.ServerEntry,
Expand Down Expand Up @@ -262,8 +263,10 @@ func MakeDialParameters(
isReplay = false
}

// Set IsExchanged so that full dial parameters are stored and replayed upon success.
// Set IsExchanged such that full dial parameters are stored and replayed
// upon success.
dialParams.IsExchanged = false

dialParams.ServerEntry = serverEntry
dialParams.NetworkID = networkID
dialParams.IsReplay = isReplay
Expand Down Expand Up @@ -662,6 +665,7 @@ func MakeDialParameters(
IPv6Synthesizer: config.IPv6Synthesizer,
TrustedCACertificatesFilename: config.TrustedCACertificatesFilename,
FragmentorConfig: fragmentor.NewUpstreamConfig(p, dialParams.TunnelProtocol, dialParams.FragmentorSeed),
UpstreamProxyErrorCallback: upstreamProxyErrorCallback,
}

// Unconditionally initialize MeekResolvedIPAddress, so a valid string can
Expand Down
32 changes: 20 additions & 12 deletions psiphon/dialParameters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {

// Test: expected dial parameter fields set

dialParams, err := MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
upstreamProxyErrorCallback := func(_ error) {}

dialParams, err := MakeDialParameters(
clientConfig, upstreamProxyErrorCallback, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
if err != nil {
t.Fatalf("MakeDialParameters failed: %s", err)
}
Expand Down Expand Up @@ -201,11 +204,16 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
t.Fatalf("missing API request fields")
}

dialConfig := dialParams.GetDialConfig()
if dialConfig.UpstreamProxyErrorCallback == nil {
t.Fatalf("missing upstreamProxyErrorCallback")
}

// Test: no replay after dial reported to fail

dialParams.Failed(clientConfig)

dialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
if err != nil {
t.Fatalf("MakeDialParameters failed: %s", err)
}
Expand All @@ -220,7 +228,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {

testNetworkID = prng.HexString(8)

dialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
if err != nil {
t.Fatalf("MakeDialParameters failed: %s", err)
}
Expand All @@ -237,7 +245,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {

dialParams.Succeeded()

replayDialParams, err := MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
replayDialParams, err := MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
if err != nil {
t.Fatalf("MakeDialParameters failed: %s", err)
}
Expand Down Expand Up @@ -323,7 +331,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
t.Fatalf("SetClientParameters failed: %s", err)
}

dialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
if err != nil {
t.Fatalf("MakeDialParameters failed: %s", err)
}
Expand All @@ -338,7 +346,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {

time.Sleep(1 * time.Second)

dialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
if err != nil {
t.Fatalf("MakeDialParameters failed: %s", err)
}
Expand All @@ -353,7 +361,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {

serverEntries[0].ConfigurationVersion += 1

dialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
if err != nil {
t.Fatalf("MakeDialParameters failed: %s", err)
}
Expand All @@ -377,14 +385,14 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
t.Fatalf("SetClientParameters failed: %s", err)
}

dialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
dialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
if err != nil {
t.Fatalf("MakeDialParameters failed: %s", err)
}

dialParams.Succeeded()

replayDialParams, err = MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
replayDialParams, err = MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntries[0], false, 0, 0)
if err != nil {
t.Fatalf("MakeDialParameters failed: %s", err)
}
Expand Down Expand Up @@ -432,7 +440,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {

if i%10 == 0 {

dialParams, err := MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntry, false, 0, 0)
dialParams, err := MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntry, false, 0, 0)
if err != nil {
t.Fatalf("MakeDialParameters failed: %s", err)
}
Expand Down Expand Up @@ -461,7 +469,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
t.Fatalf("ServerEntryIterator.Next failed: %s", err)
}

dialParams, err := MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntry, false, 0, 0)
dialParams, err := MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntry, false, 0, 0)
if err != nil {
t.Fatalf("MakeDialParameters failed: %s", err)
}
Expand All @@ -483,7 +491,7 @@ func runDialParametersAndReplay(t *testing.T, tunnelProtocol string) {
t.Fatalf("ServerEntryIterator.Next failed: %s", err)
}

dialParams, err := MakeDialParameters(clientConfig, canReplay, selectProtocol, serverEntry, false, 0, 0)
dialParams, err := MakeDialParameters(clientConfig, nil, canReplay, selectProtocol, serverEntry, false, 0, 0)
if err != nil {
t.Fatalf("MakeDialParameters failed: %s", err)
}
Expand Down
1 change: 1 addition & 0 deletions psiphon/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func TestServerEntryExchange(t *testing.T) {

dialParams, err := MakeDialParameters(
config,
nil,
canReplay,
selectProtocol,
serverEntry,
Expand Down
5 changes: 5 additions & 0 deletions psiphon/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ type DialConfig struct {
// FragmentorConfig specifies whether to layer a fragmentor.Conn on top
// of dialed TCP conns, and the fragmentation configuration to use.
FragmentorConfig *fragmentor.Config

// UpstreamProxyErrorCallback is called when a dial fails due to an upstream
// proxy error. As the upstream proxy is user configured, the error message
// may need to be relayed to the user.
UpstreamProxyErrorCallback func(error)
}

// NetworkConnectivityChecker defines the interface to the external
Expand Down
8 changes: 5 additions & 3 deletions psiphon/notice.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,9 +667,11 @@ func NoticeSplitTunnelRegion(region string) {
// NoticeUpstreamProxyError reports an error when connecting to an upstream proxy. The
// user may have input, for example, an incorrect address or incorrect credentials.
func NoticeUpstreamProxyError(err error) {
singletonNoticeLogger.outputNotice(
message := err.Error()
outputRepetitiveNotice(
"UpstreamProxyError", message, 0,
"UpstreamProxyError", 0,
"message", err.Error())
"message", message)
}

// NoticeClientUpgradeDownloadedBytes reports client upgrade download progress.
Expand Down Expand Up @@ -898,7 +900,7 @@ func outputRepetitiveNotice(

state, keyFound := repetitiveNoticeStates[repetitionKey]
if !keyFound {
state = new(repetitiveNoticeState)
state = &repetitiveNoticeState{message: repetitionMessage}
repetitiveNoticeStates[repetitionKey] = state
}

Expand Down
1 change: 1 addition & 0 deletions psiphon/server/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,7 @@ func getRequestLogFields(
if index != -1 {
strValue = strValue[:index+len(target)] + "<redacted>"
}
logFields[expectedParam.name] = strValue

default:
if expectedParam.flags&requestParamLogStringAsInt != 0 {
Expand Down
1 change: 1 addition & 0 deletions psiphon/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2199,6 +2199,7 @@ func storePruneServerEntriesTest(

dialParams, err := psiphon.MakeDialParameters(
clientConfig,
nil,
func(_ *protocol.ServerEntry, _ string) bool { return true },
func(serverEntry *protocol.ServerEntry) (string, bool) {
return runConfig.tunnelProtocol, true
Expand Down
1 change: 1 addition & 0 deletions psiphon/server/sessionID_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func TestDuplicateSessionID(t *testing.T) {

dialParams, err := psiphon.MakeDialParameters(
clientConfig,
nil,
func(_ *protocol.ServerEntry, _ string) bool { return false },
func(_ *protocol.ServerEntry) (string, bool) { return "OSSH", true },
serverEntry,
Expand Down
7 changes: 7 additions & 0 deletions psiphon/tactics.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,15 @@ func fetchTactics(
return tacticsProtocols[index], true
}

// No upstreamProxyErrorCallback is set: for tunnel establishment, the
// tactics head start is short, and tunnel connections will eventually post
// NoticeUpstreamProxyError for any persistent upstream proxy error
// conditions. Non-tunnel establishment cases, such as SendFeedback, which
// use tactics are not currently expected to post NoticeUpstreamProxyError.

dialParams, err := MakeDialParameters(
config,
nil,
canReplay,
selectProtocol,
serverEntry,
Expand Down
2 changes: 1 addition & 1 deletion psiphon/upstreamproxy/proxy_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (pc *proxyConn) handshake(addr, username, password string) error {
if username == "" {
pc.httpClientConn.Close()
pc.authState = HTTP_AUTH_STATE_FAILURE
return proxyError(fmt.Errorf("ho username credentials provided for proxy auth"))
return proxyError(fmt.Errorf("no username credentials provided for proxy auth"))
}
if err == errPersistEOF {
// The server may send Connection: close,
Expand Down

0 comments on commit 10cb019

Please sign in to comment.