Skip to content

Commit

Permalink
Minimise mumber of calls to folder create (#2511)
Browse files Browse the repository at this point in the history
* Minimise mumber of calls to folder create

* Fix test case

* Fix testcase

* Revert change to TestBasic_CopyS2SDir

* Revert change to e2etest/zt_basic_copy_sync_remove_test.go

* Make getProperties only for root folder

* Return early if we are creating root

* Add testcase

* Revert change to e2etest/scenario_helpers.go

* Fix testcase

---------

Co-authored-by: Gauri Prasad <51212198+gapra-msft@users.noreply.github.com>
  • Loading branch information
nakulkar-msft and gapra-msft committed Jan 20, 2024
1 parent 6d787b1 commit 5a3cbf7
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 30 deletions.
12 changes: 12 additions & 0 deletions ste/JobPartPlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ func (mmf *JobPartPlanMMF) Unmap() { (*common.MMF)(mmf).Unmap() }

// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

type IJobPartPlanHeader interface {
CommandString() string
GetRelativeSrcDstStrings(transferIndex uint32) (source string, destination string)
JobPartStatus() common.JobStatus
JobStatus() common.JobStatus
SetJobPartStatus(newJobStatus common.JobStatus)
SetJobStatus(newJobStatus common.JobStatus)
Transfer(transferIndex uint32) *JobPartPlanTransfer
TransferSrcDstRelatives(transferIndex uint32) (relSource string, relDest string)
TransferSrcDstStrings(transferIndex uint32) (source string, destination string, isFolder bool)
TransferSrcPropertiesAndMetadata(transferIndex uint32) (h common.ResourceHTTPHeaders, metadata common.Metadata, blobType blob.BlobType, blobTier blob.AccessTier, s2sGetPropertiesInBackend bool, DestLengthValidation bool, s2sSourceChangeValidation bool, s2sInvalidMetadataHandleOption common.InvalidMetadataHandleOption, entityType common.EntityType, blobVersionID string, blobSnapshotID string, blobTags common.BlobTags)
}
// JobPartPlanHeader represents the header of Job Part's memory-mapped file
type JobPartPlanHeader struct {
// Once set, the following fields are constants; they should never be modified
Expand Down
11 changes: 10 additions & 1 deletion ste/folderCreationTracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (f *nullFolderTracker) StopTracking(folder string) {
}

type jpptFolderTracker struct {
plan *JobPartPlanHeader
plan IJobPartPlanHeader
mu *sync.Mutex
contents map[string]uint32
unregisteredButCreated map[string]struct{}
Expand Down Expand Up @@ -85,6 +85,15 @@ func (f *jpptFolderTracker) CreateFolder(folder string, doCreation func() error)
return nil // Never persist to dev-null
}

if idx, ok := f.contents[folder]; ok &&
f.plan.Transfer(idx).TransferStatus() == (common.ETransferStatus.FolderCreated()) {
return nil
}

if _, ok := f.unregisteredButCreated[folder]; ok {
return nil
}

err := doCreation()
if err != nil {
return err
Expand Down
129 changes: 129 additions & 0 deletions ste/folderCreationTracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright © Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package ste

import (
"sync"
"sync/atomic"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/stretchr/testify/assert"
)

// This is mocked to test the folder creation tracker
type mockedJPPH struct {
folderName []string
index []int
status []*JobPartPlanTransfer

}

func (jpph *mockedJPPH) CommandString() string { panic("Not implemented") }
func (jpph *mockedJPPH) GetRelativeSrcDstStrings(uint32) (string, string) { panic("Not implemented") }
func (jpph *mockedJPPH) JobPartStatus() common.JobStatus { panic("Not implemented") }
func (jpph *mockedJPPH) JobStatus() common.JobStatus { panic("Not implemented") }
func (jpph *mockedJPPH) SetJobPartStatus(common.JobStatus) { panic("Not implemented") }
func (jpph *mockedJPPH) SetJobStatus(common.JobStatus) { panic("Not implemented") }
func (jpph *mockedJPPH) Transfer(idx uint32) *JobPartPlanTransfer {
return jpph.status[idx]
}
func (jpph *mockedJPPH) TransferSrcDstRelatives(uint32) (string, string) { panic("Not implemented") }
func (jpph *mockedJPPH) TransferSrcDstStrings(uint32) (string, string, bool) { panic("Not implemented") }
func (jpph *mockedJPPH) TransferSrcPropertiesAndMetadata(uint32) (common.ResourceHTTPHeaders, common.Metadata, blob.BlobType, blob.AccessTier, bool, bool, bool, common.InvalidMetadataHandleOption, common.EntityType, string, string, common.BlobTags) {
panic("Not implemented")
}


// This test verifies that when we call dir create for a directory, it is created only once,
// even if multiple routines request it to be created.
func TestFolderCreationTracker_directoryCreate(t *testing.T) {
a := assert.New(t)

// create a plan with one registered and one unregistered folder
folderReg := "folderReg"
folderUnReg := "folderUnReg"


plan := &mockedJPPH{
folderName: []string{folderReg, folderUnReg},
index: []int{0, 1},
status: []*JobPartPlanTransfer {
&JobPartPlanTransfer{atomicTransferStatus: common.ETransferStatus.NotStarted(),},
&JobPartPlanTransfer{atomicTransferStatus: common.ETransferStatus.NotStarted(),},
},
}

fct := &jpptFolderTracker{
plan: plan,
mu: &sync.Mutex{},
contents: make(map[string]uint32),
unregisteredButCreated: make(map[string]struct{}),
}

// 1. Register folder1
fct.RegisterPropertiesTransfer(folderReg, 0)

// Multiple calls to create folderReg should execute create only once.
numOfCreations := int32(0)
var wg sync.WaitGroup
doCreation := func() error{
atomic.AddInt32(&numOfCreations, 1)
plan.status[0].atomicTransferStatus = common.ETransferStatus.FolderCreated()
return nil
}

ch := make(chan bool)
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
<-ch
fct.CreateFolder(folderReg, doCreation)
wg.Done()
}()
}
close(ch) // this will cause all above go rotuines to start creating folder

wg.Wait()
a.Equal(int32(1), numOfCreations)

// similar test for unregistered folder
numOfCreations = 0
ch = make(chan bool)
doCreation = func() error{
atomic.AddInt32(&numOfCreations, 1)
plan.status[1].atomicTransferStatus = common.ETransferStatus.FolderCreated()
return nil
}
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
<-ch
fct.CreateFolder(folderUnReg, doCreation)
wg.Done()
}()
}
close(ch)

wg.Wait()
a.Equal(int32(1), numOfCreations)

}
55 changes: 26 additions & 29 deletions ste/sender-azureFile.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,39 +479,36 @@ func (d AzureFileParentDirCreator) CreateParentDirToRoot(ctx context.Context, fi
}

func (d AzureFileParentDirCreator) CreateDirToRoot(ctx context.Context, shareClient *share.Client, directoryClient *directory.Client, t FolderCreationTracker) error {
fileURLParts, err := file.ParseURL(directoryClient.URL())
if err != nil {
// ignoring error below because we're getting URL from a valid client.
fileURLParts, _ := file.ParseURL(directoryClient.URL())

// Try to create the parent directories. Split directories as segments.
segments := d.splitWithoutToken(fileURLParts.DirectoryOrFilePath, '/')
if len(segments) == 0 {
// If we are trying to create root, perform GetProperties instead.
// Azure Files has delayed creation of root, and if we do not perform GetProperties,
// some operations like SetMetadata or SetProperties will fail.
// TODO: Remove this block once the bug is fixed.
_, err := directoryClient.GetProperties(ctx, nil)
return err
}
_, err = directoryClient.GetProperties(ctx, nil)
if err != nil {
var respErr *azcore.ResponseError
if errors.As(err, &respErr) && (respErr.StatusCode == http.StatusNotFound || respErr.StatusCode == http.StatusForbidden) {
// Either the parent directory does not exist, or we may not have read permissions.
// Try to create the parent directories. Split directories as segments.
segments := d.splitWithoutToken(fileURLParts.DirectoryOrFilePath, '/')
currentDirectoryClient := shareClient.NewRootDirectoryClient() // Share directory should already exist, doesn't support creating share
// Try to create the directories
for i := 0; i < len(segments); i++ {
currentDirectoryClient = currentDirectoryClient.NewSubdirectoryClient(segments[i])
rawURL := currentDirectoryClient.URL()
recorderURL, err := url.Parse(rawURL)
if err != nil {
return err
}
recorderURL.RawQuery = ""
err = t.CreateFolder(recorderURL.String(), func() error {
_, err := currentDirectoryClient.Create(ctx, nil)
return err
})
if verifiedErr := d.verifyAndHandleCreateErrors(err); verifiedErr != nil {
return verifiedErr
}
}
} else {
currentDirectoryClient := shareClient.NewRootDirectoryClient() // Share directory should already exist, doesn't support creating share
// Try to create the directories
for i := 0; i < len(segments); i++ {
currentDirectoryClient = currentDirectoryClient.NewSubdirectoryClient(segments[i])
rawURL := currentDirectoryClient.URL()
recorderURL, err := url.Parse(rawURL)
if err != nil {
return err
}
recorderURL.RawQuery = ""
err = t.CreateFolder(recorderURL.String(), func() error {
_, err := currentDirectoryClient.Create(ctx, nil)
return err
})
if verifiedErr := d.verifyAndHandleCreateErrors(err); verifiedErr != nil {
return verifiedErr
}
}
// Directly return if parent directory exists.
return nil
}

0 comments on commit 5a3cbf7

Please sign in to comment.