Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update block ID generation logic #2050

Merged
merged 4 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package common

import (
"bytes"
"crypto/md5"
"encoding/base64"
"fmt"
"net/http"
"net/url"
"runtime"
"strings"

"github.com/Azure/azure-storage-azcopy/v10/azbfs"
"github.com/google/uuid"

"github.com/Azure/azure-storage-file-go/azfile"
)
Expand Down Expand Up @@ -178,3 +182,9 @@ func GenerateFullPathWithQuery(rootPath, childPath, extraQuery string) string {
return p + "?" + extraQuery
}
}

func GenerateBlockBlobBlockID(blockNamePrefix string, index int32) string {
nakulkar-msft marked this conversation as resolved.
Show resolved Hide resolved
blockNameMd5 := md5.Sum([]byte(fmt.Sprintf("%s%05d", blockNamePrefix, index)))
blockID, _ := uuid.FromBytes(blockNameMd5[:]) //This func returns error if size of blockNameMd5 is not 16
return base64.StdEncoding.EncodeToString([]byte(blockID.String()))
}
30 changes: 29 additions & 1 deletion common/extensions_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package common

import (
chk "gopkg.in/check.v1"
"fmt"
"math/rand"
"net/url"
"strings"

"github.com/Azure/azure-storage-blob-go/azblob"
chk "gopkg.in/check.v1"
)

type extensionsTestSuite struct{}
Expand Down Expand Up @@ -141,3 +145,27 @@ func (*extensionsTestSuite) TestRedaction(c *chk.C) {
}
}
}


func (*extensionsTestSuite) TestBlockblobBlockIDGeneration(c *chk.C) {
// Make sure that for a given JobID, jobPart, an index in job part and a block index,
// the blockID generated is consistent.
numOfFilesPerDispatchJobPart :=int32(10000) // == cmd.NumOfFilesPerDispatchJobPart
maxNumberOfParts := int32(99999) // Depends on our plan file Name
azCopyBlockLength := 48 // Current size of blocks in AzCopy

jobId := NewUUID()
partNum := rand.Int31n(maxNumberOfParts) // We support a max of 9999 parts
fileIndex := rand.Int31n(numOfFilesPerDispatchJobPart)
blockIndex := rand.Int31n(azblob.BlockBlobMaxBlocks)

blockNamePrefix := fmt.Sprintf("%s%05d%05d", jobId.String(), partNum, fileIndex)
blockName := GenerateBlockBlobBlockID(blockNamePrefix, blockIndex)
c.Assert(len(blockName), chk.Equals, azCopyBlockLength)

for i := 1; i <= 10; i++ {
tmp := GenerateBlockBlobBlockID(blockNamePrefix, blockIndex)
c.Assert(tmp, chk.Equals, blockName)
}

}
9 changes: 6 additions & 3 deletions ste/mgr-JobPartTransferMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type IJobPartTransferMgr interface {
PropertiesToTransfer() common.SetPropertiesFlags
ResetSourceSize() // sets source size to 0 (made to be used by setProperties command to make number of bytes transferred = 0)
SuccessfulBytesTransferred() int64
TransferIndex() (partNum, transferIndex uint32)
}

type TransferInfo struct {
Expand All @@ -120,9 +121,6 @@ type TransferInfo struct {
SrcBlobType azblob.BlobType // used for both S2S and for downloads to local from blob
S2SSrcBlobTier azblob.AccessTierType // AccessTierType (string) is used to accommodate service-side support matrix change.

// NumChunks is the number of chunks in which transfer will be split into while uploading the transfer.
// NumChunks is not used in case of AppendBlob transfer.
NumChunks uint16
RehydratePriority azblob.RehydratePriorityType
}

Expand Down Expand Up @@ -551,6 +549,11 @@ func (jptm *jobPartTransferMgr) ResetSourceSize() {
jptm.transferInfo.SourceSize = 0
}

// This will identity a file in a job
func (jptm *jobPartTransferMgr) TransferIndex() (partNum, transferIndex uint32) {
return uint32(jptm.jobPartMgr.Plan().PartNum), jptm.transferIndex
}

// JobHasLowFileCount returns an estimate of whether we only have a very small number of files in the overall job
// (An "estimate" because it actually only looks at the current job part)
func (jptm *jobPartTransferMgr) JobHasLowFileCount() bool {
Expand Down
16 changes: 11 additions & 5 deletions ste/sender-blockBlob.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package ste

import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/url"
Expand Down Expand Up @@ -61,6 +60,7 @@ type blockBlobSenderBase struct {
atomicChunksWritten int32
atomicPutListIndicator int32
muBlockIDs *sync.Mutex
blockNamePrefix string
}

func getVerifiedChunkParams(transferInfo TransferInfo, memLimit int64) (chunkSize int64, numChunks uint32, err error) {
Expand Down Expand Up @@ -138,6 +138,11 @@ func newBlockBlobSenderBase(jptm IJobPartTransferMgr, destination string, p pipe
// Once track2 goes live, we'll not need to do this conversion/casting and can directly use CpkInfo & CpkScopeInfo
cpkToApply := common.ToClientProvidedKeyOptions(jptm.CpkInfo(), jptm.CpkScopeInfo())

// Block Names of blobs are of format noted below. We generate prefix here.
// md5-Sum{ <128 Bit GUID of AzCopy JobID><5B PartNum><5B Index in the jobPart><5B blockNum> }
partNum, transferIndex := jptm.TransferIndex()
blockNamePrefix := fmt.Sprintf("%s%05d%05d", jptm.Info().JobID.String(), partNum, transferIndex)

return &blockBlobSenderBase{
jptm: jptm,
sip: srcInfoProvider,
Expand All @@ -151,7 +156,9 @@ func newBlockBlobSenderBase(jptm IJobPartTransferMgr, destination string, p pipe
blobTagsToApply: props.SrcBlobTags.ToAzBlobTagsMap(),
destBlobTier: destBlobTier,
cpkToApply: cpkToApply,
muBlockIDs: &sync.Mutex{}}, nil
muBlockIDs: &sync.Mutex{},
blockNamePrefix: blockNamePrefix,
}, nil
}

func (s *blockBlobSenderBase) SendableEntityType() common.EntityType {
Expand Down Expand Up @@ -282,7 +289,6 @@ func (s *blockBlobSenderBase) setBlockID(index int32, value string) {
s.blockIDs[index] = value
}

func (s *blockBlobSenderBase) generateEncodedBlockID() string {
blockID := common.NewUUID().String()
return base64.StdEncoding.EncodeToString([]byte(blockID))
func (s *blockBlobSenderBase) generateEncodedBlockID(index int32) string {
return common.GenerateBlockBlobBlockID(s.blockNamePrefix, index)
}
2 changes: 1 addition & 1 deletion ste/sender-blockBlobFromLocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (u *blockBlobUploader) GenerateUploadFunc(id common.ChunkID, blockIndex int
func (u *blockBlobUploader) generatePutBlock(id common.ChunkID, blockIndex int32, reader common.SingleChunkReader) chunkFunc {
return createSendToRemoteChunkFunc(u.jptm, id, func() {
// step 1: generate block ID
encodedBlockID := u.generateEncodedBlockID()
encodedBlockID := u.generateEncodedBlockID(blockIndex)

// step 2: save the block ID into the list of block IDs
u.setBlockID(blockIndex, encodedBlockID)
Expand Down
2 changes: 1 addition & 1 deletion ste/sender-blockBlobFromURL.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (c *urlToBlockBlobCopier) generateCreateEmptyBlob(id common.ChunkID) chunkF
func (c *urlToBlockBlobCopier) generatePutBlockFromURL(id common.ChunkID, blockIndex int32, adjustedChunkSize int64) chunkFunc {
return createSendToRemoteChunkFunc(c.jptm, id, func() {
// step 1: generate block ID
encodedBlockID := c.generateEncodedBlockID()
encodedBlockID := c.generateEncodedBlockID(blockIndex)

// step 2: save the block ID into the list of block IDs
c.setBlockID(blockIndex, encodedBlockID)
Expand Down