Skip to content

Commit

Permalink
Fail copy job if single blob does not exist (#1981)
Browse files Browse the repository at this point in the history
* Job fail if single file does not exist

* fixed change

* fail only on a single file not existing

* fail on file not found

* fail on file not found

* fail on file not found

* cleanup

* added tests

* cleanup

* removed test
  • Loading branch information
tasherif-msft committed Jan 12, 2023
1 parent e376658 commit 708a138
Show file tree
Hide file tree
Showing 22 changed files with 387 additions and 69 deletions.
8 changes: 5 additions & 3 deletions cmd/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"
"io"
"math"
"net/url"
Expand All @@ -36,6 +35,8 @@ import (
"sync"
"time"

"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"

"github.com/Azure/azure-pipeline-go/pipeline"

"github.com/Azure/azure-storage-blob-go/azblob"
Expand Down Expand Up @@ -1113,7 +1114,9 @@ type CookedCopyCmdArgs struct {
FollowSymlinks bool
ForceWrite common.OverwriteOption // says whether we should try to overwrite
ForceIfReadOnly bool // says whether we should _force_ any overwrites (triggered by forceWrite) to work on Azure Files objects that are set to read-only
autoDecompress bool
IsSourceDir bool

autoDecompress bool

// options from flags
blockSize int64
Expand Down Expand Up @@ -1672,7 +1675,6 @@ func (cca *CookedCopyCmdArgs) ReportProgressOrExit(lcm common.LifecycleMgr) (tot
// indicate whether constrained by disk or not
isBenchmark := cca.FromTo.From() == common.ELocation.Benchmark()
perfString, diskString := getPerfDisplayText(summary.PerfStrings, summary.PerfConstraint, duration, isBenchmark)

return fmt.Sprintf("%.1f %%, %v Done, %v Failed, %v Pending, %v Skipped, %v Total%s, %s%s%s",
summary.PercentComplete,
summary.TransfersCompleted,
Expand Down
34 changes: 24 additions & 10 deletions cmd/copyEnumeratorInit.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ type BucketToContainerNameResolver interface {
ResolveName(bucketName string) (string, error)
}

func (cca *CookedCopyCmdArgs) validateSourceDir(traverser ResourceTraverser) error {
var err error
// Ensure we're only copying a directory under valid conditions
cca.IsSourceDir, err = traverser.IsDirectory(true)
if cca.IsSourceDir &&
!cca.Recursive && // Copies the folder & everything under it
!cca.StripTopDir { // Copies only everything under it
// todo: dir only transfer, also todo: support syncing the root folder's acls on sync.
return errors.New("cannot use directory as source without --recursive or a trailing wildcard (/*)")
}
// check if error is file not found - if it is then we need to make sure it's not a wild card
if err != nil && strings.EqualFold(err.Error(), common.FILE_NOT_FOUND) && !cca.StripTopDir {
return err
}
return nil
}

func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrderRequest, ctx context.Context) (*CopyEnumerator, error) {
var traverser ResourceTraverser

Expand Down Expand Up @@ -91,18 +108,14 @@ func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrde
return nil, err
}

// Ensure we're only copying a directory under valid conditions
isSourceDir := traverser.IsDirectory(true)
if isSourceDir &&
!cca.Recursive && // Copies the folder & everything under it
!cca.StripTopDir { // Copies only everything under it
// todo: dir only transfer, also todo: support syncing the root folder's acls on sync.
return nil, errors.New("cannot use directory as source without --recursive or a trailing wildcard (/*)")
err = cca.validateSourceDir(traverser)
if err != nil {
return nil, err
}

// Check if the destination is a directory so we can correctly decide where our files land
// Check if the destination is a directory to correctly decide where our files land
isDestDir := cca.isDestDirectory(cca.Destination, &ctx)
if cca.ListOfVersionIDs != nil && (!(cca.FromTo == common.EFromTo.BlobLocal() || cca.FromTo == common.EFromTo.BlobTrash()) || isSourceDir || !isDestDir) {
if cca.ListOfVersionIDs != nil && (!(cca.FromTo == common.EFromTo.BlobLocal() || cca.FromTo == common.EFromTo.BlobTrash()) || cca.IsSourceDir || !isDestDir) {
log.Fatalf("Either source is not a blob or destination is not a local folder")
}
srcLevel, err := DetermineLocationLevel(cca.Source.Value, cca.FromTo.From(), true)
Expand Down Expand Up @@ -367,7 +380,8 @@ func (cca *CookedCopyCmdArgs) isDestDirectory(dst common.ResourceString, ctx *co
return false
}

return rt.IsDirectory(false)
isDir, _ := rt.IsDirectory(false)
return isDir
}

// Initialize the modular filters outside of copy to increase readability.
Expand Down
164 changes: 164 additions & 0 deletions cmd/copyEnumeratorInit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package cmd

import (
"context"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/Azure/azure-storage-azcopy/v10/ste"
"github.com/Azure/azure-storage-blob-go/azblob"
chk "gopkg.in/check.v1"
)

type copyEnumeratorSuite struct{}

var _ = chk.Suite(&copyEnumeratorSuite{})

// ============================================= BLOB TRAVERSER TESTS =======================================
func (ce *copyEnumeratorSuite) TestValidateSourceDirThatExists(c *chk.C) {
bsu := getBSU()

// Generate source container and blobs
containerURL, containerName := createNewContainer(c, bsu)
defer deleteContainer(c, containerURL)
c.Assert(containerURL, chk.NotNil)

dirName := "source_dir"
createNewDirectoryStub(c, containerURL, dirName)
// set up to create blob traverser
ctx := context.WithValue(context.TODO(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
p := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{})

// List
rawBlobURLWithSAS := scenarioHelper{}.getRawBlobURLWithSAS(c, containerName, dirName)
blobTraverser := newBlobTraverser(&rawBlobURLWithSAS, p, ctx, true, true, func(common.EntityType) {}, false, common.CpkOptions{}, false, false, false)

// dir but recursive flag not set - fail
cca := CookedCopyCmdArgs{StripTopDir: false, Recursive: false}
err := cca.validateSourceDir(blobTraverser)
c.Assert(err.Error(), chk.Equals, "cannot use directory as source without --recursive or a trailing wildcard (/*)")

// dir but recursive flag set - pass
cca.Recursive = true
err = cca.validateSourceDir(blobTraverser)
c.Assert(err, chk.IsNil)
c.Assert(cca.IsSourceDir, chk.Equals, true)
}

func (ce *copyEnumeratorSuite) TestValidateSourceDirDoesNotExist(c *chk.C) {
bsu := getBSU()

// Generate source container and blobs
containerURL, containerName := createNewContainer(c, bsu)
defer deleteContainer(c, containerURL)
c.Assert(containerURL, chk.NotNil)

dirName := "source_dir/"
// set up to create blob traverser
ctx := context.WithValue(context.TODO(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
p := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{})

// List
rawBlobURLWithSAS := scenarioHelper{}.getRawBlobURLWithSAS(c, containerName, dirName)
blobTraverser := newBlobTraverser(&rawBlobURLWithSAS, p, ctx, true, true, func(common.EntityType) {}, false, common.CpkOptions{}, false, false, false)

// dir but recursive flag not set - fail
cca := CookedCopyCmdArgs{StripTopDir: false, Recursive: false}
err := cca.validateSourceDir(blobTraverser)
c.Assert(err.Error(), chk.Equals, "cannot use directory as source without --recursive or a trailing wildcard (/*)")

// dir but recursive flag set - pass
cca.Recursive = true
err = cca.validateSourceDir(blobTraverser)
c.Assert(err, chk.IsNil)
c.Assert(cca.IsSourceDir, chk.Equals, true)
}

func (ce *copyEnumeratorSuite) TestValidateSourceFileExists(c *chk.C) {
bsu := getBSU()

// Generate source container and blobs
containerURL, containerName := createNewContainer(c, bsu)
defer deleteContainer(c, containerURL)
c.Assert(containerURL, chk.NotNil)

fileName := "source_file"
_, fileName = createNewBlockBlob(c, containerURL, fileName)

ctx := context.WithValue(context.TODO(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
p := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{})

// List
rawBlobURLWithSAS := scenarioHelper{}.getRawBlobURLWithSAS(c, containerName, fileName)
blobTraverser := newBlobTraverser(&rawBlobURLWithSAS, p, ctx, true, true, func(common.EntityType) {}, false, common.CpkOptions{}, false, false, false)

cca := CookedCopyCmdArgs{StripTopDir: false, Recursive: false}
err := cca.validateSourceDir(blobTraverser)
c.Assert(err, chk.IsNil)
c.Assert(cca.IsSourceDir, chk.Equals, false)
}

func (ce *copyEnumeratorSuite) TestValidateSourceFileDoesNotExist(c *chk.C) {
bsu := getBSU()

// Generate source container and blobs
containerURL, containerName := createNewContainer(c, bsu)
defer deleteContainer(c, containerURL)
c.Assert(containerURL, chk.NotNil)

fileName := "source_file"

ctx := context.WithValue(context.TODO(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
p := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{})

// List
rawBlobURLWithSAS := scenarioHelper{}.getRawBlobURLWithSAS(c, containerName, fileName)
blobTraverser := newBlobTraverser(&rawBlobURLWithSAS, p, ctx, true, true, func(common.EntityType) {}, false, common.CpkOptions{}, false, false, false)

cca := CookedCopyCmdArgs{StripTopDir: false, Recursive: false}
err := cca.validateSourceDir(blobTraverser)
c.Assert(err.Error(), chk.Equals, common.FILE_NOT_FOUND)
c.Assert(cca.IsSourceDir, chk.Equals, false)
}

func (ce *copyEnumeratorSuite) TestValidateSourceWithWildCard(c *chk.C) {
bsu := getBSU()

// Generate source container and blobs
containerURL, containerName := createNewContainer(c, bsu)
defer deleteContainer(c, containerURL)
c.Assert(containerURL, chk.NotNil)

dirName := "source_dir_does_not_exist"
// set up to create blob traverser
ctx := context.WithValue(context.TODO(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
p := azblob.NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{})

// List
rawBlobURLWithSAS := scenarioHelper{}.getRawBlobURLWithSAS(c, containerName, dirName)
blobTraverser := newBlobTraverser(&rawBlobURLWithSAS, p, ctx, true, true, func(common.EntityType) {}, false, common.CpkOptions{}, false, false, false)

// dir but recursive flag not set - fail
cca := CookedCopyCmdArgs{StripTopDir: true, Recursive: false}
err := cca.validateSourceDir(blobTraverser)
c.Assert(err, chk.IsNil)
c.Assert(cca.IsSourceDir, chk.Equals, false)
}
4 changes: 3 additions & 1 deletion cmd/syncEnumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
}

// verify that the traversers are targeting the same type of resources
if sourceTraverser.IsDirectory(true) != destinationTraverser.IsDirectory(true) {
sourceIsDir, _ := sourceTraverser.IsDirectory(true)
destIsDir, _ := destinationTraverser.IsDirectory(true)
if sourceIsDir != destIsDir {
return nil, errors.New("trying to sync between different resource types (either file <-> directory or directory <-> file) which is not allowed." +
"sync must happen between source and destination of the same type, e.g. either file <-> file or directory <-> directory." +
"To make sure target is handled as a directory, add a trailing '/' to the target.")
Expand Down
2 changes: 1 addition & 1 deletion cmd/zc_enumerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func newStoredObject(morpher objectMorpher, name string, relativePath string, en
// pass each StoredObject to the given objectProcessor if it passes all the filters
type ResourceTraverser interface {
Traverse(preprocessor objectMorpher, processor objectProcessor, filters []ObjectFilter) error
IsDirectory(isSource bool) bool
IsDirectory(isSource bool) (bool, error)
// isDirectory has an isSource flag for a single exception to blob.
// Blob should ONLY check remote if it's a source.
// On destinations, because blobs and virtual directories can share names, we should support placing in both ways.
Expand Down
4 changes: 2 additions & 2 deletions cmd/zc_traverser_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ func newBenchmarkTraverser(source string, incrementEnumerationCounter enumeratio
nil
}

func (t *benchmarkTraverser) IsDirectory(bool) bool {
return true
func (t *benchmarkTraverser) IsDirectory(bool) (bool, error) {
return true, nil
}

func (_ *benchmarkTraverser) toReversedString(i uint) string {
Expand Down
28 changes: 18 additions & 10 deletions cmd/zc_traverser_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,30 @@ type blobTraverser struct {
includeSnapshot bool

includeVersion bool

stripTopDir bool
}

func (t *blobTraverser) IsDirectory(isSource bool) bool {
func (t *blobTraverser) IsDirectory(isSource bool) (bool, error) {
isDirDirect := copyHandlerUtil{}.urlIsContainerOrVirtualDirectory(t.rawURL)

// Skip the single blob check if we're checking a destination.
// This is an individual exception for blob because blob supports virtual directories and blobs sharing the same name.
if isDirDirect || !isSource {
return isDirDirect
return isDirDirect, nil
}

_, _, isDirStub, err := t.getPropertiesIfSingleBlob()
_, _, isDirStub, blobErr := t.getPropertiesIfSingleBlob()

if stgErr, ok := err.(azblob.StorageError); ok {
if stgErr, ok := blobErr.(azblob.StorageError); ok {
// We know for sure this is a single blob still, let it walk on through to the traverser.
if stgErr.ServiceCode() == common.CPK_ERROR_SERVICE_CODE {
return false
return false, nil
}
}

if err == nil {
return isDirStub
if blobErr == nil {
return isDirStub, nil
}

blobURLParts := azblob.NewBlobURLParts(*t.rawURL)
Expand All @@ -95,15 +97,21 @@ func (t *blobTraverser) IsDirectory(isSource bool) bool {
msg := fmt.Sprintf("Failed to check if the destination is a folder or a file (Azure Files). Assuming the destination is a file: %s", err)
azcopyScanningLogger.Log(pipeline.LogError, msg)
}
return false
return false, nil
}

if len(resp.Segment.BlobItems) == 0 {
//Not a directory
return false
if stgErr, ok := blobErr.(azblob.StorageError); ok {
// if the blob is not found return the error to throw
if stgErr.ServiceCode() == common.BLOB_NOT_FOUND {
return false, errors.New(common.FILE_NOT_FOUND)
}
}
return false, blobErr
}

return true
return true, nil
}

func (t *blobTraverser) getPropertiesIfSingleBlob() (props *azblob.BlobGetPropertiesResponse, isBlob bool, isDirStub bool, err error) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/zc_traverser_blob_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ type blobAccountTraverser struct {
cpkOptions common.CpkOptions
}

func (t *blobAccountTraverser) IsDirectory(_ bool) bool {
return true // Returns true as account traversal is inherently folder-oriented and recursive.
func (t *blobAccountTraverser) IsDirectory(_ bool) (bool, error) {
return true, nil // Returns true as account traversal is inherently folder-oriented and recursive.
}

func (t *blobAccountTraverser) listContainers() ([]string, error) {
Expand Down
6 changes: 3 additions & 3 deletions cmd/zc_traverser_blob_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ type blobVersionsTraverser struct {
cpkOptions common.CpkOptions
}

func (t *blobVersionsTraverser) IsDirectory(isSource bool) bool {
func (t *blobVersionsTraverser) IsDirectory(isSource bool) (bool, error) {
isDirDirect := copyHandlerUtil{}.urlIsContainerOrVirtualDirectory(t.rawURL)

// Skip the single blob check if we're checking a destination.
// This is an individual exception for blob because blob supports virtual directories and blobs sharing the same name.
if isDirDirect || !isSource {
return isDirDirect
return isDirDirect, nil
}

// The base blob may not exist in some cases.
return false
return false, nil
}

func (t *blobVersionsTraverser) getBlobProperties(versionID string) (props *azblob.BlobGetPropertiesResponse, err error) {
Expand Down
4 changes: 2 additions & 2 deletions cmd/zc_traverser_blobfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func newBlobFSTraverser(rawURL *url.URL, p pipeline.Pipeline, ctx context.Contex
return
}

func (t *blobFSTraverser) IsDirectory(bool) bool {
return copyHandlerUtil{}.urlIsBFSFileSystemOrDirectory(t.ctx, t.rawURL, t.p) // This gets all the fanciness done for us.
func (t *blobFSTraverser) IsDirectory(bool) (bool, error) {
return copyHandlerUtil{}.urlIsBFSFileSystemOrDirectory(t.ctx, t.rawURL, t.p), nil // This gets all the fanciness done for us.
}

func (t *blobFSTraverser) getPropertiesIfSingleFile() (*azbfs.PathGetPropertiesResponse, bool, error) {
Expand Down

0 comments on commit 708a138

Please sign in to comment.