Skip to content

Commit

Permalink
Merge pull request #3095 from dolthub/dhruv/push-timeout-bug
Browse files Browse the repository at this point in the history
go/libraries/doltcore: Fix bad upload table file retries
  • Loading branch information
Dhruv Sringari committed Mar 28, 2022
2 parents 9855a09 + 8f6c203 commit c5a41a9
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 100 deletions.
85 changes: 56 additions & 29 deletions go/libraries/doltcore/remotestorage/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,9 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
details := hashToDetails[h]
switch typedLoc := loc.Location.(type) {
case *remotesapi.UploadLoc_HttpPost:
err = dcs.httpPostUpload(ctx, loc.TableFileHash, typedLoc.HttpPost, bytes.NewBuffer(data), details.ContentHash)
err = dcs.httpPostUpload(ctx, loc.TableFileHash, typedLoc.HttpPost, int64(len(data)), details.ContentHash, func() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewBuffer(data)), nil
})
default:
break
}
Expand All @@ -940,34 +942,34 @@ type Sizer interface {
Size() int64
}

func (dcs *DoltChunkStore) httpPostUpload(ctx context.Context, hashBytes []byte, post *remotesapi.HttpPostTableFile, rd io.Reader, contentHash []byte) error {
return HttpPostUpload(ctx, dcs.httpFetcher, post, rd, contentHash)
func (dcs *DoltChunkStore) httpPostUpload(ctx context.Context, hashBytes []byte, post *remotesapi.HttpPostTableFile, contentLength int64, contentHash []byte, getBody func() (io.ReadCloser, error)) error {
return HttpPostUpload(ctx, dcs.httpFetcher, post, contentLength, contentHash, getBody)
}

func HttpPostUpload(ctx context.Context, httpFetcher HTTPFetcher, post *remotesapi.HttpPostTableFile, rd io.Reader, contentHash []byte) error {
req, err := http.NewRequest(http.MethodPut, post.Url, rd)
if err != nil {
return err
}

if sizer, ok := rd.(Sizer); ok {
req.ContentLength = sizer.Size()
}

if len(contentHash) > 0 {
md5s := base64.StdEncoding.EncodeToString(contentHash)
req.Header.Set("Content-MD5", md5s)
}

func HttpPostUpload(ctx context.Context, httpFetcher HTTPFetcher, post *remotesapi.HttpPostTableFile, contentLength int64, contentHash []byte, getBody func() (io.ReadCloser, error)) error {
fetcher := globalHttpFetcher
if httpFetcher != nil {
fetcher = httpFetcher
}

var resp *http.Response
op := func() error {
var err error
resp, err = fetcher.Do(req.WithContext(ctx))
r, err := getBody()
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPut, post.Url, r)
if err != nil {
return err
}

req.ContentLength = contentLength

if len(contentHash) > 0 {
md5s := base64.StdEncoding.EncodeToString(contentHash)
req.Header.Set("Content-MD5", md5s)
}

resp, err := fetcher.Do(req.WithContext(ctx))

if err == nil {
defer func() {
Expand All @@ -978,7 +980,7 @@ func HttpPostUpload(ctx context.Context, httpFetcher HTTPFetcher, post *remotesa
return processHttpResp(resp, err)
}

err = backoff.Retry(op, backoff.WithMaxRetries(uploadRetryParams, uploadRetryCount))
err := backoff.Retry(op, backoff.WithMaxRetries(uploadRetryParams, uploadRetryCount))

if err != nil {
return err
Expand Down Expand Up @@ -1175,7 +1177,7 @@ func (dcs *DoltChunkStore) SupportedOperations() nbs.TableFileStoreOps {
}

// WriteTableFile reads a table file from the provided reader and writes it to the chunk store.
func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error {
func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentLength uint64, contentHash []byte, getRd func() (io.ReadCloser, error)) error {
dcs.logf("getting upload location for file %s with %d chunks and size %s", fileId, numChunks, humanize.Bytes(contentLength))

fileIdBytes := hash.Parse(fileId)
Expand Down Expand Up @@ -1214,7 +1216,7 @@ func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, nu
}
dcs.logf("uploading %s to %s", fileId, urlStr)

err = dcs.httpPostUpload(ctx, loc.TableFileHash, typedLoc.HttpPost, rd, contentHash)
err = dcs.httpPostUpload(ctx, loc.TableFileHash, typedLoc.HttpPost, int64(contentLength), contentHash, getRd)

if err != nil {
dcs.logf("failed to upload %s to %s. err: %s", fileId, urlStr, err.Error())
Expand Down Expand Up @@ -1330,8 +1332,33 @@ func sanitizeSignedUrl(url string) string {
}
}

func (drtf DoltRemoteTableFile) ContentLength(ctx context.Context) (cl uint64, err error) {
var resp *http.Response
resp, err = drtf.getResp(ctx)
if err != nil {
return 0, err
}
defer func() {
e := resp.Body.Close()
if err == nil {
err = e
}
}()

return uint64(resp.ContentLength), err
}

// Open returns an io.ReadCloser which can be used to read the bytes of a table file.
func (drtf DoltRemoteTableFile) Open(ctx context.Context) (io.ReadCloser, uint64, error) {
func (drtf DoltRemoteTableFile) Open(ctx context.Context) (io.ReadCloser, error) {
resp, err := drtf.getResp(ctx)
if err != nil {
return nil, err
}

return resp.Body, nil
}

func (drtf DoltRemoteTableFile) getResp(ctx context.Context) (*http.Response, error) {
if drtf.info.RefreshAfter != nil && drtf.info.RefreshAfter.AsTime().After(time.Now()) {
resp, err := drtf.dcs.csClient.RefreshTableFileUrl(ctx, drtf.info.RefreshRequest)
if err == nil {
Expand All @@ -1342,20 +1369,20 @@ func (drtf DoltRemoteTableFile) Open(ctx context.Context) (io.ReadCloser, uint64

req, err := http.NewRequestWithContext(ctx, http.MethodGet, drtf.info.Url, nil)
if err != nil {
return nil, 0, err
return nil, err
}

resp, err := drtf.dcs.httpFetcher.Do(req)
if err != nil {
return nil, 0, err
return nil, err
}

if resp.StatusCode/100 != 2 {
defer resp.Body.Close()
body := make([]byte, 4096)
n, _ := io.ReadFull(resp.Body, body)
return nil, 0, fmt.Errorf("%w: status code: %d;\nurl: %s\n\nbody:\n\n%s\n", ErrRemoteTableFileGet, resp.StatusCode, sanitizeSignedUrl(drtf.info.Url), string(body[0:n]))
return nil, fmt.Errorf("%w: status code: %d;\nurl: %s\n\nbody:\n\n%s\n", ErrRemoteTableFileGet, resp.StatusCode, sanitizeSignedUrl(drtf.info.Url), string(body[0:n]))
}

return resp.Body, uint64(resp.ContentLength), nil
return resp, nil
}
5 changes: 1 addition & 4 deletions go/libraries/utils/iohelp/read_with_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ func (rws *ReaderWithStats) Start(updateFunc func(ReadStats)) {
}()
}

// Stop "closes" ReaderWithStats. Occasionally, we might pass this ReaderWithStats as the body of
// a http.Request. Since http.Request will close the body if it is an io.Closer, we can't have ReaderWithStats conform
// to io.Closer. We want full control over the Start and Stop of ReaderWithStats.
func (rws *ReaderWithStats) Stop() error {
func (rws *ReaderWithStats) Close() error {
close(rws.closeCh)

if closer, ok := rws.rd.(io.Closer); ok {
Expand Down
38 changes: 19 additions & 19 deletions go/store/datas/pull/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pull
import (
"context"
"errors"
"io"

"github.com/cenkalti/backoff"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -85,13 +86,6 @@ func mapTableFiles(tblFiles []nbs.TableFile) ([]string, map[string]nbs.TableFile
return fileIds, fileIDtoTblFile, fileIDtoNumChunks
}

func stopWithErr(stats *iohelp.ReaderWithStats, err *error) {
e := stats.Stop()
if *err == nil && e != nil {
*err = e
}
}

const concurrentTableFileDownloads = 3

func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<- TableFileEvent) error {
Expand Down Expand Up @@ -138,23 +132,29 @@ func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<-
return backoff.Permanent(errors.New("table file not found. please try again"))
}

rd, contentLength, err := tblFile.Open(ctx)
contentLength, err := tblFile.ContentLength(ctx)
if err != nil {
return err
}
rdStats := iohelp.NewReaderWithStats(rd, int64(contentLength))
defer stopWithErr(rdStats, &err)

rdStats.Start(func(s iohelp.ReadStats) {
report(TableFileEvent{
EventType: DownloadStats,
TableFiles: []nbs.TableFile{tblFile},
Stats: []iohelp.ReadStats{s},
})
})

report(TableFileEvent{EventType: DownloadStart, TableFiles: []nbs.TableFile{tblFile}})
err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), rdStats, contentLength, nil)
err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), contentLength, nil, func() (io.ReadCloser, error) {
rd, err := tblFile.Open(ctx)
if err != nil {
return nil, err
}
rdStats := iohelp.NewReaderWithStats(rd, int64(contentLength))

rdStats.Start(func(s iohelp.ReadStats) {
report(TableFileEvent{
EventType: DownloadStats,
TableFiles: []nbs.TableFile{tblFile},
Stats: []iohelp.ReadStats{s},
})
})

return rdStats, nil
})
if err != nil {
report(TableFileEvent{EventType: DownloadFailed, TableFiles: []nbs.TableFile{tblFile}})
return err
Expand Down
25 changes: 19 additions & 6 deletions go/store/datas/pull/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,12 @@ func (ttf *TestFailingTableFile) NumChunks() int {
return ttf.numChunks
}

func (ttf *TestFailingTableFile) Open(ctx context.Context) (io.ReadCloser, uint64, error) {
return io.NopCloser(bytes.NewReader([]byte{0x00})), 1, errors.New("this is a test error")
func (ttf *TestFailingTableFile) ContentLength(ctx context.Context) (uint64, error) {
return 1, errors.New("this is a test error")
}

func (ttf *TestFailingTableFile) Open(ctx context.Context) (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader([]byte{0x00})), errors.New("this is a test error")
}

type TestTableFile struct {
Expand All @@ -403,8 +407,12 @@ func (ttf *TestTableFile) NumChunks() int {
return ttf.numChunks
}

func (ttf *TestTableFile) Open(ctx context.Context) (io.ReadCloser, uint64, error) {
return io.NopCloser(bytes.NewReader(ttf.data)), uint64(len(ttf.data)), nil
func (ttf *TestTableFile) ContentLength(ctx context.Context) (uint64, error) {
return uint64(len(ttf.data)), nil
}

func (ttf *TestTableFile) Open(ctx context.Context) (io.ReadCloser, error) {
return io.NopCloser(bytes.NewReader(ttf.data)), nil
}

type TestTableFileWriter struct {
Expand Down Expand Up @@ -457,9 +465,14 @@ func (ttfs *TestTableFileStore) Size(ctx context.Context) (uint64, error) {
return sz, nil
}

func (ttfs *TestTableFileStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error {
func (ttfs *TestTableFileStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentLength uint64, contentHash []byte, getRd func() (io.ReadCloser, error)) error {
tblFile := &TestTableFileWriter{fileId, numChunks, bytes.NewBuffer(nil), ttfs}
_, err := io.Copy(tblFile, rd)
rd, err := getRd()
if err != nil {
return err
}
defer rd.Close()
_, err = io.Copy(tblFile, rd)

if err != nil {
return err
Expand Down
40 changes: 20 additions & 20 deletions go/store/datas/pull/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
Expand Down Expand Up @@ -201,36 +202,35 @@ type tempTblFile struct {
contentHash []byte
}

func (p *Puller) uploadTempTableFile(ctx context.Context, ae *atomicerr.AtomicError, tmpTblFile tempTblFile) error {
func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile) error {
fi, err := os.Stat(tmpTblFile.path)

if ae.SetIfError(err) {
return err
}

f, err := os.Open(tmpTblFile.path)

if ae.SetIfError(err) {
if err != nil {
return err
}

fileSize := fi.Size()
fWithStats := iohelp.NewReaderWithStats(f, fileSize)
fWithStats.Start(func(stats iohelp.ReadStats) {
p.addEvent(NewTFPullerEvent(UploadTableFileUpdateEvent, &TableFileEventDetails{
CurrentFileSize: fileSize,
Stats: stats,
}))
})
defer func() {
_ = fWithStats.Stop()

go func() {
_ = file.Remove(tmpTblFile.path)
}()
}()

return p.sinkDBCS.(nbs.TableFileStore).WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.numChunks, fWithStats, tmpTblFile.contentLen, tmpTblFile.contentHash)
return p.sinkDBCS.(nbs.TableFileStore).WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.numChunks, tmpTblFile.contentLen, tmpTblFile.contentHash, func() (io.ReadCloser, error) {
f, err := os.Open(tmpTblFile.path)
if err != nil {
return nil, err
}

fWithStats := iohelp.NewReaderWithStats(f, fileSize)
fWithStats.Start(func(stats iohelp.ReadStats) {
p.addEvent(NewTFPullerEvent(UploadTableFileUpdateEvent, &TableFileEventDetails{
CurrentFileSize: fileSize,
Stats: stats,
}))
})

return fWithStats, nil
})
}

func (p *Puller) processCompletedTables(ctx context.Context, ae *atomicerr.AtomicError, completedTables <-chan FilledWriters) {
Expand Down Expand Up @@ -270,7 +270,7 @@ func (p *Puller) processCompletedTables(ctx context.Context, ae *atomicerr.Atomi
contentHash: tblFile.wr.GetMD5(),
}

err = p.uploadTempTableFile(ctx, ae, ttf)
err = p.uploadTempTableFile(ctx, ttf)
if ae.SetIfError(err) {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/generational_chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ func (gcs *GenerationalNBS) Size(ctx context.Context) (uint64, error) {
}

// WriteTableFile will read a table file from the provided reader and write it to the new gen TableFileStore
func (gcs *GenerationalNBS) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error {
return gcs.newGen.WriteTableFile(ctx, fileId, numChunks, rd, contentLength, contentHash)
func (gcs *GenerationalNBS) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentLength uint64, contentHash []byte, getRd func() (io.ReadCloser, error)) error {
return gcs.newGen.WriteTableFile(ctx, fileId, numChunks, contentLength, contentHash, getRd)
}

// AddTableFilesToManifest adds table files to the manifest of the newgen cs
Expand Down
4 changes: 2 additions & 2 deletions go/store/nbs/nbs_metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (nbsMW *NBSMetricWrapper) Size(ctx context.Context) (uint64, error) {
}

// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore
func (nbsMW *NBSMetricWrapper) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error {
return nbsMW.nbs.WriteTableFile(ctx, fileId, numChunks, rd, contentLength, contentHash)
func (nbsMW *NBSMetricWrapper) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentLength uint64, contentHash []byte, getRd func() (io.ReadCloser, error)) error {
return nbsMW.nbs.WriteTableFile(ctx, fileId, numChunks, contentLength, contentHash, getRd)
}

// AddTableFilesToManifest adds table files to the manifest
Expand Down

0 comments on commit c5a41a9

Please sign in to comment.