Skip to content

Commit

Permalink
storage/cloud: remove CloseWithError
Browse files Browse the repository at this point in the history
Context cancellation is typically how operations are aborted.
When we open a new Writer, we pass a context, so we must assume
that that context can be cancelled, which should cancel the write
operation. Having an extra CloseWithError method in the API that
also cancels the operation is duplicative.

Release note: none.
  • Loading branch information
dt committed May 25, 2021
1 parent 040a7af commit 771a2af
Show file tree
Hide file tree
Showing 17 changed files with 68 additions and 115 deletions.
6 changes: 3 additions & 3 deletions pkg/blobs/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ func benchmarkStreamingReadFile(b *testing.B, tc *benchmarkTestCase) {
if err != nil {
b.Fatal(err)
}
w, err := writeTo.Writer(tc.fileName)
w, err := writeTo.Writer(context.Background(), tc.fileName)
if err != nil {
b.Fatal(err)
}
if _, err := io.Copy(w, reader); err != nil {
b.Fatal(errors.CombineErrors(err, w.CloseWithError(err)))
b.Fatal(errors.CombineErrors(err, w.Close()))
}
if err := w.Close(); err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -147,7 +147,7 @@ func benchmarkStreamingWriteFile(b *testing.B, tc *benchmarkTestCase) {
b.Fatal(err)
}
if _, err := io.Copy(w, bytes.NewReader(content)); err != nil {
b.Fatal(errors.CombineErrors(err, w.CloseWithError(err)))
b.Fatal(errors.CombineErrors(w.Close(), err))
}
if err := w.Close(); err != nil {
b.Fatal(err)
Expand Down
35 changes: 7 additions & 28 deletions pkg/blobs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@ import (
"google.golang.org/grpc/metadata"
)

// WriteCloserWithError extends WriteCloser with an extra CloseWithError func.
type WriteCloserWithError interface {
io.WriteCloser
// CloseWithError closes the writer with an error, which may choose to abort
// rather than complete any write operations.
CloseWithError(error) error
}

// BlobClient provides an interface for file access on all nodes' local storage.
// Given the nodeID of the node on which the operation should occur, the a blob
// client should be able to find the correct node and call its blob service API.
Expand All @@ -40,7 +32,7 @@ type BlobClient interface {
ReadFile(ctx context.Context, file string, offset int64) (io.ReadCloser, int64, error)

// Writer opens the named payload on the requested node for writing.
Writer(ctx context.Context, file string) (WriteCloserWithError, error)
Writer(ctx context.Context, file string) (io.WriteCloser, error)

// List lists the corresponding filenames from the requested node.
// The requested node can be the current node.
Expand Down Expand Up @@ -82,9 +74,8 @@ func (c *remoteClient) ReadFile(
}

type streamWriter struct {
s blobspb.Blob_PutStreamClient
buf blobspb.StreamChunk
cancel func()
s blobspb.Blob_PutStreamClient
buf blobspb.StreamChunk
}

func (w *streamWriter) Write(p []byte) (int, error) {
Expand All @@ -105,29 +96,17 @@ func (w *streamWriter) Write(p []byte) (int, error) {

func (w *streamWriter) Close() error {
_, err := w.s.CloseAndRecv()
w.cancel()
return err
}

func (w *streamWriter) CloseWithError(err error) error {
if err == nil {
return w.Close()
}
w.cancel()
_, err = w.s.CloseAndRecv()
return err
}

func (c *remoteClient) Writer(ctx context.Context, file string) (WriteCloserWithError, error) {
func (c *remoteClient) Writer(ctx context.Context, file string) (io.WriteCloser, error) {
ctx = metadata.AppendToOutgoingContext(ctx, "filename", file)
ctx, cancel := context.WithCancel(ctx)
stream, err := c.blobClient.PutStream(ctx)
if err != nil {
cancel()
return nil, err
}
buf := make([]byte, 0, chunkSize)
return &streamWriter{s: stream, buf: blobspb.StreamChunk{Payload: buf}, cancel: cancel}, nil
return &streamWriter{s: stream, buf: blobspb.StreamChunk{Payload: buf}}, nil
}

func (c *remoteClient) List(ctx context.Context, pattern string) ([]string, error) {
Expand Down Expand Up @@ -180,8 +159,8 @@ func (c *localClient) ReadFile(
return c.localStorage.ReadFile(file, offset)
}

func (c *localClient) Writer(ctx context.Context, file string) (WriteCloserWithError, error) {
return c.localStorage.Writer(file)
func (c *localClient) Writer(ctx context.Context, file string) (io.WriteCloser, error) {
return c.localStorage.Writer(ctx, file)
}

func (c *localClient) List(ctx context.Context, pattern string) ([]string, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/blobs/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestBlobClientWriteFile(t *testing.T) {
return err
}
if _, err := io.Copy(w, bytes.NewReader(byteContent)); err != nil {
return errors.CombineErrors(w.CloseWithError(err), err)
return errors.CombineErrors(w.Close(), err)
}
return w.Close()
}()
Expand Down
24 changes: 13 additions & 11 deletions pkg/blobs/local_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package blobs

import (
"context"
"io"
"io/ioutil"
"os"
Expand Down Expand Up @@ -66,6 +67,7 @@ func (l *LocalStorage) prependExternalIODir(path string) (string, error) {

type localWriter struct {
f *os.File
ctx context.Context
tmp, dest string
}

Expand All @@ -74,6 +76,12 @@ func (l localWriter) Write(p []byte) (int, error) {
}

func (l localWriter) Close() error {
if err := l.ctx.Err(); err != nil {
closeErr := l.f.Close()
rmErr := os.Remove(l.tmp)
return errors.CombineErrors(err, errors.Wrap(errors.CombineErrors(rmErr, closeErr), "cleaning up"))
}

syncErr := l.f.Sync()
closeErr := l.f.Close()
if err := errors.CombineErrors(closeErr, syncErr); err != nil {
Expand All @@ -87,21 +95,15 @@ func (l localWriter) Close() error {
)
}

func (l localWriter) CloseWithError(err error) error {
if err == nil {
return l.Close()
}
closeErr := l.f.Close()
rmErr := os.Remove(l.tmp)
return errors.CombineErrors(rmErr, closeErr)
}

// Writer prepends IO dir to filename and writes the content to that local file.
func (l *LocalStorage) Writer(filename string) (WriteCloserWithError, error) {
func (l *LocalStorage) Writer(ctx context.Context, filename string) (io.WriteCloser, error) {
fullPath, err := l.prependExternalIODir(filename)
if err != nil {
return nil, err
}
if err := ctx.Err(); err != nil {
return nil, err
}

targetDir := filepath.Dir(fullPath)
if err = os.MkdirAll(targetDir, 0755); err != nil {
Expand All @@ -121,7 +123,7 @@ func (l *LocalStorage) Writer(filename string) (WriteCloserWithError, error) {
if err != nil {
return nil, errors.Wrap(err, "creating temporary file")
}
return localWriter{tmp: tmpFile.Name(), dest: fullPath, f: tmpFile}, nil
return localWriter{tmp: tmpFile.Name(), dest: fullPath, f: tmpFile, ctx: ctx}, nil
}

// ReadFile prepends IO dir to filename and reads the content of that local file.
Expand Down
15 changes: 10 additions & 5 deletions pkg/blobs/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,21 @@ func (s *Service) PutStream(stream blobspb.Blob_PutStreamServer) error {
}
reader := newPutStreamReader(stream)
defer reader.Close()
w, err := s.localStorage.Writer(filename[0])
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()

w, err := s.localStorage.Writer(ctx, filename[0])
if err != nil {
cancel()
return err
}

if _, err := io.Copy(w, reader); err != nil {
closeErr := w.CloseWithError(err)
return errors.CombineErrors(err, closeErr)
cancel()
return errors.CombineErrors(w.Close(), err)
}
return w.Close()
err = w.Close()
cancel()
return err
}

// List implements the gRPC service.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/testutils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (es *generatorExternalStorage) Size(ctx context.Context, basename string) (

func (es *generatorExternalStorage) Writer(
ctx context.Context, basename string,
) (cloud.WriteCloserWithError, error) {
) (io.WriteCloser, error) {
return nil, errors.New("unsupported")
}

Expand Down
15 changes: 8 additions & 7 deletions pkg/sql/copy_file_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/errors"
"github.com/lib/pq"
)
Expand All @@ -41,7 +40,8 @@ var _ copyMachineInterface = &fileUploadMachine{}

type fileUploadMachine struct {
c *copyMachine
w cloud.WriteCloserWithError
w io.WriteCloser
cancel func()
failureCleanup func()
}

Expand Down Expand Up @@ -131,7 +131,9 @@ func newFileUploadMachine(
return nil, err
}

f.w, err = store.Writer(ctx, "")
writeCtx, canecelWriteCtx := context.WithCancel(ctx)
f.cancel = canecelWriteCtx
f.w, err = store.Writer(writeCtx, "")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -168,11 +170,10 @@ func CopyInFileStmt(destination, schema, table string) string {

func (f *fileUploadMachine) run(ctx context.Context) error {
err := f.c.run(ctx)
if err != nil {
err = errors.CombineErrors(f.w.CloseWithError(err), err)
} else {
err = f.w.Close()
if err != nil && f.cancel != nil {
f.cancel()
}
err = errors.CombineErrors(f.w.Close(), err)

if err != nil {
f.failureCleanup()
Expand Down
4 changes: 1 addition & 3 deletions pkg/storage/cloud/amazon/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,7 @@ func (s *s3Storage) Settings() *cluster.Settings {
return s.settings
}

func (s *s3Storage) Writer(
ctx context.Context, basename string,
) (cloud.WriteCloserWithError, error) {
func (s *s3Storage) Writer(ctx context.Context, basename string) (io.WriteCloser, error) {
sess, err := s.newSession(ctx)
if err != nil {
return nil, err
Expand Down
4 changes: 1 addition & 3 deletions pkg/storage/cloud/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ func (s *azureStorage) Settings() *cluster.Settings {
return s.settings
}

func (s *azureStorage) Writer(
ctx context.Context, basename string,
) (cloud.WriteCloserWithError, error) {
func (s *azureStorage) Writer(ctx context.Context, basename string) (io.WriteCloser, error) {
blob := s.getBlob(basename)
return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error {
_, err := azblob.UploadStreamToBlockBlob(
Expand Down
21 changes: 9 additions & 12 deletions pkg/storage/cloud/cloud_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ func CheckHTTPContentRangeHeader(h string, pos int64) (int64, error) {
// that has a background process reading from it. It *must* be Closed().
func BackgroundPipe(
ctx context.Context, fn func(ctx context.Context, pr io.Reader) error,
) WriteCloserWithError {
) io.WriteCloser {
pr, pw := io.Pipe()
w := &backgroundPipe{w: pw, grp: ctxgroup.WithContext(ctx)}
w := &backgroundPipe{w: pw, grp: ctxgroup.WithContext(ctx), ctx: ctx}
w.grp.GoCtx(func(ctc context.Context) error {
err := fn(ctx, pr)
if err != nil {
Expand All @@ -276,6 +276,7 @@ func BackgroundPipe(
type backgroundPipe struct {
w *io.PipeWriter
grp ctxgroup.Group
ctx context.Context
}

// Write writes to the writer.
Expand All @@ -285,27 +286,23 @@ func (s *backgroundPipe) Write(p []byte) (int, error) {

// Close closes the writer, finishing the write operation.
func (s *backgroundPipe) Close() error {
err := s.w.Close()
err := s.w.CloseWithError(s.ctx.Err())
return errors.CombineErrors(err, s.grp.Wait())
}

// CloseWithError closes the Writer with an error, which may discard prior
// writes and abort the overall write operation.
func (s *backgroundPipe) CloseWithError(err error) error {
e := s.w.CloseWithError(err)
return errors.CombineErrors(e, s.grp.Wait())
}

// WriteFile is a helper for writing the content of a Reader to the given path
// of an ExternalStorage.
func WriteFile(ctx context.Context, dest ExternalStorage, basename string, src io.Reader) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

w, err := dest.Writer(ctx, basename)
if err != nil {
return errors.Wrap(err, "opening object for writing")
}
if _, err := io.Copy(w, src); err != nil {
closeErr := w.CloseWithError(err)
return errors.CombineErrors(closeErr, err)
cancel()
return errors.CombineErrors(w.Close(), err)
}
return errors.Wrap(w.Close(), "closing object")
}
26 changes: 6 additions & 20 deletions pkg/storage/cloud/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ import (
// This file is for interfaces only and should not contain any implementation
// code. All concrete implementations should be added to pkg/storage/cloudimpl.

// WriteCloserWithError extends WriteCloser with an extra CloseWithError func.
type WriteCloserWithError interface {
io.WriteCloser
// CloseWithError closes the writer with an error, which may choose to abort
// rather than complete any write operations.
CloseWithError(error) error
}

// ExternalStorage provides an API to read and write files in some storage,
// namely various cloud storage providers, for example to store backups.
// Generally an implementation is instantiated pointing to some base path or
Expand Down Expand Up @@ -75,18 +67,12 @@ type ExternalStorage interface {

// Writer returns a writer for the requested name.
//
// A Writer *must* be closed via either Close or CloseWithError, and if
// closing returns a non-nil error, that error should be handled or reported
// to the user (likely using errors.CombineErrors(closeErr, reasonErr). This
// is because an implementation may wait or buffer written data until Close
// or a Write call may return io.EOF if the writer's stream is closed due to
// an error that will be reported by Close.
//
// The CloseWithError(err) alternative to Close(), if passed a non-nil error
// may elect to abort the operation, skip flushing any buffers or otherwise
// end the operation with the minimum additional work in the event the
// caller no longer wishes to complete it, e.g. due to some other error.
Writer(ctx context.Context, basename string) (WriteCloserWithError, error)
// A Writer *must* be closed via either Close, and if closing returns a
// non-nil error, that error should be handled or reported to the user -- an
// implementation may buffer written data until Close and only then return
// an error, or Write may retrun an opaque io.EOF with the underlying cause
// returned by the subsequent Close().
Writer(ctx context.Context, basename string) (io.WriteCloser, error)

// ListFiles returns files that match a globs-style pattern. The returned
// results are usually relative to the base path, meaning an ExternalStorage
Expand Down
4 changes: 1 addition & 3 deletions pkg/storage/cloud/gcp/gcs_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ func makeGCSStorage(
}, nil
}

func (g *gcsStorage) Writer(
ctx context.Context, basename string,
) (cloud.WriteCloserWithError, error) {
func (g *gcsStorage) Writer(ctx context.Context, basename string) (io.WriteCloser, error) {
w := g.bucket.Object(path.Join(g.prefix, basename)).NewWriter(ctx)
return w, nil
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/storage/cloud/httpsink/http_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,7 @@ func (h *httpStorage) ReadFileAt(
return stream.Body, size, nil
}

func (h *httpStorage) Writer(
ctx context.Context, basename string,
) (cloud.WriteCloserWithError, error) {
func (h *httpStorage) Writer(ctx context.Context, basename string) (io.WriteCloser, error) {
return cloud.BackgroundPipe(ctx, func(ctx context.Context, r io.Reader) error {
_, err := h.reqNoBody(ctx, "PUT", basename, r)
return err
Expand Down

0 comments on commit 771a2af

Please sign in to comment.