Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: close inprogress connect subsegments #102

Merged
merged 6 commits into from
Apr 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion strategy/sampling/centralized_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2513,12 +2513,14 @@ A:
break A
default:
// Assert that rule was added to manifest and the timestamp refreshed
ss.manifest.Lock()
if len(ss.manifest.Rules) == 1 &&
len(ss.manifest.Index) == 1 &&
ss.manifest.refreshedAt == 1500000000 {

ss.manifest.Unlock()
break A
}
ss.manifest.Unlock()
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion xray/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,18 @@ var xRayBeforeSendHandler = request.NamedHandler{
var xRayAfterSendHandler = request.NamedHandler{
Name: "XRayAfterSendHandler",
Fn: func(r *request.Request) {
endSubsegment(r)
curseg := GetSegment(r.HTTPRequest.Context())

if curseg.Name == "attempt" {
// An error could have prevented the connect subsegment from closing,
// so clean it up here.
for _, subsegment := range curseg.rawSubsegments {
if subsegment.Name == "connect" && subsegment.safeInProgress() {
subsegment.Close(nil)
return
}
}
}
},
}

Expand Down Expand Up @@ -143,6 +154,7 @@ func pushHandlers(c *client.Client) {
c.Handlers.Validate.PushFrontNamed(xRayBeforeValidateHandler)
c.Handlers.Build.PushBackNamed(xRayAfterBuildHandler)
c.Handlers.Sign.PushFrontNamed(xRayBeforeSignHandler)
c.Handlers.Send.PushBackNamed(xRayAfterSendHandler)
c.Handlers.Unmarshal.PushFrontNamed(xRayBeforeUnmarshalHandler)
c.Handlers.Unmarshal.PushBackNamed(xRayAfterUnmarshalHandler)
c.Handlers.Retry.PushFrontNamed(xRayBeforeRetryHandler)
Expand Down
117 changes: 117 additions & 0 deletions xray/aws_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package xray

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/lambda"
"github.com/stretchr/testify/assert"
)

func TestClientSuccessfulConnection(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
b := []byte(`{}`)
w.WriteHeader(http.StatusOK)
w.Write(b)
}))

svc := lambda.New(session.Must(session.NewSession(&aws.Config{
Endpoint: aws.String(ts.URL),
Region: aws.String("fake-moon-1"),
Credentials: credentials.NewStaticCredentials("akid", "secret", "noop")})))

ctx, root := BeginSegment(context.Background(), "Test")

AWS(svc.Client)

_, err := svc.ListFunctionsWithContext(ctx, &lambda.ListFunctionsInput{})
root.Close(nil)
assert.NoError(t, err)

s, e := TestDaemon.Recv()
assert.NoError(t, e)

subseg := &Segment{}
assert.NotEmpty(t, s.Subsegments)
assert.NoError(t, json.Unmarshal(s.Subsegments[0], &subseg))
assert.False(t, subseg.Fault)
assert.NotEmpty(t, subseg.Subsegments)

attemptSubseg := &Segment{}
for _, sub := range subseg.Subsegments {
tempSeg := &Segment{}
assert.NoError(t, json.Unmarshal(sub, &tempSeg))
if tempSeg.Name == "attempt" {
attemptSubseg = tempSeg
break
}
}

assert.Equal(t, "attempt", attemptSubseg.Name)
assert.Zero(t, attemptSubseg.openSegments)

// Connect subsegment will contain multiple child subsegments.
// The subsegment should fail since the endpoint is not valid,
// and should not be InProgress.
connectSubseg := &Segment{}
assert.NotEmpty(t, attemptSubseg.Subsegments)
assert.NoError(t, json.Unmarshal(attemptSubseg.Subsegments[0], &connectSubseg))
assert.Equal(t, "connect", connectSubseg.Name)
assert.False(t, connectSubseg.InProgress)
assert.NotZero(t, connectSubseg.EndTime)
assert.NotEmpty(t, connectSubseg.Subsegments)

// Ensure that the 'connect' subsegments are completed.
for _, sub := range connectSubseg.Subsegments {
tempSeg := &Segment{}
assert.NoError(t, json.Unmarshal(sub, &tempSeg))
assert.False(t, tempSeg.InProgress)
assert.NotZero(t, tempSeg.EndTime)
}
}

func TestClientFailedConnection(t *testing.T) {
svc := lambda.New(session.Must(session.NewSession(&aws.Config{
Region: aws.String("fake-moon-1"),
Credentials: credentials.NewStaticCredentials("akid", "secret", "noop")})))

ctx, root := BeginSegment(context.Background(), "Test")

AWS(svc.Client)

_, err := svc.ListFunctionsWithContext(ctx, &lambda.ListFunctionsInput{})
root.Close(nil)
assert.Error(t, err)

s, e := TestDaemon.Recv()
assert.NoError(t, e)

subseg := &Segment{}
assert.NotEmpty(t, s.Subsegments)
assert.NoError(t, json.Unmarshal(s.Subsegments[0], &subseg))
assert.True(t, subseg.Fault)
// Should contain 'marshal' and 'attempt' subsegments only.
assert.Len(t, subseg.Subsegments, 2)

attemptSubseg := &Segment{}
assert.NoError(t, json.Unmarshal(subseg.Subsegments[1], &attemptSubseg))
assert.Equal(t, "attempt", attemptSubseg.Name)
assert.Zero(t, attemptSubseg.openSegments)

// Connect subsegment will contain multiple child subsegments.
// The subsegment should fail since the endpoint is not valid,
// and should not be InProgress.
connectSubseg := &Segment{}
assert.NotEmpty(t, attemptSubseg.Subsegments)
assert.NoError(t, json.Unmarshal(attemptSubseg.Subsegments[0], &connectSubseg))
assert.Equal(t, "connect", connectSubseg.Name)
assert.False(t, connectSubseg.InProgress)
assert.NotZero(t, connectSubseg.EndTime)
assert.NotEmpty(t, connectSubseg.Subsegments)
}
50 changes: 50 additions & 0 deletions xray/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,30 @@ func TestRoundTrip(t *testing.T) {
assert.Equal(t, 200, subseg.HTTP.Response.Status)
assert.Equal(t, responseContentLength, subseg.HTTP.Response.ContentLength)
assert.Equal(t, headers.RootTraceID, s.TraceID)

connectSeg := &Segment{}
for _, sub := range subseg.Subsegments {
tempSeg := &Segment{}
assert.NoError(t, json.Unmarshal(sub, &tempSeg))
if tempSeg.Name == "connect" {
connectSeg = tempSeg
break
}
}

// Ensure that a 'connect' subsegment was created and closed
assert.Equal(t, "connect", connectSeg.Name)
assert.False(t, connectSeg.InProgress)
assert.NotZero(t, connectSeg.EndTime)
assert.NotEmpty(t, connectSeg.Subsegments)

// Ensure that the 'connect' subsegments are completed.
for _, sub := range connectSeg.Subsegments {
tempSeg := &Segment{}
assert.NoError(t, json.Unmarshal(sub, &tempSeg))
assert.False(t, tempSeg.InProgress)
assert.NotZero(t, tempSeg.EndTime)
}
}

func TestRoundTripWithError(t *testing.T) {
Expand Down Expand Up @@ -196,6 +220,32 @@ func TestBadRoundTrip(t *testing.T) {
assert.Equal(t, fmt.Sprintf("%v", err), subseg.Cause.Exceptions[0].Message)
}

func TestBadRoundTripDial(t *testing.T) {
ctx, root := BeginSegment(context.Background(), "Test")
reader := strings.NewReader("")
// Make a request against an unreachable endpoint.
req := httptest.NewRequest("GET", "https://0.0.0.0:0", reader)
req = req.WithContext(ctx)
_, err := rt.RoundTrip(req)
root.Close(nil)
assert.Error(t, err)

s, e := TestDaemon.Recv()
assert.NoError(t, e)
subseg := &Segment{}
assert.NoError(t, json.Unmarshal(s.Subsegments[0], &subseg))
assert.Equal(t, fmt.Sprintf("%v", err), subseg.Cause.Exceptions[0].Message)

// Also ensure that the 'connect' subsegment is closed and showing fault
connectSeg := &Segment{}
assert.NoError(t, json.Unmarshal(subseg.Subsegments[0], &connectSeg))
assert.Equal(t, "connect", connectSeg.Name)
assert.NotZero(t, connectSeg.EndTime)
assert.False(t, connectSeg.InProgress)
assert.True(t, connectSeg.Fault)
assert.NotEmpty(t, connectSeg.Subsegments)
}

func TestRoundTripReuseDatarace(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
b := []byte(`200 - Nothing to see`)
Expand Down
22 changes: 18 additions & 4 deletions xray/httptrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ func NewHTTPSubsegments(opCtx context.Context) *HTTPSubsegments {
// GetConn begins a connect subsegment if the HTTP operation
// subsegment is still in progress.
func (xt *HTTPSubsegments) GetConn(hostPort string) {
if GetSegment(xt.opCtx).InProgress {
if GetSegment(xt.opCtx).safeInProgress() {
xt.connCtx, _ = BeginSubsegment(xt.opCtx, "connect")
}
}

// DNSStart begins a dns subsegment if the HTTP operation
// subsegment is still in progress.
func (xt *HTTPSubsegments) DNSStart(info httptrace.DNSStartInfo) {
if GetSegment(xt.opCtx).safeInProgress() {
if GetSegment(xt.opCtx).safeInProgress() && xt.connCtx != nil {
xt.dnsCtx, _ = BeginSubsegment(xt.connCtx, "dns")
}
}
Expand All @@ -71,7 +71,7 @@ func (xt *HTTPSubsegments) DNSDone(info httptrace.DNSDoneInfo) {
// ConnectStart begins a dial subsegment if the HTTP operation
// subsegment is still in progress.
func (xt *HTTPSubsegments) ConnectStart(network, addr string) {
if GetSegment(xt.opCtx).safeInProgress() {
if GetSegment(xt.opCtx).safeInProgress() && xt.connCtx != nil {
xt.connectCtx, _ = BeginSubsegment(xt.connCtx, "dial")
}
}
Expand Down Expand Up @@ -121,10 +121,14 @@ func (xt *HTTPSubsegments) TLSHandshakeDone(connState tls.ConnectionState, err e
// metadata to the subsegment. If the connection is marked as reused,
// the connect subsegment is deleted.
func (xt *HTTPSubsegments) GotConn(info *httptrace.GotConnInfo, err error) {
if xt.connCtx != nil && GetSegment(xt.opCtx).InProgress { // GetConn may not have been called (client_test.TestBadRoundTrip)
if xt.connCtx != nil && GetSegment(xt.opCtx).safeInProgress() { // GetConn may not have been called (client_test.TestBadRoundTrip)
if info != nil {
if info.Reused {
GetSegment(xt.opCtx).RemoveSubsegment(GetSegment(xt.connCtx))
xt.mu.Lock()
// Remove the connCtx context since it is no longer needed.
xt.connCtx = nil
xt.mu.Unlock()
} else {
metadata := make(map[string]interface{})
metadata["reused"] = info.Reused
Expand All @@ -136,6 +140,8 @@ func (xt *HTTPSubsegments) GotConn(info *httptrace.GotConnInfo, err error) {
AddMetadataToNamespace(xt.connCtx, "http", "connection", metadata)
GetSegment(xt.connCtx).Close(err)
}
} else if xt.connCtx != nil && GetSegment(xt.connCtx).safeInProgress() {
GetSegment(xt.connCtx).Close(err)
}

if err == nil {
Expand All @@ -156,6 +162,14 @@ func (xt *HTTPSubsegments) WroteRequest(info httptrace.WroteRequestInfo) {
xt.responseCtx = resCtx
xt.mu.Unlock()
}

// In case the GotConn http trace handler wasn't called,
// we close the connection subsegment since a connection
// had to have been acquired before attempting to write
// the request.
if xt.connCtx != nil && GetSegment(xt.connCtx).safeInProgress() {
GetSegment(xt.connCtx).Close(nil)
}
}

// GotFirstResponseByte closes the response subsegment if the HTTP
Expand Down
2 changes: 1 addition & 1 deletion xray/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (seg *Segment) RemoveSubsegment(remove *Segment) bool {
seg.rawSubsegments[len(seg.rawSubsegments)-1] = nil
seg.rawSubsegments = seg.rawSubsegments[:len(seg.rawSubsegments)-1]

seg.totalSubSegments--
seg.ParentSegment.totalSubSegments--
seg.openSegments--
return true
}
Expand Down