Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow calling app to cancel enumeration and handle scanning directory errors in traverser #1803

Merged
merged 8 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cmd/copyEnumeratorInit.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"
"log"
"net/url"
"os"
Expand All @@ -15,6 +14,8 @@ import (
"sync"
"time"

"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"

"github.com/Azure/azure-pipeline-go/pipeline"

"github.com/Azure/azure-storage-blob-go/azblob"
Expand Down Expand Up @@ -84,7 +85,7 @@ func (cca *CookedCopyCmdArgs) initEnumerator(jobPartOrder common.CopyJobPartOrde
traverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &srcCredInfo,
&cca.FollowSymlinks, cca.ListOfFilesChannel, cca.Recursive, getRemoteProperties,
cca.IncludeDirectoryStubs, cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs,
cca.S2sPreserveBlobTags, azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions)
cca.S2sPreserveBlobTags, azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions, nil /* errorChannel */)

if err != nil {
return nil, err
Expand Down Expand Up @@ -355,7 +356,9 @@ func (cca *CookedCopyCmdArgs) isDestDirectory(dst common.ResourceString, ctx *co
return false
}

rt, err := InitResourceTraverser(dst, cca.FromTo.To(), ctx, &dstCredInfo, nil, nil, false, false, false, common.EPermanentDeleteOption.None(), func(common.EntityType) {}, cca.ListOfVersionIDs, false, pipeline.LogNone, cca.CpkOptions)
rt, err := InitResourceTraverser(dst, cca.FromTo.To(), ctx, &dstCredInfo, nil,
nil, false, false, false, common.EPermanentDeleteOption.None(),
func(common.EntityType) {}, cca.ListOfVersionIDs, false, pipeline.LogNone, cca.CpkOptions, nil /* errorChannel */)

if err != nil {
return false
Expand Down
7 changes: 5 additions & 2 deletions cmd/list.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"context"
"errors"
"fmt"
pipeline2 "github.com/Azure/azure-pipeline-go/pipeline"
"strconv"
"strings"

"github.com/Azure/azure-pipeline-go/pipeline"

"github.com/spf13/cobra"

"github.com/Azure/azure-storage-azcopy/v10/common"
Expand Down Expand Up @@ -221,7 +222,9 @@ func (cooked cookedListCmdArgs) HandleListContainerCommand() (err error) {
}
}

traverser, err := InitResourceTraverser(source, cooked.location, &ctx, &credentialInfo, nil, nil, true, false, false, common.EPermanentDeleteOption.None(), func(common.EntityType) {}, nil, false, pipeline2.LogNone, common.CpkOptions{})
traverser, err := InitResourceTraverser(source, cooked.location, &ctx, &credentialInfo, nil, nil,
true, false, false, common.EPermanentDeleteOption.None(), func(common.EntityType) {},
nil, false, pipeline.LogNone, common.CpkOptions{}, nil /* errorChannel */)

if err != nil {
return fmt.Errorf("failed to initialize traverser: %s", err.Error())
Expand Down
5 changes: 3 additions & 2 deletions cmd/removeEnumerator.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"
"strings"

"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"

"github.com/Azure/azure-pipeline-go/pipeline"

"github.com/Azure/azure-storage-azcopy/v10/azbfs"
Expand All @@ -50,7 +51,7 @@ func newRemoveEnumerator(cca *CookedCopyCmdArgs) (enumerator *CopyEnumerator, er
sourceTraverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &cca.credentialInfo,
nil, cca.ListOfFilesChannel, cca.Recursive, false, cca.IncludeDirectoryStubs,
cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs, false,
azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions)
azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions, nil /* errorChannel */)

// report failure to create traverser
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/setPropertiesEnumerator.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func setPropertiesEnumerator(cca *CookedCopyCmdArgs) (enumerator *CopyEnumerator
sourceTraverser, err = InitResourceTraverser(cca.Source, cca.FromTo.From(), &ctx, &cca.credentialInfo,
nil, cca.ListOfFilesChannel, cca.Recursive, false, cca.IncludeDirectoryStubs,
cca.permanentDeleteOption, func(common.EntityType) {}, cca.ListOfVersionIDs, false,
azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions)
azcopyLogVerbosity.ToPipelineLogLevel(), cca.CpkOptions, nil /* errorChannel */)

// report failure to create traverser
if err != nil {
Expand Down
8 changes: 5 additions & 3 deletions cmd/syncEnumerator.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ import (
"context"
"errors"
"fmt"
"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"
"runtime"
"strings"
"sync/atomic"

"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"

"github.com/Azure/azure-pipeline-go/pipeline"

"github.com/Azure/azure-storage-azcopy/v10/common"
Expand Down Expand Up @@ -56,14 +57,15 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
}

// TODO: enable symlink support in a future release after evaluating the implications
// TODO: Consider passing an errorChannel so that enumeration errors during sync can be conveyed to the caller.
// GetProperties is enabled by default as sync supports both upload and download.
// This property only supports Files and S3 at the moment, but provided that Files sync is coming soon, enable to avoid stepping on Files sync work
sourceTraverser, err := InitResourceTraverser(cca.source, cca.fromTo.From(), &ctx, &srcCredInfo, nil,
nil, cca.recursive, true, cca.isHNSToHNS, common.EPermanentDeleteOption.None(), func(entityType common.EntityType) {
if entityType == common.EEntityType.File() {
atomic.AddUint64(&cca.atomicSourceFilesScanned, 1)
}
}, nil, cca.s2sPreserveBlobTags, azcopyLogVerbosity.ToPipelineLogLevel(), cca.cpkOptions)
}, nil, cca.s2sPreserveBlobTags, azcopyLogVerbosity.ToPipelineLogLevel(), cca.cpkOptions, nil /* errorChannel */)

if err != nil {
return nil, err
Expand All @@ -84,7 +86,7 @@ func (cca *cookedSyncCmdArgs) initEnumerator(ctx context.Context) (enumerator *s
if entityType == common.EEntityType.File() {
atomic.AddUint64(&cca.atomicDestinationFilesScanned, 1)
}
}, nil, cca.s2sPreserveBlobTags, azcopyLogVerbosity.ToPipelineLogLevel(), cca.cpkOptions)
}, nil, cca.s2sPreserveBlobTags, azcopyLogVerbosity.ToPipelineLogLevel(), cca.cpkOptions, nil /* errorChannel */)
if err != nil {
return nil, err
}
Expand Down
14 changes: 11 additions & 3 deletions cmd/zc_enumerator.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,12 @@ type enumerationCounterFunc func(entityType common.EntityType)
// ctx, pipeline are only required for remote resources.
// followSymlinks is only required for local resources (defaults to false)
// errorOnDirWOutRecursive is used by copy.

func InitResourceTraverser(resource common.ResourceString, location common.Location, ctx *context.Context, credential *common.CredentialInfo, followSymlinks *bool, listOfFilesChannel chan string, recursive, getProperties, includeDirectoryStubs bool, permanentDeleteOption common.PermanentDeleteOption, incrementEnumerationCounter enumerationCounterFunc, listOfVersionIds chan string, s2sPreserveBlobTags bool, logLevel pipeline.LogLevel, cpkOptions common.CpkOptions) (ResourceTraverser, error) {
// If errorChannel is non-nil, all errors encountered during enumeration will be conveyed through this channel.
// To avoid slowdowns, use a buffered channel of enough capacity.
func InitResourceTraverser(resource common.ResourceString, location common.Location, ctx *context.Context,
credential *common.CredentialInfo, followSymlinks *bool, listOfFilesChannel chan string, recursive, getProperties,
includeDirectoryStubs bool, permanentDeleteOption common.PermanentDeleteOption, incrementEnumerationCounter enumerationCounterFunc, listOfVersionIds chan string,
s2sPreserveBlobTags bool, logLevel pipeline.LogLevel, cpkOptions common.CpkOptions, errorChannel chan ErrorFileInfo) (ResourceTraverser, error) {
var output ResourceTraverser
var p *pipeline.Pipeline

Expand Down Expand Up @@ -392,7 +396,11 @@ func InitResourceTraverser(resource common.ResourceString, location common.Locat
output = newListTraverser(baseResource, location, nil, nil, recursive, toFollow, getProperties,
globChan, includeDirectoryStubs, incrementEnumerationCounter, s2sPreserveBlobTags, logLevel, cpkOptions)
} else {
output = newLocalTraverser(resource.ValueLocal(), recursive, toFollow, incrementEnumerationCounter)
if ctx != nil {
output = newLocalTraverser(*ctx, resource.ValueLocal(), recursive, toFollow, incrementEnumerationCounter, errorChannel)
Strikerzee marked this conversation as resolved.
Show resolved Hide resolved
} else {
output = newLocalTraverser(context.TODO(), resource.ValueLocal(), recursive, toFollow, incrementEnumerationCounter, errorChannel)
}
}
case common.ELocation.Benchmark():
ben, err := newBenchmarkTraverser(resource.Value, incrementEnumerationCounter)
Expand Down
4 changes: 3 additions & 1 deletion cmd/zc_traverser_list.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ func newListTraverser(parent common.ResourceString, parentType common.Location,
}

// Construct a traverser that goes through the child
traverser, err := InitResourceTraverser(source, parentType, ctx, credential, &followSymlinks, nil, recursive, getProperties, includeDirectoryStubs, common.EPermanentDeleteOption.None(), incrementEnumerationCounter, nil, s2sPreserveBlobTags, logLevel, cpkOptions)
traverser, err := InitResourceTraverser(source, parentType, ctx, credential, &followSymlinks,
nil, recursive, getProperties, includeDirectoryStubs, common.EPermanentDeleteOption.None(), incrementEnumerationCounter,
nil, s2sPreserveBlobTags, logLevel, cpkOptions, nil /* errorChannel */)
if err != nil {
return nil, err
}
Expand Down
58 changes: 43 additions & 15 deletions cmd/zc_traverser_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package cmd

import (
"context"
"errors"
"fmt"
"io/ioutil"
Expand All @@ -41,9 +42,10 @@ type localTraverser struct {
fullPath string
recursive bool
followSymlinks bool

appCtx context.Context
// a generic function to notify that a new stored object has been enumerated
incrementEnumerationCounter enumerationCounterFunc
errorChannel chan ErrorFileInfo
}

func (t *localTraverser) IsDirectory(bool) bool {
Expand Down Expand Up @@ -164,15 +166,28 @@ type symlinkTargetFileInfo struct {
name string
}

// ErrorFileInfo holds information about files and folders that failed enumeration.
type ErrorFileInfo struct {
FilePath string
FileInfo os.FileInfo
ErrorMsg error
}

func (s symlinkTargetFileInfo) Name() string {
return s.name // override the name
}

func writeToErrorChannel(errorChannel chan ErrorFileInfo, err ErrorFileInfo) {
if errorChannel != nil {
errorChannel <- err
}
}

// WalkWithSymlinks is a symlinks-aware, parallelized, version of filePath.Walk.
// Separate this from the traverser for two purposes:
// 1) Cleaner code
// 2) Easier to test individually than to test the entire traverser.
func WalkWithSymlinks(fullPath string, walkFunc filepath.WalkFunc, followSymlinks bool) (err error) {
func WalkWithSymlinks(appCtx context.Context, fullPath string, walkFunc filepath.WalkFunc, followSymlinks bool, errorChannel chan ErrorFileInfo) (err error) {

// We want to re-queue symlinks up in their evaluated form because filepath.Walk doesn't evaluate them for us.
// So, what is the plan of attack?
Expand Down Expand Up @@ -202,9 +217,10 @@ func WalkWithSymlinks(fullPath string, walkFunc filepath.WalkFunc, followSymlink
walkQueue = walkQueue[1:]
// walk contents of this queueItem in parallel
// (for simplicity of coding, we don't parallelize across multiple queueItems)
parallel.Walk(queueItem.fullPath, EnumerationParallelism, EnumerationParallelStatFiles, func(filePath string, fileInfo os.FileInfo, fileError error) error {
parallel.Walk(appCtx, queueItem.fullPath, EnumerationParallelism, EnumerationParallelStatFiles, func(filePath string, fileInfo os.FileInfo, fileError error) error {
if fileError != nil {
WarnStdoutAndScanningLog(fmt.Sprintf("Accessing '%s' failed with error: %s", filePath, fileError))
WarnStdoutAndScanningLog(fmt.Sprintf("Accessing '%s' failed with error: %v", filePath, fileError))
writeToErrorChannel(errorChannel, ErrorFileInfo{FilePath: filePath, FileInfo: fileInfo, ErrorMsg: fileError})
return nil
}
computedRelativePath := strings.TrimPrefix(cleanLocalPath(filePath), cleanLocalPath(queueItem.fullPath))
Expand Down Expand Up @@ -245,25 +261,33 @@ func WalkWithSymlinks(fullPath string, walkFunc filepath.WalkFunc, followSymlink
result, err := UnfurlSymlinks(filePath)

if err != nil {
WarnStdoutAndScanningLog(fmt.Sprintf("Failed to resolve symlink %s: %s", filePath, err))
err = fmt.Errorf("Failed to resolve symlink %s: %v", filePath, err)
Strikerzee marked this conversation as resolved.
Show resolved Hide resolved
WarnStdoutAndScanningLog(err.Error())
writeToErrorChannel(errorChannel, ErrorFileInfo{FilePath: filePath, FileInfo: fileInfo, ErrorMsg: err})
return nil
}

result, err = filepath.Abs(result)
if err != nil {
WarnStdoutAndScanningLog(fmt.Sprintf("Failed to get absolute path of symlink result %s: %s", filePath, err))
err = fmt.Errorf("Failed to get absolute path of symlink result %s: %v", filePath, err)
Strikerzee marked this conversation as resolved.
Show resolved Hide resolved
WarnStdoutAndScanningLog(err.Error())
writeToErrorChannel(errorChannel, ErrorFileInfo{FilePath: filePath, FileInfo: fileInfo, ErrorMsg: err})
return nil
}

slPath, err := filepath.Abs(filePath)
if err != nil {
WarnStdoutAndScanningLog(fmt.Sprintf("Failed to get absolute path of %s: %s", filePath, err))
err = fmt.Errorf("Failed to get absolute path of %s: %v", filePath, err)
WarnStdoutAndScanningLog(err.Error())
writeToErrorChannel(errorChannel, ErrorFileInfo{FilePath: filePath, FileInfo: fileInfo, ErrorMsg: err})
return nil
}

rStat, err := os.Stat(result)
if err != nil {
WarnStdoutAndScanningLog(fmt.Sprintf("Failed to get properties of symlink target at %s: %s", result, err))
err = fmt.Errorf("Failed to get properties of symlink target at %s: %v", result, err)
WarnStdoutAndScanningLog(err.Error())
writeToErrorChannel(errorChannel, ErrorFileInfo{FilePath: filePath, FileInfo: fileInfo, ErrorMsg: err})
return nil
}

Expand Down Expand Up @@ -304,7 +328,9 @@ func WalkWithSymlinks(fullPath string, walkFunc filepath.WalkFunc, followSymlink
result, err := filepath.Abs(filePath)

if err != nil {
WarnStdoutAndScanningLog(fmt.Sprintf("Failed to get absolute path of %s: %s", filePath, err))
err = fmt.Errorf("Failed to get absolute path of %s: %v", filePath, err)
Strikerzee marked this conversation as resolved.
Show resolved Hide resolved
WarnStdoutAndScanningLog(err.Error())
writeToErrorChannel(errorChannel, ErrorFileInfo{FilePath: filePath, FileInfo: fileInfo, ErrorMsg: err})
return nil
}

Expand Down Expand Up @@ -339,7 +365,7 @@ func (t *localTraverser) Traverse(preprocessor objectMorpher, processor objectPr
singleFileInfo, isSingleFile, err := t.getInfoIfSingleFile()

if err != nil {
azcopyScanningLogger.Log(pipeline.LogError, fmt.Sprintf("Failed to scan path %s: %s", t.fullPath, err.Error()))
azcopyScanningLogger.Log(pipeline.LogError, fmt.Sprintf("Failed to scan path %s: %v", t.fullPath, err.Error()))
Strikerzee marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("failed to scan path %s due to %s", t.fullPath, err.Error())
}

Expand Down Expand Up @@ -370,15 +396,15 @@ func (t *localTraverser) Traverse(preprocessor objectMorpher, processor objectPr
if t.recursive {
processFile := func(filePath string, fileInfo os.FileInfo, fileError error) error {
if fileError != nil {
WarnStdoutAndScanningLog(fmt.Sprintf("Accessing %s failed with error: %s", filePath, fileError))
WarnStdoutAndScanningLog(fmt.Sprintf("Accessing %s failed with error: %v", filePath, fileError))
return nil
}

var entityType common.EntityType
if fileInfo.IsDir() {
newFileInfo, err := WrapFolder(filePath, fileInfo)
if err != nil {
WarnStdoutAndScanningLog(fmt.Sprintf("Failed to get last change of target at %s: %s", filePath, err))
WarnStdoutAndScanningLog(fmt.Sprintf("Failed to get last change of target at %s: %v", filePath, err))
} else {
// fileInfo becomes nil in case we fail to wrap folder.
fileInfo = newFileInfo
Expand Down Expand Up @@ -417,7 +443,7 @@ func (t *localTraverser) Traverse(preprocessor objectMorpher, processor objectPr
}

// note: Walk includes root, so no need here to separately create StoredObject for root (as we do for other folder-aware sources)
return WalkWithSymlinks(t.fullPath, processFile, t.followSymlinks)
return WalkWithSymlinks(t.appCtx, t.fullPath, processFile, t.followSymlinks, t.errorChannel)
} else {
// if recursive is off, we only need to scan the files immediately under the fullPath
// We don't transfer any directory properties here, not even the root. (Because the root's
Expand Down Expand Up @@ -495,12 +521,14 @@ func (t *localTraverser) Traverse(preprocessor objectMorpher, processor objectPr
return
}

func newLocalTraverser(fullPath string, recursive bool, followSymlinks bool, incrementEnumerationCounter enumerationCounterFunc) *localTraverser {
func newLocalTraverser(ctx context.Context, fullPath string, recursive bool, followSymlinks bool, incrementEnumerationCounter enumerationCounterFunc, errorChannel chan ErrorFileInfo) *localTraverser {
traverser := localTraverser{
fullPath: cleanLocalPath(fullPath),
recursive: recursive,
followSymlinks: followSymlinks,
incrementEnumerationCounter: incrementEnumerationCounter}
appCtx: ctx,
incrementEnumerationCounter: incrementEnumerationCounter,
errorChannel: errorChannel}
return &traverser
}

Expand Down
8 changes: 5 additions & 3 deletions cmd/zt_generic_service_traverser_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cmd

import (
"context"

"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/Azure/azure-storage-file-go/azfile"
chk "gopkg.in/check.v1"
Expand Down Expand Up @@ -56,7 +58,7 @@ func (s *genericTraverserSuite) TestBlobFSServiceTraverserWithManyObjects(c *chk
scenarioHelper{}.generateLocalFilesFromList(c, dstDirName, objectList)

// Create a local traversal
localTraverser := newLocalTraverser(dstDirName, true, true, func(common.EntityType) {})
localTraverser := newLocalTraverser(context.TODO(), dstDirName, true, true, func(common.EntityType) {}, nil)

// Invoke the traversal with an indexer so the results are indexed for easy validation
localIndexer := newObjectIndexer()
Expand Down Expand Up @@ -172,7 +174,7 @@ func (s *genericTraverserSuite) TestServiceTraverserWithManyObjects(c *chk.C) {
scenarioHelper{}.generateLocalFilesFromList(c, dstDirName, objectList)

// Create a local traversal
localTraverser := newLocalTraverser(dstDirName, true, true, func(common.EntityType) {})
localTraverser := newLocalTraverser(context.TODO(), dstDirName, true, true, func(common.EntityType) {}, nil)

// Invoke the traversal with an indexer so the results are indexed for easy validation
localIndexer := newObjectIndexer()
Expand Down Expand Up @@ -356,7 +358,7 @@ func (s *genericTraverserSuite) TestServiceTraverserWithWildcards(c *chk.C) {
scenarioHelper{}.generateLocalFilesFromList(c, dstDirName, objectList)

// Create a local traversal
localTraverser := newLocalTraverser(dstDirName, true, true, func(common.EntityType) {})
localTraverser := newLocalTraverser(context.TODO(), dstDirName, true, true, func(common.EntityType) {}, nil)

// Invoke the traversal with an indexer so the results are indexed for easy validation
localIndexer := newObjectIndexer()
Expand Down
Loading