Skip to content

Commit

Permalink
maintner: don't try to download negative bytes
Browse files Browse the repository at this point in the history
If the maintnerd server restarts with some uncommitted transactions,
clients may end up having more downloaded bytes than the server, after
the server restarts. We're invariably heading towards an ErrSplit if
this happens (which is fine, otherwise we'd need to add more code to
avoid duplicate event processing and it's unlikely to be worthwhile).

In such a situation, don't try to download new data from the server,
since that's guaranteed to fail with a 416 Range Not Satisfiable error.
Do try to reuse the prefix of the existing data as long as its checksum
matches, otherwise download the new version of the growing file.

Fixes golang/go#26922.
Updates golang/go#51211.

Change-Id: Ide099ee0740e854cfe764db8c3b4341836a237f4
Reviewed-on: https://go-review.googlesource.com/c/build/+/414434
TryBot-Result: Gopher Robot <gobot@golang.org>
Auto-Submit: Dmitri Shuralyov <dmitshur@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
Reviewed-by: Alex Rakoczy <jenny@golang.org>
Run-TryBot: Dmitri Shuralyov <dmitshur@golang.org>
  • Loading branch information
dmitshur authored and gopherbot committed Jun 29, 2022
1 parent 816bbcc commit 4443b10
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 31 deletions.
72 changes: 43 additions & 29 deletions maintner/netsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type netMutSource struct {
// Hooks for testing. If nil, unused:
testHookGetServerSegments func(context.Context, int64) ([]LogSegmentJSON, error)
testHookSyncSeg func(context.Context, LogSegmentJSON) (fileSeg, []byte, error)
testHookOnSplit func(sumCommon int64)
testHookFilePrefixSum224 func(file string, n int64) string
}

Expand Down Expand Up @@ -400,6 +401,9 @@ func (ns *netMutSource) getNewSegments(ctx context.Context) ([]fileSeg, error) {
// and check there is in fact something new.
sumCommon := ns.sumCommonPrefixSize(fileSegs, ns.last)
if sumCommon != sumLast {
if fn := ns.testHookOnSplit; fn != nil {
fn(sumCommon)
}
// Our history diverged from the source.
return nil, ErrSplit
} else if sumCur := sumSegSize(fileSegs); sumCommon == sumCur {
Expand Down Expand Up @@ -464,6 +468,8 @@ func sumJSONSegSize(segs []LogSegmentJSON) (sum int64) {
return
}

// sumCommonPrefixSize computes the size of the longest common prefix of file segments a and b
// that can be found quickly by checking for matching checksums between segment boundaries.
func (ns *netMutSource) sumCommonPrefixSize(a, b []fileSeg) (sum int64) {
for len(a) > 0 && len(b) > 0 {
sa, sb := a[0], b[0]
Expand Down Expand Up @@ -588,38 +594,46 @@ func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (_ file
}
}

// Otherwise, download.
req, err := http.NewRequestWithContext(ctx, "GET", segURL.String(), nil)
if err != nil {
return fileSeg{}, nil, err
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", len(have), seg.Size-1))
// Otherwise, download new data.
if int64(len(have)) < seg.Size {
req, err := http.NewRequestWithContext(ctx, "GET", segURL.String(), nil)
if err != nil {
return fileSeg{}, nil, err
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", len(have), seg.Size-1))

if !ns.quiet {
log.Printf("Downloading %d bytes of %s ...", seg.Size-int64(len(have)), segURL)
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return fileSeg{}, nil, fetchError{Err: err, PossiblyRetryable: true}
}
defer res.Body.Close()
if res.StatusCode/100 == 5 {
// Consider a 5xx server response to possibly succeed later.
return fileSeg{}, nil, fetchError{Err: fmt.Errorf("%s: %s", segURL.String(), res.Status), PossiblyRetryable: true}
} else if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusPartialContent {
return fileSeg{}, nil, fmt.Errorf("%s: %s", segURL.String(), res.Status)
}
slurp, err := io.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return fileSeg{}, nil, fetchError{Err: err, PossiblyRetryable: true}
if !ns.quiet {
log.Printf("Downloading %d bytes of %s ...", seg.Size-int64(len(have)), segURL)
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return fileSeg{}, nil, fetchError{Err: err, PossiblyRetryable: true}
}
defer res.Body.Close()
if res.StatusCode/100 == 5 {
// Consider a 5xx server response to possibly succeed later.
return fileSeg{}, nil, fetchError{Err: fmt.Errorf("%s: %s", segURL.String(), res.Status), PossiblyRetryable: true}
} else if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusPartialContent {
return fileSeg{}, nil, fmt.Errorf("%s: %s", segURL.String(), res.Status)
}
newData, err = io.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return fileSeg{}, nil, fetchError{Err: err, PossiblyRetryable: true}
}
}

// Commit to disk.
var newContents []byte
if int64(len(slurp)) == seg.Size {
newContents = slurp
} else if int64(len(have)+len(slurp)) == seg.Size {
newContents = append(have, slurp...)
if int64(len(newData)) == seg.Size {
newContents = newData
} else if int64(len(have)+len(newData)) == seg.Size {
newContents = append(have, newData...)
} else if int64(len(have)) > seg.Size {
// We have more data than the server; likely because it restarted with uncommitted
// transactions, and so we're headed towards an ErrSplit. Reuse the longest common
// prefix as long as its checksum matches.
newContents = have[:seg.Size]
}
got224 := fmt.Sprintf("%x", sha256.Sum224(newContents))
if got224 != seg.SHA224 {
Expand Down Expand Up @@ -655,7 +669,7 @@ func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (_ file
if !ns.quiet {
log.Printf("wrote %v", finalName)
}
return fileSeg{seg: seg.Number, file: finalName, size: seg.Size, sha224: seg.SHA224}, slurp, nil
return fileSeg{seg: seg.Number, file: finalName, size: seg.Size, sha224: seg.SHA224}, newData, nil
}

type LogSegmentJSON struct {
Expand Down
25 changes: 23 additions & 2 deletions maintner/netsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func TestGetNewSegments(t *testing.T) {

want []fileSeg
wantSplit bool
wantSumCommon int64
wantUnchanged bool
}
tests := []testCase{
Expand Down Expand Up @@ -280,6 +281,20 @@ func TestGetNewSegments(t *testing.T) {
prefixSum: "ffffffffff", // no match
wantSplit: true,
},
{
name: "split_error_same_first_seg_but_shorter",
lastSegs: []fileSeg{
{seg: 1, size: 101, sha224: "abc", file: "/fake/0001.mutlog"},
},
serverSegs: [][]LogSegmentJSON{
[]LogSegmentJSON{
{Number: 1, Size: 50, SHA224: "def"},
},
},
prefixSum: "def", // match
wantSplit: true,
wantSumCommon: 50,
},
{
name: "split_error_diff_final_seg",
lastSegs: []fileSeg{
Expand All @@ -292,8 +307,9 @@ func TestGetNewSegments(t *testing.T) {
{Number: 2, Size: 4, SHA224: "fff"},
},
},
prefixSum: "not_def",
wantSplit: true,
prefixSum: "not_def",
wantSplit: true,
wantSumCommon: 100,
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -328,6 +344,11 @@ func TestGetNewSegments(t *testing.T) {
file: fmt.Sprintf("/fake/%04d.mutlog", seg.Number),
}, nil, nil
},
testHookOnSplit: func(sumCommon int64) {
if got, want := sumCommon, tt.wantSumCommon; got != want {
t.Errorf("sumCommon = %v; want %v", got, want)
}
},
testHookFilePrefixSum224: func(file string, n int64) string {
if tt.prefixSum != "" {
return tt.prefixSum
Expand Down

0 comments on commit 4443b10

Please sign in to comment.